From cfd893477958afb66abe6024144e62d8de32fa5d Mon Sep 17 00:00:00 2001 From: Johannes Schriewer Date: Mon, 30 Jul 2018 22:04:54 +0200 Subject: [PATCH] Bugfix: Implement last will correctly, fix some QoS handling and allow for clean session Re #4 --- src/mqtt.c | 6 +++--- src/mqtt.h | 8 +++++++- src/protocol.c | 10 +++++----- src/protocol.h | 2 +- src/subscriptions.c | 3 ++- src/subscriptions.h | 3 ++- tests/connect_subscribe.c | 6 +++--- 7 files changed, 23 insertions(+), 15 deletions(-) diff --git a/src/mqtt.c b/src/mqtt.c index a585224..f0c0f60 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -214,13 +214,13 @@ MQTTStatus mqtt_reconnect(MQTTHandle *handle, MQTTEventHandler callback, void *c return MQTT_STATUS_OK; } -MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback) { +MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTQosLevel qos_level, MQTTPublishEventHandler callback) { if (!handle->reader_alive) { handle->error_handler(handle, MQTT_Error_Connection_Reset); return MQTT_STATUS_ERROR; } - add_subscription(handle, topic, callback); - return (send_subscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); + add_subscription(handle, topic, qos_level, callback); + return (send_subscribe_packet(handle, topic, qos_level) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); } MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) { diff --git a/src/mqtt.h b/src/mqtt.h index b939c8a..3f5c218 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -11,9 +11,14 @@ typedef struct { uint16_t port; /**< Port the broker listens on, set to 0 for 1883 default */ char *client_id; /**< Client identification */ + bool clean_session; /**< Set to true to reset the session on reconnect */ char *username; /**< User name, set to NULL to connect anonymously */ char *password; /**< Password, set to NULL to connect without password */ + + char *last_will_topic; /**< last will topic that is automatically published on connection loss */ + char *last_will_message; /**< last will message */ + bool last_will_retain; /**< tell server to retain last will message */ } MQTTConfig; typedef enum { @@ -84,10 +89,11 @@ MQTTStatus mqtt_reconnect(MQTTHandle *handle, MQTTEventHandler callback, void *c * * @param handle: MQTT Handle from `mqtt_connect` * @param topic: Topic to subscribe + * @param qos_level: Maximum qos level to subscribe to * @param callback: Callback function to call when receiving something for that topic * @returns: Status code */ -MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback); +MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTQosLevel qos_level, MQTTPublishEventHandler callback); /** * Un-Subscribe from a topic diff --git a/src/protocol.c b/src/protocol.c index 7a816f7..8f80b9e 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -26,12 +26,12 @@ bool send_connect_packet(MQTTHandle *handle) { payload->client_id = handle->config->client_id; payload->protocol_level = 4; payload->keepalive_interval = 60; + payload->clean_session = handle->config->clean_session; - // TODO: support last will - // payload->will_topic = "test/lastwill"; - // payload->will_message = "disconnected"; - // payload->will_qos = MQTT_QOS_1; - // payload->retain_will = true; + payload->will_topic = handle->config->last_will_topic; + payload->will_message = handle->config->last_will_message; + payload->will_qos = MQTT_QOS_0; + payload->retain_will = handle->config->last_will_retain; payload->username = handle->config->username; payload->password = handle->config->password; diff --git a/src/protocol.h b/src/protocol.h index a83710b..f7ae3e8 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -4,7 +4,7 @@ #include "mqtt.h" bool send_connect_packet(MQTTHandle *handle); -bool send_subscribe_packet(MQTTHandle *handle, char *topic); +bool send_subscribe_packet(MQTTHandle *handle, char *topic, MQTTQosLevel qos); bool send_unsubscribe_packet(MQTTHandle *handle, char *topic); bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos); bool send_disconnect_packet(MQTTHandle *handle); diff --git a/src/subscriptions.c b/src/subscriptions.c index fe06479..2ee76ca 100644 --- a/src/subscriptions.c +++ b/src/subscriptions.c @@ -1,10 +1,11 @@ #include "mqtt_internal.h" #include "subscriptions.h" -void add_subscription(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback) { +void add_subscription(MQTTHandle *handle, char *topic, MQTTQosLevel qos, MQTTPublishEventHandler callback) { SubscriptionItem *item = calloc(1, sizeof(SubscriptionItem)); item->topic = topic; + item->qos = qos; item->handler = callback; item->pending = true; diff --git a/src/subscriptions.h b/src/subscriptions.h index 1c706bd..8184448 100644 --- a/src/subscriptions.h +++ b/src/subscriptions.h @@ -7,6 +7,7 @@ typedef struct _SubscriptionItem { struct _SubscriptionItem *next; char *topic; + MQTTQosLevel qos; MQTTPublishEventHandler handler; bool pending; } SubscriptionItem; @@ -15,7 +16,7 @@ typedef struct { SubscriptionItem *items; } Subscriptions; -void add_subscription(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback); +void add_subscription(MQTTHandle *handle, char *topic, MQTTQosLevel qos, MQTTPublishEventHandler callback); void remove_subscription(MQTTHandle *handle, char *topic); void subscription_set_pending(MQTTHandle *handle, char *topic, bool pending); diff --git a/tests/connect_subscribe.c b/tests/connect_subscribe.c index f8e208a..3cdaa40 100644 --- a/tests/connect_subscribe.c +++ b/tests/connect_subscribe.c @@ -37,13 +37,13 @@ void mqtt_connected(MQTTHandle *handle, void *context) { LOG("Connected!"); LOG("Trying subscribe on testsuite/mqtt/test..."); - MQTTStatus result = mqtt_subscribe(handle, "testsuite/mqtt/test", callback); + MQTTStatus result = mqtt_subscribe(handle, "testsuite/mqtt/test", MQTT_QOS_0, callback); if (result != MQTT_STATUS_OK) { LOG("Could not subscribe test"); exit(1); } - result = mqtt_subscribe(handle, "testsuite/mqtt/test2", callback); + result = mqtt_subscribe(handle, "testsuite/mqtt/test2", MQTT_QOS_0, callback); if (result != MQTT_STATUS_OK) { LOG("Could not subscribe test 2"); exit(1); @@ -55,7 +55,7 @@ int main(int argc, char **argv) { config.client_id = "libmqtt_testsuite"; config.hostname = "localhost"; - config.port = 1883; + config.clean_session = true; LOG("Trying to connect to %s...", config.hostname); MQTTHandle *mqtt = mqtt_connect(&config, mqtt_connected, NULL, err_handler);