Implement packet encoder, start writing tests

This commit is contained in:
Johannes Schriewer 2018-07-28 02:30:41 +02:00
parent 4a32facdb5
commit 6ff7d2a841
21 changed files with 1509 additions and 9 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
*.o
*.a

31
.vscode/c_cpp_properties.json vendored Normal file
View file

@ -0,0 +1,31 @@
{
"configurations": [
{
"name": "Linux",
"includePath": [
"${workspaceRoot}/src",
"${workspaceRoot}/platform",
"/usr/include"
],
"defines": [
"MAX_BUFFER_SIZE=256",
"DEBUG=1"
],
"intelliSenseMode": "clang-x64",
"browse": {
"path": [
"${workspaceRoot}",
"${workspaceRoot}/src",
"${workspaceRoot}/platform",
"/usr/include"
]
},
"limitSymbolsToIncludedHeaders": true,
"databaseFilename": "",
"compilerPath": "/usr/bin/clang",
"cStandard": "c99",
"cppStandard": "c++17"
}
],
"version": 4
}

16
.vscode/settings.json vendored Normal file
View file

@ -0,0 +1,16 @@
{
"files.associations": {
"stdlib.h": "c",
"mqtt_internal.h": "c",
"platform.h": "c",
"tidyplatform.h": "c",
"type_traits": "cpp",
"istream": "c",
"optional": "c",
"ostream": "c",
"ratio": "c",
"system_error": "c",
"array": "c",
"string_view": "c"
}
}

44
.vscode/tasks.json vendored Normal file
View file

@ -0,0 +1,44 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"label": "Build linux",
"type": "shell",
"command": "make",
"args": [
"-f",
"Makefile.linux",
"clean",
"all"
],
"options": {
"cwd": "${workspaceRoot}"
},
"group": {
"kind": "build",
"isDefault": true
},
"problemMatcher": [
"$gcc"
]
},
{
"label": "Clean linux",
"type": "shell",
"command": "make",
"args": [
"-f",
"Makefile.linux",
"clean"
],
"options": {
"cwd": "${workspaceRoot}"
},
"problemMatcher": [
"$gcc"
]
}
]
}

View file

@ -1,11 +1,13 @@
SRCS=src/mqtt.c src/debug.c platform/linux.c
SRCS=src/mqtt.c src/packet.c src/protocol.c src/debug.c platform/linux.c
OBJS=$(SRCS:%.c=%.o)
TARGET=libmqtt.a
PLATFORM_FLAGS=-DMAX_BUFFER_SIZE=256
AR=ar
CC=clang
CFLAGS=-g -Os -Wall -pthread
CFLAGS=-g -Os -Wall -pthread -I./platform -I./src $(PLATFORM_FLAGS)
all: $(TARGET)

View file

@ -1,4 +1,5 @@
#include <pthread.h>
#include <stdlib.h>
#include "platform.h"
@ -7,17 +8,21 @@ struct _PlatformData {
};
void initialize_platform(MQTTHandle *handle) {
handle->platform = calloc(sizeof(struct _PlatformData));
handle->platform = calloc(sizeof(struct _PlatformData), 1);
}
MQTTStatus run_read_task(MQTTHandle *handle, Reader reader) {
if (pthread_create(&handle->platform->read_thread, NULL, reader, (void *)handle)) {
if (pthread_create(&handle->platform->read_thread, NULL, (void *(*)(void *))reader, (void *)handle)) {
return MQTT_STATUS_ERROR;
}
return MQTT_STATUS_OK;
}
MQTTStatus join_read_task(MQTTHandle *handle) {
pthread_join(handle->platform->read_thread);
pthread_join(handle->platform->read_thread, NULL);
return MQTT_STATUS_OK;
}
void release_platform(MQTTHandle *handle) {

210
src/buffer.h Normal file
View file

@ -0,0 +1,210 @@
#ifndef buffer_h__included
#define buffer_h__included
#include <string.h>
#include <stdlib.h>
#include <stdint.h>
#include <stdbool.h>
#include "mqtt.h"
typedef struct {
char *data; /**< Pointer to data */
size_t len; /**< Allocated space in data */
size_t position; /**< current cursor position in buffer */
} Buffer;
/**
* Copy data from current buffer position into dest
*
* This advances the internal buffer position
*
* @param src: Source buffer
* @param dest: Destination memory area
* @param len: Number of bytes to copy
* @returns: Actual number of bytes copied
*/
static inline size_t buffer_copy_out(Buffer *src, char *dest, size_t len) {
size_t sz = (len > src->len - src->position) ? src->len - src->position : len;
memcpy(dest, src->data + src->position, sz);
src->position += sz;
return sz;
}
/**
* Copy data into the buffer
*
* This advances the internal buffer position
*
* @param src: Source memory area
* @param dest: Destination buffer
* @param len: Number of bytes to copy
* @returns: Actual number of bytes copied
*/
static inline size_t buffer_copy_in(char *src, Buffer *dest, size_t len) {
size_t sz = (len > dest->len - dest->position) ? dest->len - dest->position : len;
memcpy(dest->data + dest->position, src, sz);
dest->position += sz;
return sz;
}
/**
* Get free space in buffer
*
* @param buffer: Buffer to check
* @returns: Number of free bytes in buffer
*/
static inline size_t buffer_free_space(Buffer *buffer) {
return buffer->len - buffer->position;
}
/**
* Check if the internal position is at the end of the buffer
*
* @param buffer; Buffer to check
* @returns: True if end of buffer reached
*/
static inline bool buffer_eof(Buffer *buffer) {
return buffer->position == buffer->len;
}
/**
* Reset internal position of buffer
*
* @param buffer: Buffer to reset
*/
static inline void buffer_reset(Buffer *buffer) {
buffer->position = 0;
}
/**
* Allocate a new buffer
*
* @param len: Size of new buffer
* @returns: New buffer or NULL if out of memory
*/
static inline Buffer *buffer_allocate(size_t len) {
Buffer *buffer = (Buffer *)malloc(sizeof(Buffer));
if (buffer == NULL) {
return NULL;
}
buffer->len = len;
buffer->position = 0;
buffer->data = (char *)calloc(1, len);
if (buffer->data == NULL) {
free(buffer);
return NULL;
}
return buffer;
}
/**
* Re-allocate buffer size
*
* @param buffer: Buffer to modify
* @param len: Size of new buffer
* @returns: Modified buffer if realloc did work, check if buffer->len == len to verify
*/
static inline Buffer *buffer_reallocate(Buffer *buffer, size_t len) {
char *new_buffer = (char *)realloc(buffer->data, len);
if (new_buffer != NULL) {
buffer->data = new_buffer;
buffer->len = len;
}
return buffer;
}
/**
* Create a new buffer from a memory area and a size
*
* @param data: Memory area
* @param len: Length of memory area
* @returns: New Buffer
*
* @attention: the data pointer will be owned by the buffer and freed with it!
*/
static inline Buffer *buffer_from_data_no_copy(char *data, size_t len) {
Buffer *buffer = (Buffer *)malloc(sizeof(Buffer));
buffer->len = len;
buffer->data = data;
buffer->position = 0;
return buffer;
}
/**
* Create a new buffer and copy the data
*
* @param data: Data to copy into the buffer
* @param len: Number of bytes to copy
* @returns: New Buffer
*/
static inline Buffer *buffer_from_data_copy(char *data, size_t len) {
Buffer *buffer = buffer_allocate(len);
(void)buffer_copy_in(data, buffer, len);
return buffer;
}
/**
* Release a buffer
*
* @param buffer: Buffer to release
*/
static inline void buffer_release(Buffer *buffer) {
free(buffer->data);
buffer->data = NULL;
free(buffer);
}
/**
* Append data to a buffer
*
* @param buffer: Buffer to append data to
* @param data: Memory area to copy to the end of the buffer
* @param len: Number of bytes to copy
* @returns: Numbr of bytes copied
*
* @attention: May come up short if the destination buffer has to be reallocated and
* that reallocation fails
*/
static inline size_t buffer_append_data(Buffer *buffer, char *data, size_t len) {
size_t num_bytes = buffer_copy_in(data, buffer, len);
if (num_bytes != len) {
// reallocate
(void)buffer_reallocate(buffer, buffer->len + (len - num_bytes));
if (buffer_eof(buffer)) {
// reallocation failed
return num_bytes;
}
(void)buffer_copy_in(data + num_bytes, buffer, (len - num_bytes));
}
return len;
}
/**
* Append a buffer to another buffer
*
* @param dest: Destination buffer
* @param src: Source buffer to append
* @returns: Number of bytes copied
*
* @attention: May come up short if the destination buffer has to be reallocated and
* that reallocation fails
*/
static inline size_t buffer_append_buffer(Buffer *dest, Buffer *src) {
return buffer_append_data(dest, src->data, src->len);
}
#if DEBUG
#include "debug.h"
static inline void buffer_hexdump(Buffer *buffer) {
hexdump(buffer->data, buffer->len);
}
#else
#define buffer_hexdump(_buffer) /* */
#endif
#endif /* buffer_h__included */

View file

@ -0,0 +1,30 @@
#include "stdio.h"
#include "debug.h"
void hexdump(char *data, size_t len) {
for (int i = 0; i < len;) {
for (int col = 0; col < 16; col++) {
if (i + col < len) {
fprintf(stdout, "%02x ", data[i + col]);
} else {
fprintf(stdout, " ");
}
}
fprintf(stdout, " | ");
for (int col = 0; col < 16; col++) {
if (i + col < len) {
char c = data[i + col];
if ((c > 127) || (c < 32)) c = '.';
fprintf(stdout, "%c", c);
} else {
fprintf(stdout, " ");
}
}
fprintf(stdout, "\n");
i += 16;
}
}

View file

@ -0,0 +1,8 @@
#ifndef debug_h__included
#define debug_h__included
#include <stdlib.h>
void hexdump(char *data, size_t len);
#endif /* debug_h__included */

View file

@ -5,25 +5,34 @@
#include <unistd.h>
#include <stdlib.h>
#include <stdint.h>
#include <errno.h>
#include "mqtt.h"
#include "mqtt_internal.h"
#include "platform.h"
// TODO: Make configurable by platform
#define BUF_LEN 256
#define BUF_LEN MAX_BUFFER_SIZE
static void _reader(MQTTHandle *handle) {
int num_bytes;
char buffer[BUF_LEN];
handle->reader_alive = true;
while (1) {
num_bytes = read(handle->sock, &buffer, BUF_LEN);
if (num_bytes == 0) {
/* Socket closed */
/* Socket closed, coordinated shutdown */
handle->reader_alive = false;
return;
} else if (num_bytes < 0) {
/* Error, TODO: Handle */
if ((errno == EINTR) || (errno == EAGAIN)) {
continue;
}
/* Set reader task to dead */
handle->reader_alive = false;
return;
}
// TODO: Parse and dispatch

View file

@ -2,6 +2,7 @@
#define mqtt_h__included
#include <stdint.h>
#include <stdbool.h>
typedef struct _MQTTHandle MQTTHandle;

View file

@ -17,6 +17,7 @@ struct _MQTTHandle {
int num_subscriptions;
int sock;
bool reader_alive;
// TODO: status queue (Waiting for ACK)
PlatformData *platform;

533
src/packet.c Normal file
View file

@ -0,0 +1,533 @@
#include <assert.h>
#include "packet.h"
/*
* Helper functionality
*/
MQTTPacket *allocate_MQTTPacket(MQTTControlPacketType type) {
MQTTPacket *packet = calloc(1, sizeof(MQTTPacket));
packet->packet_type = type;
switch (type) {
case PacketTypeConnect:
packet->payload = calloc(1, sizeof(ConnectPayload));
break;
case PacketTypeConnAck:
packet->payload = calloc(1, sizeof(ConnAckPayload));
break;
case PacketTypePublish:
packet->payload = calloc(1, sizeof(PublishPayload));
break;
case PacketTypePubAck:
packet->payload = calloc(1, sizeof(PubAckPayload));
break;
case PacketTypePubRec:
packet->payload = calloc(1, sizeof(PubRecPayload));
break;
case PacketTypePubRel:
packet->payload = calloc(1, sizeof(PubRelPayload));
break;
case PacketTypePubComp:
packet->payload = calloc(1, sizeof(PubCompPayload));
break;
case PacketTypeSubscribe:
packet->payload = calloc(1, sizeof(SubscribePayload));
break;
case PacketTypeSubAck:
packet->payload = calloc(1, sizeof(SubAckPayload));
break;
case PacketTypeUnsubscribe:
packet->payload = calloc(1, sizeof(UnsubscribePayload));
break;
case PacketTypeUnsubAck:
packet->payload = calloc(1, sizeof(UnsubAckPayload));
break;
case PacketTypePingReq:
case PacketTypePingResp:
case PacketTypeDisconnect:
packet->payload = NULL;
break;
}
return packet;
}
void free_MQTTPacket(MQTTPacket *packet) {
free(packet->payload);
packet->payload = NULL;
free(packet);
}
uint16_t variable_length_int_decode(Buffer *buffer) {
uint16_t result = 0;
while (buffer->data[buffer->position] & 0x80) {
result *= 128;
result += buffer->data[buffer->position] & 0x7f;
buffer->position++;
if (buffer_eof(buffer)) {
break; // bail out, buffer exhausted
}
}
return result;
}
char *utf8_string_decode(Buffer *buffer) {
char *result;
if (buffer_free_space(buffer) < 2) {
return NULL; // buffer too small
}
uint16_t sz = (buffer->data[buffer->position] << 8) + buffer->data[buffer->position + 1];
if (buffer_free_space(buffer) < sz + 2) {
return NULL; // incomplete buffer
}
buffer->position += 2;
result = malloc(sz);
buffer_copy_out(buffer, result, sz);
return result;
}
size_t variable_length_int_encode(uint16_t value, Buffer *buffer) {
if (value == 0) {
buffer->data[buffer->position] = 0;
buffer->position++;
return 1;
}
size_t len = 0;
while (value > 0) {
if (buffer->position + len > buffer->len) {
buffer->position += len - 1;
return len - 1; // bail out, buffer exhausted
}
buffer->data[buffer->position + len] = value % 128;
value = value / 128;
if (value > 0){
buffer->data[buffer->position + len] |= 0x80;
}
len++;
}
buffer->position += len;
return len;
}
size_t variable_length_int_size(uint16_t value) {
if (value == 0) {
return 1;
}
size_t len = 0;
while (value > 0) {
value = value / 128;
len++;
}
return len;
}
size_t utf8_string_encode(char *string, Buffer *buffer) {
size_t len = 0;
if (string != NULL) {
len = strlen(string);
}
if ((len > UINT16_MAX) || (buffer_free_space(buffer) < len + 2)) {
return 0; // bail out, buffer too small
}
buffer->data[buffer->position] = (len & 0xff00) >> 8;
buffer->data[buffer->position + 1] = (len & 0xff);
buffer->position += 2;
if (string != NULL) {
(void)buffer_copy_in(string, buffer, len);
}
return len + 2;
}
/*
* Decoder
*/
ConnectPayload *decode_connect(Buffer *buffer) {
}
ConnAckPayload *decode_connack(Buffer *buffer) {
}
PublishPayload *decode_publish(Buffer *buffer) {
}
PubAckPayload *decode_puback(Buffer *buffer) {
}
PubRecPayload *decode_pubrec(Buffer *buffer) {
}
PubRelPayload *decode_pubrel(Buffer *buffer) {
}
PubCompPayload *decode_pubcomp(Buffer *buffer) {
}
SubscribePayload *decode_subscribe(Buffer *buffer) {
}
SubAckPayload *decode_suback(Buffer *buffer) {
}
UnsubscribePayload *decode_unsubscribe(Buffer *buffer) {
}
UnsubAckPayload *decode_unsuback(Buffer *buffer) {
}
int decode_pingreq(Buffer *buffer) {
}
int decode_pingresp(Buffer *buffer) {
}
int decode_disconnect(Buffer *buffer) {
}
MQTTPacket *mqtt_packet_decode(Buffer *buffer) {
MQTTControlPacketType type = (buffer->data[0] & 0xf0) >> 4;
MQTTPacket *result =allocate_MQTTPacket(type);
switch (type) {
case PacketTypeConnect:
result->payload = (void *)decode_connect(buffer);
break;
case PacketTypeConnAck:
result->payload = (void *)decode_connack(buffer);
break;
case PacketTypePublish:
result->payload = (void *)decode_publish(buffer);
break;
case PacketTypePubAck:
result->payload = (void *)decode_puback(buffer);
break;
case PacketTypePubRec:
result->payload = (void *)decode_pubrec(buffer);
break;
case PacketTypePubRel:
result->payload = (void *)decode_pubrel(buffer);
break;
case PacketTypePubComp:
result->payload = (void *)decode_pubcomp(buffer);
break;
case PacketTypeSubscribe:
result->payload = (void *)decode_subscribe(buffer);
break;
case PacketTypeSubAck:
result->payload = (void *)decode_suback(buffer);
break;
case PacketTypeUnsubscribe:
result->payload = (void *)decode_unsubscribe(buffer);
break;
case PacketTypeUnsubAck:
result->payload = (void *)decode_unsuback(buffer);
break;
case PacketTypePingReq:
result->payload = (void *)decode_pingreq(buffer);
break;
case PacketTypePingResp:
result->payload = (void *)decode_pingresp(buffer);
break;
case PacketTypeDisconnect:
result->payload = (void *)decode_disconnect(buffer);
break;
}
return result;
}
/*
* Encoder
*/
Buffer *make_buffer_for_header(size_t sz, MQTTControlPacketType type) {
sz += variable_length_int_size(sz); // size field
sz += 1; // packet type and flags
Buffer *buffer = buffer_allocate(sz);
buffer->data[0] = type << 4;
buffer->position += 1;
variable_length_int_encode(sz - 2, buffer);
return buffer;
}
Buffer *encode_connect(ConnectPayload *payload) {
size_t sz = 10 /* variable header */;
sz += strlen(payload->client_id) + 2;
if (payload->will_topic) {
sz += strlen(payload->will_topic) + 2;
sz += strlen(payload->will_message) + 2;
}
if (payload->username) {
sz += strlen(payload->username) + 2;
}
if (payload->password) {
sz += strlen(payload->password) + 2;
}
Buffer *buffer = make_buffer_for_header(sz, PacketTypeConnect);
// variable header
utf8_string_encode("MQTT", buffer);
char *p = buffer->data + buffer->position;
*(p++) = payload->protocol_level;
uint8_t flags = (
((payload->username) ? (1 << 7) : 0)
+ ((payload->password) ? (1 << 6) : 0)
+ ((payload->retain_will) ? (1 << 5) : 0)
+ ((payload->will_topic) ? (payload->will_qos << 3) : 0)
+ ((payload->will_topic) ? (1 << 2) : 0)
+ ((payload->clean_session) ? (1 << 1) : 0)
);
*(p++) = flags;
*(p++) = (payload->keepalive_interval & 0xff00) >> 8;
*(p++) = (payload->keepalive_interval & 0xff);
buffer->position += 4;
// payload
utf8_string_encode(payload->client_id, buffer);
if (payload->will_topic) {
utf8_string_encode(payload->will_topic, buffer);
utf8_string_encode(payload->will_message, buffer);
}
if (payload->username) {
utf8_string_encode(payload->username, buffer);
}
if (payload->password) {
utf8_string_encode(payload->password, buffer);
}
assert(buffer_eof(buffer));
return buffer;
}
Buffer *encode_connack(ConnAckPayload *payload) {
size_t sz = 2; // session flag and status
Buffer *buffer = make_buffer_for_header(sz, PacketTypeConnAck);
buffer->data[buffer->position++] = payload->session_present;
buffer->data[buffer->position++] = payload->status;
assert(buffer_eof(buffer));
return buffer;
}
Buffer *encode_publish(PublishPayload *payload) {
size_t sz = 0;
sz += strlen(payload->topic) + 2; // topic
sz += 2; // packet id
sz += strlen(payload->message);
Buffer *buffer = make_buffer_for_header(sz, PacketTypePublish);
// Variable header
utf8_string_encode(payload->topic, buffer);
buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8;
buffer->data[buffer->position++] = (payload->packet_id & 0xff);
// Payload
buffer_copy_in(payload->message, buffer, strlen(payload->message));
assert(buffer_eof(buffer));
return buffer;
}
Buffer *encode_puback(PubAckPayload *payload) {
size_t sz = 2; // packet id
Buffer *buffer = make_buffer_for_header(sz, PacketTypePubAck);
// Variable header
buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8;
buffer->data[buffer->position++] = (payload->packet_id & 0xff);
assert(buffer_eof(buffer));
return buffer;
}
Buffer *encode_pubrec(PubRecPayload *payload) {
size_t sz = 2; // packet id
Buffer *buffer = make_buffer_for_header(sz, PacketTypePubRec);
// Variable header
buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8;
buffer->data[buffer->position++] = (payload->packet_id & 0xff);
assert(buffer_eof(buffer));
return buffer;
}
Buffer *encode_pubrel(PubRelPayload *payload) {
size_t sz = 2; // packet id
Buffer *buffer = make_buffer_for_header(sz, PacketTypePubRel);
// Variable header
buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8;
buffer->data[buffer->position++] = (payload->packet_id & 0xff);
assert(buffer_eof(buffer));
return buffer;
}
Buffer *encode_pubcomp(PubCompPayload *payload) {
size_t sz = 2; // packet id
Buffer *buffer = make_buffer_for_header(sz, PacketTypePubComp);
// Variable header
buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8;
buffer->data[buffer->position++] = (payload->packet_id & 0xff);
assert(buffer_eof(buffer));
return buffer;
}
Buffer *encode_subscribe(SubscribePayload *payload) {
size_t sz = 2; // packet id
sz += strlen(payload->topic); // topic
sz += 1; // qos level
Buffer *buffer = make_buffer_for_header(sz, PacketTypeSubscribe);
// Variable header
buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8;
buffer->data[buffer->position++] = (payload->packet_id & 0xff);
// Payload
utf8_string_encode(payload->topic, buffer);
buffer->data[buffer->position++] = payload->qos;
assert(buffer_eof(buffer));
return buffer;
}
Buffer *encode_suback(SubAckPayload *payload) {
size_t sz = 2; // packet id
sz += 1; // Status code
Buffer *buffer = make_buffer_for_header(sz, PacketTypeSubAck);
// Variable header
buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8;
buffer->data[buffer->position++] = (payload->packet_id & 0xff);
// Payload
buffer->data[buffer->position++] = payload->status;
assert(buffer_eof(buffer));
return buffer;
}
Buffer *encode_unsubscribe(UnsubscribePayload *payload) {
size_t sz = 2; // packet id
sz += strlen(payload->topic); // topic
Buffer *buffer = make_buffer_for_header(sz, PacketTypeUnsubscribe);
// Variable header
buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8;
buffer->data[buffer->position++] = (payload->packet_id & 0xff);
// Payload
utf8_string_encode(payload->topic, buffer);
assert(buffer_eof(buffer));
return buffer;
}
Buffer *encode_unsuback(UnsubAckPayload *payload) {
size_t sz = 2; // packet id
Buffer *buffer = make_buffer_for_header(sz, PacketTypeUnsubAck);
// Variable header
buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8;
buffer->data[buffer->position++] = (payload->packet_id & 0xff);
assert(buffer_eof(buffer));
return buffer;
}
Buffer *encode_pingreq() {
Buffer *buffer = make_buffer_for_header(0, PacketTypePingReq);
assert(buffer_eof(buffer));
return buffer;
}
Buffer *encode_pingresp() {
Buffer *buffer = make_buffer_for_header(0, PacketTypePingResp);
assert(buffer_eof(buffer));
return buffer;
}
Buffer *encode_disconnect() {
Buffer *buffer = make_buffer_for_header(0, PacketTypeDisconnect);
assert(buffer_eof(buffer));
return buffer;
}
Buffer *mqtt_packet_encode(MQTTPacket *packet) {
switch (packet->packet_type) {
case PacketTypeConnect:
return encode_connect((ConnectPayload *)packet->payload);
case PacketTypeConnAck:
return encode_connack((ConnAckPayload *)packet->payload);
case PacketTypePublish:
return encode_publish((PublishPayload *)packet->payload);
case PacketTypePubAck:
return encode_puback((PubAckPayload *)packet->payload);
case PacketTypePubRec:
return encode_pubrec((PubRecPayload *)packet->payload);
case PacketTypePubRel:
return encode_pubrel((PubRelPayload *)packet->payload);
case PacketTypePubComp:
return encode_pubcomp((PubCompPayload *)packet->payload);
case PacketTypeSubscribe:
return encode_subscribe((SubscribePayload *)packet->payload);
case PacketTypeSubAck:
return encode_suback((SubAckPayload *)packet->payload);
case PacketTypeUnsubscribe:
return encode_unsubscribe((UnsubscribePayload *)packet->payload);
case PacketTypeUnsubAck:
return encode_unsuback((UnsubAckPayload *)packet->payload);
case PacketTypePingReq:
return encode_pingreq();
case PacketTypePingResp:
return encode_pingresp();
case PacketTypeDisconnect:
return encode_disconnect();
}
}

155
src/packet.h Normal file
View file

@ -0,0 +1,155 @@
#ifndef packet_h__included
#define packet_h__included
#include <stdlib.h>
#include <stdint.h>
#include <stdbool.h>
#include "mqtt.h"
#include "buffer.h"
typedef enum {
PacketTypeConnect = 1,
PacketTypeConnAck = 2,
PacketTypePublish = 3,
PacketTypePubAck = 4,
PacketTypePubRec = 5,
PacketTypePubRel = 6,
PacketTypePubComp = 7,
PacketTypeSubscribe = 8,
PacketTypeSubAck = 9,
PacketTypeUnsubscribe = 10,
PacketTypeUnsubAck = 11,
PacketTypePingReq = 12,
PacketTypePingResp = 13,
PacketTypeDisconnect = 14
} MQTTControlPacketType;
typedef struct {
// required:
char *client_id;
int protocol_level;
uint16_t keepalive_interval;
// optional
char *username;
char *password;
char *will_topic;
char *will_message;
MQTTQosLevel will_qos;
bool retain_will;
bool clean_session;
} ConnectPayload;
typedef enum {
ConnAckStatusAccepted = 0, /**< Connection accepted */
ConnAckStatusInvalidProtocolLevel = 1, /**< Protocol level not supported */
ConnAckStatusInvalidIdentifier = 2, /**< Client ID not accepted */
ConnAckStatusServerUnavailable = 3, /**< Server restarting or too many clients */
ConnAckStatusAuthenticationError = 4, /**< missing username/password */
ConnAckStatusNotAuthorized = 5 /**< not authorized */
} ConnAckStatus;
typedef struct {
bool session_present;
ConnAckStatus status;
} ConnAckPayload;
typedef struct {
bool duplicate;
MQTTQosLevel qos;
bool retain;
char *topic;
uint16_t packet_id;
char *message;
} PublishPayload;
typedef struct {
uint16_t packet_id;
} PubAckPayload;
typedef struct {
uint16_t packet_id;
} PubRecPayload;
typedef struct {
uint16_t packet_id;
} PubRelPayload;
typedef struct {
uint16_t packet_id;
} PubCompPayload;
typedef struct {
uint16_t packet_id;
char *topic;
MQTTQosLevel qos;
} SubscribePayload;
typedef enum {
SubAckStatusQoS0 = 0,
SubAckStatusQoS1 = 1,
SubAckStatusQoS2 = 2,
SubAckFailure = 0x80
} SubAckStatus;
typedef struct {
uint16_t packet_id;
SubAckStatus status;
} SubAckPayload;
typedef struct {
uint16_t packet_id;
char *topic;
} UnsubscribePayload;
typedef struct {
uint16_t packet_id;
} UnsubAckPayload;
typedef struct {
MQTTControlPacketType packet_type;
void *payload;
} MQTTPacket;
/*
* Decoder
*/
MQTTPacket *mqtt_packet_decode(Buffer *buffer);
void free_MQTTPacket(MQTTPacket *packet);
/*
* Encoder
*/
Buffer *mqtt_packet_encode(MQTTPacket *packet);
MQTTPacket *allocate_MQTTPacket(MQTTControlPacketType type);
/*
* Internal
*/
size_t variable_length_int_encode(uint16_t value, Buffer *buffer);
size_t variable_length_int_size(uint16_t value);
size_t utf8_string_encode(char *string, Buffer *buffer);
Buffer *make_buffer_for_header(size_t sz, MQTTControlPacketType type);
Buffer *encode_connect(ConnectPayload *payload);
Buffer *encode_connack(ConnAckPayload *payload);
Buffer *encode_publish(PublishPayload *payload);
Buffer *encode_puback(PubAckPayload *payload);
Buffer *encode_pubrec(PubRecPayload *payload);
Buffer *encode_pubrel(PubRelPayload *payload);
Buffer *encode_pubcomp(PubCompPayload *payload);
Buffer *encode_subscribe(SubscribePayload *payload);
Buffer *encode_suback(SubAckPayload *payload);
Buffer *encode_unsubscribe(UnsubscribePayload *payload);
Buffer *encode_unsuback(UnsubAckPayload *payload);
Buffer *encode_pingreq();
Buffer *encode_pingresp();
Buffer *encode_disconnect();
#endif /* packet_h__included */

0
src/protocol.c Normal file
View file

0
src/protocol.h Normal file
View file

29
tests/Makefile Normal file
View file

@ -0,0 +1,29 @@
SRCS=encode_packet.c
OBJS=$(SRCS:%.c=%.o)
TARGETS=$(SRCS:%.c=%.test)
CC=clang
CFLAGS=-g -Os -Wall -I.. -I../src -I../platform -DDEBUG=1
# -DTIMETRIAL
LDFLAGS=
LIBS=-L.. -lmqtt
all: $(TARGETS)
%.test: %.o cputime.o
$(CC) $(LDFLAGS) -o $@ cputime.o $< $(LIBS)
./$@
rm $@
%.o: %.c test.h
$(CC) $(CFLAGS) -o $@ -c $<
%.e: %.c test.h
$(CC) $(CFLAGS) -E -o $@ -c $<
less $@
rm $@
clean:
rm -f $(TARGETS)
rm -f $(OBJS)
rm -f *.e

98
tests/cputime.c Normal file
View file

@ -0,0 +1,98 @@
/*
* Author: David Robert Nadeau
* Site: http://NadeauSoftware.com/
* License: Creative Commons Attribution 3.0 Unported License
* http://creativecommons.org/licenses/by/3.0/deed.en_US
*/
#if defined(_WIN32)
#include <Windows.h>
#elif defined(__unix__) || defined(__unix) || defined(unix) || (defined(__APPLE__) && defined(__MACH__))
#include <unistd.h>
#include <sys/resource.h>
#include <sys/times.h>
#include <time.h>
#else
#error "Unable to define getCPUTime( ) for an unknown OS."
#endif
/**
* Returns the amount of CPU time used by the current process,
* in seconds, or -1.0 if an error occurred.
*/
double getCPUTime(void) {
#if defined(_WIN32)
/* Windows -------------------------------------------------- */
FILETIME createTime;
FILETIME exitTime;
FILETIME kernelTime;
FILETIME userTime;
if ( GetProcessTimes( GetCurrentProcess( ),
&createTime, &exitTime, &kernelTime, &userTime ) != -1 )
{
SYSTEMTIME userSystemTime;
if ( FileTimeToSystemTime( &userTime, &userSystemTime ) != -1 )
return (double)userSystemTime.wHour * 3600.0 +
(double)userSystemTime.wMinute * 60.0 +
(double)userSystemTime.wSecond +
(double)userSystemTime.wMilliseconds / 1000.0;
}
#elif defined(__unix__) || defined(__unix) || defined(unix) || (defined(__APPLE__) && defined(__MACH__))
/* AIX, BSD, Cygwin, HP-UX, Linux, OSX, and Solaris --------- */
#if defined(_POSIX_TIMERS) && (_POSIX_TIMERS > 0)
/* Prefer high-res POSIX timers, when available. */
{
clockid_t id;
struct timespec ts;
#if _POSIX_CPUTIME > 0
/* Clock ids vary by OS. Query the id, if possible. */
if ( clock_getcpuclockid( 0, &id ) == -1 )
#endif
#if defined(CLOCK_PROCESS_CPUTIME_ID)
/* Use known clock id for AIX, Linux, or Solaris. */
id = CLOCK_PROCESS_CPUTIME_ID;
#elif defined(CLOCK_VIRTUAL)
/* Use known clock id for BSD or HP-UX. */
id = CLOCK_VIRTUAL;
#else
id = (clockid_t)-1;
#endif
if ( id != (clockid_t)-1 && clock_gettime( id, &ts ) != -1 )
return (double)ts.tv_sec +
(double)ts.tv_nsec / 1000000000.0;
}
#endif
#if defined(RUSAGE_SELF)
{
struct rusage rusage;
if ( getrusage( RUSAGE_SELF, &rusage ) != -1 )
return (double)rusage.ru_utime.tv_sec +
(double)rusage.ru_utime.tv_usec / 1000000.0;
}
#endif
#if defined(_SC_CLK_TCK)
{
const double ticks = (double)sysconf( _SC_CLK_TCK );
struct tms tms;
if ( times( &tms ) != (clock_t)-1 )
return (double)tms.tms_utime / ticks;
}
#endif
#if defined(CLOCKS_PER_SEC)
{
clock_t cl = clock( );
if ( cl != (clock_t)-1 )
return (double)cl / (double)CLOCKS_PER_SEC;
}
#endif
#endif
return -1; /* Failed. */
}

20
tests/cputime.h Normal file
View file

@ -0,0 +1,20 @@
/*
* Author: David Robert Nadeau
* Site: http://NadeauSoftware.com/
* License: Creative Commons Attribution 3.0 Unported License
* http://creativecommons.org/licenses/by/3.0/deed.en_US
*/
double getCPUTime(void);
/* Example usage:
* ==============
*
* double startTime, endTime;
*
* startTime = getCPUTime( );
* ...
* endTime = getCPUTime( );
*
* fprintf( stderr, "CPU time used = %lf\n", (endTime - startTime) );
*/

209
tests/encode_packet.c Normal file
View file

@ -0,0 +1,209 @@
#include "test.h"
#include "packet.h"
// Variable length int length calculation
TestResult test_vl_int_0(void) {
TESTASSERT(variable_length_int_size(0) == 1, "Values < 128 should fit in one byte");
}
TestResult test_vl_int_127(void) {
TESTASSERT(variable_length_int_size(127) == 1, "Values < 128 should fit in one byte");
}
TestResult test_vl_int_128(void) {
TESTASSERT(variable_length_int_size(128) == 2, "Values < 16384 should fit in two bytes");
}
TestResult test_vl_int_16383(void) {
TESTASSERT(variable_length_int_size(16383) == 2, "Values < 16384 should fit in two bytes");
}
TestResult test_vl_int_16384(void) {
TESTASSERT(variable_length_int_size(16384) == 3, "Values < 2097151 should fit in three bytes");
}
// Variable length int data check
TestResult test_vl_int_data_0(void) {
char data[] = { 0 };
Buffer *buffer = buffer_allocate(5);
variable_length_int_encode(0, buffer);
TESTASSERT(
(
(buffer->position == 1)
&&
(memcmp(buffer->data, data, 1) == 0)
),
"Variable length int of 0 should result in [0x00]");
}
TestResult test_vl_int_data_127(void) {
char data[] = { 127 };
Buffer *buffer = buffer_allocate(5);
variable_length_int_encode(127, buffer);
TESTASSERT(
(
(buffer->position == 1)
&&
(memcmp(buffer->data, data, 1) == 0)
),
"Variable length int of 127 should result in [0x7f]");
}
TestResult test_vl_int_data_128(void) {
char data[] = { 0x80, 0x01 };
Buffer *buffer = buffer_allocate(5);
variable_length_int_encode(128, buffer);
TESTASSERT(
(
(buffer->position == 2)
&&
(memcmp(buffer->data, data, 2) == 0)
),
"Variable length int of 128 should result in [0x80, 0x01]");
}
TestResult test_vl_int_data_16383(void) {
char data[] = { 0xff, 0x7f };
Buffer *buffer = buffer_allocate(5);
variable_length_int_encode(16383, buffer);
TESTASSERT(
(
(buffer->position == 2)
&&
(memcmp(buffer->data, data, 2) == 0)
),
"Variable length int of 16383 should result in [0xff, 0x7f]");
}
TestResult test_vl_int_data_16384(void) {
char data[] = { 0x80, 0x80, 0x01 };
Buffer *buffer = buffer_allocate(5);
variable_length_int_encode(16384, buffer);
TESTASSERT(
(
(buffer->position == 3)
&&
(memcmp(buffer->data, data, 3) == 0)
),
"Variable length int of 16384 should result in [0x80, 0x80, 0x01]");
}
TestResult test_vl_int_data_32767(void) {
char data[] = { 0xff, 0xff, 0x01 };
Buffer *buffer = buffer_allocate(5);
variable_length_int_encode(32767, buffer);
TESTASSERT(
(
(buffer->position == 3)
&&
(memcmp(buffer->data, data, 3) == 0)
),
"Variable length int of 32767 should result in [0xff, 0xff, 0x01]");
}
// UTF-8 String encoding
TestResult test_utf8_string_null(void) {
char data[] = { 0x00, 0x00 };
Buffer *buffer = buffer_allocate(5);
size_t sz = utf8_string_encode(NULL, buffer);
TESTASSERT(
(
(buffer->position == 2)
&&
(sz == 2)
&&
(memcmp(buffer->data, data, 2) == 0)
),
"NULL String should result in [0x00, 0x00]");
}
TestResult test_utf8_string_empty(void) {
char data[] = { 0x00, 0x00 };
Buffer *buffer = buffer_allocate(5);
size_t sz = utf8_string_encode("", buffer);
TESTASSERT(
(
(buffer->position == 2)
&&
(sz == 2)
&&
(memcmp(buffer->data, data, 2) == 0)
),
"Empty String should result in [0x00, 0x00]");
}
TestResult test_utf8_string_hello(void) {
char data[] = { 0x00, 0x05, 'h', 'e', 'l', 'l', 'o' };
Buffer *buffer = buffer_allocate(10);
size_t sz = utf8_string_encode("hello", buffer);
TESTASSERT(
(
(buffer->position == 7)
&&
(sz == 7)
&&
(memcmp(buffer->data, data, 7) == 0)
),
"\"hello\" String should result in [0x00, 0x05, 'h', 'e', 'l', 'l', 'o']");
}
// make header
TestResult test_make_header(void) {
char data[] = { 0x10, 0x05, 0x00, 0x00, 0x00, 0x00, 0x00 };
Buffer *buffer = make_buffer_for_header(5, PacketTypeConnect);
TESTASSERT(
(
(buffer->len == 7)
&&
(memcmp(buffer->data, data, 7) == 0)
&&
(buffer->position == 2)
),
"Header should be valid");
}
TestResult not_implemented(void) {
TESTRESULT(TestStatusSkipped, "Not implemented");
}
TESTS(
TEST("Variable length int size for 0", test_vl_int_0),
TEST("Variable length int size for 127", test_vl_int_127),
TEST("Variable length int size for 128", test_vl_int_128),
TEST("Variable length int size for 16383", test_vl_int_16383),
TEST("Variable length int size for 16384", test_vl_int_16384),
TEST("Variable length int data for 0", test_vl_int_data_0),
TEST("Variable length int data for 127", test_vl_int_data_127),
TEST("Variable length int data for 128", test_vl_int_data_128),
TEST("Variable length int data for 16383", test_vl_int_data_16383),
TEST("Variable length int data for 16384", test_vl_int_data_16384),
TEST("Variable length int data for 32767", test_vl_int_data_32767),
TEST("UTF-8 string encode NULL", test_utf8_string_null),
TEST("UTF-8 string encode empty string", test_utf8_string_empty),
TEST("UTF-8 string encode \"hello\"", test_utf8_string_hello),
TEST("Make header", test_make_header),
TEST("Encode Connect", not_implemented),
TEST("Encode ConnAck", not_implemented),
TEST("Encode Publish", not_implemented),
TEST("Encode PubAck", not_implemented),
TEST("Encode PubRec", not_implemented),
TEST("Encode PubRel", not_implemented),
TEST("Encode PubComp", not_implemented),
TEST("Encode Subscribe", not_implemented),
TEST("Encode SubAck", not_implemented),
TEST("Encode Unsubscribe", not_implemented),
TEST("Encode UnsubAck", not_implemented),
TEST("Encode PingReq", not_implemented),
TEST("Encode PingResp", not_implemented),
TEST("Encode Disconnect", not_implemented)
);

97
tests/test.h Normal file
View file

@ -0,0 +1,97 @@
#include <stdio.h>
#include <stdint.h>
#include <stdbool.h>
#include <math.h>
#include <libgen.h>
#include <string.h>
#include "cputime.h"
typedef enum {
TestStatusSkipped = 0,
TestStatusFailure,
TestStatusOk,
} TestStatus;
typedef struct {
TestStatus status;
char *message;
} TestResult;
typedef TestResult (*TestPointer)(void);
typedef struct {
char *name;
TestPointer run;
char *file;
int line;
} DefinedTest;
extern DefinedTest defined_tests[];
#define TEST(_name, _function) (DefinedTest){ _name, _function, __FILE__, __LINE__ }
#define TESTS(...) \
DefinedTest defined_tests[] = { \
__VA_ARGS__, \
TEST(NULL, NULL) \
}
#define TESTRESULT(_status, _message) return (TestResult){ _status, _message }
#ifdef TIMETRIAL
# define TESTASSERT(_assertion, _message) return (TestResult){ (_assertion) ? TestStatusOk : TestStatusFailure, NULL }
#else
# define TESTASSERT(_assertion, _message) return (TestResult){ (_assertion) ? TestStatusOk : TestStatusFailure, _message }
#endif
void timetrial(DefinedTest *test);
int main(int argc, char **argv) {
for(DefinedTest *test = defined_tests; test->run != NULL; test++) {
TestResult result = test->run();
switch (result.status) {
case TestStatusOk:
fprintf(stdout, "info: Test %s suceeded ", test->name);
#ifdef TIMETRIAL
timetrial(test);
#else
fprintf(stdout, "\n");
#endif
break;
case TestStatusSkipped:
fprintf(stderr, "%s:%d: warning: Skipped test %s\n", test->file, test->line, test->name);
if (result.message) {
fprintf(stderr, " -> %s\n", result.message);
}
break;
case TestStatusFailure:
fprintf(stderr, "%s:%d: error: Test %s failed\n", test->file, test->line, test->name);
if (result.message) {
fprintf(stderr, " -> %s\n", result.message);
}
return 1;
break;
}
}
return 0;
}
#ifdef TIMETRIAL
#define ITER 10000000
void timetrial(DefinedTest *test) {
double start, end;
start = getCPUTime();
for(uint64_t i = 0; i < ITER; i++) {
volatile TestResult result = test->run();
(void)result;
}
end = getCPUTime();
double time = (end - start) * 1000.0 /* ms */ * 1000.0 /* us */ * 1000.0 /* ns */ / (double)ITER;
fprintf(stdout, "[ %0.3f ns ]\n", time);
}
#endif