Implement keepalive

currently relatively dumb, as the timer does not get reset when
a regular packet is sent.

Re #9
This commit is contained in:
Johannes Schriewer 2018-08-03 18:42:48 +02:00
parent e508c4e28d
commit 1981e1a104
8 changed files with 187 additions and 2 deletions

View file

@ -55,7 +55,7 @@ set(mqtt-source
# full build library for testing
add_library(mqtt-full STATIC ${mqtt-source})
target_compile_definitions(mqtt-full PRIVATE MQTT_SERVER=1 MQTT_CLIENT=1)
target_compile_definitions(mqtt-full PRIVATE MQTT_SERVER=1 MQTT_CLIENT=1 KEEPALIVE_INTERVAL=4)
set_target_properties(mqtt-full PROPERTIES PUBLIC_HEADER "src/mqtt.h")
target_include_directories(mqtt-full
PUBLIC

View file

@ -18,15 +18,52 @@
const size_t max_receive_buffer_size = 4 * 4096; // 16 KB
#define MAX_TASKS 16
#define MAX_TIMERS 5
typedef struct {
PlatformTimerCallback callback;
int status;
int interval;
} PlatformTimer;
struct _PlatformData {
pthread_t tasks[MAX_TASKS];
PlatformTimer timers[MAX_TIMERS];
int timer_task;
int sock;
};
void *timer_task(MQTTHandle *handle) {
while (1) {
platform_sleep(1000);
bool active = false;
for (uint8_t i = 0; i < MAX_TIMERS; i++) {
PlatformTimer *timer = &handle->platform->timers[i];
if (timer->callback != NULL) {
timer->status--;
if (timer->status == 0) {
timer->callback(handle, i);
timer->status = timer->interval;
}
active = true;
}
}
if (!active) {
return NULL;
}
}
}
PlatformStatusCode platform_init(MQTTHandle *handle) {
handle->platform = (PlatformData *)calloc(1, sizeof(struct _PlatformData));
handle->platform->sock = -1;
handle->platform->timer_task = -1;
if (handle->platform) {
return PlatformStatusOk;
}
@ -37,6 +74,18 @@ PlatformStatusCode platform_init(MQTTHandle *handle) {
PlatformStatusCode platform_release(MQTTHandle *handle) {
PlatformData *p = handle->platform;
// shut down all timers
if (p->timer_task >= 0) {
for (uint8_t free_timer = 0; free_timer < MAX_TIMERS; free_timer++) {
PlatformStatusCode ret = platform_destroy_timer(handle, free_timer);
if (ret != PlatformStatusOk) {
DEBUG_LOG("Could not shut down all timers");
return PlatformStatusError;
}
}
}
// check if there are tasks running
for (uint8_t free_task = 0; free_task < MAX_TASKS; free_task++) {
if (p->tasks[free_task] != 0) {
DEBUG_LOG("Cannot free platform handle, there are tasks running!");
@ -200,3 +249,74 @@ PlatformStatusCode platform_disconnect(MQTTHandle *handle) {
return PlatformStatusOk;
}
PlatformStatusCode platform_create_timer(MQTTHandle *handle, int interval, int *timer_handle, PlatformTimerCallback callback) {
PlatformData *p = handle->platform;
uint8_t free_timer = 0;
for (free_timer = 0; free_timer < MAX_TIMERS; free_timer++) {
DEBUG_LOG("Timer %d: %s", free_timer, p->timers[free_timer].callback ? "Occupied" : "Free");
if (p->timers[free_timer].callback == NULL) {
break;
}
}
if (free_timer == MAX_TASKS) {
return PlatformStatusError;
}
PlatformTimer *timer = &p->timers[free_timer];
timer->callback = callback;
timer->status = interval;
timer->interval = interval;
if (p->timer_task < 0) {
PlatformStatusCode ret = platform_run_task(handle, &p->timer_task, timer_task);
if (ret != PlatformStatusOk) {
DEBUG_LOG("Could not start timer task");
return PlatformStatusError;
}
}
*timer_handle = free_timer;
return PlatformStatusOk;
}
PlatformStatusCode platform_destroy_timer(MQTTHandle *handle, int timer_handle) {
PlatformData *p = handle->platform;
if ((timer_handle < 0) || (timer_handle >= MAX_TIMERS)) {
DEBUG_LOG("Invalid timer handle");
return PlatformStatusError;
}
p->timers[timer_handle].callback = NULL;
// check if there is a timer running
uint8_t free_timer = 0;
for (free_timer = 0; free_timer < MAX_TIMERS; free_timer++) {
if (p->timers[free_timer].callback != NULL) {
break;
}
}
if ((free_timer == MAX_TIMERS) && (p->timer_task >= 0)) {
// if we get here we have no running timers, so we destroy the timer task
PlatformStatusCode ret = platform_cleanup_task(handle, p->timer_task);
if (ret == PlatformStatusOk) {
p->timer_task = -1;
} else {
DEBUG_LOG("Could not finish timer task");
return PlatformStatusError;
}
}
return PlatformStatusOk;
}
PlatformStatusCode platform_sleep(int milliseconds) {
usleep(milliseconds * 1000);
return PlatformStatusOk;
}

View file

@ -4,6 +4,7 @@
#include "mqtt_internal.h"
typedef void *(*PlatformTask)(MQTTHandle *handle);
typedef void (*PlatformTimerCallback)(MQTTHandle *handle, int timer_handle);
/** maximum receiver buffer size, defined by platform */
extern const size_t max_receive_buffer_size;
@ -94,4 +95,32 @@ PlatformStatusCode platform_write(MQTTHandle *handle, Buffer *buffer);
PlatformStatusCode platform_disconnect(MQTTHandle *handle);
/**
* Set a recurring timer
*
* @param handle: State handle
* @param interval: Number of seconds to call the callback in
* @param timer_handle: Timer handle out
* @param callback: Callback to call
* @return Platform status code
*/
PlatformStatusCode platform_create_timer(MQTTHandle *handle, int interval, int *timer_handle, PlatformTimerCallback callback);
/**
* Destroy a recurring timer
* @param handle
* @param timer_handle
* @return Platform status code
*/
PlatformStatusCode platform_destroy_timer(MQTTHandle *handle, int timer_handle);
/**
* Sleep for some milliseconds, may yield the task, so the sleep time could be longer
*
* @param milliseconds: minimum number of milliseconds to sleep
* @return Platform status code
*/
PlatformStatusCode platform_sleep(int milliseconds);
#endif /* platform_h__included */

View file

@ -19,9 +19,27 @@ void mqtt_free(MQTTHandle *handle) {
free(handle);
}
static void _keepalive_callback(MQTTHandle *handle, int timer_handle) {
bool result = send_ping_packet(handle);
if (!result) {
DEBUG_LOG("Sending PINGREQ packet failed!");
}
}
static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) {
switch (packet->packet_type) {
case PacketTypeConnAck:
if (!dispatch_packet(handle, packet)) {
DEBUG_LOG("Unexpected packet! (type: CONNACK)");
(void)platform_disconnect(handle);
} else {
if (platform_create_timer(handle, KEEPALIVE_INTERVAL, &handle->keepalive_timer, _keepalive_callback) != PlatformStatusOk) {
DEBUG_LOG("Could not create keepalive timer!");
(void)platform_disconnect(handle);
}
}
break;
case PacketTypePubAck:
case PacketTypePubRec:
case PacketTypePubRel:

View file

@ -21,8 +21,14 @@ struct _MQTTHandle {
MQTTCallbackQueue queue;
PlatformData *platform;
int keepalive_timer;
};
void mqtt_free(MQTTHandle *handle);
#ifndef KEEPALIVE_INTERVAL
#define KEEPALIVE_INTERVAL 60
#endif
#endif /* mqtt_internal_h__included */

View file

@ -62,7 +62,7 @@ bool send_connect_packet(MQTTHandle *handle) {
payload->client_id = handle->config->client_id;
payload->protocol_level = 4;
payload->keepalive_interval = 60;
payload->keepalive_interval = KEEPALIVE_INTERVAL;
payload->clean_session = handle->config->clean_session;
payload->will_topic = handle->config->last_will_topic;
@ -173,6 +173,14 @@ bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQos
return true;
}
#if MQTT_CLIENT
bool send_ping_packet(MQTTHandle *handle) {
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePingReq, NULL });
encoded->position = 0;
return send_buffer(handle, encoded);
}
#endif /* MQTT_CLIENT */
#if MQTT_CLIENT
bool send_disconnect_packet(MQTTHandle *handle) {
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeDisconnect, NULL });

View file

@ -7,6 +7,7 @@
bool send_connect_packet(MQTTHandle *handle);
bool send_subscribe_packet(MQTTHandle *handle, char *topic, MQTTQosLevel qos);
bool send_unsubscribe_packet(MQTTHandle *handle, char *topic);
bool send_ping_packet(MQTTHandle *handle);
bool send_disconnect_packet(MQTTHandle *handle);
#endif /* MQTT_CLIENT */

View file

@ -84,6 +84,9 @@ int main(_unused int argc, _unused char **argv) {
}
}
LOG("Waiting for ping to happen...");
sleep(5);
LOG("Disconnecting...");
MQTTStatus result = mqtt_disconnect(mqtt, NULL, NULL);
if (result != MQTT_STATUS_OK) {