Implement tests for publish QoS handling and fix found bugs.

Re #1
This commit is contained in:
Johannes Schriewer 2018-07-31 01:13:08 +02:00
parent 77c86c2f52
commit 7c3215efcf
7 changed files with 110 additions and 40 deletions

View file

@ -173,13 +173,14 @@ MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic);
#### Publish something to the broker #### Publish something to the broker
```c ```c
MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level); MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level, MQTTPublishEventCallback callback);
``` ```
- `handle`: MQTT Handle from `mqtt_connect` - `handle`: MQTT Handle from `mqtt_connect`
- `topic`: Topic to publish to - `topic`: Topic to publish to
- `payload`: Message payload to publish - `payload`: Message payload to publish
- `qos_level`: QoS Level for the publish (0 = Fire and forget, 1 = At least once, 2 = One time for sure) - `qos_level`: QoS Level for the publish (0 = Fire and forget, 1 = At least once, 2 = One time for sure)
- `callback`: Callback function that is called when publish cleared the QoS handlers
- Returns status code - Returns status code
This uses a c-string as the payload, theoretically the protocol would allow for binary payloads, but this is currently This uses a c-string as the payload, theoretically the protocol would allow for binary payloads, but this is currently

View file

@ -239,12 +239,12 @@ MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) {
return (send_unsubscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); return (send_unsubscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR);
} }
MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level) { MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level, MQTTPublishEventHandler callback) {
if (!handle->reader_alive) { if (!handle->reader_alive) {
handle->error_handler(handle, handle->config, MQTT_Error_Connection_Reset); handle->error_handler(handle, handle->config, MQTT_Error_Connection_Reset);
return MQTT_STATUS_ERROR; return MQTT_STATUS_ERROR;
} }
return (send_publish_packet(handle, topic, payload, qos_level) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); return (send_publish_packet(handle, topic, payload, qos_level, callback) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR);
} }
MQTTStatus mqtt_disconnect(MQTTHandle *handle, MQTTEventHandler callback, void *callback_context) { MQTTStatus mqtt_disconnect(MQTTHandle *handle, MQTTEventHandler callback, void *callback_context) {

View file

@ -121,9 +121,11 @@ MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic);
* @param handle: MQTT Handle from `mqtt_connect` * @param handle: MQTT Handle from `mqtt_connect`
* @param topic: Topic to publish to * @param topic: Topic to publish to
* @param payload: Message payload to publish * @param payload: Message payload to publish
* @param qos_level: QoS level to use
* @param callback: finish callback
* @returns: Status code * @returns: Status code
*/ */
MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level); MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level, MQTTPublishEventHandler callback);
/** /**
* Disconnect from MQTT broker * Disconnect from MQTT broker

View file

@ -7,6 +7,15 @@
#include "debug.h" #include "debug.h"
typedef struct {
PublishPayload *payload;
MQTTPublishEventHandler callback;
} PublishCallback;
/*
* Utility
*/
bool send_buffer(MQTTHandle *handle, Buffer *buffer) { bool send_buffer(MQTTHandle *handle, Buffer *buffer) {
while (!buffer_eof(buffer)) { while (!buffer_eof(buffer)) {
ssize_t bytes = write(handle->sock, buffer->data + buffer->position, buffer_free_space(buffer)); ssize_t bytes = write(handle->sock, buffer->data + buffer->position, buffer_free_space(buffer));
@ -20,6 +29,39 @@ bool send_buffer(MQTTHandle *handle, Buffer *buffer) {
return true; return true;
} }
/*
* QoS event handlers
*/
void handle_puback_pubcomp(MQTTHandle *handle, void *context) {
PublishCallback *ctx = (PublishCallback *)context;
if (ctx->callback) {
ctx->callback(handle, ctx->payload->topic, ctx->payload->message);
}
free(ctx->payload);
free(ctx);
}
void handle_pubrec(MQTTHandle *handle, void *context) {
PublishCallback *ctx = (PublishCallback *)context;
PubRelPayload newPayload = {
.packet_id = ctx->payload->packet_id
};
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePubRel, &newPayload });
expect_packet(handle, PacketTypePubComp, ctx->payload->packet_id, handle_puback_pubcomp, context);
encoded->position = 0;
send_buffer(handle, encoded);
}
/*
* packet constructors
*/
#if MQTT_CLIENT #if MQTT_CLIENT
bool send_connect_packet(MQTTHandle *handle) { bool send_connect_packet(MQTTHandle *handle) {
ConnectPayload *payload = calloc(1, sizeof(ConnectPayload)); ConnectPayload *payload = calloc(1, sizeof(ConnectPayload));
@ -92,23 +134,7 @@ bool send_unsubscribe_packet(MQTTHandle *handle, char *topic) {
} }
#endif /* MQTT_CLIENT */ #endif /* MQTT_CLIENT */
void handle_pubrec(MQTTHandle *handle, void *context) { bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos, MQTTPublishEventHandler callback) {
PublishPayload *payload = (PublishPayload *)context;
PubRelPayload newPayload = {
.packet_id = payload->packet_id
};
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePubRel, &newPayload });
expect_packet(handle, PacketTypePubComp, payload->packet_id, NULL, NULL);
free(payload);
encoded->position = 0;
send_buffer(handle, encoded);
}
bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos) {
PublishPayload *payload = calloc(1, sizeof(PublishPayload)); PublishPayload *payload = calloc(1, sizeof(PublishPayload));
payload->qos = qos; payload->qos = qos;
@ -118,24 +144,39 @@ bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQos
payload->message = message; payload->message = message;
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePublish, payload }); Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePublish, payload });
encoded->position = 0;
bool result = send_buffer(handle, encoded);
if (!result) {
free(payload);
return false;
}
// Handle QoS and add waiting packets to queue // Handle QoS and add waiting packets to queue
switch(payload->qos) { switch(payload->qos) {
case MQTT_QOS_0: case MQTT_QOS_0:
// fire and forget // fire and forget
if (callback) {
callback(handle, payload->topic, payload->message);
}
free(payload); free(payload);
break; break;
case MQTT_QOS_1: case MQTT_QOS_1: {
expect_packet(handle, PacketTypePubAck, payload->packet_id, NULL, NULL); PublishCallback *ctx = malloc(sizeof(PublishCallback));
free(payload); ctx->payload = payload;
ctx->callback = callback;
expect_packet(handle, PacketTypePubAck, payload->packet_id, handle_puback_pubcomp, ctx);
break; break;
case MQTT_QOS_2: }
expect_packet(handle, PacketTypePubRec, payload->packet_id, handle_pubrec, payload); case MQTT_QOS_2: {
PublishCallback *ctx = malloc(sizeof(PublishCallback));
ctx->payload = payload;
ctx->callback = callback;
expect_packet(handle, PacketTypePubRec, payload->packet_id, handle_pubrec, ctx);
break; break;
}
} }
encoded->position = 0; return true;
return send_buffer(handle, encoded);
} }
#if MQTT_CLIENT #if MQTT_CLIENT

View file

@ -10,6 +10,6 @@ bool send_unsubscribe_packet(MQTTHandle *handle, char *topic);
bool send_disconnect_packet(MQTTHandle *handle); bool send_disconnect_packet(MQTTHandle *handle);
#endif /* MQTT_CLIENT */ #endif /* MQTT_CLIENT */
bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos); bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos, MQTTPublishEventHandler callback);
#endif #endif

View file

@ -2,6 +2,7 @@
#include "state_queue.h" #include "state_queue.h"
#include "debug.h" #include "debug.h"
#if 0
static inline void dump_expected(MQTTHandle *handle) { static inline void dump_expected(MQTTHandle *handle) {
MQTTCallbackQueueItem *item = handle->queue.pending; MQTTCallbackQueueItem *item = handle->queue.pending;
@ -13,6 +14,7 @@ static inline void dump_expected(MQTTHandle *handle) {
item = item->next; item = item->next;
} }
} }
#endif
void expect_packet(MQTTHandle *handle, MQTTControlPacketType type, uint16_t packet_id, MQTTEventHandler callback, void *context) { void expect_packet(MQTTHandle *handle, MQTTControlPacketType type, uint16_t packet_id, MQTTEventHandler callback, void *context) {
MQTTCallbackQueueItem *item = calloc(1, sizeof(MQTTCallbackQueueItem)); MQTTCallbackQueueItem *item = calloc(1, sizeof(MQTTCallbackQueueItem));
@ -45,7 +47,6 @@ void remove_from_queue(MQTTHandle *handle, MQTTCallbackQueueItem *remove) {
// attach next item to prev item removing this one // attach next item to prev item removing this one
prev_item->next = item->next; prev_item->next = item->next;
} }
free(item);
break; break;
} }
@ -60,11 +61,12 @@ bool dispatch_packet(MQTTHandle *handle, MQTTPacket *packet) {
while (item != NULL) { while (item != NULL) {
if ((item->type == packet->packet_type) && (item->packet_id == packet_id)) { if ((item->type == packet->packet_type) && (item->packet_id == packet_id)) {
remove_from_queue(handle, item);
if (item->callback) { if (item->callback) {
item->callback(handle, item->context); item->callback(handle, item->context);
} }
free(item);
remove_from_queue(handle, item);
return true; return true;
} }
item = item->next; item = item->next;

View file

@ -4,7 +4,7 @@
#include "mqtt.h" #include "mqtt.h"
bool leave = false; int leave = 0;
#define LOG(fmt, ...) fprintf(stdout, fmt "\n", ## __VA_ARGS__) #define LOG(fmt, ...) fprintf(stdout, fmt "\n", ## __VA_ARGS__)
@ -15,26 +15,38 @@ bool err_handler(MQTTHandle *handle, MQTTConfig *config, MQTTErrorCode error) {
return true; return true;
} }
void publish_handler(MQTTHandle *handle, char *topic, char *message) {
LOG("Published %s -> %s", topic, message);
leave++;
}
void mqtt_connected(MQTTHandle *handle, void *context) { void mqtt_connected(MQTTHandle *handle, void *context) {
LOG("Connected!"); LOG("Connected!");
MQTTStatus result;
LOG("Trying publish to testsuite/mqtt/test..."); LOG("Trying publish to testsuite/mqtt/test...");
MQTTStatus result = mqtt_publish(handle, "testsuite/mqtt/test", "payload", MQTT_QOS_0); result = mqtt_publish(handle, "testsuite/mqtt/test", "payload1", MQTT_QOS_0, publish_handler);
if (result != MQTT_STATUS_OK) { if (result != MQTT_STATUS_OK) {
LOG("Could not publish"); LOG("Could not publish");
exit(1); exit(1);
} }
sleep(1); LOG("Trying publish to testsuite/mqtt/test_qos1...");
result = mqtt_publish(handle, "testsuite/mqtt/test_qos1", "payload2", MQTT_QOS_1, publish_handler);
LOG("Disconnecting...");
result = mqtt_disconnect(handle, NULL, NULL);
if (result != MQTT_STATUS_OK) { if (result != MQTT_STATUS_OK) {
LOG("Could not disconnect"); LOG("Could not publish");
exit(1); exit(1);
} }
exit(0); LOG("Trying publish to testsuite/mqtt/test_qos2...");
result = mqtt_publish(handle, "testsuite/mqtt/test_qos2", "payload3", MQTT_QOS_2, publish_handler);
if (result != MQTT_STATUS_OK) {
LOG("Could not publish");
exit(1);
}
leave = true;
} }
int main(int argc, char **argv) { int main(int argc, char **argv) {
@ -62,8 +74,20 @@ int main(int argc, char **argv) {
return 1; return 1;
} }
while (!leave) { int cancel = 0;
while (leave < 3) {
LOG("Waiting..."); LOG("Waiting...");
sleep(1); sleep(1);
cancel++;
if (cancel == 10) {
break;
}
}
LOG("Disconnecting...");
MQTTStatus result = mqtt_disconnect(mqtt, NULL, NULL);
if (result != MQTT_STATUS_OK) {
LOG("Could not disconnect");
exit(1);
} }
} }