parent
cd89a74ff6
commit
0be28a49d4
5 changed files with 273 additions and 128 deletions
183
platform/linux.c
183
platform/linux.c
|
@ -1,46 +1,87 @@
|
||||||
#include<string.h>
|
#include <arpa/inet.h>
|
||||||
#include<sys/socket.h>
|
#include <netdb.h>
|
||||||
#include<errno.h>
|
#include <stdio.h>
|
||||||
#include<netdb.h>
|
#include <string.h>
|
||||||
#include<arpa/inet.h>
|
#include <sys/socket.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
|
||||||
|
#include "mqtt_internal.h"
|
||||||
#include "platform.h"
|
#include "platform.h"
|
||||||
|
|
||||||
const size_t max_receive_buffer_size = 4 * 4096; // 16 KB
|
const size_t max_receive_buffer_size = 4 * 4096; // 16 KB
|
||||||
|
|
||||||
|
#define MAX_TASKS 16
|
||||||
|
|
||||||
struct _PlatformData {
|
struct _PlatformData {
|
||||||
pthread_t read_thread;
|
pthread_t tasks[MAX_TASKS];
|
||||||
|
int sock;
|
||||||
};
|
};
|
||||||
|
|
||||||
void initialize_platform(MQTTHandle *handle) {
|
PlatformStatusCode platform_init(MQTTHandle *handle) {
|
||||||
handle->platform = calloc(1, sizeof(struct _PlatformData));
|
handle->platform = calloc(1, sizeof(struct _PlatformData));
|
||||||
}
|
handle->platform->sock = -1;
|
||||||
|
if (handle->platform) {
|
||||||
MQTTStatus run_read_task(MQTTHandle *handle, Reader reader) {
|
return PlatformStatusOk;
|
||||||
if (pthread_create(&handle->platform->read_thread, NULL, (void *(*)(void *))reader, (void *)handle)) {
|
|
||||||
return MQTT_STATUS_ERROR;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return MQTT_STATUS_OK;
|
return PlatformStatusError;
|
||||||
}
|
}
|
||||||
|
|
||||||
MQTTStatus join_read_task(MQTTHandle *handle) {
|
PlatformStatusCode platform_release(MQTTHandle *handle) {
|
||||||
if (handle->platform->read_thread) {
|
PlatformData *p = handle->platform;
|
||||||
pthread_join(handle->platform->read_thread, NULL);
|
|
||||||
handle->platform->read_thread = 0;
|
for (uint8_t free_task = 0; free_task < MAX_TASKS; free_task++) {
|
||||||
|
if (p->tasks[free_task] != 0) {
|
||||||
|
DEBUG_LOG("Cannot free platform handle, there are tasks running!");
|
||||||
|
return PlatformStatusError;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return MQTT_STATUS_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
void release_platform(MQTTHandle *handle) {
|
|
||||||
free(handle->platform);
|
free(handle->platform);
|
||||||
|
return PlatformStatusOk;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PlatformStatusCode platform_run_task(MQTTHandle *handle, int *task_handle, PlatformTask callback) {
|
||||||
|
PlatformData *p = handle->platform;
|
||||||
|
uint8_t free_task = 0;
|
||||||
|
|
||||||
bool hostname_to_ip(char *hostname , char *ip) {
|
for (free_task = 0; free_task < MAX_TASKS; free_task++) {
|
||||||
|
if (p->tasks[free_task] == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (free_task == MAX_TASKS) {
|
||||||
|
return PlatformStatusError;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pthread_create(&p->tasks[free_task], NULL, (void *(*)(void *))callback, (void *)handle)) {
|
||||||
|
return PlatformStatusError;
|
||||||
|
}
|
||||||
|
|
||||||
|
return PlatformStatusOk;
|
||||||
|
}
|
||||||
|
|
||||||
|
PlatformStatusCode platform_cleanup_task(MQTTHandle *handle, int task_handle) {
|
||||||
|
PlatformData *p = handle->platform;
|
||||||
|
|
||||||
|
if ((task_handle < 0) || (task_handle >= MAX_TASKS)) {
|
||||||
|
return PlatformStatusError;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p->tasks[task_handle]) {
|
||||||
|
pthread_join(p->tasks[task_handle], NULL);
|
||||||
|
p->tasks[task_handle] = 0;
|
||||||
|
}
|
||||||
|
return PlatformStatusOk;
|
||||||
|
}
|
||||||
|
|
||||||
|
PlatformStatusCode platform_resolve_host(char *hostname , char *ip) {
|
||||||
struct addrinfo hints, *servinfo;
|
struct addrinfo hints, *servinfo;
|
||||||
struct sockaddr_in *h;
|
struct sockaddr_in *h;
|
||||||
|
|
||||||
|
@ -50,8 +91,8 @@ bool hostname_to_ip(char *hostname , char *ip) {
|
||||||
|
|
||||||
int ret = getaddrinfo(hostname, NULL, &hints, &servinfo);
|
int ret = getaddrinfo(hostname, NULL, &hints, &servinfo);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
// fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
|
DEBUG_LOG("Resolving host failed: %s", gai_strerror(ret));
|
||||||
return false;
|
return PlatformStatusError;
|
||||||
}
|
}
|
||||||
|
|
||||||
// loop through all the results and connect to the first we can
|
// loop through all the results and connect to the first we can
|
||||||
|
@ -61,5 +102,99 @@ bool hostname_to_ip(char *hostname , char *ip) {
|
||||||
}
|
}
|
||||||
|
|
||||||
freeaddrinfo(servinfo); // all done with this structure
|
freeaddrinfo(servinfo); // all done with this structure
|
||||||
return true;
|
return PlatformStatusOk;
|
||||||
|
}
|
||||||
|
|
||||||
|
PlatformStatusCode platform_connect(MQTTHandle *handle) {
|
||||||
|
PlatformData *p = handle->platform;
|
||||||
|
|
||||||
|
int ret;
|
||||||
|
struct sockaddr_in servaddr;
|
||||||
|
memset(&servaddr, 0, sizeof(servaddr));
|
||||||
|
|
||||||
|
p->sock = socket(AF_INET, SOCK_STREAM, 0);
|
||||||
|
servaddr.sin_family = AF_INET;
|
||||||
|
servaddr.sin_port = htons(handle->config->port);
|
||||||
|
|
||||||
|
char ip[40];
|
||||||
|
if (platform_resolve_host(handle->config->hostname, ip) != PlatformStatusOk) {
|
||||||
|
bool free_handle = handle->error_handler(handle, handle->config, MQTT_Error_Host_Not_Found);
|
||||||
|
if (free_handle) {
|
||||||
|
mqtt_free(handle);
|
||||||
|
}
|
||||||
|
DEBUG_LOG("Resolving hostname failed: %s", strerror(errno));
|
||||||
|
close(p->sock);
|
||||||
|
return PlatformStatusError;
|
||||||
|
}
|
||||||
|
ret = inet_pton(AF_INET, ip, &(servaddr.sin_addr));
|
||||||
|
if (ret == 0) {
|
||||||
|
bool free_handle = handle->error_handler(handle, handle->config, MQTT_Error_Host_Not_Found);
|
||||||
|
if (free_handle) {
|
||||||
|
mqtt_free(handle);
|
||||||
|
}
|
||||||
|
DEBUG_LOG("Converting to servaddr failed: %s", strerror(errno));
|
||||||
|
close(p->sock);
|
||||||
|
return PlatformStatusError;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = connect(p->sock, (struct sockaddr *)&servaddr, sizeof(servaddr));
|
||||||
|
if (ret != 0) {
|
||||||
|
bool free_handle = handle->error_handler(handle, handle->config, MQTT_Error_Connection_Refused);
|
||||||
|
if (free_handle) {
|
||||||
|
mqtt_free(handle);
|
||||||
|
}
|
||||||
|
DEBUG_LOG("Connection failed: %s", strerror(errno));
|
||||||
|
close(p->sock);
|
||||||
|
return PlatformStatusError;
|
||||||
|
}
|
||||||
|
|
||||||
|
return PlatformStatusOk;
|
||||||
|
}
|
||||||
|
|
||||||
|
PlatformStatusCode platform_read(MQTTHandle *handle, Buffer *buffer) {
|
||||||
|
PlatformData *p = handle->platform;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
ssize_t num_bytes = read(p->sock, &buffer->data[buffer->position], buffer_free_space(buffer));
|
||||||
|
if (num_bytes == 0) {
|
||||||
|
/* Socket closed, coordinated shutdown */
|
||||||
|
DEBUG_LOG("Socket closed");
|
||||||
|
return PlatformStatusError;
|
||||||
|
} else if (num_bytes < 0) {
|
||||||
|
if ((errno == EINTR) || (errno == EAGAIN)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Set reader task to dead */
|
||||||
|
handle->reader_alive = false;
|
||||||
|
return PlatformStatusError;
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer->position += num_bytes;
|
||||||
|
return PlatformStatusOk;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
PlatformStatusCode platform_write(MQTTHandle *handle, Buffer *buffer) {
|
||||||
|
PlatformData *p = handle->platform;
|
||||||
|
|
||||||
|
while (!buffer_eof(buffer)) {
|
||||||
|
ssize_t bytes = write(p->sock, buffer->data + buffer->position, buffer_free_space(buffer));
|
||||||
|
if (bytes <= 0) {
|
||||||
|
return PlatformStatusError;
|
||||||
|
}
|
||||||
|
buffer->position += bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
return PlatformStatusOk;
|
||||||
|
}
|
||||||
|
|
||||||
|
PlatformStatusCode platform_disconnect(MQTTHandle *handle) {
|
||||||
|
PlatformData *p = handle->platform;
|
||||||
|
if (p->sock >= 0) {
|
||||||
|
close(p->sock);
|
||||||
|
p->sock = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return PlatformStatusOk;
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,41 +3,95 @@
|
||||||
|
|
||||||
#include "mqtt_internal.h"
|
#include "mqtt_internal.h"
|
||||||
|
|
||||||
typedef void (*Reader)(MQTTHandle *handle);
|
typedef void (*PlatformTask)(MQTTHandle *handle);
|
||||||
|
|
||||||
/** maximum receiver buffer size, defined by platform */
|
/** maximum receiver buffer size, defined by platform */
|
||||||
extern const size_t max_receive_buffer_size;
|
extern const size_t max_receive_buffer_size;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
PlatformStatusOk, /**< Everything ok */
|
||||||
|
PlatformStatusError, /**< Non-recoverable error */
|
||||||
|
PlatformStatusRetry /**< Recoverable error */
|
||||||
|
} PlatformStatusCode;
|
||||||
|
|
||||||
bool hostname_to_ip(char *hostname, char *ip);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize platform specific data
|
* Initialize platform specific data
|
||||||
*
|
*
|
||||||
* @param handle: The handle to initialize
|
* @param handle: The handle to initialize
|
||||||
|
* @return Platform status code
|
||||||
*/
|
*/
|
||||||
void initialize_platform(MQTTHandle *handle);
|
PlatformStatusCode platform_init(MQTTHandle *handle);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Platform specific function to release resources associated with a MQTTHandle
|
||||||
|
*
|
||||||
|
* @param handle: The handle to clean up
|
||||||
|
* @return Platform status code
|
||||||
|
*/
|
||||||
|
PlatformStatusCode platform_release(MQTTHandle *handle);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Platform specific function to start a reading thread
|
* Platform specific function to start a reading thread
|
||||||
*
|
*
|
||||||
* @param handle: The broker connection handle
|
* @param handle: The broker connection handle
|
||||||
* @param reader: callback to run in the thread
|
* @param task_handle: Task handle output
|
||||||
|
* @param callback: callback to run in the thread
|
||||||
|
* @return Platform status code
|
||||||
*/
|
*/
|
||||||
MQTTStatus run_read_task(MQTTHandle *handle, Reader reader);
|
PlatformStatusCode platform_run_task(MQTTHandle *handle, int *task_handle, PlatformTask callback);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Platform specific function to clean up the reading thread
|
* Platform specific function to clean up the reading thread
|
||||||
*
|
*
|
||||||
* @param handle: The broker connection handle
|
* @param handle: State handle
|
||||||
|
* @param task_handle: Task handle to clean up
|
||||||
|
* @return Platform status code
|
||||||
*/
|
*/
|
||||||
MQTTStatus join_read_task(MQTTHandle *handle);
|
PlatformStatusCode platform_cleanup_task(MQTTHandle *handle, int task_handle);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Platform specific function to release resources associated with a MQTTHandle
|
* Resolve host
|
||||||
*
|
*
|
||||||
* @param handle: The handle to clean up
|
* @param hostname: Hostname to resolve
|
||||||
|
* @param ip_out: resulting IP address if no error occured
|
||||||
|
* @return Platform status code
|
||||||
*/
|
*/
|
||||||
void release_platform(MQTTHandle *handle);
|
PlatformStatusCode platform_resolve_host(char *hostname, char *ip_out);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connect to host from configuration
|
||||||
|
*
|
||||||
|
* @param handle: The configuration
|
||||||
|
* @return Platform status code
|
||||||
|
*/
|
||||||
|
PlatformStatusCode platform_connect(MQTTHandle *handle);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read from the "socket" in the handle
|
||||||
|
*
|
||||||
|
* @param handle: State handle
|
||||||
|
* @param buffer: Read target
|
||||||
|
* @return Platform status code
|
||||||
|
*/
|
||||||
|
PlatformStatusCode platform_read(MQTTHandle *handle, Buffer *buffer);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write to the "socket" in the handle
|
||||||
|
*
|
||||||
|
* @param handle: State handle
|
||||||
|
* @param buffer: Write source
|
||||||
|
* @return Platform status code
|
||||||
|
*/
|
||||||
|
PlatformStatusCode platform_write(MQTTHandle *handle, Buffer *buffer);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Disconnect the "socket" in the handle
|
||||||
|
*
|
||||||
|
* @param handle: State handle
|
||||||
|
* @return Platform status code
|
||||||
|
*/
|
||||||
|
PlatformStatusCode platform_disconnect(MQTTHandle *handle);
|
||||||
|
|
||||||
|
|
||||||
#endif /* platform_h__included */
|
#endif /* platform_h__included */
|
||||||
|
|
124
src/mqtt.c
124
src/mqtt.c
|
@ -14,16 +14,11 @@
|
||||||
#include "protocol.h"
|
#include "protocol.h"
|
||||||
#include "debug.h"
|
#include "debug.h"
|
||||||
|
|
||||||
static inline void mqtt_free(MQTTHandle *handle) {
|
void mqtt_free(MQTTHandle *handle) {
|
||||||
release_platform(handle);
|
platform_release(handle);
|
||||||
free(handle);
|
free(handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline void disconnect(MQTTHandle *handle) {
|
|
||||||
close(handle->sock);
|
|
||||||
// FIXME: Do we have to do anything else?
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) {
|
static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) {
|
||||||
switch (packet->packet_type) {
|
switch (packet->packet_type) {
|
||||||
case PacketTypeConnAck:
|
case PacketTypeConnAck:
|
||||||
|
@ -35,7 +30,7 @@ static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) {
|
||||||
case PacketTypeUnsubAck:
|
case PacketTypeUnsubAck:
|
||||||
if (!dispatch_packet(handle, packet)) {
|
if (!dispatch_packet(handle, packet)) {
|
||||||
DEBUG_LOG("Unexpected packet! (type: %s, packet_id: %d)", get_packet_name(packet), get_packet_id(packet));
|
DEBUG_LOG("Unexpected packet! (type: %s, packet_id: %d)", get_packet_name(packet), get_packet_id(packet));
|
||||||
disconnect(handle);
|
(void)platform_disconnect(handle);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
@ -55,70 +50,61 @@ static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) {
|
||||||
case PacketTypePingReq:
|
case PacketTypePingReq:
|
||||||
case PacketTypeDisconnect:
|
case PacketTypeDisconnect:
|
||||||
DEBUG_LOG("Server packet on client connection? What's up with the broker?");
|
DEBUG_LOG("Server packet on client connection? What's up with the broker?");
|
||||||
disconnect(handle);
|
(void)platform_disconnect(handle);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void _reader(MQTTHandle *handle) {
|
static void _reader(MQTTHandle *handle) {
|
||||||
ssize_t num_bytes;
|
Buffer *buffer = buffer_allocate(max_receive_buffer_size);
|
||||||
char *read_buffer = malloc(max_receive_buffer_size);
|
|
||||||
uint8_t offset = 0;
|
|
||||||
|
|
||||||
handle->reader_alive = true;
|
handle->reader_alive = true;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
num_bytes = read(handle->sock, &read_buffer[offset], max_receive_buffer_size - offset);
|
PlatformStatusCode ret = platform_read(handle, buffer);
|
||||||
if (num_bytes == 0) {
|
if (ret == PlatformStatusError) {
|
||||||
/* Socket closed, coordinated shutdown */
|
|
||||||
DEBUG_LOG("Socket closed");
|
|
||||||
handle->reader_alive = false;
|
|
||||||
return;
|
|
||||||
} else if (num_bytes < 0) {
|
|
||||||
if ((errno == EINTR) || (errno == EAGAIN)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Set reader task to dead */
|
|
||||||
handle->reader_alive = false;
|
handle->reader_alive = false;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
Buffer *buffer = buffer_from_data_no_copy(read_buffer, num_bytes);
|
buffer->len = buffer->position;
|
||||||
|
buffer->position = 0;
|
||||||
|
|
||||||
MQTTPacket *packet = mqtt_packet_decode(buffer);
|
MQTTPacket *packet = mqtt_packet_decode(buffer);
|
||||||
if (packet == NULL) {
|
if (packet == NULL) {
|
||||||
// invalid packet
|
// invalid packet
|
||||||
if (num_bytes < max_receive_buffer_size) {
|
if (buffer_free_space(buffer) > 0) {
|
||||||
// maybe not long enough, try to fetch the rest
|
// half packet, fetch more
|
||||||
offset += num_bytes;
|
buffer->position = buffer->len;
|
||||||
free(buffer);
|
buffer->len = max_receive_buffer_size;
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
// no space in buffer, bail and reconnect
|
// no space in buffer, bail and reconnect
|
||||||
DEBUG_LOG("Buffer overflow!");
|
DEBUG_LOG("Buffer overflow!");
|
||||||
disconnect(handle);
|
platform_disconnect(handle);
|
||||||
handle->reader_alive = false;
|
handle->reader_alive = false;
|
||||||
free(buffer);
|
buffer_release(buffer);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// hexdump(buffer->data, num_bytes, 2);
|
hexdump(buffer->data, num_bytes, 2);
|
||||||
|
|
||||||
parse_packet(handle, packet);
|
parse_packet(handle, packet);
|
||||||
free_MQTTPacket(packet);
|
free_MQTTPacket(packet);
|
||||||
|
|
||||||
if (!buffer_eof(buffer)) {
|
if (!buffer_eof(buffer)) {
|
||||||
|
buffer->position = buffer->len;
|
||||||
|
buffer->len = max_receive_buffer_size;
|
||||||
|
|
||||||
// Not complete recv buffer was consumed, so we have more than one packet in there
|
// Not complete recv buffer was consumed, so we have more than one packet in there
|
||||||
size_t remaining = max_receive_buffer_size - buffer->position;
|
size_t remaining = max_receive_buffer_size - buffer->position;
|
||||||
memmove(read_buffer, read_buffer + buffer->position, remaining);
|
memmove(buffer->data, buffer->data + buffer->position, remaining);
|
||||||
offset -= buffer->position;
|
buffer->position = 0;
|
||||||
num_bytes -= buffer->position;
|
break;
|
||||||
free(buffer);
|
|
||||||
} else {
|
} else {
|
||||||
// buffer consumed completely, read another chunk
|
// buffer consumed completely, read another chunk
|
||||||
offset = 0;
|
buffer->position = 0;
|
||||||
free(buffer);
|
buffer->len = max_receive_buffer_size;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -127,58 +113,30 @@ static void _reader(MQTTHandle *handle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void _mqtt_connect(MQTTHandle *handle, MQTTEventHandler callback, void *context) {
|
static void _mqtt_connect(MQTTHandle *handle, MQTTEventHandler callback, void *context) {
|
||||||
int ret;
|
PlatformStatusCode ret = platform_connect(handle);
|
||||||
struct sockaddr_in servaddr;
|
|
||||||
memset(&servaddr, 0, sizeof(servaddr));
|
|
||||||
|
|
||||||
handle->sock = socket(AF_INET, SOCK_STREAM, 0);
|
if (ret == PlatformStatusError) {
|
||||||
servaddr.sin_family = AF_INET;
|
DEBUG_LOG("Could not connect");
|
||||||
servaddr.sin_port = htons(handle->config->port);
|
|
||||||
|
|
||||||
char ip[40];
|
|
||||||
if (!hostname_to_ip(handle->config->hostname, ip)) {
|
|
||||||
bool free_handle = handle->error_handler(handle, handle->config, MQTT_Error_Host_Not_Found);
|
|
||||||
if (free_handle) {
|
|
||||||
mqtt_free(handle);
|
|
||||||
}
|
|
||||||
DEBUG_LOG("Resolving hostname failed: %s", strerror(errno));
|
|
||||||
close(handle->sock);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
ret = inet_pton(AF_INET, ip, &(servaddr.sin_addr));
|
|
||||||
if (ret == 0) {
|
|
||||||
bool free_handle = handle->error_handler(handle, handle->config, MQTT_Error_Host_Not_Found);
|
|
||||||
if (free_handle) {
|
|
||||||
mqtt_free(handle);
|
|
||||||
}
|
|
||||||
DEBUG_LOG("Converting to servaddr failed: %s", strerror(errno));
|
|
||||||
close(handle->sock);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = connect(handle->sock, (struct sockaddr *)&servaddr, sizeof(servaddr));
|
ret = platform_run_task(handle, &handle->read_task_handle, _reader);
|
||||||
if (ret != 0) {
|
if (ret == PlatformStatusError) {
|
||||||
bool free_handle = handle->error_handler(handle, handle->config, MQTT_Error_Connection_Refused);
|
DEBUG_LOG("Could not start read task");
|
||||||
if (free_handle) {
|
|
||||||
mqtt_free(handle);
|
|
||||||
}
|
|
||||||
DEBUG_LOG("Connection failed: %s", strerror(errno));
|
|
||||||
close(handle->sock);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
run_read_task(handle, _reader);
|
|
||||||
|
|
||||||
expect_packet(handle, PacketTypeConnAck, 0, callback, context);
|
expect_packet(handle, PacketTypeConnAck, 0, callback, context);
|
||||||
|
|
||||||
bool result = send_connect_packet(handle);
|
bool result = send_connect_packet(handle);
|
||||||
if (result == false) {
|
if (result == false) {
|
||||||
DEBUG_LOG("Sending connect packet failed, running error handler");
|
DEBUG_LOG("Sending connect packet failed, running error handler");
|
||||||
bool free_handle = handle->error_handler(handle, handle->config, MQTT_Error_Broker_Disconnected);
|
bool free_handle = handle->error_handler(handle, handle->config, MQTT_Error_Broker_Disconnected);
|
||||||
|
platform_disconnect(handle);
|
||||||
if (free_handle) {
|
if (free_handle) {
|
||||||
|
platform_cleanup_task(handle, handle->read_task_handle);
|
||||||
mqtt_free(handle);
|
mqtt_free(handle);
|
||||||
}
|
}
|
||||||
close(handle->sock);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -190,7 +148,11 @@ MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTEventHandler callback, void *co
|
||||||
}
|
}
|
||||||
|
|
||||||
MQTTHandle *handle = calloc(sizeof(struct _MQTTHandle), 1);
|
MQTTHandle *handle = calloc(sizeof(struct _MQTTHandle), 1);
|
||||||
initialize_platform(handle);
|
PlatformStatusCode ret = platform_init(handle);
|
||||||
|
if (ret == PlatformStatusError) {
|
||||||
|
free(handle);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (config->port == 0) {
|
if (config->port == 0) {
|
||||||
config->port = 1883;
|
config->port = 1883;
|
||||||
|
@ -208,8 +170,8 @@ MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTEventHandler callback, void *co
|
||||||
MQTTStatus mqtt_reconnect(MQTTHandle *handle, MQTTEventHandler callback, void *context) {
|
MQTTStatus mqtt_reconnect(MQTTHandle *handle, MQTTEventHandler callback, void *context) {
|
||||||
if (handle->reader_alive) {
|
if (handle->reader_alive) {
|
||||||
DEBUG_LOG("Closing old connection");
|
DEBUG_LOG("Closing old connection");
|
||||||
close(handle->sock);
|
platform_disconnect(handle);
|
||||||
join_read_task(handle);
|
platform_cleanup_task(handle, handle->read_task_handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: re-submit unacknowledged packages with QoS > 0
|
// TODO: re-submit unacknowledged packages with QoS > 0
|
||||||
|
@ -249,10 +211,8 @@ MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosL
|
||||||
|
|
||||||
MQTTStatus mqtt_disconnect(MQTTHandle *handle, MQTTEventHandler callback, void *callback_context) {
|
MQTTStatus mqtt_disconnect(MQTTHandle *handle, MQTTEventHandler callback, void *callback_context) {
|
||||||
send_disconnect_packet(handle);
|
send_disconnect_packet(handle);
|
||||||
if (close(handle->sock)) {
|
platform_disconnect(handle);
|
||||||
return MQTT_STATUS_ERROR;
|
platform_cleanup_task(handle, handle->read_task_handle);
|
||||||
}
|
|
||||||
join_read_task(handle);
|
|
||||||
mqtt_free(handle);
|
mqtt_free(handle);
|
||||||
|
|
||||||
if (callback) {
|
if (callback) {
|
||||||
|
|
|
@ -14,8 +14,8 @@ struct _MQTTHandle {
|
||||||
MQTTErrorHandler error_handler;
|
MQTTErrorHandler error_handler;
|
||||||
Subscriptions subscriptions;
|
Subscriptions subscriptions;
|
||||||
|
|
||||||
int sock;
|
|
||||||
bool reader_alive;
|
bool reader_alive;
|
||||||
|
int read_task_handle;
|
||||||
|
|
||||||
uint16_t packet_id_counter;
|
uint16_t packet_id_counter;
|
||||||
|
|
||||||
|
@ -23,4 +23,6 @@ struct _MQTTHandle {
|
||||||
PlatformData *platform;
|
PlatformData *platform;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void mqtt_free(MQTTHandle *handle);
|
||||||
|
|
||||||
#endif /* mqtt_internal_h__included */
|
#endif /* mqtt_internal_h__included */
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
|
#include <platform.h>
|
||||||
|
|
||||||
#include "mqtt_internal.h"
|
#include "mqtt_internal.h"
|
||||||
#include "packet.h"
|
#include "packet.h"
|
||||||
|
@ -17,16 +18,9 @@ typedef struct {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
bool send_buffer(MQTTHandle *handle, Buffer *buffer) {
|
bool send_buffer(MQTTHandle *handle, Buffer *buffer) {
|
||||||
while (!buffer_eof(buffer)) {
|
PlatformStatusCode ret = platform_write(handle, buffer);
|
||||||
ssize_t bytes = write(handle->sock, buffer->data + buffer->position, buffer_free_space(buffer));
|
|
||||||
if (bytes <= 0) {
|
|
||||||
buffer_release(buffer);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
buffer->position += bytes;
|
|
||||||
}
|
|
||||||
buffer_release(buffer);
|
buffer_release(buffer);
|
||||||
return true;
|
return (ret == PlatformStatusOk);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
Loading…
Reference in a new issue