Implement server reconnect

Re #2
This commit is contained in:
Johannes Schriewer 2018-08-05 23:58:50 +02:00
parent ec578f0dbd
commit 790ddec60e
8 changed files with 182 additions and 18 deletions

View file

@ -133,6 +133,16 @@ add_executable (connect_subscribe.test tests/connect_subscribe.c)
)
add_test(NAME ConnectSubscribe COMMAND ${PROJECT_BINARY_DIR}/connect_subscribe.test)
add_executable (connect_reconnect.test tests/connect_reconnect.c)
target_link_libraries (connect_reconnect.test mqtt-full ${PLATFORM_LIBS})
target_include_directories(connect_reconnect.test
PRIVATE
${PROJECT_BINARY_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/src
${CMAKE_CURRENT_SOURCE_DIR}/platform
)
add_test(NAME ConnectReconnect COMMAND ${PROJECT_BINARY_DIR}/connect_reconnect.test)
add_executable (decode_packet.test tests/decode_packet.c)
target_link_libraries (decode_packet.test mqtt-full ${PLATFORM_LIBS})
target_include_directories(decode_packet.test

View file

@ -16,6 +16,33 @@ void mqtt_free(MQTTHandle *handle) {
free(handle);
}
/*
* State handling
*/
void cleanup_session(MQTTHandle *handle) {
// Remove all waiting packets
clear_packet_queue(handle);
}
static MQTTStatus resubscribe(MQTTHandle *handle) {
// re-subscribe to all topics
SubscriptionItem *item = handle->subscriptions.items;
while (item != NULL) {
if (!send_subscribe_packet(handle, item->topic, item->qos)) {
DEBUG_LOG("Error sending subscribe packet");
return MQTT_STATUS_ERROR;
}
item = item->next;
}
return MQTT_STATUS_OK;
}
/*
* Keepalive
*/
static void _keepalive_callback(MQTTHandle *handle, int timer_handle) {
bool result = send_ping_packet(handle);
if (!result) {
@ -23,15 +50,35 @@ static void _keepalive_callback(MQTTHandle *handle, int timer_handle) {
}
}
/*
* Packet parser
*/
static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) {
// DEBUG_LOG("Packet, type: %s, packet_id: %d", get_packet_name(packet), get_packet_id(packet));
switch (packet->packet_type) {
case PacketTypeConnAck:
if (!dispatch_packet(handle, packet)) {
DEBUG_LOG("Unexpected packet! (type: CONNACK)");
(void)platform_destroy_timer(handle, handle->keepalive_timer);
handle->keepalive_timer = -1;
(void)platform_disconnect(handle);
} else {
ConnAckPayload *payload = (ConnAckPayload *)packet->payload;
if ((!payload->session_present) && (handle->reconnecting)) {
cleanup_session(handle);
if (resubscribe(handle) != MQTT_STATUS_OK) {
DEBUG_LOG("Could not re-subscribe to all topics!");
(void)platform_destroy_timer(handle, handle->keepalive_timer);
handle->keepalive_timer = -1;
(void)platform_disconnect(handle);
}
}
if (platform_create_timer(handle, KEEPALIVE_INTERVAL, &handle->keepalive_timer, _keepalive_callback) != PlatformStatusOk) {
DEBUG_LOG("Could not create keepalive timer!");
(void)platform_destroy_timer(handle, handle->keepalive_timer);
handle->keepalive_timer = -1;
(void)platform_disconnect(handle);
}
}
@ -45,11 +92,14 @@ static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) {
case PacketTypeUnsubAck:
if (!dispatch_packet(handle, packet)) {
DEBUG_LOG("Unexpected packet! (type: %s, packet_id: %d)", get_packet_name(packet), get_packet_id(packet));
(void)platform_destroy_timer(handle, handle->keepalive_timer);
handle->keepalive_timer = -1;
(void)platform_disconnect(handle);
}
break;
case PacketTypePublish:
// DEBUG_LOG("Publish on %s -> %s", ((PublishPayload *)packet->payload)->topic, ((PublishPayload *)packet->payload)->message);
dispatch_subscription(handle, (PublishPayload *)packet->payload);
// TODO: Handle QoS
break;
@ -65,11 +115,17 @@ static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) {
case PacketTypePingReq:
case PacketTypeDisconnect:
DEBUG_LOG("Server packet on client connection? What's up with the broker?");
(void)platform_destroy_timer(handle, handle->keepalive_timer);
handle->keepalive_timer = -1;
(void)platform_disconnect(handle);
break;
}
}
/*
* Reading loop
*/
PlatformTaskFunc(_reader) {
MQTTHandle *handle = (MQTTHandle *)context;
Buffer *buffer = buffer_allocate(max_receive_buffer_size);
@ -98,7 +154,9 @@ PlatformTaskFunc(_reader) {
} else {
// no space in buffer, bail and reconnect
DEBUG_LOG("Buffer overflow!");
platform_disconnect(handle);
(void)platform_destroy_timer(handle, handle->keepalive_timer);
handle->keepalive_timer = -1;
(void)platform_disconnect(handle);
handle->reader_alive = false;
buffer_release(buffer);
return 0;
@ -136,10 +194,16 @@ static void _mqtt_connect(MQTTHandle *handle, MQTTEventHandler callback, void *c
return;
}
ret = platform_run_task(handle, &handle->read_task_handle, _reader);
if (ret == PlatformStatusError) {
DEBUG_LOG("Could not start read task");
return;
if (!handle->reader_alive) {
if (handle->read_task_handle >= 0) {
platform_cleanup_task(handle, handle->read_task_handle);
handle->read_task_handle = -1;
}
ret = platform_run_task(handle, &handle->read_task_handle, _reader);
if (ret == PlatformStatusError) {
DEBUG_LOG("Could not start read task");
return;
}
}
expect_packet(handle, PacketTypeConnAck, 0, callback, context);
@ -151,11 +215,16 @@ static void _mqtt_connect(MQTTHandle *handle, MQTTEventHandler callback, void *c
platform_disconnect(handle);
if (free_handle) {
platform_cleanup_task(handle, handle->read_task_handle);
handle->read_task_handle = -1;
mqtt_free(handle);
}
}
}
/*
* API
*/
MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTEventHandler callback, void *context, MQTTErrorHandler error_callback) {
// sanity check
if ((config->client_id != NULL) && (strlen(config->client_id) > 23)) {
@ -185,15 +254,15 @@ MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTEventHandler callback, void *co
MQTTStatus mqtt_reconnect(MQTTHandle *handle, MQTTEventHandler callback, void *context) {
if (handle->reader_alive) {
DEBUG_LOG("Closing old connection");
platform_disconnect(handle);
platform_cleanup_task(handle, handle->read_task_handle);
(void)platform_destroy_timer(handle, handle->keepalive_timer);
handle->keepalive_timer = -1;
(void)platform_disconnect(handle);
// DEBUG_LOG("Waiting for reader to exit");
// platform_cleanup_task(handle, handle->read_task_handle);
}
// TODO: re-submit unacknowledged packages with QoS > 0
// TODO: clear waiting packets
// TODO: re-subscribe all subscriptions
handle->config->clean_session = false;
handle->reconnecting = true;
_mqtt_connect(handle, callback, context);
return MQTT_STATUS_OK;
@ -226,9 +295,12 @@ MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosL
}
MQTTStatus mqtt_disconnect(MQTTHandle *handle, MQTTEventHandler callback, void *callback_context) {
send_disconnect_packet(handle);
platform_disconnect(handle);
platform_cleanup_task(handle, handle->read_task_handle);
(void)send_disconnect_packet(handle);
(void)platform_destroy_timer(handle, handle->keepalive_timer);
handle->keepalive_timer = -1;
(void)platform_disconnect(handle);
(void)platform_cleanup_task(handle, handle->read_task_handle);
handle->read_task_handle = -1;
mqtt_free(handle);
if (callback) {

View file

@ -22,6 +22,7 @@ struct _MQTTHandle {
MQTTCallbackQueue queue;
PlatformData *platform;
bool reconnecting;
int keepalive_timer;
};

View file

@ -10,6 +10,7 @@
typedef struct {
PublishPayload *payload;
MQTTPublishEventHandler callback;
MQTTQosLevel qos;
} PublishCallback;
/*
@ -157,6 +158,7 @@ bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQos
PublishCallback *ctx = (PublishCallback *)malloc(sizeof(PublishCallback));
ctx->payload = payload;
ctx->callback = callback;
ctx->qos = payload->qos;
expect_packet(handle, PacketTypePubAck, payload->packet_id, handle_puback_pubcomp, ctx);
break;
}
@ -164,6 +166,7 @@ bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQos
PublishCallback *ctx = (PublishCallback *)malloc(sizeof(PublishCallback));
ctx->payload = payload;
ctx->callback = callback;
ctx->qos = payload->qos;
expect_packet(handle, PacketTypePubRec, payload->packet_id, handle_pubrec, ctx);
break;
}

View file

@ -55,6 +55,18 @@ void remove_from_queue(MQTTHandle *handle, MQTTCallbackQueueItem *remove) {
}
}
void clear_packet_queue(MQTTHandle *handle) {
MQTTCallbackQueueItem *item = handle->queue.pending;
handle->queue.pending = NULL;
while (item != NULL) {
MQTTCallbackQueueItem *current = item;
item = item->next;
free(current);
}
}
bool dispatch_packet(MQTTHandle *handle, MQTTPacket *packet) {
MQTTCallbackQueueItem *item = handle->queue.pending;
uint16_t packet_id = get_packet_id(packet);

View file

@ -20,5 +20,6 @@ typedef struct {
void expect_packet(MQTTHandle *handle, MQTTControlPacketType type, uint16_t packet_id, MQTTEventHandler callback, void *context);
bool dispatch_packet(MQTTHandle *handle, MQTTPacket *packet);
void clear_packet_queue(MQTTHandle *handle);
#endif /* state_queue_h__included */

View file

@ -45,8 +45,6 @@ void mqtt_connected(MQTTHandle *handle, void *context) {
LOG("Could not publish");
exit(1);
}
leave = true;
}
int main(int argc, char **argv) {
@ -80,7 +78,8 @@ int main(int argc, char **argv) {
platform_sleep(1000);
cancel++;
if (cancel == 10) {
break;
LOG("Giving up!");
return 1;
}
}

66
tests/connect_reconnect.c Normal file
View file

@ -0,0 +1,66 @@
#include <stdio.h>
#include <stdlib.h>
#include "platform.h"
#include "mqtt.h"
int leave = 0;
#define LOG(fmt, ...) fprintf(stdout, fmt "\n", ## __VA_ARGS__)
bool err_handler(MQTTHandle *handle, MQTTConfig *config, MQTTErrorCode error) {
LOG("Error received: %d", error);
return 1;
}
void mqtt_reconnected(MQTTHandle *handle, void *context) {
LOG("Reconnected!");
}
void callback(MQTTHandle *handle, char *topic, char *payload) {
LOG("Received publish: %s -> %s", topic, payload);
leave++;
}
void mqtt_connected(MQTTHandle *handle, void *context) {
LOG("Connected!");
LOG("Trying subscribe on testsuite/mqtt/test...");
MQTTStatus result = mqtt_subscribe(handle, "testsuite/mqtt/test", MQTT_QOS_0, callback);
if (result != MQTT_STATUS_OK) {
LOG("Could not subscribe test");
exit(1);
}
}
int main(int argc, char **argv) {
MQTTConfig config = { 0 };
config.client_id = "libmqtt_testsuite";
config.hostname = "localhost";
config.clean_session = true;
LOG("Trying to connect to %s...", config.hostname);
MQTTHandle *mqtt = mqtt_connect(&config, mqtt_connected, NULL, err_handler);
if (mqtt == NULL) {
LOG("Connection failed!");
return 1;
}
while (leave < 1) {
LOG("Waiting for first publish...");
platform_sleep(1000);
}
mqtt_reconnect(mqtt, mqtt_reconnected, NULL);
while (leave < 2) {
LOG("Waiting for second publish...");
platform_sleep(1000);
}
LOG("Disconnecting...");
mqtt_disconnect(mqtt, NULL, NULL);
}