From 05b939dfea2047a7c7aae4b801ea9a9d0b72a6ab Mon Sep 17 00:00:00 2001 From: Johannes Schriewer Date: Mon, 30 Jul 2018 02:53:18 +0200 Subject: [PATCH] Implement statemachine, QoS untested for now --- Makefile.linux | 2 +- src/mqtt_internal.h | 27 +----------- src/state_queue.c | 102 ++++++++++++++++++++++++++++++++++++++++++++ src/state_queue.h | 24 +++++++++++ src/subscriptions.c | 66 ++++++++++++++++++++++++++++ src/subscriptions.h | 24 +++++++++++ 6 files changed, 219 insertions(+), 26 deletions(-) create mode 100644 src/state_queue.c create mode 100644 src/state_queue.h create mode 100644 src/subscriptions.c create mode 100644 src/subscriptions.h diff --git a/Makefile.linux b/Makefile.linux index 0bd782a..1e04a90 100644 --- a/Makefile.linux +++ b/Makefile.linux @@ -1,4 +1,4 @@ -SRCS=src/mqtt.c src/packet.c src/protocol.c src/debug.c platform/linux.c +SRCS=src/mqtt.c src/packet.c src/protocol.c src/state_queue.c src/subscriptions.c src/debug.c platform/linux.c OBJS=$(SRCS:%.c=%.o) DEBUG_OBJS=$(SRCS:%.c=%.do) COVERAGE_FILES=$(SRCS:%.c=%.gcno) $(SRCS:%.c=%.gcda) diff --git a/src/mqtt_internal.h b/src/mqtt_internal.h index 5ee673b..972487f 100644 --- a/src/mqtt_internal.h +++ b/src/mqtt_internal.h @@ -3,34 +3,11 @@ #include "mqtt.h" #include "packet.h" +#include "subscriptions.h" +#include "state_queue.h" typedef struct _PlatformData PlatformData; -typedef struct { - char *topic; - MQTTEventHandler *handler; - bool pending; -} SubscriptionItem; - -typedef struct { - SubscriptionItem *items; - uint8_t num_items; -} Subscriptions; - -typedef void (*MQTTCallback)(MQTTHandle *handle, MQTTPacket *packet, void *context); - -typedef struct { - MQTTControlPacketType type; - uint16_t packet_id; - void *context; - MQTTCallback callback; -} MQTTCallbackQueueItem; - -typedef struct { - MQTTCallbackQueueItem *pending; - uint8_t num_items; -} MQTTCallbackQueue; - struct _MQTTHandle { MQTTConfig *config; diff --git a/src/state_queue.c b/src/state_queue.c new file mode 100644 index 0000000..8cce625 --- /dev/null +++ b/src/state_queue.c @@ -0,0 +1,102 @@ +#include "mqtt_internal.h" +#include "state_queue.h" +#include "debug.h" + +static inline void dump_expected(MQTTHandle *handle) { + MQTTCallbackQueueItem *item = handle->queue.pending; + + DEBUG_LOG("Expected packets:") + + while (item != NULL) { + DEBUG_LOG(" - Type: %d, packet_id: %d", item->type, item->packet_id); + + item = item->next; + } +} + +void expect_packet(MQTTHandle *handle, MQTTControlPacketType type, uint16_t packet_id, MQTTEventHandler callback, void *context) { + MQTTCallbackQueueItem *item = calloc(1, sizeof(MQTTCallbackQueueItem)); + + item->type = type; + item->packet_id = packet_id; + item->callback = callback; + item->context = context; + + // insert at start + if (handle->queue.pending != NULL) { + item->next = handle->queue.pending; + } + + handle->queue.pending = item; + // dump_expected(handle); +} + +static uint16_t get_packet_id(MQTTPacket *packet) { + switch(packet->packet_type) { + case PacketTypePublish: + return ((PublishPayload *)packet->payload)->packet_id; + case PacketTypePubAck: + return ((PubAckPayload *)packet->payload)->packet_id; + case PacketTypePubRec: + return ((PubRecPayload *)packet->payload)->packet_id; + case PacketTypePubRel: + return ((PubRelPayload *)packet->payload)->packet_id; + case PacketTypePubComp: + return ((PubCompPayload *)packet->payload)->packet_id; + case PacketTypeSubscribe: + return ((SubscribePayload *)packet->payload)->packet_id; + case PacketTypeSubAck: + return ((SubAckPayload *)packet->payload)->packet_id; + case PacketTypeUnsubscribe: + return ((UnsubscribePayload *)packet->payload)->packet_id; + case PacketTypeUnsubAck: + return ((UnsubAckPayload *)packet->payload)->packet_id; + default: + return 0; // no packet id in payload + } +} + +static inline void remove_from_queue(MQTTHandle *handle, MQTTCallbackQueueItem *remove) { + MQTTCallbackQueueItem *item = handle->queue.pending; + MQTTCallbackQueueItem *prev_item = NULL; + + while (item != NULL) { + if (item == remove) { + // remove from queue + if (prev_item == NULL) { + // no prev item, attach directly to queue + handle->queue.pending = item->next; + } else { + // attach next item to prev item removing this one + prev_item->next = item->next; + } + free(item); + + break; + } + prev_item = item; + item = item->next; + } +} + +bool dispatch_packet(MQTTHandle *handle, MQTTPacket *packet) { + MQTTCallbackQueueItem *item = handle->queue.pending; + MQTTCallbackQueueItem *prev_item = NULL; + uint16_t packet_id = get_packet_id(packet); + + while (item != NULL) { + if ((item->type == packet->packet_type) && (item->packet_id == packet_id)) { + if (item->callback) { + item->callback(handle, item->context); + } + + remove_from_queue(handle, item); + return true; + } + prev_item = item; + item = item->next; + } + + // not found + return false; +} diff --git a/src/state_queue.h b/src/state_queue.h new file mode 100644 index 0000000..9657ebd --- /dev/null +++ b/src/state_queue.h @@ -0,0 +1,24 @@ +#ifndef state_queue_h__included +#define state_queue_h__included + +#include +#include "mqtt.h" +#include "packet.h" + +typedef struct _MQTTCallbackQueueItem { + struct _MQTTCallbackQueueItem *next; + + MQTTControlPacketType type; + uint16_t packet_id; + void *context; + MQTTEventHandler callback; +} MQTTCallbackQueueItem; + +typedef struct { + MQTTCallbackQueueItem *pending; +} MQTTCallbackQueue; + +void expect_packet(MQTTHandle *handle, MQTTControlPacketType type, uint16_t packet_id, MQTTEventHandler callback, void *context); +bool dispatch_packet(MQTTHandle *handle, MQTTPacket *packet); + +#endif /* state_queue_h__included */ diff --git a/src/subscriptions.c b/src/subscriptions.c new file mode 100644 index 0000000..fe06479 --- /dev/null +++ b/src/subscriptions.c @@ -0,0 +1,66 @@ +#include "mqtt_internal.h" +#include "subscriptions.h" + +void add_subscription(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback) { + SubscriptionItem *item = calloc(1, sizeof(SubscriptionItem)); + + item->topic = topic; + item->handler = callback; + item->pending = true; + + // insert at start + if (handle->subscriptions.items != NULL) { + item->next = handle->subscriptions.items; + } + + handle->subscriptions.items = item; +} + +void remove_subscription(MQTTHandle *handle, char *topic) { + SubscriptionItem *item = handle->subscriptions.items; + SubscriptionItem *prev = NULL; + + while (item != NULL) { + if (strcmp(topic, item->topic) == 0) { + if (prev == NULL) { + handle->subscriptions.items = item->next; + } else { + prev->next = item->next; + } + + free(item); + break; + } + + prev = item; + item = item->next; + } +} + +void subscription_set_pending(MQTTHandle *handle, char *topic, bool pending) { + SubscriptionItem *item = handle->subscriptions.items; + + while (item != NULL) { + if (strcmp(topic, item->topic) == 0) { + item->pending = pending; + break; + } + + item = item->next; + } +} + +void dispatch_subscription(MQTTHandle *handle, PublishPayload *payload) { + SubscriptionItem *item = handle->subscriptions.items; + + while (item != NULL) { + if ((item->pending == false) && (strcmp(payload->topic, item->topic) == 0)) { + if (item->handler) { + item->handler(handle, payload->topic, payload->message); + } + break; + } + + item = item->next; + } +} diff --git a/src/subscriptions.h b/src/subscriptions.h new file mode 100644 index 0000000..1c706bd --- /dev/null +++ b/src/subscriptions.h @@ -0,0 +1,24 @@ +#ifndef subscriptions_h__included +#define subscriptions_h__included + +#include "mqtt.h" + +typedef struct _SubscriptionItem { + struct _SubscriptionItem *next; + + char *topic; + MQTTPublishEventHandler handler; + bool pending; +} SubscriptionItem; + +typedef struct { + SubscriptionItem *items; +} Subscriptions; + +void add_subscription(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback); +void remove_subscription(MQTTHandle *handle, char *topic); +void subscription_set_pending(MQTTHandle *handle, char *topic, bool pending); + +void dispatch_subscription(MQTTHandle *handle, PublishPayload *payload); + +#endif /* subscription_h__included */