Implement statemachine, QoS untested for now

This commit is contained in:
Johannes Schriewer 2018-07-30 02:53:18 +02:00
parent 4916148749
commit 05b939dfea
6 changed files with 219 additions and 26 deletions

View file

@ -1,4 +1,4 @@
SRCS=src/mqtt.c src/packet.c src/protocol.c src/debug.c platform/linux.c
SRCS=src/mqtt.c src/packet.c src/protocol.c src/state_queue.c src/subscriptions.c src/debug.c platform/linux.c
OBJS=$(SRCS:%.c=%.o)
DEBUG_OBJS=$(SRCS:%.c=%.do)
COVERAGE_FILES=$(SRCS:%.c=%.gcno) $(SRCS:%.c=%.gcda)

View file

@ -3,34 +3,11 @@
#include "mqtt.h"
#include "packet.h"
#include "subscriptions.h"
#include "state_queue.h"
typedef struct _PlatformData PlatformData;
typedef struct {
char *topic;
MQTTEventHandler *handler;
bool pending;
} SubscriptionItem;
typedef struct {
SubscriptionItem *items;
uint8_t num_items;
} Subscriptions;
typedef void (*MQTTCallback)(MQTTHandle *handle, MQTTPacket *packet, void *context);
typedef struct {
MQTTControlPacketType type;
uint16_t packet_id;
void *context;
MQTTCallback callback;
} MQTTCallbackQueueItem;
typedef struct {
MQTTCallbackQueueItem *pending;
uint8_t num_items;
} MQTTCallbackQueue;
struct _MQTTHandle {
MQTTConfig *config;

102
src/state_queue.c Normal file
View file

@ -0,0 +1,102 @@
#include "mqtt_internal.h"
#include "state_queue.h"
#include "debug.h"
static inline void dump_expected(MQTTHandle *handle) {
MQTTCallbackQueueItem *item = handle->queue.pending;
DEBUG_LOG("Expected packets:")
while (item != NULL) {
DEBUG_LOG(" - Type: %d, packet_id: %d", item->type, item->packet_id);
item = item->next;
}
}
void expect_packet(MQTTHandle *handle, MQTTControlPacketType type, uint16_t packet_id, MQTTEventHandler callback, void *context) {
MQTTCallbackQueueItem *item = calloc(1, sizeof(MQTTCallbackQueueItem));
item->type = type;
item->packet_id = packet_id;
item->callback = callback;
item->context = context;
// insert at start
if (handle->queue.pending != NULL) {
item->next = handle->queue.pending;
}
handle->queue.pending = item;
// dump_expected(handle);
}
static uint16_t get_packet_id(MQTTPacket *packet) {
switch(packet->packet_type) {
case PacketTypePublish:
return ((PublishPayload *)packet->payload)->packet_id;
case PacketTypePubAck:
return ((PubAckPayload *)packet->payload)->packet_id;
case PacketTypePubRec:
return ((PubRecPayload *)packet->payload)->packet_id;
case PacketTypePubRel:
return ((PubRelPayload *)packet->payload)->packet_id;
case PacketTypePubComp:
return ((PubCompPayload *)packet->payload)->packet_id;
case PacketTypeSubscribe:
return ((SubscribePayload *)packet->payload)->packet_id;
case PacketTypeSubAck:
return ((SubAckPayload *)packet->payload)->packet_id;
case PacketTypeUnsubscribe:
return ((UnsubscribePayload *)packet->payload)->packet_id;
case PacketTypeUnsubAck:
return ((UnsubAckPayload *)packet->payload)->packet_id;
default:
return 0; // no packet id in payload
}
}
static inline void remove_from_queue(MQTTHandle *handle, MQTTCallbackQueueItem *remove) {
MQTTCallbackQueueItem *item = handle->queue.pending;
MQTTCallbackQueueItem *prev_item = NULL;
while (item != NULL) {
if (item == remove) {
// remove from queue
if (prev_item == NULL) {
// no prev item, attach directly to queue
handle->queue.pending = item->next;
} else {
// attach next item to prev item removing this one
prev_item->next = item->next;
}
free(item);
break;
}
prev_item = item;
item = item->next;
}
}
bool dispatch_packet(MQTTHandle *handle, MQTTPacket *packet) {
MQTTCallbackQueueItem *item = handle->queue.pending;
MQTTCallbackQueueItem *prev_item = NULL;
uint16_t packet_id = get_packet_id(packet);
while (item != NULL) {
if ((item->type == packet->packet_type) && (item->packet_id == packet_id)) {
if (item->callback) {
item->callback(handle, item->context);
}
remove_from_queue(handle, item);
return true;
}
prev_item = item;
item = item->next;
}
// not found
return false;
}

24
src/state_queue.h Normal file
View file

@ -0,0 +1,24 @@
#ifndef state_queue_h__included
#define state_queue_h__included
#include <stdint.h>
#include "mqtt.h"
#include "packet.h"
typedef struct _MQTTCallbackQueueItem {
struct _MQTTCallbackQueueItem *next;
MQTTControlPacketType type;
uint16_t packet_id;
void *context;
MQTTEventHandler callback;
} MQTTCallbackQueueItem;
typedef struct {
MQTTCallbackQueueItem *pending;
} MQTTCallbackQueue;
void expect_packet(MQTTHandle *handle, MQTTControlPacketType type, uint16_t packet_id, MQTTEventHandler callback, void *context);
bool dispatch_packet(MQTTHandle *handle, MQTTPacket *packet);
#endif /* state_queue_h__included */

66
src/subscriptions.c Normal file
View file

@ -0,0 +1,66 @@
#include "mqtt_internal.h"
#include "subscriptions.h"
void add_subscription(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback) {
SubscriptionItem *item = calloc(1, sizeof(SubscriptionItem));
item->topic = topic;
item->handler = callback;
item->pending = true;
// insert at start
if (handle->subscriptions.items != NULL) {
item->next = handle->subscriptions.items;
}
handle->subscriptions.items = item;
}
void remove_subscription(MQTTHandle *handle, char *topic) {
SubscriptionItem *item = handle->subscriptions.items;
SubscriptionItem *prev = NULL;
while (item != NULL) {
if (strcmp(topic, item->topic) == 0) {
if (prev == NULL) {
handle->subscriptions.items = item->next;
} else {
prev->next = item->next;
}
free(item);
break;
}
prev = item;
item = item->next;
}
}
void subscription_set_pending(MQTTHandle *handle, char *topic, bool pending) {
SubscriptionItem *item = handle->subscriptions.items;
while (item != NULL) {
if (strcmp(topic, item->topic) == 0) {
item->pending = pending;
break;
}
item = item->next;
}
}
void dispatch_subscription(MQTTHandle *handle, PublishPayload *payload) {
SubscriptionItem *item = handle->subscriptions.items;
while (item != NULL) {
if ((item->pending == false) && (strcmp(payload->topic, item->topic) == 0)) {
if (item->handler) {
item->handler(handle, payload->topic, payload->message);
}
break;
}
item = item->next;
}
}

24
src/subscriptions.h Normal file
View file

@ -0,0 +1,24 @@
#ifndef subscriptions_h__included
#define subscriptions_h__included
#include "mqtt.h"
typedef struct _SubscriptionItem {
struct _SubscriptionItem *next;
char *topic;
MQTTPublishEventHandler handler;
bool pending;
} SubscriptionItem;
typedef struct {
SubscriptionItem *items;
} Subscriptions;
void add_subscription(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback);
void remove_subscription(MQTTHandle *handle, char *topic);
void subscription_set_pending(MQTTHandle *handle, char *topic, bool pending);
void dispatch_subscription(MQTTHandle *handle, PublishPayload *payload);
#endif /* subscription_h__included */