Initial commit

This commit is contained in:
Johannes Schriewer 2018-07-27 02:25:06 +02:00
commit 4a32facdb5
12 changed files with 310 additions and 0 deletions

27
Makefile.linux Normal file
View file

@ -0,0 +1,27 @@
SRCS=src/mqtt.c src/debug.c platform/linux.c
OBJS=$(SRCS:%.c=%.o)
TARGET=libmqtt.a
AR=ar
CC=clang
CFLAGS=-g -Os -Wall -pthread
all: $(TARGET)
test: $(TARGET)
$(MAKE) -C tests
$(TARGET): $(OBJS)
$(AR) -cr $(TARGET) $(OBJS)
%.o: %.c
$(CC) $(CFLAGS) -o $@ -c $<
mqtt.o: mqtt.h
clean:
$(MAKE) -C tests clean
rm -f $(TARGET)
rm -f $(OBJS)
rm -rf docs/

0
doc/.gitkeep Normal file
View file

0
platform/esp32.c Normal file
View file

0
platform/esp8266.c Normal file
View file

25
platform/linux.c Normal file
View file

@ -0,0 +1,25 @@
#include <pthread.h>
#include "platform.h"
struct _PlatformData {
pthread_t read_thread;
};
void initialize_platform(MQTTHandle *handle) {
handle->platform = calloc(sizeof(struct _PlatformData));
}
MQTTStatus run_read_task(MQTTHandle *handle, Reader reader) {
if (pthread_create(&handle->platform->read_thread, NULL, reader, (void *)handle)) {
return MQTT_STATUS_ERROR;
}
}
MQTTStatus join_read_task(MQTTHandle *handle) {
pthread_join(handle->platform->read_thread);
}
void release_platform(MQTTHandle *handle) {
free(handle->platform);
}

37
platform/platform.h Normal file
View file

@ -0,0 +1,37 @@
#ifndef platform_h__included
#define platform_h__included
#include "mqtt_internal.h"
typedef void (*Reader)(MQTTHandle *handle);
/**
* Initialize platform specific data
*
* @param handle: The handle to initialize
*/
void initialize_platform(MQTTHandle *handle);
/**
* Platform specific function to start a reading thread
*
* @param handle: The broker connection handle
* @param reader: callback to run in the thread
*/
MQTTStatus run_read_task(MQTTHandle *handle, Reader reader);
/**
* Platform specific function to clean up the reading thread
*
* @param handle: The broker connection handle
*/
MQTTStatus join_read_task(MQTTHandle *handle);
/**
* Platform specific function to release resources associated with a MQTTHandle
*
* @param handle: The handle to clean up
*/
void release_platform(MQTTHandle *handle);
#endif /* platform_h__included */

0
src/debug.c Normal file
View file

0
src/debug.h Normal file
View file

89
src/mqtt.c Normal file
View file

@ -0,0 +1,89 @@
#include <arpa/inet.h>
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>
#include "mqtt.h"
#include "mqtt_internal.h"
#include "platform.h"
// TODO: Make configurable by platform
#define BUF_LEN 256
static void _reader(MQTTHandle *handle) {
int num_bytes;
char buffer[BUF_LEN];
while (1) {
num_bytes = read(handle->sock, &buffer, BUF_LEN);
if (num_bytes == 0) {
/* Socket closed */
return;
} else if (num_bytes < 0) {
/* Error, TODO: Handle */
}
// TODO: Parse and dispatch
}
}
MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback) {
int ret;
MQTTHandle *handle = calloc(sizeof(struct _MQTTHandle), 1);
initialize_platform(handle);
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
if (config->port == 0) {
config->port = 1883;
}
handle->sock = socket(AF_INET, SOCK_STREAM, 0);
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(config->port);
ret = inet_pton(AF_INET, config->hostname, &(servaddr.sin_addr));
if (ret == 0) {
callback(handle, MQTT_Error_Host_Not_Found);
close(handle->sock);
free(handle);
return NULL;
}
ret = connect(handle->sock, (struct sockaddr *)&servaddr, sizeof(servaddr));
if (ret != 0) {
callback(handle, MQTT_Error_Connection_Refused);
close(handle->sock);
free(handle);
return NULL;
}
run_read_task(handle, _reader);
return handle;
}
MQTTStatus mqtt_reconnect(MQTTHandle *handle) {
// TODO: reconnect
}
MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTEventHandler callback) {
// TODO: subscribe
}
MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) {
// TODO: unsubscribe
}
MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level) {
// TODO: publish
}
MQTTStatus mqtt_disconnect(MQTTHandle *handle) {
release_platform(handle);
free(handle);
}

107
src/mqtt.h Normal file
View file

@ -0,0 +1,107 @@
#ifndef mqtt_h__included
#define mqtt_h__included
#include <stdint.h>
typedef struct _MQTTHandle MQTTHandle;
typedef struct {
char *hostname; /**< Hostname to connect to, will do DNS resolution */
uint16_t port; /**< Port the broker listens on, set to 0 for 1883 default */
char *clientID; /**< Client identification */
char *username; /**< User name, set to NULL to connect anonymously */
char *password; /**< Password, set to NULL to connect without password */
} MQTTConfig;
typedef enum {
MQTT_STATUS_OK = 0, /**< All ok, no error */
MQTT_STATUS_ERROR /**< Error, action did not succeed, error handler will be called */
} MQTTStatus;
typedef enum {
MQTT_QOS_0 = 0, /**< At most once, drop message if nobody is listening, no ACK */
MQTT_QOS_1, /**< At least once, wait for ACK from broker */
MQTT_QOS_2, /**< Exactly once, do triple way handshake with broker */
} MQTTQosLevel;
typedef enum {
MQTT_Error_Host_Not_Found, /**< Host could not be resolved */
MQTT_Error_Connection_Refused, /**< Connection was refused, wrong port? */
MQTT_Error_Broker_Disconnected, /**< Broker went down, perhaps restart? */
MQTT_Error_Authentication, /**< Authentication error, wrong or missing username/password? */
MQTT_Error_Protocol_Not_Supported, /**< Broker does not speak MQTT protocol 3.1.1 (aka version 4) */
MQTT_Error_Connection_Reset /**< Network connection reset, perhaps network went down? */
} MQTTErrorCode;
/** Error handler callback */
typedef void (*MQTTErrorHandler)(MQTTHandle *handle, MQTTErrorCode code);
/** Event handler callback */
typedef void (*MQTTEventHandler)(MQTTHandle *handle, char *topic, char *payload);
/**
* Connect to MQTT broker
*
* @param config: MQTT configuration
* @param callback: Callback function to call on errors
* @returns handle to mqtt connection or NULL on error
*/
MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback);
/**
* Re-Connect to MQTT broker
*
* Usually called in the MQTTErrorHandler callback, if called on a working
* connection the connection will be disconnected before reconnecting.
*
* If there were registered subscriptions they will be re-instated after
* a successful reconnect.
*
* @param handle: MQTT Handle from `mqtt_connect`
* @returns: Status code
*/
MQTTStatus mqtt_reconnect(MQTTHandle *handle);
/**
* Subscribe to a topic
*
* @param handle: MQTT Handle from `mqtt_connect`
* @param topic: Topic to subscribe
* @param callback: Callback function to call when receiving something for that topic
* @returns: Status code
*/
MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTEventHandler callback);
/**
* Un-Subscribe from a topic
*
* @param handle: MQTT Handle from `mqtt_connect`
* @param topic: Topic to unsubscribe
* @returns: Status code
*/
MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic);
/**
* Publish something to the broker
*
* @param handle: MQTT Handle from `mqtt_connect`
* @param topic: Topic to publish to
* @param payload: Message payload to publish
* @returns: Status code
*/
MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level);
/**
* Disconnect from MQTT broker
*
* @param handle: MQTT Handle from `mqtt_connect`
* @returns: Status code
*
* @attention: do not use the handle after calling this function,
* all resources will be freed, this handle is now invalid!
*/
MQTTStatus mqtt_disconnect(MQTTHandle *handle);
#endif /* mqtt_h__included */

25
src/mqtt_internal.h Normal file
View file

@ -0,0 +1,25 @@
#ifndef mqtt_internal_h__included
#define mqtt_internal_h__included
#include "mqtt.h"
typedef struct _PlatformData PlatformData;
typedef struct {
char *topic;
MQTTEventHandler *handler;
bool pending;
} Subscriptions;
struct _MQTTHandle {
MQTTErrorHandler *errorHandler;
Subscriptions *subscriptions;
int num_subscriptions;
int sock;
// TODO: status queue (Waiting for ACK)
PlatformData *platform;
};
#endif /* mqtt_internal_h__included */

0
tests/.gitkeep Normal file
View file