diff --git a/CMakeLists.txt b/CMakeLists.txt index 4104cd1..e03ee83 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -133,6 +133,16 @@ add_executable (connect_subscribe.test tests/connect_subscribe.c) ) add_test(NAME ConnectSubscribe COMMAND ${PROJECT_BINARY_DIR}/connect_subscribe.test) +add_executable (connect_reconnect.test tests/connect_reconnect.c) + target_link_libraries (connect_reconnect.test mqtt-full ${PLATFORM_LIBS}) + target_include_directories(connect_reconnect.test + PRIVATE + ${PROJECT_BINARY_DIR} + ${CMAKE_CURRENT_SOURCE_DIR}/src + ${CMAKE_CURRENT_SOURCE_DIR}/platform + ) + add_test(NAME ConnectReconnect COMMAND ${PROJECT_BINARY_DIR}/connect_reconnect.test) + add_executable (decode_packet.test tests/decode_packet.c) target_link_libraries (decode_packet.test mqtt-full ${PLATFORM_LIBS}) target_include_directories(decode_packet.test diff --git a/src/mqtt.c b/src/mqtt.c index 0ae4be1..d4294fa 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -16,6 +16,33 @@ void mqtt_free(MQTTHandle *handle) { free(handle); } +/* + * State handling + */ + + void cleanup_session(MQTTHandle *handle) { + // Remove all waiting packets + clear_packet_queue(handle); +} + +static MQTTStatus resubscribe(MQTTHandle *handle) { + // re-subscribe to all topics + SubscriptionItem *item = handle->subscriptions.items; + while (item != NULL) { + if (!send_subscribe_packet(handle, item->topic, item->qos)) { + DEBUG_LOG("Error sending subscribe packet"); + return MQTT_STATUS_ERROR; + } + item = item->next; + } + + return MQTT_STATUS_OK; +} + +/* + * Keepalive + */ + static void _keepalive_callback(MQTTHandle *handle, int timer_handle) { bool result = send_ping_packet(handle); if (!result) { @@ -23,15 +50,35 @@ static void _keepalive_callback(MQTTHandle *handle, int timer_handle) { } } +/* + * Packet parser + */ + static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) { + // DEBUG_LOG("Packet, type: %s, packet_id: %d", get_packet_name(packet), get_packet_id(packet)); + switch (packet->packet_type) { case PacketTypeConnAck: if (!dispatch_packet(handle, packet)) { DEBUG_LOG("Unexpected packet! (type: CONNACK)"); + (void)platform_destroy_timer(handle, handle->keepalive_timer); + handle->keepalive_timer = -1; (void)platform_disconnect(handle); } else { + ConnAckPayload *payload = (ConnAckPayload *)packet->payload; + if ((!payload->session_present) && (handle->reconnecting)) { + cleanup_session(handle); + if (resubscribe(handle) != MQTT_STATUS_OK) { + DEBUG_LOG("Could not re-subscribe to all topics!"); + (void)platform_destroy_timer(handle, handle->keepalive_timer); + handle->keepalive_timer = -1; + (void)platform_disconnect(handle); + } + } if (platform_create_timer(handle, KEEPALIVE_INTERVAL, &handle->keepalive_timer, _keepalive_callback) != PlatformStatusOk) { DEBUG_LOG("Could not create keepalive timer!"); + (void)platform_destroy_timer(handle, handle->keepalive_timer); + handle->keepalive_timer = -1; (void)platform_disconnect(handle); } } @@ -45,11 +92,14 @@ static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) { case PacketTypeUnsubAck: if (!dispatch_packet(handle, packet)) { DEBUG_LOG("Unexpected packet! (type: %s, packet_id: %d)", get_packet_name(packet), get_packet_id(packet)); + (void)platform_destroy_timer(handle, handle->keepalive_timer); + handle->keepalive_timer = -1; (void)platform_disconnect(handle); } break; case PacketTypePublish: + // DEBUG_LOG("Publish on %s -> %s", ((PublishPayload *)packet->payload)->topic, ((PublishPayload *)packet->payload)->message); dispatch_subscription(handle, (PublishPayload *)packet->payload); // TODO: Handle QoS break; @@ -65,11 +115,17 @@ static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) { case PacketTypePingReq: case PacketTypeDisconnect: DEBUG_LOG("Server packet on client connection? What's up with the broker?"); + (void)platform_destroy_timer(handle, handle->keepalive_timer); + handle->keepalive_timer = -1; (void)platform_disconnect(handle); break; } } +/* + * Reading loop + */ + PlatformTaskFunc(_reader) { MQTTHandle *handle = (MQTTHandle *)context; Buffer *buffer = buffer_allocate(max_receive_buffer_size); @@ -98,7 +154,9 @@ PlatformTaskFunc(_reader) { } else { // no space in buffer, bail and reconnect DEBUG_LOG("Buffer overflow!"); - platform_disconnect(handle); + (void)platform_destroy_timer(handle, handle->keepalive_timer); + handle->keepalive_timer = -1; + (void)platform_disconnect(handle); handle->reader_alive = false; buffer_release(buffer); return 0; @@ -136,10 +194,16 @@ static void _mqtt_connect(MQTTHandle *handle, MQTTEventHandler callback, void *c return; } - ret = platform_run_task(handle, &handle->read_task_handle, _reader); - if (ret == PlatformStatusError) { - DEBUG_LOG("Could not start read task"); - return; + if (!handle->reader_alive) { + if (handle->read_task_handle >= 0) { + platform_cleanup_task(handle, handle->read_task_handle); + handle->read_task_handle = -1; + } + ret = platform_run_task(handle, &handle->read_task_handle, _reader); + if (ret == PlatformStatusError) { + DEBUG_LOG("Could not start read task"); + return; + } } expect_packet(handle, PacketTypeConnAck, 0, callback, context); @@ -151,11 +215,16 @@ static void _mqtt_connect(MQTTHandle *handle, MQTTEventHandler callback, void *c platform_disconnect(handle); if (free_handle) { platform_cleanup_task(handle, handle->read_task_handle); + handle->read_task_handle = -1; mqtt_free(handle); } } } +/* + * API + */ + MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTEventHandler callback, void *context, MQTTErrorHandler error_callback) { // sanity check if ((config->client_id != NULL) && (strlen(config->client_id) > 23)) { @@ -185,15 +254,15 @@ MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTEventHandler callback, void *co MQTTStatus mqtt_reconnect(MQTTHandle *handle, MQTTEventHandler callback, void *context) { if (handle->reader_alive) { - DEBUG_LOG("Closing old connection"); - platform_disconnect(handle); - platform_cleanup_task(handle, handle->read_task_handle); + (void)platform_destroy_timer(handle, handle->keepalive_timer); + handle->keepalive_timer = -1; + (void)platform_disconnect(handle); + // DEBUG_LOG("Waiting for reader to exit"); + // platform_cleanup_task(handle, handle->read_task_handle); } - // TODO: re-submit unacknowledged packages with QoS > 0 - // TODO: clear waiting packets - // TODO: re-subscribe all subscriptions - + handle->config->clean_session = false; + handle->reconnecting = true; _mqtt_connect(handle, callback, context); return MQTT_STATUS_OK; @@ -226,9 +295,12 @@ MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosL } MQTTStatus mqtt_disconnect(MQTTHandle *handle, MQTTEventHandler callback, void *callback_context) { - send_disconnect_packet(handle); - platform_disconnect(handle); - platform_cleanup_task(handle, handle->read_task_handle); + (void)send_disconnect_packet(handle); + (void)platform_destroy_timer(handle, handle->keepalive_timer); + handle->keepalive_timer = -1; + (void)platform_disconnect(handle); + (void)platform_cleanup_task(handle, handle->read_task_handle); + handle->read_task_handle = -1; mqtt_free(handle); if (callback) { diff --git a/src/mqtt_internal.h b/src/mqtt_internal.h index adbb9cf..a0feeb5 100644 --- a/src/mqtt_internal.h +++ b/src/mqtt_internal.h @@ -22,6 +22,7 @@ struct _MQTTHandle { MQTTCallbackQueue queue; PlatformData *platform; + bool reconnecting; int keepalive_timer; }; diff --git a/src/protocol.c b/src/protocol.c index b4cac7f..97f8987 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -10,6 +10,7 @@ typedef struct { PublishPayload *payload; MQTTPublishEventHandler callback; + MQTTQosLevel qos; } PublishCallback; /* @@ -157,6 +158,7 @@ bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQos PublishCallback *ctx = (PublishCallback *)malloc(sizeof(PublishCallback)); ctx->payload = payload; ctx->callback = callback; + ctx->qos = payload->qos; expect_packet(handle, PacketTypePubAck, payload->packet_id, handle_puback_pubcomp, ctx); break; } @@ -164,6 +166,7 @@ bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQos PublishCallback *ctx = (PublishCallback *)malloc(sizeof(PublishCallback)); ctx->payload = payload; ctx->callback = callback; + ctx->qos = payload->qos; expect_packet(handle, PacketTypePubRec, payload->packet_id, handle_pubrec, ctx); break; } diff --git a/src/state_queue.c b/src/state_queue.c index 391a2c2..9db56fa 100644 --- a/src/state_queue.c +++ b/src/state_queue.c @@ -55,6 +55,18 @@ void remove_from_queue(MQTTHandle *handle, MQTTCallbackQueueItem *remove) { } } +void clear_packet_queue(MQTTHandle *handle) { + MQTTCallbackQueueItem *item = handle->queue.pending; + handle->queue.pending = NULL; + + while (item != NULL) { + MQTTCallbackQueueItem *current = item; + item = item->next; + + free(current); + } +} + bool dispatch_packet(MQTTHandle *handle, MQTTPacket *packet) { MQTTCallbackQueueItem *item = handle->queue.pending; uint16_t packet_id = get_packet_id(packet); diff --git a/src/state_queue.h b/src/state_queue.h index 9657ebd..f5e8e30 100644 --- a/src/state_queue.h +++ b/src/state_queue.h @@ -20,5 +20,6 @@ typedef struct { void expect_packet(MQTTHandle *handle, MQTTControlPacketType type, uint16_t packet_id, MQTTEventHandler callback, void *context); bool dispatch_packet(MQTTHandle *handle, MQTTPacket *packet); +void clear_packet_queue(MQTTHandle *handle); #endif /* state_queue_h__included */ diff --git a/tests/connect_publish.c b/tests/connect_publish.c index b013901..4e96e98 100644 --- a/tests/connect_publish.c +++ b/tests/connect_publish.c @@ -45,8 +45,6 @@ void mqtt_connected(MQTTHandle *handle, void *context) { LOG("Could not publish"); exit(1); } - - leave = true; } int main(int argc, char **argv) { @@ -80,7 +78,8 @@ int main(int argc, char **argv) { platform_sleep(1000); cancel++; if (cancel == 10) { - break; + LOG("Giving up!"); + return 1; } } diff --git a/tests/connect_reconnect.c b/tests/connect_reconnect.c new file mode 100644 index 0000000..d0098d6 --- /dev/null +++ b/tests/connect_reconnect.c @@ -0,0 +1,66 @@ +#include +#include + +#include "platform.h" +#include "mqtt.h" + +int leave = 0; + +#define LOG(fmt, ...) fprintf(stdout, fmt "\n", ## __VA_ARGS__) + +bool err_handler(MQTTHandle *handle, MQTTConfig *config, MQTTErrorCode error) { + LOG("Error received: %d", error); + + return 1; +} + +void mqtt_reconnected(MQTTHandle *handle, void *context) { + LOG("Reconnected!"); +} + +void callback(MQTTHandle *handle, char *topic, char *payload) { + LOG("Received publish: %s -> %s", topic, payload); + + leave++; +} + +void mqtt_connected(MQTTHandle *handle, void *context) { + LOG("Connected!"); + + LOG("Trying subscribe on testsuite/mqtt/test..."); + MQTTStatus result = mqtt_subscribe(handle, "testsuite/mqtt/test", MQTT_QOS_0, callback); + if (result != MQTT_STATUS_OK) { + LOG("Could not subscribe test"); + exit(1); + } +} + +int main(int argc, char **argv) { + MQTTConfig config = { 0 }; + + config.client_id = "libmqtt_testsuite"; + config.hostname = "localhost"; + config.clean_session = true; + + LOG("Trying to connect to %s...", config.hostname); + MQTTHandle *mqtt = mqtt_connect(&config, mqtt_connected, NULL, err_handler); + + if (mqtt == NULL) { + LOG("Connection failed!"); + return 1; + } + + while (leave < 1) { + LOG("Waiting for first publish..."); + platform_sleep(1000); + } + mqtt_reconnect(mqtt, mqtt_reconnected, NULL); + + while (leave < 2) { + LOG("Waiting for second publish..."); + platform_sleep(1000); + } + + LOG("Disconnecting..."); + mqtt_disconnect(mqtt, NULL, NULL); +}