From 6ff7d2a841690c62661b69bfcc0443c439bc4a88 Mon Sep 17 00:00:00 2001 From: Johannes Schriewer Date: Sat, 28 Jul 2018 02:30:41 +0200 Subject: [PATCH] Implement packet encoder, start writing tests --- .gitignore | 2 + .vscode/c_cpp_properties.json | 31 ++ .vscode/settings.json | 16 + .vscode/tasks.json | 44 +++ Makefile.linux | 6 +- platform/linux.c | 11 +- src/buffer.h | 210 ++++++++++++++ src/debug.c | 30 ++ src/debug.h | 8 + src/mqtt.c | 17 +- src/mqtt.h | 1 + src/mqtt_internal.h | 1 + src/packet.c | 533 ++++++++++++++++++++++++++++++++++ src/packet.h | 155 ++++++++++ src/protocol.c | 0 src/protocol.h | 0 tests/Makefile | 29 ++ tests/cputime.c | 98 +++++++ tests/cputime.h | 20 ++ tests/encode_packet.c | 209 +++++++++++++ tests/test.h | 97 +++++++ 21 files changed, 1509 insertions(+), 9 deletions(-) create mode 100644 .gitignore create mode 100644 .vscode/c_cpp_properties.json create mode 100644 .vscode/settings.json create mode 100644 .vscode/tasks.json create mode 100644 src/buffer.h create mode 100644 src/packet.c create mode 100644 src/packet.h create mode 100644 src/protocol.c create mode 100644 src/protocol.h create mode 100644 tests/Makefile create mode 100644 tests/cputime.c create mode 100644 tests/cputime.h create mode 100644 tests/encode_packet.c create mode 100644 tests/test.h diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e0292b1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.o +*.a diff --git a/.vscode/c_cpp_properties.json b/.vscode/c_cpp_properties.json new file mode 100644 index 0000000..bba3e19 --- /dev/null +++ b/.vscode/c_cpp_properties.json @@ -0,0 +1,31 @@ +{ + "configurations": [ + { + "name": "Linux", + "includePath": [ + "${workspaceRoot}/src", + "${workspaceRoot}/platform", + "/usr/include" + ], + "defines": [ + "MAX_BUFFER_SIZE=256", + "DEBUG=1" + ], + "intelliSenseMode": "clang-x64", + "browse": { + "path": [ + "${workspaceRoot}", + "${workspaceRoot}/src", + "${workspaceRoot}/platform", + "/usr/include" + ] + }, + "limitSymbolsToIncludedHeaders": true, + "databaseFilename": "", + "compilerPath": "/usr/bin/clang", + "cStandard": "c99", + "cppStandard": "c++17" + } + ], + "version": 4 +} diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..e886cdc --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,16 @@ +{ + "files.associations": { + "stdlib.h": "c", + "mqtt_internal.h": "c", + "platform.h": "c", + "tidyplatform.h": "c", + "type_traits": "cpp", + "istream": "c", + "optional": "c", + "ostream": "c", + "ratio": "c", + "system_error": "c", + "array": "c", + "string_view": "c" + } +} diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..a81ad52 --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,44 @@ +{ + // See https://go.microsoft.com/fwlink/?LinkId=733558 + // for the documentation about the tasks.json format + "version": "2.0.0", + "tasks": [ + { + "label": "Build linux", + "type": "shell", + "command": "make", + "args": [ + "-f", + "Makefile.linux", + "clean", + "all" + ], + "options": { + "cwd": "${workspaceRoot}" + }, + "group": { + "kind": "build", + "isDefault": true + }, + "problemMatcher": [ + "$gcc" + ] + }, + { + "label": "Clean linux", + "type": "shell", + "command": "make", + "args": [ + "-f", + "Makefile.linux", + "clean" + ], + "options": { + "cwd": "${workspaceRoot}" + }, + "problemMatcher": [ + "$gcc" + ] + } + ] +} diff --git a/Makefile.linux b/Makefile.linux index 50f77b2..3c8599e 100644 --- a/Makefile.linux +++ b/Makefile.linux @@ -1,11 +1,13 @@ -SRCS=src/mqtt.c src/debug.c platform/linux.c +SRCS=src/mqtt.c src/packet.c src/protocol.c src/debug.c platform/linux.c OBJS=$(SRCS:%.c=%.o) TARGET=libmqtt.a +PLATFORM_FLAGS=-DMAX_BUFFER_SIZE=256 + AR=ar CC=clang -CFLAGS=-g -Os -Wall -pthread +CFLAGS=-g -Os -Wall -pthread -I./platform -I./src $(PLATFORM_FLAGS) all: $(TARGET) diff --git a/platform/linux.c b/platform/linux.c index 65e4bfd..a8371ed 100644 --- a/platform/linux.c +++ b/platform/linux.c @@ -1,4 +1,5 @@ #include +#include #include "platform.h" @@ -7,17 +8,21 @@ struct _PlatformData { }; void initialize_platform(MQTTHandle *handle) { - handle->platform = calloc(sizeof(struct _PlatformData)); + handle->platform = calloc(sizeof(struct _PlatformData), 1); } MQTTStatus run_read_task(MQTTHandle *handle, Reader reader) { - if (pthread_create(&handle->platform->read_thread, NULL, reader, (void *)handle)) { + if (pthread_create(&handle->platform->read_thread, NULL, (void *(*)(void *))reader, (void *)handle)) { return MQTT_STATUS_ERROR; } + + return MQTT_STATUS_OK; } MQTTStatus join_read_task(MQTTHandle *handle) { - pthread_join(handle->platform->read_thread); + pthread_join(handle->platform->read_thread, NULL); + + return MQTT_STATUS_OK; } void release_platform(MQTTHandle *handle) { diff --git a/src/buffer.h b/src/buffer.h new file mode 100644 index 0000000..5a14505 --- /dev/null +++ b/src/buffer.h @@ -0,0 +1,210 @@ +#ifndef buffer_h__included +#define buffer_h__included + +#include +#include +#include +#include + +#include "mqtt.h" + +typedef struct { + char *data; /**< Pointer to data */ + size_t len; /**< Allocated space in data */ + size_t position; /**< current cursor position in buffer */ +} Buffer; + +/** + * Copy data from current buffer position into dest + * + * This advances the internal buffer position + * + * @param src: Source buffer + * @param dest: Destination memory area + * @param len: Number of bytes to copy + * @returns: Actual number of bytes copied + */ +static inline size_t buffer_copy_out(Buffer *src, char *dest, size_t len) { + size_t sz = (len > src->len - src->position) ? src->len - src->position : len; + memcpy(dest, src->data + src->position, sz); + src->position += sz; + return sz; +} + +/** + * Copy data into the buffer + * + * This advances the internal buffer position + * + * @param src: Source memory area + * @param dest: Destination buffer + * @param len: Number of bytes to copy + * @returns: Actual number of bytes copied + */ +static inline size_t buffer_copy_in(char *src, Buffer *dest, size_t len) { + size_t sz = (len > dest->len - dest->position) ? dest->len - dest->position : len; + memcpy(dest->data + dest->position, src, sz); + dest->position += sz; + return sz; +} + +/** + * Get free space in buffer + * + * @param buffer: Buffer to check + * @returns: Number of free bytes in buffer + */ +static inline size_t buffer_free_space(Buffer *buffer) { + return buffer->len - buffer->position; +} + +/** + * Check if the internal position is at the end of the buffer + * + * @param buffer; Buffer to check + * @returns: True if end of buffer reached + */ +static inline bool buffer_eof(Buffer *buffer) { + return buffer->position == buffer->len; +} + +/** + * Reset internal position of buffer + * + * @param buffer: Buffer to reset + */ +static inline void buffer_reset(Buffer *buffer) { + buffer->position = 0; +} + +/** + * Allocate a new buffer + * + * @param len: Size of new buffer + * @returns: New buffer or NULL if out of memory + */ +static inline Buffer *buffer_allocate(size_t len) { + Buffer *buffer = (Buffer *)malloc(sizeof(Buffer)); + if (buffer == NULL) { + return NULL; + } + buffer->len = len; + buffer->position = 0; + buffer->data = (char *)calloc(1, len); + if (buffer->data == NULL) { + free(buffer); + return NULL; + } + + return buffer; +} + +/** + * Re-allocate buffer size + * + * @param buffer: Buffer to modify + * @param len: Size of new buffer + * @returns: Modified buffer if realloc did work, check if buffer->len == len to verify + */ +static inline Buffer *buffer_reallocate(Buffer *buffer, size_t len) { + char *new_buffer = (char *)realloc(buffer->data, len); + if (new_buffer != NULL) { + buffer->data = new_buffer; + buffer->len = len; + } + return buffer; +} + +/** + * Create a new buffer from a memory area and a size + * + * @param data: Memory area + * @param len: Length of memory area + * @returns: New Buffer + * + * @attention: the data pointer will be owned by the buffer and freed with it! + */ +static inline Buffer *buffer_from_data_no_copy(char *data, size_t len) { + Buffer *buffer = (Buffer *)malloc(sizeof(Buffer)); + buffer->len = len; + buffer->data = data; + buffer->position = 0; + + return buffer; +} + +/** + * Create a new buffer and copy the data + * + * @param data: Data to copy into the buffer + * @param len: Number of bytes to copy + * @returns: New Buffer + */ +static inline Buffer *buffer_from_data_copy(char *data, size_t len) { + Buffer *buffer = buffer_allocate(len); + (void)buffer_copy_in(data, buffer, len); + return buffer; +} + +/** + * Release a buffer + * + * @param buffer: Buffer to release + */ +static inline void buffer_release(Buffer *buffer) { + free(buffer->data); + buffer->data = NULL; + free(buffer); +} + +/** + * Append data to a buffer + * + * @param buffer: Buffer to append data to + * @param data: Memory area to copy to the end of the buffer + * @param len: Number of bytes to copy + * @returns: Numbr of bytes copied + * + * @attention: May come up short if the destination buffer has to be reallocated and + * that reallocation fails + */ +static inline size_t buffer_append_data(Buffer *buffer, char *data, size_t len) { + size_t num_bytes = buffer_copy_in(data, buffer, len); + if (num_bytes != len) { + // reallocate + (void)buffer_reallocate(buffer, buffer->len + (len - num_bytes)); + if (buffer_eof(buffer)) { + // reallocation failed + return num_bytes; + } + (void)buffer_copy_in(data + num_bytes, buffer, (len - num_bytes)); + } + + return len; +} + +/** + * Append a buffer to another buffer + * + * @param dest: Destination buffer + * @param src: Source buffer to append + * @returns: Number of bytes copied + * + * @attention: May come up short if the destination buffer has to be reallocated and + * that reallocation fails + */ +static inline size_t buffer_append_buffer(Buffer *dest, Buffer *src) { + return buffer_append_data(dest, src->data, src->len); +} + +#if DEBUG +#include "debug.h" + +static inline void buffer_hexdump(Buffer *buffer) { + hexdump(buffer->data, buffer->len); +} +#else +#define buffer_hexdump(_buffer) /* */ +#endif + +#endif /* buffer_h__included */ diff --git a/src/debug.c b/src/debug.c index e69de29..85ce8e7 100644 --- a/src/debug.c +++ b/src/debug.c @@ -0,0 +1,30 @@ +#include "stdio.h" + +#include "debug.h" + +void hexdump(char *data, size_t len) { + for (int i = 0; i < len;) { + for (int col = 0; col < 16; col++) { + if (i + col < len) { + fprintf(stdout, "%02x ", data[i + col]); + } else { + fprintf(stdout, " "); + } + } + + fprintf(stdout, " | "); + + for (int col = 0; col < 16; col++) { + if (i + col < len) { + char c = data[i + col]; + if ((c > 127) || (c < 32)) c = '.'; + fprintf(stdout, "%c", c); + } else { + fprintf(stdout, " "); + } + } + + fprintf(stdout, "\n"); + i += 16; + } +} diff --git a/src/debug.h b/src/debug.h index e69de29..c6a0e68 100644 --- a/src/debug.h +++ b/src/debug.h @@ -0,0 +1,8 @@ +#ifndef debug_h__included +#define debug_h__included + +#include + +void hexdump(char *data, size_t len); + +#endif /* debug_h__included */ diff --git a/src/mqtt.c b/src/mqtt.c index 66fc699..ebed082 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -5,25 +5,34 @@ #include #include #include +#include #include "mqtt.h" #include "mqtt_internal.h" #include "platform.h" -// TODO: Make configurable by platform -#define BUF_LEN 256 +#define BUF_LEN MAX_BUFFER_SIZE static void _reader(MQTTHandle *handle) { int num_bytes; char buffer[BUF_LEN]; + handle->reader_alive = true; + while (1) { num_bytes = read(handle->sock, &buffer, BUF_LEN); if (num_bytes == 0) { - /* Socket closed */ + /* Socket closed, coordinated shutdown */ + handle->reader_alive = false; return; } else if (num_bytes < 0) { - /* Error, TODO: Handle */ + if ((errno == EINTR) || (errno == EAGAIN)) { + continue; + } + + /* Set reader task to dead */ + handle->reader_alive = false; + return; } // TODO: Parse and dispatch diff --git a/src/mqtt.h b/src/mqtt.h index e284793..eadbfe8 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -2,6 +2,7 @@ #define mqtt_h__included #include +#include typedef struct _MQTTHandle MQTTHandle; diff --git a/src/mqtt_internal.h b/src/mqtt_internal.h index fa5e565..1157e05 100644 --- a/src/mqtt_internal.h +++ b/src/mqtt_internal.h @@ -17,6 +17,7 @@ struct _MQTTHandle { int num_subscriptions; int sock; + bool reader_alive; // TODO: status queue (Waiting for ACK) PlatformData *platform; diff --git a/src/packet.c b/src/packet.c new file mode 100644 index 0000000..2d68a3d --- /dev/null +++ b/src/packet.c @@ -0,0 +1,533 @@ +#include + +#include "packet.h" + +/* + * Helper functionality + */ + +MQTTPacket *allocate_MQTTPacket(MQTTControlPacketType type) { + MQTTPacket *packet = calloc(1, sizeof(MQTTPacket)); + packet->packet_type = type; + + switch (type) { + case PacketTypeConnect: + packet->payload = calloc(1, sizeof(ConnectPayload)); + break; + case PacketTypeConnAck: + packet->payload = calloc(1, sizeof(ConnAckPayload)); + break; + case PacketTypePublish: + packet->payload = calloc(1, sizeof(PublishPayload)); + break; + case PacketTypePubAck: + packet->payload = calloc(1, sizeof(PubAckPayload)); + break; + case PacketTypePubRec: + packet->payload = calloc(1, sizeof(PubRecPayload)); + break; + case PacketTypePubRel: + packet->payload = calloc(1, sizeof(PubRelPayload)); + break; + case PacketTypePubComp: + packet->payload = calloc(1, sizeof(PubCompPayload)); + break; + case PacketTypeSubscribe: + packet->payload = calloc(1, sizeof(SubscribePayload)); + break; + case PacketTypeSubAck: + packet->payload = calloc(1, sizeof(SubAckPayload)); + break; + case PacketTypeUnsubscribe: + packet->payload = calloc(1, sizeof(UnsubscribePayload)); + break; + case PacketTypeUnsubAck: + packet->payload = calloc(1, sizeof(UnsubAckPayload)); + break; + case PacketTypePingReq: + case PacketTypePingResp: + case PacketTypeDisconnect: + packet->payload = NULL; + break; + } + + return packet; +} + +void free_MQTTPacket(MQTTPacket *packet) { + free(packet->payload); + packet->payload = NULL; + free(packet); +} + + +uint16_t variable_length_int_decode(Buffer *buffer) { + uint16_t result = 0; + while (buffer->data[buffer->position] & 0x80) { + result *= 128; + result += buffer->data[buffer->position] & 0x7f; + buffer->position++; + if (buffer_eof(buffer)) { + break; // bail out, buffer exhausted + } + } + + return result; +} + +char *utf8_string_decode(Buffer *buffer) { + char *result; + + if (buffer_free_space(buffer) < 2) { + return NULL; // buffer too small + } + uint16_t sz = (buffer->data[buffer->position] << 8) + buffer->data[buffer->position + 1]; + if (buffer_free_space(buffer) < sz + 2) { + return NULL; // incomplete buffer + } + buffer->position += 2; + + result = malloc(sz); + buffer_copy_out(buffer, result, sz); + return result; +} + +size_t variable_length_int_encode(uint16_t value, Buffer *buffer) { + if (value == 0) { + buffer->data[buffer->position] = 0; + buffer->position++; + return 1; + } + + size_t len = 0; + while (value > 0) { + if (buffer->position + len > buffer->len) { + buffer->position += len - 1; + return len - 1; // bail out, buffer exhausted + } + buffer->data[buffer->position + len] = value % 128; + value = value / 128; + if (value > 0){ + buffer->data[buffer->position + len] |= 0x80; + } + len++; + } + buffer->position += len; + return len; +} + +size_t variable_length_int_size(uint16_t value) { + if (value == 0) { + return 1; + } + + size_t len = 0; + while (value > 0) { + value = value / 128; + len++; + } + return len; +} + +size_t utf8_string_encode(char *string, Buffer *buffer) { + size_t len = 0; + + if (string != NULL) { + len = strlen(string); + } + + if ((len > UINT16_MAX) || (buffer_free_space(buffer) < len + 2)) { + return 0; // bail out, buffer too small + } + + buffer->data[buffer->position] = (len & 0xff00) >> 8; + buffer->data[buffer->position + 1] = (len & 0xff); + buffer->position += 2; + + if (string != NULL) { + (void)buffer_copy_in(string, buffer, len); + } + + return len + 2; +} + +/* + * Decoder + */ + +ConnectPayload *decode_connect(Buffer *buffer) { + +} + +ConnAckPayload *decode_connack(Buffer *buffer) { + +} + +PublishPayload *decode_publish(Buffer *buffer) { + +} + +PubAckPayload *decode_puback(Buffer *buffer) { + +} + +PubRecPayload *decode_pubrec(Buffer *buffer) { + +} + +PubRelPayload *decode_pubrel(Buffer *buffer) { + +} + +PubCompPayload *decode_pubcomp(Buffer *buffer) { + +} + +SubscribePayload *decode_subscribe(Buffer *buffer) { + +} + +SubAckPayload *decode_suback(Buffer *buffer) { + +} + +UnsubscribePayload *decode_unsubscribe(Buffer *buffer) { + +} + +UnsubAckPayload *decode_unsuback(Buffer *buffer) { + +} + +int decode_pingreq(Buffer *buffer) { + +} + +int decode_pingresp(Buffer *buffer) { + +} + +int decode_disconnect(Buffer *buffer) { + +} + +MQTTPacket *mqtt_packet_decode(Buffer *buffer) { + MQTTControlPacketType type = (buffer->data[0] & 0xf0) >> 4; + MQTTPacket *result =allocate_MQTTPacket(type); + + switch (type) { + case PacketTypeConnect: + result->payload = (void *)decode_connect(buffer); + break; + case PacketTypeConnAck: + result->payload = (void *)decode_connack(buffer); + break; + case PacketTypePublish: + result->payload = (void *)decode_publish(buffer); + break; + case PacketTypePubAck: + result->payload = (void *)decode_puback(buffer); + break; + case PacketTypePubRec: + result->payload = (void *)decode_pubrec(buffer); + break; + case PacketTypePubRel: + result->payload = (void *)decode_pubrel(buffer); + break; + case PacketTypePubComp: + result->payload = (void *)decode_pubcomp(buffer); + break; + case PacketTypeSubscribe: + result->payload = (void *)decode_subscribe(buffer); + break; + case PacketTypeSubAck: + result->payload = (void *)decode_suback(buffer); + break; + case PacketTypeUnsubscribe: + result->payload = (void *)decode_unsubscribe(buffer); + break; + case PacketTypeUnsubAck: + result->payload = (void *)decode_unsuback(buffer); + break; + case PacketTypePingReq: + result->payload = (void *)decode_pingreq(buffer); + break; + case PacketTypePingResp: + result->payload = (void *)decode_pingresp(buffer); + break; + case PacketTypeDisconnect: + result->payload = (void *)decode_disconnect(buffer); + break; + } + + return result; +} + +/* + * Encoder + */ + +Buffer *make_buffer_for_header(size_t sz, MQTTControlPacketType type) { + sz += variable_length_int_size(sz); // size field + sz += 1; // packet type and flags + + Buffer *buffer = buffer_allocate(sz); + buffer->data[0] = type << 4; + buffer->position += 1; + variable_length_int_encode(sz - 2, buffer); + + return buffer; +} + +Buffer *encode_connect(ConnectPayload *payload) { + size_t sz = 10 /* variable header */; + sz += strlen(payload->client_id) + 2; + if (payload->will_topic) { + sz += strlen(payload->will_topic) + 2; + sz += strlen(payload->will_message) + 2; + } + if (payload->username) { + sz += strlen(payload->username) + 2; + } + if (payload->password) { + sz += strlen(payload->password) + 2; + } + + Buffer *buffer = make_buffer_for_header(sz, PacketTypeConnect); + + // variable header + utf8_string_encode("MQTT", buffer); + char *p = buffer->data + buffer->position; + + *(p++) = payload->protocol_level; + + uint8_t flags = ( + ((payload->username) ? (1 << 7) : 0) + + ((payload->password) ? (1 << 6) : 0) + + ((payload->retain_will) ? (1 << 5) : 0) + + ((payload->will_topic) ? (payload->will_qos << 3) : 0) + + ((payload->will_topic) ? (1 << 2) : 0) + + ((payload->clean_session) ? (1 << 1) : 0) + ); + *(p++) = flags; + *(p++) = (payload->keepalive_interval & 0xff00) >> 8; + *(p++) = (payload->keepalive_interval & 0xff); + + buffer->position += 4; + + // payload + utf8_string_encode(payload->client_id, buffer); + if (payload->will_topic) { + utf8_string_encode(payload->will_topic, buffer); + utf8_string_encode(payload->will_message, buffer); + } + if (payload->username) { + utf8_string_encode(payload->username, buffer); + } + if (payload->password) { + utf8_string_encode(payload->password, buffer); + } + + assert(buffer_eof(buffer)); + return buffer; +} + +Buffer *encode_connack(ConnAckPayload *payload) { + size_t sz = 2; // session flag and status + + Buffer *buffer = make_buffer_for_header(sz, PacketTypeConnAck); + buffer->data[buffer->position++] = payload->session_present; + buffer->data[buffer->position++] = payload->status; + + assert(buffer_eof(buffer)); + return buffer; +} + +Buffer *encode_publish(PublishPayload *payload) { + size_t sz = 0; + sz += strlen(payload->topic) + 2; // topic + sz += 2; // packet id + sz += strlen(payload->message); + + Buffer *buffer = make_buffer_for_header(sz, PacketTypePublish); + + // Variable header + utf8_string_encode(payload->topic, buffer); + buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8; + buffer->data[buffer->position++] = (payload->packet_id & 0xff); + + // Payload + buffer_copy_in(payload->message, buffer, strlen(payload->message)); + + assert(buffer_eof(buffer)); + return buffer; +} + +Buffer *encode_puback(PubAckPayload *payload) { + size_t sz = 2; // packet id + + Buffer *buffer = make_buffer_for_header(sz, PacketTypePubAck); + + // Variable header + buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8; + buffer->data[buffer->position++] = (payload->packet_id & 0xff); + + assert(buffer_eof(buffer)); + return buffer; +} + +Buffer *encode_pubrec(PubRecPayload *payload) { + size_t sz = 2; // packet id + + Buffer *buffer = make_buffer_for_header(sz, PacketTypePubRec); + + // Variable header + buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8; + buffer->data[buffer->position++] = (payload->packet_id & 0xff); + + assert(buffer_eof(buffer)); + return buffer; +} + +Buffer *encode_pubrel(PubRelPayload *payload) { + size_t sz = 2; // packet id + + Buffer *buffer = make_buffer_for_header(sz, PacketTypePubRel); + + // Variable header + buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8; + buffer->data[buffer->position++] = (payload->packet_id & 0xff); + + assert(buffer_eof(buffer)); + return buffer; +} + +Buffer *encode_pubcomp(PubCompPayload *payload) { + size_t sz = 2; // packet id + + Buffer *buffer = make_buffer_for_header(sz, PacketTypePubComp); + + // Variable header + buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8; + buffer->data[buffer->position++] = (payload->packet_id & 0xff); + + assert(buffer_eof(buffer)); + return buffer; +} + +Buffer *encode_subscribe(SubscribePayload *payload) { + size_t sz = 2; // packet id + sz += strlen(payload->topic); // topic + sz += 1; // qos level + + Buffer *buffer = make_buffer_for_header(sz, PacketTypeSubscribe); + + // Variable header + buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8; + buffer->data[buffer->position++] = (payload->packet_id & 0xff); + + // Payload + utf8_string_encode(payload->topic, buffer); + buffer->data[buffer->position++] = payload->qos; + + assert(buffer_eof(buffer)); + return buffer; +} + +Buffer *encode_suback(SubAckPayload *payload) { + size_t sz = 2; // packet id + sz += 1; // Status code + + Buffer *buffer = make_buffer_for_header(sz, PacketTypeSubAck); + + // Variable header + buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8; + buffer->data[buffer->position++] = (payload->packet_id & 0xff); + + // Payload + buffer->data[buffer->position++] = payload->status; + + assert(buffer_eof(buffer)); + return buffer; +} + +Buffer *encode_unsubscribe(UnsubscribePayload *payload) { + size_t sz = 2; // packet id + sz += strlen(payload->topic); // topic + + Buffer *buffer = make_buffer_for_header(sz, PacketTypeUnsubscribe); + + // Variable header + buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8; + buffer->data[buffer->position++] = (payload->packet_id & 0xff); + + // Payload + utf8_string_encode(payload->topic, buffer); + + assert(buffer_eof(buffer)); + return buffer; +} + +Buffer *encode_unsuback(UnsubAckPayload *payload) { + size_t sz = 2; // packet id + + Buffer *buffer = make_buffer_for_header(sz, PacketTypeUnsubAck); + + // Variable header + buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8; + buffer->data[buffer->position++] = (payload->packet_id & 0xff); + + assert(buffer_eof(buffer)); + return buffer; +} + +Buffer *encode_pingreq() { + Buffer *buffer = make_buffer_for_header(0, PacketTypePingReq); + assert(buffer_eof(buffer)); + return buffer; +} + +Buffer *encode_pingresp() { + Buffer *buffer = make_buffer_for_header(0, PacketTypePingResp); + assert(buffer_eof(buffer)); + return buffer; +} + +Buffer *encode_disconnect() { + Buffer *buffer = make_buffer_for_header(0, PacketTypeDisconnect); + assert(buffer_eof(buffer)); + return buffer; +} + +Buffer *mqtt_packet_encode(MQTTPacket *packet) { + switch (packet->packet_type) { + case PacketTypeConnect: + return encode_connect((ConnectPayload *)packet->payload); + case PacketTypeConnAck: + return encode_connack((ConnAckPayload *)packet->payload); + case PacketTypePublish: + return encode_publish((PublishPayload *)packet->payload); + case PacketTypePubAck: + return encode_puback((PubAckPayload *)packet->payload); + case PacketTypePubRec: + return encode_pubrec((PubRecPayload *)packet->payload); + case PacketTypePubRel: + return encode_pubrel((PubRelPayload *)packet->payload); + case PacketTypePubComp: + return encode_pubcomp((PubCompPayload *)packet->payload); + case PacketTypeSubscribe: + return encode_subscribe((SubscribePayload *)packet->payload); + case PacketTypeSubAck: + return encode_suback((SubAckPayload *)packet->payload); + case PacketTypeUnsubscribe: + return encode_unsubscribe((UnsubscribePayload *)packet->payload); + case PacketTypeUnsubAck: + return encode_unsuback((UnsubAckPayload *)packet->payload); + case PacketTypePingReq: + return encode_pingreq(); + case PacketTypePingResp: + return encode_pingresp(); + case PacketTypeDisconnect: + return encode_disconnect(); + } +} diff --git a/src/packet.h b/src/packet.h new file mode 100644 index 0000000..d753c72 --- /dev/null +++ b/src/packet.h @@ -0,0 +1,155 @@ +#ifndef packet_h__included +#define packet_h__included + +#include +#include +#include + +#include "mqtt.h" +#include "buffer.h" + +typedef enum { + PacketTypeConnect = 1, + PacketTypeConnAck = 2, + PacketTypePublish = 3, + PacketTypePubAck = 4, + PacketTypePubRec = 5, + PacketTypePubRel = 6, + PacketTypePubComp = 7, + PacketTypeSubscribe = 8, + PacketTypeSubAck = 9, + PacketTypeUnsubscribe = 10, + PacketTypeUnsubAck = 11, + PacketTypePingReq = 12, + PacketTypePingResp = 13, + PacketTypeDisconnect = 14 +} MQTTControlPacketType; + +typedef struct { + // required: + char *client_id; + int protocol_level; + uint16_t keepalive_interval; + + // optional + char *username; + char *password; + char *will_topic; + char *will_message; + MQTTQosLevel will_qos; + bool retain_will; + bool clean_session; +} ConnectPayload; + +typedef enum { + ConnAckStatusAccepted = 0, /**< Connection accepted */ + ConnAckStatusInvalidProtocolLevel = 1, /**< Protocol level not supported */ + ConnAckStatusInvalidIdentifier = 2, /**< Client ID not accepted */ + ConnAckStatusServerUnavailable = 3, /**< Server restarting or too many clients */ + ConnAckStatusAuthenticationError = 4, /**< missing username/password */ + ConnAckStatusNotAuthorized = 5 /**< not authorized */ +} ConnAckStatus; + +typedef struct { + bool session_present; + ConnAckStatus status; +} ConnAckPayload; + +typedef struct { + bool duplicate; + MQTTQosLevel qos; + bool retain; + + char *topic; + uint16_t packet_id; + + char *message; +} PublishPayload; + +typedef struct { + uint16_t packet_id; +} PubAckPayload; + +typedef struct { + uint16_t packet_id; +} PubRecPayload; + +typedef struct { + uint16_t packet_id; +} PubRelPayload; + +typedef struct { + uint16_t packet_id; +} PubCompPayload; + +typedef struct { + uint16_t packet_id; + char *topic; + MQTTQosLevel qos; +} SubscribePayload; + +typedef enum { + SubAckStatusQoS0 = 0, + SubAckStatusQoS1 = 1, + SubAckStatusQoS2 = 2, + SubAckFailure = 0x80 +} SubAckStatus; + +typedef struct { + uint16_t packet_id; + SubAckStatus status; +} SubAckPayload; + +typedef struct { + uint16_t packet_id; + char *topic; +} UnsubscribePayload; + +typedef struct { + uint16_t packet_id; +} UnsubAckPayload; + +typedef struct { + MQTTControlPacketType packet_type; + void *payload; +} MQTTPacket; + +/* + * Decoder + */ +MQTTPacket *mqtt_packet_decode(Buffer *buffer); +void free_MQTTPacket(MQTTPacket *packet); + +/* + * Encoder + */ + +Buffer *mqtt_packet_encode(MQTTPacket *packet); +MQTTPacket *allocate_MQTTPacket(MQTTControlPacketType type); + + +/* + * Internal + */ + +size_t variable_length_int_encode(uint16_t value, Buffer *buffer); +size_t variable_length_int_size(uint16_t value); +size_t utf8_string_encode(char *string, Buffer *buffer); + +Buffer *make_buffer_for_header(size_t sz, MQTTControlPacketType type); +Buffer *encode_connect(ConnectPayload *payload); +Buffer *encode_connack(ConnAckPayload *payload); +Buffer *encode_publish(PublishPayload *payload); +Buffer *encode_puback(PubAckPayload *payload); +Buffer *encode_pubrec(PubRecPayload *payload); +Buffer *encode_pubrel(PubRelPayload *payload); +Buffer *encode_pubcomp(PubCompPayload *payload); +Buffer *encode_subscribe(SubscribePayload *payload); +Buffer *encode_suback(SubAckPayload *payload); +Buffer *encode_unsubscribe(UnsubscribePayload *payload); +Buffer *encode_unsuback(UnsubAckPayload *payload); +Buffer *encode_pingreq(); +Buffer *encode_pingresp(); +Buffer *encode_disconnect(); + +#endif /* packet_h__included */ diff --git a/src/protocol.c b/src/protocol.c new file mode 100644 index 0000000..e69de29 diff --git a/src/protocol.h b/src/protocol.h new file mode 100644 index 0000000..e69de29 diff --git a/tests/Makefile b/tests/Makefile new file mode 100644 index 0000000..c0948ac --- /dev/null +++ b/tests/Makefile @@ -0,0 +1,29 @@ +SRCS=encode_packet.c +OBJS=$(SRCS:%.c=%.o) +TARGETS=$(SRCS:%.c=%.test) + +CC=clang +CFLAGS=-g -Os -Wall -I.. -I../src -I../platform -DDEBUG=1 +# -DTIMETRIAL +LDFLAGS= +LIBS=-L.. -lmqtt + +all: $(TARGETS) + +%.test: %.o cputime.o + $(CC) $(LDFLAGS) -o $@ cputime.o $< $(LIBS) + ./$@ + rm $@ + +%.o: %.c test.h + $(CC) $(CFLAGS) -o $@ -c $< + +%.e: %.c test.h + $(CC) $(CFLAGS) -E -o $@ -c $< + less $@ + rm $@ + +clean: + rm -f $(TARGETS) + rm -f $(OBJS) + rm -f *.e diff --git a/tests/cputime.c b/tests/cputime.c new file mode 100644 index 0000000..f37cf85 --- /dev/null +++ b/tests/cputime.c @@ -0,0 +1,98 @@ +/* + * Author: David Robert Nadeau + * Site: http://NadeauSoftware.com/ + * License: Creative Commons Attribution 3.0 Unported License + * http://creativecommons.org/licenses/by/3.0/deed.en_US + */ +#if defined(_WIN32) +#include + +#elif defined(__unix__) || defined(__unix) || defined(unix) || (defined(__APPLE__) && defined(__MACH__)) +#include +#include +#include +#include + +#else +#error "Unable to define getCPUTime( ) for an unknown OS." +#endif + +/** + * Returns the amount of CPU time used by the current process, + * in seconds, or -1.0 if an error occurred. + */ +double getCPUTime(void) { +#if defined(_WIN32) + /* Windows -------------------------------------------------- */ + FILETIME createTime; + FILETIME exitTime; + FILETIME kernelTime; + FILETIME userTime; + if ( GetProcessTimes( GetCurrentProcess( ), + &createTime, &exitTime, &kernelTime, &userTime ) != -1 ) + { + SYSTEMTIME userSystemTime; + if ( FileTimeToSystemTime( &userTime, &userSystemTime ) != -1 ) + return (double)userSystemTime.wHour * 3600.0 + + (double)userSystemTime.wMinute * 60.0 + + (double)userSystemTime.wSecond + + (double)userSystemTime.wMilliseconds / 1000.0; + } + +#elif defined(__unix__) || defined(__unix) || defined(unix) || (defined(__APPLE__) && defined(__MACH__)) + /* AIX, BSD, Cygwin, HP-UX, Linux, OSX, and Solaris --------- */ + +#if defined(_POSIX_TIMERS) && (_POSIX_TIMERS > 0) + /* Prefer high-res POSIX timers, when available. */ + { + clockid_t id; + struct timespec ts; +#if _POSIX_CPUTIME > 0 + /* Clock ids vary by OS. Query the id, if possible. */ + if ( clock_getcpuclockid( 0, &id ) == -1 ) +#endif +#if defined(CLOCK_PROCESS_CPUTIME_ID) + /* Use known clock id for AIX, Linux, or Solaris. */ + id = CLOCK_PROCESS_CPUTIME_ID; +#elif defined(CLOCK_VIRTUAL) + /* Use known clock id for BSD or HP-UX. */ + id = CLOCK_VIRTUAL; +#else + id = (clockid_t)-1; +#endif + if ( id != (clockid_t)-1 && clock_gettime( id, &ts ) != -1 ) + return (double)ts.tv_sec + + (double)ts.tv_nsec / 1000000000.0; + } +#endif + +#if defined(RUSAGE_SELF) + { + struct rusage rusage; + if ( getrusage( RUSAGE_SELF, &rusage ) != -1 ) + return (double)rusage.ru_utime.tv_sec + + (double)rusage.ru_utime.tv_usec / 1000000.0; + } +#endif + +#if defined(_SC_CLK_TCK) + { + const double ticks = (double)sysconf( _SC_CLK_TCK ); + struct tms tms; + if ( times( &tms ) != (clock_t)-1 ) + return (double)tms.tms_utime / ticks; + } +#endif + +#if defined(CLOCKS_PER_SEC) + { + clock_t cl = clock( ); + if ( cl != (clock_t)-1 ) + return (double)cl / (double)CLOCKS_PER_SEC; + } +#endif + +#endif + + return -1; /* Failed. */ +} \ No newline at end of file diff --git a/tests/cputime.h b/tests/cputime.h new file mode 100644 index 0000000..5785832 --- /dev/null +++ b/tests/cputime.h @@ -0,0 +1,20 @@ +/* + * Author: David Robert Nadeau + * Site: http://NadeauSoftware.com/ + * License: Creative Commons Attribution 3.0 Unported License + * http://creativecommons.org/licenses/by/3.0/deed.en_US + */ + +double getCPUTime(void); + +/* Example usage: + * ============== + * + * double startTime, endTime; + * + * startTime = getCPUTime( ); + * ... + * endTime = getCPUTime( ); + * + * fprintf( stderr, "CPU time used = %lf\n", (endTime - startTime) ); + */ \ No newline at end of file diff --git a/tests/encode_packet.c b/tests/encode_packet.c new file mode 100644 index 0000000..c8826a9 --- /dev/null +++ b/tests/encode_packet.c @@ -0,0 +1,209 @@ +#include "test.h" +#include "packet.h" + +// Variable length int length calculation +TestResult test_vl_int_0(void) { + TESTASSERT(variable_length_int_size(0) == 1, "Values < 128 should fit in one byte"); +} + +TestResult test_vl_int_127(void) { + TESTASSERT(variable_length_int_size(127) == 1, "Values < 128 should fit in one byte"); +} + +TestResult test_vl_int_128(void) { + TESTASSERT(variable_length_int_size(128) == 2, "Values < 16384 should fit in two bytes"); +} + +TestResult test_vl_int_16383(void) { + TESTASSERT(variable_length_int_size(16383) == 2, "Values < 16384 should fit in two bytes"); +} + +TestResult test_vl_int_16384(void) { + TESTASSERT(variable_length_int_size(16384) == 3, "Values < 2097151 should fit in three bytes"); +} + +// Variable length int data check + +TestResult test_vl_int_data_0(void) { + char data[] = { 0 }; + Buffer *buffer = buffer_allocate(5); + variable_length_int_encode(0, buffer); + + TESTASSERT( + ( + (buffer->position == 1) + && + (memcmp(buffer->data, data, 1) == 0) + ), + "Variable length int of 0 should result in [0x00]"); +} + +TestResult test_vl_int_data_127(void) { + char data[] = { 127 }; + Buffer *buffer = buffer_allocate(5); + variable_length_int_encode(127, buffer); + + TESTASSERT( + ( + (buffer->position == 1) + && + (memcmp(buffer->data, data, 1) == 0) + ), + "Variable length int of 127 should result in [0x7f]"); +} + +TestResult test_vl_int_data_128(void) { + char data[] = { 0x80, 0x01 }; + Buffer *buffer = buffer_allocate(5); + variable_length_int_encode(128, buffer); + + TESTASSERT( + ( + (buffer->position == 2) + && + (memcmp(buffer->data, data, 2) == 0) + ), + "Variable length int of 128 should result in [0x80, 0x01]"); +} + +TestResult test_vl_int_data_16383(void) { + char data[] = { 0xff, 0x7f }; + Buffer *buffer = buffer_allocate(5); + variable_length_int_encode(16383, buffer); + + TESTASSERT( + ( + (buffer->position == 2) + && + (memcmp(buffer->data, data, 2) == 0) + ), + "Variable length int of 16383 should result in [0xff, 0x7f]"); +} + +TestResult test_vl_int_data_16384(void) { + char data[] = { 0x80, 0x80, 0x01 }; + Buffer *buffer = buffer_allocate(5); + variable_length_int_encode(16384, buffer); + + TESTASSERT( + ( + (buffer->position == 3) + && + (memcmp(buffer->data, data, 3) == 0) + ), + "Variable length int of 16384 should result in [0x80, 0x80, 0x01]"); +} + +TestResult test_vl_int_data_32767(void) { + char data[] = { 0xff, 0xff, 0x01 }; + Buffer *buffer = buffer_allocate(5); + variable_length_int_encode(32767, buffer); + + TESTASSERT( + ( + (buffer->position == 3) + && + (memcmp(buffer->data, data, 3) == 0) + ), + "Variable length int of 32767 should result in [0xff, 0xff, 0x01]"); +} + +// UTF-8 String encoding + +TestResult test_utf8_string_null(void) { + char data[] = { 0x00, 0x00 }; + Buffer *buffer = buffer_allocate(5); + size_t sz = utf8_string_encode(NULL, buffer); + TESTASSERT( + ( + (buffer->position == 2) + && + (sz == 2) + && + (memcmp(buffer->data, data, 2) == 0) + ), + "NULL String should result in [0x00, 0x00]"); +} + +TestResult test_utf8_string_empty(void) { + char data[] = { 0x00, 0x00 }; + Buffer *buffer = buffer_allocate(5); + size_t sz = utf8_string_encode("", buffer); + TESTASSERT( + ( + (buffer->position == 2) + && + (sz == 2) + && + (memcmp(buffer->data, data, 2) == 0) + ), + "Empty String should result in [0x00, 0x00]"); +} + +TestResult test_utf8_string_hello(void) { + char data[] = { 0x00, 0x05, 'h', 'e', 'l', 'l', 'o' }; + Buffer *buffer = buffer_allocate(10); + size_t sz = utf8_string_encode("hello", buffer); + TESTASSERT( + ( + (buffer->position == 7) + && + (sz == 7) + && + (memcmp(buffer->data, data, 7) == 0) + ), + "\"hello\" String should result in [0x00, 0x05, 'h', 'e', 'l', 'l', 'o']"); +} + +// make header +TestResult test_make_header(void) { + char data[] = { 0x10, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00 }; + Buffer *buffer = make_buffer_for_header(5, PacketTypeConnect); + + TESTASSERT( + ( + (buffer->len == 7) + && + (memcmp(buffer->data, data, 7) == 0) + && + (buffer->position == 2) + ), + "Header should be valid"); +} + +TestResult not_implemented(void) { + TESTRESULT(TestStatusSkipped, "Not implemented"); +} + + +TESTS( + TEST("Variable length int size for 0", test_vl_int_0), + TEST("Variable length int size for 127", test_vl_int_127), + TEST("Variable length int size for 128", test_vl_int_128), + TEST("Variable length int size for 16383", test_vl_int_16383), + TEST("Variable length int size for 16384", test_vl_int_16384), + TEST("Variable length int data for 0", test_vl_int_data_0), + TEST("Variable length int data for 127", test_vl_int_data_127), + TEST("Variable length int data for 128", test_vl_int_data_128), + TEST("Variable length int data for 16383", test_vl_int_data_16383), + TEST("Variable length int data for 16384", test_vl_int_data_16384), + TEST("Variable length int data for 32767", test_vl_int_data_32767), + TEST("UTF-8 string encode NULL", test_utf8_string_null), + TEST("UTF-8 string encode empty string", test_utf8_string_empty), + TEST("UTF-8 string encode \"hello\"", test_utf8_string_hello), + TEST("Make header", test_make_header), + TEST("Encode Connect", not_implemented), + TEST("Encode ConnAck", not_implemented), + TEST("Encode Publish", not_implemented), + TEST("Encode PubAck", not_implemented), + TEST("Encode PubRec", not_implemented), + TEST("Encode PubRel", not_implemented), + TEST("Encode PubComp", not_implemented), + TEST("Encode Subscribe", not_implemented), + TEST("Encode SubAck", not_implemented), + TEST("Encode Unsubscribe", not_implemented), + TEST("Encode UnsubAck", not_implemented), + TEST("Encode PingReq", not_implemented), + TEST("Encode PingResp", not_implemented), + TEST("Encode Disconnect", not_implemented) +); diff --git a/tests/test.h b/tests/test.h new file mode 100644 index 0000000..13d8508 --- /dev/null +++ b/tests/test.h @@ -0,0 +1,97 @@ +#include +#include +#include +#include +#include + +#include + +#include "cputime.h" + +typedef enum { + TestStatusSkipped = 0, + TestStatusFailure, + TestStatusOk, +} TestStatus; + +typedef struct { + TestStatus status; + char *message; +} TestResult; + +typedef TestResult (*TestPointer)(void); + +typedef struct { + char *name; + TestPointer run; + char *file; + int line; +} DefinedTest; + +extern DefinedTest defined_tests[]; + +#define TEST(_name, _function) (DefinedTest){ _name, _function, __FILE__, __LINE__ } + +#define TESTS(...) \ + DefinedTest defined_tests[] = { \ + __VA_ARGS__, \ + TEST(NULL, NULL) \ + } + +#define TESTRESULT(_status, _message) return (TestResult){ _status, _message } + +#ifdef TIMETRIAL +# define TESTASSERT(_assertion, _message) return (TestResult){ (_assertion) ? TestStatusOk : TestStatusFailure, NULL } +#else +# define TESTASSERT(_assertion, _message) return (TestResult){ (_assertion) ? TestStatusOk : TestStatusFailure, _message } +#endif + +void timetrial(DefinedTest *test); + +int main(int argc, char **argv) { + for(DefinedTest *test = defined_tests; test->run != NULL; test++) { + TestResult result = test->run(); + switch (result.status) { + case TestStatusOk: + fprintf(stdout, "info: Test %s suceeded ", test->name); + #ifdef TIMETRIAL + timetrial(test); + #else + fprintf(stdout, "\n"); + #endif + break; + case TestStatusSkipped: + fprintf(stderr, "%s:%d: warning: Skipped test %s\n", test->file, test->line, test->name); + if (result.message) { + fprintf(stderr, " -> %s\n", result.message); + } + break; + case TestStatusFailure: + fprintf(stderr, "%s:%d: error: Test %s failed\n", test->file, test->line, test->name); + if (result.message) { + fprintf(stderr, " -> %s\n", result.message); + } + return 1; + break; + } + } + + return 0; +} + +#ifdef TIMETRIAL +#define ITER 10000000 +void timetrial(DefinedTest *test) { + double start, end; + + start = getCPUTime(); + for(uint64_t i = 0; i < ITER; i++) { + volatile TestResult result = test->run(); + (void)result; + } + end = getCPUTime(); + + double time = (end - start) * 1000.0 /* ms */ * 1000.0 /* us */ * 1000.0 /* ns */ / (double)ITER; + fprintf(stdout, "[ %0.3f ns ]\n", time); +} +#endif