Bugfix: Implement last will correctly, fix some QoS handling and allow for clean session

Re #4
This commit is contained in:
Johannes Schriewer 2018-07-30 22:04:54 +02:00
parent 82ea7853e8
commit cfd8934779
7 changed files with 23 additions and 15 deletions

View file

@ -214,13 +214,13 @@ MQTTStatus mqtt_reconnect(MQTTHandle *handle, MQTTEventHandler callback, void *c
return MQTT_STATUS_OK; return MQTT_STATUS_OK;
} }
MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback) { MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTQosLevel qos_level, MQTTPublishEventHandler callback) {
if (!handle->reader_alive) { if (!handle->reader_alive) {
handle->error_handler(handle, MQTT_Error_Connection_Reset); handle->error_handler(handle, MQTT_Error_Connection_Reset);
return MQTT_STATUS_ERROR; return MQTT_STATUS_ERROR;
} }
add_subscription(handle, topic, callback); add_subscription(handle, topic, qos_level, callback);
return (send_subscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); return (send_subscribe_packet(handle, topic, qos_level) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR);
} }
MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) { MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) {

View file

@ -11,9 +11,14 @@ typedef struct {
uint16_t port; /**< Port the broker listens on, set to 0 for 1883 default */ uint16_t port; /**< Port the broker listens on, set to 0 for 1883 default */
char *client_id; /**< Client identification */ char *client_id; /**< Client identification */
bool clean_session; /**< Set to true to reset the session on reconnect */
char *username; /**< User name, set to NULL to connect anonymously */ char *username; /**< User name, set to NULL to connect anonymously */
char *password; /**< Password, set to NULL to connect without password */ char *password; /**< Password, set to NULL to connect without password */
char *last_will_topic; /**< last will topic that is automatically published on connection loss */
char *last_will_message; /**< last will message */
bool last_will_retain; /**< tell server to retain last will message */
} MQTTConfig; } MQTTConfig;
typedef enum { typedef enum {
@ -84,10 +89,11 @@ MQTTStatus mqtt_reconnect(MQTTHandle *handle, MQTTEventHandler callback, void *c
* *
* @param handle: MQTT Handle from `mqtt_connect` * @param handle: MQTT Handle from `mqtt_connect`
* @param topic: Topic to subscribe * @param topic: Topic to subscribe
* @param qos_level: Maximum qos level to subscribe to
* @param callback: Callback function to call when receiving something for that topic * @param callback: Callback function to call when receiving something for that topic
* @returns: Status code * @returns: Status code
*/ */
MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback); MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTQosLevel qos_level, MQTTPublishEventHandler callback);
/** /**
* Un-Subscribe from a topic * Un-Subscribe from a topic

View file

@ -26,12 +26,12 @@ bool send_connect_packet(MQTTHandle *handle) {
payload->client_id = handle->config->client_id; payload->client_id = handle->config->client_id;
payload->protocol_level = 4; payload->protocol_level = 4;
payload->keepalive_interval = 60; payload->keepalive_interval = 60;
payload->clean_session = handle->config->clean_session;
// TODO: support last will payload->will_topic = handle->config->last_will_topic;
// payload->will_topic = "test/lastwill"; payload->will_message = handle->config->last_will_message;
// payload->will_message = "disconnected"; payload->will_qos = MQTT_QOS_0;
// payload->will_qos = MQTT_QOS_1; payload->retain_will = handle->config->last_will_retain;
// payload->retain_will = true;
payload->username = handle->config->username; payload->username = handle->config->username;
payload->password = handle->config->password; payload->password = handle->config->password;

View file

@ -4,7 +4,7 @@
#include "mqtt.h" #include "mqtt.h"
bool send_connect_packet(MQTTHandle *handle); bool send_connect_packet(MQTTHandle *handle);
bool send_subscribe_packet(MQTTHandle *handle, char *topic); bool send_subscribe_packet(MQTTHandle *handle, char *topic, MQTTQosLevel qos);
bool send_unsubscribe_packet(MQTTHandle *handle, char *topic); bool send_unsubscribe_packet(MQTTHandle *handle, char *topic);
bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos); bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos);
bool send_disconnect_packet(MQTTHandle *handle); bool send_disconnect_packet(MQTTHandle *handle);

View file

@ -1,10 +1,11 @@
#include "mqtt_internal.h" #include "mqtt_internal.h"
#include "subscriptions.h" #include "subscriptions.h"
void add_subscription(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback) { void add_subscription(MQTTHandle *handle, char *topic, MQTTQosLevel qos, MQTTPublishEventHandler callback) {
SubscriptionItem *item = calloc(1, sizeof(SubscriptionItem)); SubscriptionItem *item = calloc(1, sizeof(SubscriptionItem));
item->topic = topic; item->topic = topic;
item->qos = qos;
item->handler = callback; item->handler = callback;
item->pending = true; item->pending = true;

View file

@ -7,6 +7,7 @@ typedef struct _SubscriptionItem {
struct _SubscriptionItem *next; struct _SubscriptionItem *next;
char *topic; char *topic;
MQTTQosLevel qos;
MQTTPublishEventHandler handler; MQTTPublishEventHandler handler;
bool pending; bool pending;
} SubscriptionItem; } SubscriptionItem;
@ -15,7 +16,7 @@ typedef struct {
SubscriptionItem *items; SubscriptionItem *items;
} Subscriptions; } Subscriptions;
void add_subscription(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback); void add_subscription(MQTTHandle *handle, char *topic, MQTTQosLevel qos, MQTTPublishEventHandler callback);
void remove_subscription(MQTTHandle *handle, char *topic); void remove_subscription(MQTTHandle *handle, char *topic);
void subscription_set_pending(MQTTHandle *handle, char *topic, bool pending); void subscription_set_pending(MQTTHandle *handle, char *topic, bool pending);

View file

@ -37,13 +37,13 @@ void mqtt_connected(MQTTHandle *handle, void *context) {
LOG("Connected!"); LOG("Connected!");
LOG("Trying subscribe on testsuite/mqtt/test..."); LOG("Trying subscribe on testsuite/mqtt/test...");
MQTTStatus result = mqtt_subscribe(handle, "testsuite/mqtt/test", callback); MQTTStatus result = mqtt_subscribe(handle, "testsuite/mqtt/test", MQTT_QOS_0, callback);
if (result != MQTT_STATUS_OK) { if (result != MQTT_STATUS_OK) {
LOG("Could not subscribe test"); LOG("Could not subscribe test");
exit(1); exit(1);
} }
result = mqtt_subscribe(handle, "testsuite/mqtt/test2", callback); result = mqtt_subscribe(handle, "testsuite/mqtt/test2", MQTT_QOS_0, callback);
if (result != MQTT_STATUS_OK) { if (result != MQTT_STATUS_OK) {
LOG("Could not subscribe test 2"); LOG("Could not subscribe test 2");
exit(1); exit(1);
@ -55,7 +55,7 @@ int main(int argc, char **argv) {
config.client_id = "libmqtt_testsuite"; config.client_id = "libmqtt_testsuite";
config.hostname = "localhost"; config.hostname = "localhost";
config.port = 1883; config.clean_session = true;
LOG("Trying to connect to %s...", config.hostname); LOG("Trying to connect to %s...", config.hostname);
MQTTHandle *mqtt = mqtt_connect(&config, mqtt_connected, NULL, err_handler); MQTTHandle *mqtt = mqtt_connect(&config, mqtt_connected, NULL, err_handler);