diff --git a/Readme.md b/Readme.md index 8aeede2..ffeb849 100644 --- a/Readme.md +++ b/Readme.md @@ -173,13 +173,14 @@ MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic); #### Publish something to the broker ```c -MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level); +MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level, MQTTPublishEventCallback callback); ``` - `handle`: MQTT Handle from `mqtt_connect` - `topic`: Topic to publish to - `payload`: Message payload to publish - `qos_level`: QoS Level for the publish (0 = Fire and forget, 1 = At least once, 2 = One time for sure) +- `callback`: Callback function that is called when publish cleared the QoS handlers - Returns status code This uses a c-string as the payload, theoretically the protocol would allow for binary payloads, but this is currently diff --git a/src/mqtt.c b/src/mqtt.c index dcd5536..17a68ce 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -239,12 +239,12 @@ MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) { return (send_unsubscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); } -MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level) { +MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level, MQTTPublishEventHandler callback) { if (!handle->reader_alive) { handle->error_handler(handle, handle->config, MQTT_Error_Connection_Reset); return MQTT_STATUS_ERROR; } - return (send_publish_packet(handle, topic, payload, qos_level) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); + return (send_publish_packet(handle, topic, payload, qos_level, callback) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); } MQTTStatus mqtt_disconnect(MQTTHandle *handle, MQTTEventHandler callback, void *callback_context) { diff --git a/src/mqtt.h b/src/mqtt.h index 6dfa4df..e51d12d 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -121,9 +121,11 @@ MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic); * @param handle: MQTT Handle from `mqtt_connect` * @param topic: Topic to publish to * @param payload: Message payload to publish + * @param qos_level: QoS level to use + * @param callback: finish callback * @returns: Status code */ -MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level); +MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level, MQTTPublishEventHandler callback); /** * Disconnect from MQTT broker diff --git a/src/protocol.c b/src/protocol.c index 523fa09..c685c34 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -7,6 +7,15 @@ #include "debug.h" +typedef struct { + PublishPayload *payload; + MQTTPublishEventHandler callback; +} PublishCallback; + +/* + * Utility + */ + bool send_buffer(MQTTHandle *handle, Buffer *buffer) { while (!buffer_eof(buffer)) { ssize_t bytes = write(handle->sock, buffer->data + buffer->position, buffer_free_space(buffer)); @@ -20,6 +29,39 @@ bool send_buffer(MQTTHandle *handle, Buffer *buffer) { return true; } +/* + * QoS event handlers + */ + +void handle_puback_pubcomp(MQTTHandle *handle, void *context) { + PublishCallback *ctx = (PublishCallback *)context; + + if (ctx->callback) { + ctx->callback(handle, ctx->payload->topic, ctx->payload->message); + } + + free(ctx->payload); + free(ctx); +} + +void handle_pubrec(MQTTHandle *handle, void *context) { + PublishCallback *ctx = (PublishCallback *)context; + + PubRelPayload newPayload = { + .packet_id = ctx->payload->packet_id + }; + + Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePubRel, &newPayload }); + expect_packet(handle, PacketTypePubComp, ctx->payload->packet_id, handle_puback_pubcomp, context); + + encoded->position = 0; + send_buffer(handle, encoded); +} + +/* + * packet constructors + */ + #if MQTT_CLIENT bool send_connect_packet(MQTTHandle *handle) { ConnectPayload *payload = calloc(1, sizeof(ConnectPayload)); @@ -92,23 +134,7 @@ bool send_unsubscribe_packet(MQTTHandle *handle, char *topic) { } #endif /* MQTT_CLIENT */ -void handle_pubrec(MQTTHandle *handle, void *context) { - PublishPayload *payload = (PublishPayload *)context; - - PubRelPayload newPayload = { - .packet_id = payload->packet_id - }; - - Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePubRel, &newPayload }); - - expect_packet(handle, PacketTypePubComp, payload->packet_id, NULL, NULL); - free(payload); - - encoded->position = 0; - send_buffer(handle, encoded); -} - -bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos) { +bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos, MQTTPublishEventHandler callback) { PublishPayload *payload = calloc(1, sizeof(PublishPayload)); payload->qos = qos; @@ -118,24 +144,39 @@ bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQos payload->message = message; Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePublish, payload }); + encoded->position = 0; + bool result = send_buffer(handle, encoded); + if (!result) { + free(payload); + return false; + } // Handle QoS and add waiting packets to queue switch(payload->qos) { case MQTT_QOS_0: // fire and forget + if (callback) { + callback(handle, payload->topic, payload->message); + } free(payload); break; - case MQTT_QOS_1: - expect_packet(handle, PacketTypePubAck, payload->packet_id, NULL, NULL); - free(payload); + case MQTT_QOS_1: { + PublishCallback *ctx = malloc(sizeof(PublishCallback)); + ctx->payload = payload; + ctx->callback = callback; + expect_packet(handle, PacketTypePubAck, payload->packet_id, handle_puback_pubcomp, ctx); break; - case MQTT_QOS_2: - expect_packet(handle, PacketTypePubRec, payload->packet_id, handle_pubrec, payload); + } + case MQTT_QOS_2: { + PublishCallback *ctx = malloc(sizeof(PublishCallback)); + ctx->payload = payload; + ctx->callback = callback; + expect_packet(handle, PacketTypePubRec, payload->packet_id, handle_pubrec, ctx); break; + } } - encoded->position = 0; - return send_buffer(handle, encoded); + return true; } #if MQTT_CLIENT diff --git a/src/protocol.h b/src/protocol.h index 800554a..6e28647 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -10,6 +10,6 @@ bool send_unsubscribe_packet(MQTTHandle *handle, char *topic); bool send_disconnect_packet(MQTTHandle *handle); #endif /* MQTT_CLIENT */ -bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos); +bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos, MQTTPublishEventHandler callback); #endif diff --git a/src/state_queue.c b/src/state_queue.c index 6ba5187..f6d40e6 100644 --- a/src/state_queue.c +++ b/src/state_queue.c @@ -2,6 +2,7 @@ #include "state_queue.h" #include "debug.h" +#if 0 static inline void dump_expected(MQTTHandle *handle) { MQTTCallbackQueueItem *item = handle->queue.pending; @@ -13,6 +14,7 @@ static inline void dump_expected(MQTTHandle *handle) { item = item->next; } } +#endif void expect_packet(MQTTHandle *handle, MQTTControlPacketType type, uint16_t packet_id, MQTTEventHandler callback, void *context) { MQTTCallbackQueueItem *item = calloc(1, sizeof(MQTTCallbackQueueItem)); @@ -45,7 +47,6 @@ void remove_from_queue(MQTTHandle *handle, MQTTCallbackQueueItem *remove) { // attach next item to prev item removing this one prev_item->next = item->next; } - free(item); break; } @@ -60,11 +61,12 @@ bool dispatch_packet(MQTTHandle *handle, MQTTPacket *packet) { while (item != NULL) { if ((item->type == packet->packet_type) && (item->packet_id == packet_id)) { + remove_from_queue(handle, item); if (item->callback) { item->callback(handle, item->context); } + free(item); - remove_from_queue(handle, item); return true; } item = item->next; diff --git a/tests/connect_publish.c b/tests/connect_publish.c index 6d872dd..2695f71 100644 --- a/tests/connect_publish.c +++ b/tests/connect_publish.c @@ -4,7 +4,7 @@ #include "mqtt.h" -bool leave = false; +int leave = 0; #define LOG(fmt, ...) fprintf(stdout, fmt "\n", ## __VA_ARGS__) @@ -15,26 +15,38 @@ bool err_handler(MQTTHandle *handle, MQTTConfig *config, MQTTErrorCode error) { return true; } +void publish_handler(MQTTHandle *handle, char *topic, char *message) { + LOG("Published %s -> %s", topic, message); + + leave++; +} + void mqtt_connected(MQTTHandle *handle, void *context) { LOG("Connected!"); + MQTTStatus result; LOG("Trying publish to testsuite/mqtt/test..."); - MQTTStatus result = mqtt_publish(handle, "testsuite/mqtt/test", "payload", MQTT_QOS_0); + result = mqtt_publish(handle, "testsuite/mqtt/test", "payload1", MQTT_QOS_0, publish_handler); if (result != MQTT_STATUS_OK) { LOG("Could not publish"); exit(1); } - sleep(1); - - LOG("Disconnecting..."); - result = mqtt_disconnect(handle, NULL, NULL); + LOG("Trying publish to testsuite/mqtt/test_qos1..."); + result = mqtt_publish(handle, "testsuite/mqtt/test_qos1", "payload2", MQTT_QOS_1, publish_handler); if (result != MQTT_STATUS_OK) { - LOG("Could not disconnect"); + LOG("Could not publish"); exit(1); } - exit(0); + LOG("Trying publish to testsuite/mqtt/test_qos2..."); + result = mqtt_publish(handle, "testsuite/mqtt/test_qos2", "payload3", MQTT_QOS_2, publish_handler); + if (result != MQTT_STATUS_OK) { + LOG("Could not publish"); + exit(1); + } + + leave = true; } int main(int argc, char **argv) { @@ -62,8 +74,20 @@ int main(int argc, char **argv) { return 1; } - while (!leave) { + int cancel = 0; + while (leave < 3) { LOG("Waiting..."); sleep(1); + cancel++; + if (cancel == 10) { + break; + } + } + + LOG("Disconnecting..."); + MQTTStatus result = mqtt_disconnect(mqtt, NULL, NULL); + if (result != MQTT_STATUS_OK) { + LOG("Could not disconnect"); + exit(1); } }