commit 4a32facdb5feda6908827450e0785f0f3415b93b Author: Johannes Schriewer Date: Fri Jul 27 02:25:06 2018 +0200 Initial commit diff --git a/Makefile.linux b/Makefile.linux new file mode 100644 index 0000000..50f77b2 --- /dev/null +++ b/Makefile.linux @@ -0,0 +1,27 @@ +SRCS=src/mqtt.c src/debug.c platform/linux.c +OBJS=$(SRCS:%.c=%.o) + +TARGET=libmqtt.a + +AR=ar +CC=clang +CFLAGS=-g -Os -Wall -pthread + +all: $(TARGET) + +test: $(TARGET) + $(MAKE) -C tests + +$(TARGET): $(OBJS) + $(AR) -cr $(TARGET) $(OBJS) + +%.o: %.c + $(CC) $(CFLAGS) -o $@ -c $< + +mqtt.o: mqtt.h + +clean: + $(MAKE) -C tests clean + rm -f $(TARGET) + rm -f $(OBJS) + rm -rf docs/ diff --git a/doc/.gitkeep b/doc/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/platform/esp32.c b/platform/esp32.c new file mode 100644 index 0000000..e69de29 diff --git a/platform/esp8266.c b/platform/esp8266.c new file mode 100644 index 0000000..e69de29 diff --git a/platform/linux.c b/platform/linux.c new file mode 100644 index 0000000..65e4bfd --- /dev/null +++ b/platform/linux.c @@ -0,0 +1,25 @@ +#include + +#include "platform.h" + +struct _PlatformData { + pthread_t read_thread; +}; + +void initialize_platform(MQTTHandle *handle) { + handle->platform = calloc(sizeof(struct _PlatformData)); +} + +MQTTStatus run_read_task(MQTTHandle *handle, Reader reader) { + if (pthread_create(&handle->platform->read_thread, NULL, reader, (void *)handle)) { + return MQTT_STATUS_ERROR; + } +} + +MQTTStatus join_read_task(MQTTHandle *handle) { + pthread_join(handle->platform->read_thread); +} + +void release_platform(MQTTHandle *handle) { + free(handle->platform); +} diff --git a/platform/platform.h b/platform/platform.h new file mode 100644 index 0000000..58b4577 --- /dev/null +++ b/platform/platform.h @@ -0,0 +1,37 @@ +#ifndef platform_h__included +#define platform_h__included + +#include "mqtt_internal.h" + +typedef void (*Reader)(MQTTHandle *handle); + +/** + * Initialize platform specific data + * + * @param handle: The handle to initialize + */ +void initialize_platform(MQTTHandle *handle); + +/** + * Platform specific function to start a reading thread + * + * @param handle: The broker connection handle + * @param reader: callback to run in the thread + */ +MQTTStatus run_read_task(MQTTHandle *handle, Reader reader); + +/** + * Platform specific function to clean up the reading thread + * + * @param handle: The broker connection handle + */ +MQTTStatus join_read_task(MQTTHandle *handle); + +/** + * Platform specific function to release resources associated with a MQTTHandle + * + * @param handle: The handle to clean up + */ +void release_platform(MQTTHandle *handle); + +#endif /* platform_h__included */ diff --git a/src/debug.c b/src/debug.c new file mode 100644 index 0000000..e69de29 diff --git a/src/debug.h b/src/debug.h new file mode 100644 index 0000000..e69de29 diff --git a/src/mqtt.c b/src/mqtt.c new file mode 100644 index 0000000..66fc699 --- /dev/null +++ b/src/mqtt.c @@ -0,0 +1,89 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "mqtt.h" +#include "mqtt_internal.h" +#include "platform.h" + +// TODO: Make configurable by platform +#define BUF_LEN 256 + +static void _reader(MQTTHandle *handle) { + int num_bytes; + char buffer[BUF_LEN]; + + while (1) { + num_bytes = read(handle->sock, &buffer, BUF_LEN); + if (num_bytes == 0) { + /* Socket closed */ + return; + } else if (num_bytes < 0) { + /* Error, TODO: Handle */ + } + + // TODO: Parse and dispatch + } +} + +MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback) { + int ret; + MQTTHandle *handle = calloc(sizeof(struct _MQTTHandle), 1); + initialize_platform(handle); + + struct sockaddr_in servaddr; + memset(&servaddr, 0, sizeof(servaddr)); + + if (config->port == 0) { + config->port = 1883; + } + + handle->sock = socket(AF_INET, SOCK_STREAM, 0); + servaddr.sin_family = AF_INET; + servaddr.sin_port = htons(config->port); + + ret = inet_pton(AF_INET, config->hostname, &(servaddr.sin_addr)); + if (ret == 0) { + callback(handle, MQTT_Error_Host_Not_Found); + close(handle->sock); + free(handle); + return NULL; + } + + ret = connect(handle->sock, (struct sockaddr *)&servaddr, sizeof(servaddr)); + if (ret != 0) { + callback(handle, MQTT_Error_Connection_Refused); + close(handle->sock); + free(handle); + return NULL; + } + + run_read_task(handle, _reader); + + return handle; +} + +MQTTStatus mqtt_reconnect(MQTTHandle *handle) { + // TODO: reconnect +} + +MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTEventHandler callback) { + // TODO: subscribe +} + +MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) { + // TODO: unsubscribe +} + +MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level) { + // TODO: publish +} + +MQTTStatus mqtt_disconnect(MQTTHandle *handle) { + release_platform(handle); + free(handle); +} diff --git a/src/mqtt.h b/src/mqtt.h new file mode 100644 index 0000000..e284793 --- /dev/null +++ b/src/mqtt.h @@ -0,0 +1,107 @@ +#ifndef mqtt_h__included +#define mqtt_h__included + +#include + +typedef struct _MQTTHandle MQTTHandle; + +typedef struct { + char *hostname; /**< Hostname to connect to, will do DNS resolution */ + uint16_t port; /**< Port the broker listens on, set to 0 for 1883 default */ + + char *clientID; /**< Client identification */ + + char *username; /**< User name, set to NULL to connect anonymously */ + char *password; /**< Password, set to NULL to connect without password */ +} MQTTConfig; + +typedef enum { + MQTT_STATUS_OK = 0, /**< All ok, no error */ + MQTT_STATUS_ERROR /**< Error, action did not succeed, error handler will be called */ +} MQTTStatus; + +typedef enum { + MQTT_QOS_0 = 0, /**< At most once, drop message if nobody is listening, no ACK */ + MQTT_QOS_1, /**< At least once, wait for ACK from broker */ + MQTT_QOS_2, /**< Exactly once, do triple way handshake with broker */ +} MQTTQosLevel; + +typedef enum { + MQTT_Error_Host_Not_Found, /**< Host could not be resolved */ + MQTT_Error_Connection_Refused, /**< Connection was refused, wrong port? */ + MQTT_Error_Broker_Disconnected, /**< Broker went down, perhaps restart? */ + MQTT_Error_Authentication, /**< Authentication error, wrong or missing username/password? */ + MQTT_Error_Protocol_Not_Supported, /**< Broker does not speak MQTT protocol 3.1.1 (aka version 4) */ + MQTT_Error_Connection_Reset /**< Network connection reset, perhaps network went down? */ +} MQTTErrorCode; + +/** Error handler callback */ +typedef void (*MQTTErrorHandler)(MQTTHandle *handle, MQTTErrorCode code); + +/** Event handler callback */ +typedef void (*MQTTEventHandler)(MQTTHandle *handle, char *topic, char *payload); + +/** + * Connect to MQTT broker + * + * @param config: MQTT configuration + * @param callback: Callback function to call on errors + * @returns handle to mqtt connection or NULL on error + */ +MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback); + +/** + * Re-Connect to MQTT broker + * + * Usually called in the MQTTErrorHandler callback, if called on a working + * connection the connection will be disconnected before reconnecting. + * + * If there were registered subscriptions they will be re-instated after + * a successful reconnect. + * + * @param handle: MQTT Handle from `mqtt_connect` + * @returns: Status code + */ +MQTTStatus mqtt_reconnect(MQTTHandle *handle); + +/** + * Subscribe to a topic + * + * @param handle: MQTT Handle from `mqtt_connect` + * @param topic: Topic to subscribe + * @param callback: Callback function to call when receiving something for that topic + * @returns: Status code + */ +MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTEventHandler callback); + +/** + * Un-Subscribe from a topic + * + * @param handle: MQTT Handle from `mqtt_connect` + * @param topic: Topic to unsubscribe + * @returns: Status code + */ +MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic); + +/** + * Publish something to the broker + * + * @param handle: MQTT Handle from `mqtt_connect` + * @param topic: Topic to publish to + * @param payload: Message payload to publish + * @returns: Status code + */ +MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level); + +/** + * Disconnect from MQTT broker + * + * @param handle: MQTT Handle from `mqtt_connect` + * @returns: Status code + * + * @attention: do not use the handle after calling this function, + * all resources will be freed, this handle is now invalid! + */ +MQTTStatus mqtt_disconnect(MQTTHandle *handle); + +#endif /* mqtt_h__included */ diff --git a/src/mqtt_internal.h b/src/mqtt_internal.h new file mode 100644 index 0000000..fa5e565 --- /dev/null +++ b/src/mqtt_internal.h @@ -0,0 +1,25 @@ +#ifndef mqtt_internal_h__included +#define mqtt_internal_h__included + +#include "mqtt.h" + +typedef struct _PlatformData PlatformData; + +typedef struct { + char *topic; + MQTTEventHandler *handler; + bool pending; +} Subscriptions; + +struct _MQTTHandle { + MQTTErrorHandler *errorHandler; + Subscriptions *subscriptions; + int num_subscriptions; + + int sock; + + // TODO: status queue (Waiting for ACK) + PlatformData *platform; +}; + +#endif /* mqtt_internal_h__included */ diff --git a/tests/.gitkeep b/tests/.gitkeep new file mode 100644 index 0000000..e69de29