diff --git a/src/mqtt.c b/src/mqtt.c index ebed082..02c2767 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -10,89 +10,244 @@ #include "mqtt.h" #include "mqtt_internal.h" #include "platform.h" +#include "protocol.h" +#include "debug.h" -#define BUF_LEN MAX_BUFFER_SIZE +static inline void mqtt_free(MQTTHandle *handle) { + release_platform(handle); + free(handle); +} + +static inline void disconnect(MQTTHandle *handle) { + close(handle->sock); + // FIXME: Do we have to do anything else? +} + +static inline bool find_waiting(MQTTHandle *handle, MQTTPacket *packet) { + // TODO: Try to find a waiting task and call its callback + // TODO: Remove waiting task from queue + + return false; +} + +static inline void send_subscription(MQTTHandle *handle, PublishPayload *payload) { + // TODO: find subscriber and call the callback +} + +static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) { + switch (packet->packet_type) { + case PacketTypeConnAck: + case PacketTypePubAck: + case PacketTypePubRec: + case PacketTypePubRel: + case PacketTypePubComp: + case PacketTypeSubAck: + case PacketTypeUnsubAck: + case PacketTypePingResp: + if (!find_waiting(handle, packet)) { + disconnect(handle); + } + + case PacketTypePublish: + send_subscription(handle, packet->payload); + + // client -> server, will not be handled in client + case PacketTypeConnect: + case PacketTypeSubscribe: + case PacketTypeUnsubscribe: + case PacketTypePingReq: + case PacketTypeDisconnect: + disconnect(handle); + break; + } +} static void _reader(MQTTHandle *handle) { int num_bytes; - char buffer[BUF_LEN]; + char *read_buffer = malloc(max_receive_buffer_size); + uint8_t offset = 0; handle->reader_alive = true; while (1) { - num_bytes = read(handle->sock, &buffer, BUF_LEN); + num_bytes = read(handle->sock, &read_buffer[offset], max_receive_buffer_size - offset); if (num_bytes == 0) { /* Socket closed, coordinated shutdown */ + DEBUG_LOG("Socket closed"); handle->reader_alive = false; return; } else if (num_bytes < 0) { if ((errno == EINTR) || (errno == EAGAIN)) { + DEBUG_LOG("Interrupted"); continue; } /* Set reader task to dead */ handle->reader_alive = false; + DEBUG_LOG("Read error: %s", strerror(errno)); return; } - // TODO: Parse and dispatch + while (1) { + Buffer *buffer = buffer_from_data_no_copy(read_buffer, num_bytes); + MQTTPacket *packet = mqtt_packet_decode(buffer); + if (packet == NULL) { + DEBUG_LOG("Invalid packet"); + // invalid packet + if (num_bytes < max_receive_buffer_size) { + // maybe not long enough, try to fetch the rest + offset += num_bytes; + free(buffer); + DEBUG_LOG("Trying to read more..."); + break; + } else { + // no space in buffer, bail and reconnect + DEBUG_LOG("Buffer overflow!"); + disconnect(handle); + handle->reader_alive = false; + free(buffer); + return; + } + } else { + DEBUG_LOG("Packet parsed"); + parse_packet(handle, packet); + free_MQTTPacket(packet); + + if (!buffer_eof(buffer)) { + DEBUG_LOG("Residual buffer data"); + // Not complete recv buffer was consumed, so we have more than one packet in there + size_t remaining = max_receive_buffer_size - buffer->position; + memmove(read_buffer, read_buffer + buffer->position, remaining); + offset -= remaining; + num_bytes -= remaining; + free(buffer); + } else { + DEBUG_LOG("Buffer consumed"); + // buffer consumed completely, read another chunk + offset = 0; + free(buffer); + break; + } + } + } } } -MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback) { +static void _mqtt_connect(MQTTHandle *handle) { int ret; - MQTTHandle *handle = calloc(sizeof(struct _MQTTHandle), 1); - initialize_platform(handle); - struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); - if (config->port == 0) { - config->port = 1883; - } - handle->sock = socket(AF_INET, SOCK_STREAM, 0); servaddr.sin_family = AF_INET; - servaddr.sin_port = htons(config->port); + servaddr.sin_port = htons(handle->config->port); - ret = inet_pton(AF_INET, config->hostname, &(servaddr.sin_addr)); - if (ret == 0) { - callback(handle, MQTT_Error_Host_Not_Found); + char ip[40]; + if (!hostname_to_ip(handle->config->hostname, ip)) { + bool free_handle = handle->error_handler(handle, MQTT_Error_Host_Not_Found); + if (free_handle) { + mqtt_free(handle); + } + DEBUG_LOG("Resolving hostname failed: %s", strerror(errno)); close(handle->sock); - free(handle); - return NULL; + return; + } + ret = inet_pton(AF_INET, ip, &(servaddr.sin_addr)); + if (ret == 0) { + bool free_handle = handle->error_handler(handle, MQTT_Error_Host_Not_Found); + if (free_handle) { + mqtt_free(handle); + } + DEBUG_LOG("Converting to servaddr failed: %s", strerror(errno)); + close(handle->sock); + return; } ret = connect(handle->sock, (struct sockaddr *)&servaddr, sizeof(servaddr)); if (ret != 0) { - callback(handle, MQTT_Error_Connection_Refused); + bool free_handle = handle->error_handler(handle, MQTT_Error_Connection_Refused); + if (free_handle) { + mqtt_free(handle); + } + DEBUG_LOG("Connection failed: %s", strerror(errno)); close(handle->sock); - free(handle); - return NULL; + return; } run_read_task(handle, _reader); + bool result = send_connect_packet(handle); + if (result == false) { + bool free_handle = handle->error_handler(handle, MQTT_Error_Broker_Disconnected); + if (free_handle) { + mqtt_free(handle); + } + DEBUG_LOG("Sending connect packet failed..."); + close(handle->sock); + } +} + +MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback) { + MQTTHandle *handle = calloc(sizeof(struct _MQTTHandle), 1); + initialize_platform(handle); + + if (config->port == 0) { + config->port = 1883; + } + + handle->config = config; + handle->error_handler = callback; + + _mqtt_connect(handle); + return handle; } + MQTTStatus mqtt_reconnect(MQTTHandle *handle) { - // TODO: reconnect + if (handle->reader_alive) { + DEBUG_LOG("Closing old connection"); + close(handle->sock); + join_read_task(handle); + } + _mqtt_connect(handle); + + return MQTT_STATUS_OK; } MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTEventHandler callback) { - // TODO: subscribe + if (!handle->reader_alive) { + handle->error_handler(handle, MQTT_Error_Connection_Reset); + return MQTT_STATUS_ERROR; + } + return (send_subscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); + // TODO: add subscription to list } MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) { - // TODO: unsubscribe + if (!handle->reader_alive) { + handle->error_handler(handle, MQTT_Error_Connection_Reset); + return MQTT_STATUS_ERROR; + } + return (send_unsubscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); + // TODO: remove subscription from list } MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level) { - // TODO: publish + if (!handle->reader_alive) { + handle->error_handler(handle, MQTT_Error_Connection_Reset); + return MQTT_STATUS_ERROR; + } + return (send_publish_packet(handle, topic, payload, qos_level) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); } MQTTStatus mqtt_disconnect(MQTTHandle *handle) { - release_platform(handle); - free(handle); + DEBUG_LOG("Disconnecting...") + if (close(handle->sock)) { + return MQTT_STATUS_ERROR; + } + join_read_task(handle); + mqtt_free(handle); + + return MQTT_STATUS_OK; } diff --git a/src/mqtt_internal.h b/src/mqtt_internal.h index 1157e05..5ee673b 100644 --- a/src/mqtt_internal.h +++ b/src/mqtt_internal.h @@ -2,6 +2,7 @@ #define mqtt_internal_h__included #include "mqtt.h" +#include "packet.h" typedef struct _PlatformData PlatformData; @@ -9,17 +10,39 @@ typedef struct { char *topic; MQTTEventHandler *handler; bool pending; +} SubscriptionItem; + +typedef struct { + SubscriptionItem *items; + uint8_t num_items; } Subscriptions; +typedef void (*MQTTCallback)(MQTTHandle *handle, MQTTPacket *packet, void *context); + +typedef struct { + MQTTControlPacketType type; + uint16_t packet_id; + void *context; + MQTTCallback callback; +} MQTTCallbackQueueItem; + +typedef struct { + MQTTCallbackQueueItem *pending; + uint8_t num_items; +} MQTTCallbackQueue; + struct _MQTTHandle { - MQTTErrorHandler *errorHandler; - Subscriptions *subscriptions; - int num_subscriptions; + MQTTConfig *config; + + MQTTErrorHandler error_handler; + Subscriptions subscriptions; int sock; bool reader_alive; - // TODO: status queue (Waiting for ACK) + uint16_t packet_id_counter; + + MQTTCallbackQueue queue; PlatformData *platform; }; diff --git a/src/protocol.c b/src/protocol.c index e69de29..04c4d4f 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -0,0 +1,100 @@ +#include +#include + +#include "mqtt_internal.h" +#include "packet.h" +#include "buffer.h" + +#include "debug.h" + +static bool send_buffer(MQTTHandle *handle, Buffer *buffer) { + DEBUG_LOG("Attempting to send:"); + buffer_hexdump(buffer, 2); + + while (buffer_free_space(buffer) > 0) { + ssize_t bytes = write(handle->sock, buffer->data + buffer->position, buffer_free_space(buffer)); + if (bytes <= 0) { + DEBUG_LOG("write error, %s", strerror(errno)); + buffer_release(buffer); + return false; + } + DEBUG_LOG("Wrote %zu bytes...", bytes); + buffer->position += bytes; + } + DEBUG_LOG("Buffer written..."); + buffer_release(buffer); + return true; +} + +bool send_connect_packet(MQTTHandle *handle) { + ConnectPayload *payload = calloc(1, sizeof(ConnectPayload)); + + payload->client_id = handle->config->client_id; + payload->protocol_level = 4; + payload->keepalive_interval = 60; + + // TODO: support last will + // payload->will_topic = "test/lastwill"; + // payload->will_message = "disconnected"; + // payload->will_qos = MQTT_QOS_1; + // payload->retain_will = true; + + payload->username = handle->config->username; + payload->password = handle->config->password; + + Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeConnect, payload }); + free(payload); + + // TODO: add waiting ConnAck to queue + + encoded->position = 0; + return send_buffer(handle, encoded); +} + +bool send_subscribe_packet(MQTTHandle *handle, char *topic, MQTTQosLevel qos) { + SubscribePayload *payload = calloc(1, sizeof(SubscribePayload)); + + payload->packet_id = handle->packet_id_counter++; + payload->topic = topic; + payload->qos = qos; + + Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeSubscribe, payload }); + free(payload); + + // TODO: add waiting SubAck to queue + + encoded->position = 0; + return send_buffer(handle, encoded); +} + +bool send_unsubscribe_packet(MQTTHandle *handle, char *topic) { + UnsubscribePayload *payload = calloc(1, sizeof(UnsubscribePayload)); + + payload->packet_id = 10; + payload->topic = "test/topic"; + + Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeUnsubscribe, payload }); + free(payload); + + // TODO: add waiting UnsubAck to queue + encoded->position = 0; + return send_buffer(handle, encoded); +} + +bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos) { + PublishPayload *payload = calloc(1, sizeof(PublishPayload)); + + payload->qos = qos; + payload->retain = true; + payload->topic = topic; + payload->packet_id = handle->packet_id_counter++; + payload->message = message; + + Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePublish, payload }); + free(payload); + + // TODO: Handle QoS and add waiting packets to queue + + encoded->position = 0; + return send_buffer(handle, encoded); +} diff --git a/src/protocol.h b/src/protocol.h index e69de29..aaf3267 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -0,0 +1,11 @@ +#ifndef protocol_h__included +#define protocol_h__included + +#include "mqtt.h" + +bool send_connect_packet(MQTTHandle *handle); +bool send_subscribe_packet(MQTTHandle *handle, char *topic); +bool send_unsubscribe_packet(MQTTHandle *handle, char *topic); +bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos); + +#endif diff --git a/tests/connect_publish.c b/tests/connect_publish.c new file mode 100644 index 0000000..4d2d43b --- /dev/null +++ b/tests/connect_publish.c @@ -0,0 +1,51 @@ +#include +#include +#include + +#include "mqtt.h" + +#define LOG(fmt, ...) fprintf(stdout, fmt "\n", ## __VA_ARGS__) + +bool err_handler(MQTTHandle *handle, MQTTErrorCode error) { + LOG("Error received: %d", error); + exit(1); + + return true; +} + +int main(int argc, char **argv) { + MQTTConfig config = { 0 }; + + config.client_id = "libmqtt_testsuite"; + config.hostname = "localhost"; + config.port = 1883; + + LOG("Trying to connect to %s", config.hostname); + MQTTHandle *mqtt = mqtt_connect(&config, err_handler); + + if (mqtt == NULL) { + LOG("Connection failed!"); + return 1; + } + + LOG("Connected!"); + + sleep(5); + + LOG("Trying publish to testsuite/mqtt/test..."); + MQTTStatus result = mqtt_publish(mqtt, "testsuite/mqtt/test", "payload", MQTT_QOS_0); + if (result != MQTT_STATUS_OK) { + LOG("Could not publish"); + return 1; + } + + sleep(5); + + LOG("Disconnecting..."); + result = mqtt_disconnect(mqtt); + if (result != MQTT_STATUS_OK) { + LOG("Could not disconnect"); + return 1; + } + return 0; +} diff --git a/tests/connect_subscribe.c b/tests/connect_subscribe.c new file mode 100644 index 0000000..2c87cc1 --- /dev/null +++ b/tests/connect_subscribe.c @@ -0,0 +1,53 @@ +#include +#include + +#include "mqtt.h" + +#define LOG(fmt, ...) fprintf(stdout, fmt "\n", ## __VA_ARGS__) + +bool err_handler(MQTTHandle *handle, MQTTErrorCode error) { + LOG("Error received: %d", error); + + return 1; +} + +bool leave = false; +void callback(MQTTHandle *handle, char *topic, char *payload) { + LOG("Received publish: %s -> %s", topic, payload); + leave = true; +} + +int main(int argc, char **argv) { + MQTTConfig config; + + config.client_id = "libmqtt_testsuite"; + config.hostname = "test.mosquitto.org"; + config.port = 1883; + + LOG("Trying to connect to test.mosquitto.org"); + MQTTHandle *mqtt = mqtt_connect(&config, err_handler); + + if (mqtt == NULL) { + LOG("Connection failed!"); + return 1; + } + LOG("Connected!"); + + sleep(5); + + LOG("Trying subscribe on testsuite/mqtt/test..."); + MQTTStatus result = mqtt_subscribe(mqtt, "testsuite/mqtt/test", callback); + if (result != MQTT_STATUS_OK) { + LOG("Could not publish"); + return 1; + } + + while (!leave) { + LOG("Waiting..."); + sleep(1); + } + + LOG("Disconnecting..."); + mqtt_disconnect(mqtt); + return 0; +}