From 1981e1a104059ff044c10257ccb131e409f0acf5 Mon Sep 17 00:00:00 2001 From: Johannes Schriewer Date: Fri, 3 Aug 2018 18:42:48 +0200 Subject: [PATCH] Implement keepalive currently relatively dumb, as the timer does not get reset when a regular packet is sent. Re #9 --- CMakeLists.txt | 2 +- platform/linux.c | 120 ++++++++++++++++++++++++++++++++++++++++ platform/platform.h | 29 ++++++++++ src/mqtt.c | 18 ++++++ src/mqtt_internal.h | 6 ++ src/protocol.c | 10 +++- src/protocol.h | 1 + tests/connect_publish.c | 3 + 8 files changed, 187 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 06e424c..074098f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -55,7 +55,7 @@ set(mqtt-source # full build library for testing add_library(mqtt-full STATIC ${mqtt-source}) - target_compile_definitions(mqtt-full PRIVATE MQTT_SERVER=1 MQTT_CLIENT=1) + target_compile_definitions(mqtt-full PRIVATE MQTT_SERVER=1 MQTT_CLIENT=1 KEEPALIVE_INTERVAL=4) set_target_properties(mqtt-full PROPERTIES PUBLIC_HEADER "src/mqtt.h") target_include_directories(mqtt-full PUBLIC diff --git a/platform/linux.c b/platform/linux.c index fe38487..55d279d 100644 --- a/platform/linux.c +++ b/platform/linux.c @@ -18,15 +18,52 @@ const size_t max_receive_buffer_size = 4 * 4096; // 16 KB #define MAX_TASKS 16 +#define MAX_TIMERS 5 + +typedef struct { + PlatformTimerCallback callback; + int status; + int interval; + +} PlatformTimer; struct _PlatformData { pthread_t tasks[MAX_TASKS]; + PlatformTimer timers[MAX_TIMERS]; + int timer_task; int sock; }; +void *timer_task(MQTTHandle *handle) { + while (1) { + platform_sleep(1000); + + bool active = false; + for (uint8_t i = 0; i < MAX_TIMERS; i++) { + PlatformTimer *timer = &handle->platform->timers[i]; + + if (timer->callback != NULL) { + timer->status--; + + if (timer->status == 0) { + timer->callback(handle, i); + timer->status = timer->interval; + } + + active = true; + } + } + + if (!active) { + return NULL; + } + } +} + PlatformStatusCode platform_init(MQTTHandle *handle) { handle->platform = (PlatformData *)calloc(1, sizeof(struct _PlatformData)); handle->platform->sock = -1; + handle->platform->timer_task = -1; if (handle->platform) { return PlatformStatusOk; } @@ -37,6 +74,18 @@ PlatformStatusCode platform_init(MQTTHandle *handle) { PlatformStatusCode platform_release(MQTTHandle *handle) { PlatformData *p = handle->platform; + // shut down all timers + if (p->timer_task >= 0) { + for (uint8_t free_timer = 0; free_timer < MAX_TIMERS; free_timer++) { + PlatformStatusCode ret = platform_destroy_timer(handle, free_timer); + if (ret != PlatformStatusOk) { + DEBUG_LOG("Could not shut down all timers"); + return PlatformStatusError; + } + } + } + + // check if there are tasks running for (uint8_t free_task = 0; free_task < MAX_TASKS; free_task++) { if (p->tasks[free_task] != 0) { DEBUG_LOG("Cannot free platform handle, there are tasks running!"); @@ -200,3 +249,74 @@ PlatformStatusCode platform_disconnect(MQTTHandle *handle) { return PlatformStatusOk; } + +PlatformStatusCode platform_create_timer(MQTTHandle *handle, int interval, int *timer_handle, PlatformTimerCallback callback) { + PlatformData *p = handle->platform; + uint8_t free_timer = 0; + + for (free_timer = 0; free_timer < MAX_TIMERS; free_timer++) { + DEBUG_LOG("Timer %d: %s", free_timer, p->timers[free_timer].callback ? "Occupied" : "Free"); + if (p->timers[free_timer].callback == NULL) { + break; + } + } + if (free_timer == MAX_TASKS) { + return PlatformStatusError; + } + + PlatformTimer *timer = &p->timers[free_timer]; + + timer->callback = callback; + timer->status = interval; + timer->interval = interval; + + if (p->timer_task < 0) { + PlatformStatusCode ret = platform_run_task(handle, &p->timer_task, timer_task); + if (ret != PlatformStatusOk) { + DEBUG_LOG("Could not start timer task"); + return PlatformStatusError; + } + } + + *timer_handle = free_timer; + return PlatformStatusOk; +} + +PlatformStatusCode platform_destroy_timer(MQTTHandle *handle, int timer_handle) { + PlatformData *p = handle->platform; + + if ((timer_handle < 0) || (timer_handle >= MAX_TIMERS)) { + DEBUG_LOG("Invalid timer handle"); + return PlatformStatusError; + } + + p->timers[timer_handle].callback = NULL; + + + // check if there is a timer running + uint8_t free_timer = 0; + + for (free_timer = 0; free_timer < MAX_TIMERS; free_timer++) { + if (p->timers[free_timer].callback != NULL) { + break; + } + } + if ((free_timer == MAX_TIMERS) && (p->timer_task >= 0)) { + // if we get here we have no running timers, so we destroy the timer task + PlatformStatusCode ret = platform_cleanup_task(handle, p->timer_task); + if (ret == PlatformStatusOk) { + p->timer_task = -1; + } else { + DEBUG_LOG("Could not finish timer task"); + return PlatformStatusError; + } + } + + return PlatformStatusOk; +} + + +PlatformStatusCode platform_sleep(int milliseconds) { + usleep(milliseconds * 1000); + return PlatformStatusOk; +} \ No newline at end of file diff --git a/platform/platform.h b/platform/platform.h index 34006c8..3c8d5c3 100644 --- a/platform/platform.h +++ b/platform/platform.h @@ -4,6 +4,7 @@ #include "mqtt_internal.h" typedef void *(*PlatformTask)(MQTTHandle *handle); +typedef void (*PlatformTimerCallback)(MQTTHandle *handle, int timer_handle); /** maximum receiver buffer size, defined by platform */ extern const size_t max_receive_buffer_size; @@ -94,4 +95,32 @@ PlatformStatusCode platform_write(MQTTHandle *handle, Buffer *buffer); PlatformStatusCode platform_disconnect(MQTTHandle *handle); +/** + * Set a recurring timer + * + * @param handle: State handle + * @param interval: Number of seconds to call the callback in + * @param timer_handle: Timer handle out + * @param callback: Callback to call + * @return Platform status code + */ +PlatformStatusCode platform_create_timer(MQTTHandle *handle, int interval, int *timer_handle, PlatformTimerCallback callback); + +/** + * Destroy a recurring timer + * @param handle + * @param timer_handle + * @return Platform status code + */ +PlatformStatusCode platform_destroy_timer(MQTTHandle *handle, int timer_handle); + + +/** + * Sleep for some milliseconds, may yield the task, so the sleep time could be longer + * + * @param milliseconds: minimum number of milliseconds to sleep + * @return Platform status code + */ +PlatformStatusCode platform_sleep(int milliseconds); + #endif /* platform_h__included */ diff --git a/src/mqtt.c b/src/mqtt.c index e881874..495bc74 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -19,9 +19,27 @@ void mqtt_free(MQTTHandle *handle) { free(handle); } +static void _keepalive_callback(MQTTHandle *handle, int timer_handle) { + bool result = send_ping_packet(handle); + if (!result) { + DEBUG_LOG("Sending PINGREQ packet failed!"); + } +} + static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) { switch (packet->packet_type) { case PacketTypeConnAck: + if (!dispatch_packet(handle, packet)) { + DEBUG_LOG("Unexpected packet! (type: CONNACK)"); + (void)platform_disconnect(handle); + } else { + if (platform_create_timer(handle, KEEPALIVE_INTERVAL, &handle->keepalive_timer, _keepalive_callback) != PlatformStatusOk) { + DEBUG_LOG("Could not create keepalive timer!"); + (void)platform_disconnect(handle); + } + } + break; + case PacketTypePubAck: case PacketTypePubRec: case PacketTypePubRel: diff --git a/src/mqtt_internal.h b/src/mqtt_internal.h index fefeb92..adbb9cf 100644 --- a/src/mqtt_internal.h +++ b/src/mqtt_internal.h @@ -21,8 +21,14 @@ struct _MQTTHandle { MQTTCallbackQueue queue; PlatformData *platform; + + int keepalive_timer; }; void mqtt_free(MQTTHandle *handle); +#ifndef KEEPALIVE_INTERVAL +#define KEEPALIVE_INTERVAL 60 +#endif + #endif /* mqtt_internal_h__included */ diff --git a/src/protocol.c b/src/protocol.c index c8ec60b..3295072 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -62,7 +62,7 @@ bool send_connect_packet(MQTTHandle *handle) { payload->client_id = handle->config->client_id; payload->protocol_level = 4; - payload->keepalive_interval = 60; + payload->keepalive_interval = KEEPALIVE_INTERVAL; payload->clean_session = handle->config->clean_session; payload->will_topic = handle->config->last_will_topic; @@ -173,6 +173,14 @@ bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQos return true; } +#if MQTT_CLIENT +bool send_ping_packet(MQTTHandle *handle) { + Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePingReq, NULL }); + encoded->position = 0; + return send_buffer(handle, encoded); +} +#endif /* MQTT_CLIENT */ + #if MQTT_CLIENT bool send_disconnect_packet(MQTTHandle *handle) { Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeDisconnect, NULL }); diff --git a/src/protocol.h b/src/protocol.h index 6e28647..00e6fc2 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -7,6 +7,7 @@ bool send_connect_packet(MQTTHandle *handle); bool send_subscribe_packet(MQTTHandle *handle, char *topic, MQTTQosLevel qos); bool send_unsubscribe_packet(MQTTHandle *handle, char *topic); +bool send_ping_packet(MQTTHandle *handle); bool send_disconnect_packet(MQTTHandle *handle); #endif /* MQTT_CLIENT */ diff --git a/tests/connect_publish.c b/tests/connect_publish.c index a67827d..6dcb720 100644 --- a/tests/connect_publish.c +++ b/tests/connect_publish.c @@ -84,6 +84,9 @@ int main(_unused int argc, _unused char **argv) { } } + LOG("Waiting for ping to happen..."); + sleep(5); + LOG("Disconnecting..."); MQTTStatus result = mqtt_disconnect(mqtt, NULL, NULL); if (result != MQTT_STATUS_OK) {