From db273fc8a41442bbe337e787175ceb75cb48e15e Mon Sep 17 00:00:00 2001
From: Johannes Schriewer <hallo@dunkelstern.de>
Date: Sun, 29 Jul 2018 03:43:01 +0200
Subject: [PATCH] Start implementation of protocol statemachine

---
 src/mqtt.c                | 209 +++++++++++++++++++++++++++++++++-----
 src/mqtt_internal.h       |  31 +++++-
 src/protocol.c            | 100 ++++++++++++++++++
 src/protocol.h            |  11 ++
 tests/connect_publish.c   |  51 ++++++++++
 tests/connect_subscribe.c |  53 ++++++++++
 6 files changed, 424 insertions(+), 31 deletions(-)
 create mode 100644 tests/connect_publish.c
 create mode 100644 tests/connect_subscribe.c

diff --git a/src/mqtt.c b/src/mqtt.c
index ebed082..02c2767 100644
--- a/src/mqtt.c
+++ b/src/mqtt.c
@@ -10,89 +10,244 @@
 #include "mqtt.h"
 #include "mqtt_internal.h"
 #include "platform.h"
+#include "protocol.h"
+#include "debug.h"
 
-#define BUF_LEN MAX_BUFFER_SIZE
+static inline void mqtt_free(MQTTHandle *handle) {
+    release_platform(handle);
+    free(handle);
+}
+
+static inline void disconnect(MQTTHandle *handle) {
+    close(handle->sock);
+    // FIXME: Do we have to do anything else?
+}
+
+static inline bool find_waiting(MQTTHandle *handle, MQTTPacket *packet) {
+    // TODO: Try to find a waiting task and call its callback
+    // TODO: Remove waiting task from queue
+
+    return false;
+}
+
+static inline void send_subscription(MQTTHandle *handle, PublishPayload *payload) {
+    // TODO: find subscriber and call the callback
+}
+
+static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) {
+    switch (packet->packet_type) {
+        case PacketTypeConnAck:
+        case PacketTypePubAck:
+        case PacketTypePubRec:
+        case PacketTypePubRel:
+        case PacketTypePubComp:
+        case PacketTypeSubAck:
+        case PacketTypeUnsubAck:
+        case PacketTypePingResp:
+            if (!find_waiting(handle, packet)) {
+                disconnect(handle);
+            }
+
+        case PacketTypePublish:
+            send_subscription(handle, packet->payload);
+
+        // client -> server, will not be handled in client
+        case PacketTypeConnect:
+        case PacketTypeSubscribe:
+        case PacketTypeUnsubscribe:
+        case PacketTypePingReq:
+        case PacketTypeDisconnect:
+            disconnect(handle);
+            break;
+    }
+}
 
 static void _reader(MQTTHandle *handle) {
     int num_bytes;
-    char buffer[BUF_LEN];
+    char *read_buffer = malloc(max_receive_buffer_size);
+    uint8_t offset = 0;
 
     handle->reader_alive = true;
     
     while (1) {
-        num_bytes = read(handle->sock, &buffer, BUF_LEN);
+        num_bytes = read(handle->sock, &read_buffer[offset], max_receive_buffer_size - offset);
         if (num_bytes == 0) {
             /* Socket closed, coordinated shutdown */
+            DEBUG_LOG("Socket closed");
             handle->reader_alive = false;
             return;
         } else if (num_bytes < 0) {
             if ((errno == EINTR) || (errno == EAGAIN)) {
+                DEBUG_LOG("Interrupted");
                 continue;
             }
 
             /* Set reader task to dead */
             handle->reader_alive = false;
+            DEBUG_LOG("Read error: %s", strerror(errno));
             return;
         }
 
-        // TODO: Parse and dispatch
+        while (1) {
+            Buffer *buffer = buffer_from_data_no_copy(read_buffer, num_bytes);
+            MQTTPacket *packet = mqtt_packet_decode(buffer);
+            if (packet == NULL) {
+                DEBUG_LOG("Invalid packet");
+                // invalid packet
+                if (num_bytes < max_receive_buffer_size) {
+                    // maybe not long enough, try to fetch the rest
+                    offset += num_bytes;
+                    free(buffer);
+                    DEBUG_LOG("Trying to read more...");
+                    break;
+                } else {
+                    // no space in buffer, bail and reconnect
+                    DEBUG_LOG("Buffer overflow!");
+                    disconnect(handle);
+                    handle->reader_alive = false;
+                    free(buffer);
+                    return;
+                }
+            } else {
+                DEBUG_LOG("Packet parsed");
+                parse_packet(handle, packet);
+                free_MQTTPacket(packet);
+                
+                if (!buffer_eof(buffer)) {
+                    DEBUG_LOG("Residual buffer data");
+                    // Not complete recv buffer was consumed, so we have more than one packet in there
+                    size_t remaining = max_receive_buffer_size - buffer->position;
+                    memmove(read_buffer, read_buffer + buffer->position, remaining);
+                    offset -= remaining;
+                    num_bytes -= remaining;
+                    free(buffer);
+                } else {
+                    DEBUG_LOG("Buffer consumed");
+                    // buffer consumed completely, read another chunk
+                    offset = 0;
+                    free(buffer);
+                    break;
+                }
+            }
+        }
     }
 }
 
-MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback) {
+static void _mqtt_connect(MQTTHandle *handle) {
     int ret;
-    MQTTHandle *handle = calloc(sizeof(struct _MQTTHandle), 1);
-    initialize_platform(handle);
-
     struct sockaddr_in servaddr;
     memset(&servaddr, 0, sizeof(servaddr));
  
-    if (config->port == 0) {
-        config->port = 1883;
-    }
-
     handle->sock = socket(AF_INET, SOCK_STREAM, 0); 
     servaddr.sin_family = AF_INET;
-    servaddr.sin_port = htons(config->port);
+    servaddr.sin_port = htons(handle->config->port);
  
-    ret = inet_pton(AF_INET, config->hostname, &(servaddr.sin_addr));
-    if (ret == 0) {
-        callback(handle, MQTT_Error_Host_Not_Found);
+    char ip[40];
+    if (!hostname_to_ip(handle->config->hostname, ip)) {
+        bool free_handle = handle->error_handler(handle, MQTT_Error_Host_Not_Found);
+        if (free_handle) {
+            mqtt_free(handle);
+        }
+        DEBUG_LOG("Resolving hostname failed: %s", strerror(errno));
         close(handle->sock);
-        free(handle);
-        return NULL;
+        return;
+    }
+    ret = inet_pton(AF_INET, ip, &(servaddr.sin_addr));
+    if (ret == 0) {
+        bool free_handle = handle->error_handler(handle, MQTT_Error_Host_Not_Found);
+        if (free_handle) {
+            mqtt_free(handle);
+        }
+        DEBUG_LOG("Converting to servaddr failed: %s", strerror(errno));
+        close(handle->sock);
+        return;
     }
 
     ret = connect(handle->sock, (struct sockaddr *)&servaddr, sizeof(servaddr));
     if (ret != 0) {
-        callback(handle, MQTT_Error_Connection_Refused);
+        bool free_handle = handle->error_handler(handle, MQTT_Error_Connection_Refused);
+        if (free_handle) {
+            mqtt_free(handle);
+        }
+        DEBUG_LOG("Connection failed: %s", strerror(errno));
         close(handle->sock);
-        free(handle);
-        return NULL;
+        return;
     }
 
     run_read_task(handle, _reader);
 
+    bool result = send_connect_packet(handle);
+    if (result == false) {
+        bool free_handle = handle->error_handler(handle, MQTT_Error_Broker_Disconnected);
+        if (free_handle) {
+            mqtt_free(handle);
+        }
+        DEBUG_LOG("Sending connect packet failed...");
+        close(handle->sock);
+    }
+}
+
+MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback) {
+    MQTTHandle *handle = calloc(sizeof(struct _MQTTHandle), 1);
+    initialize_platform(handle);
+
+    if (config->port == 0) {
+        config->port = 1883;
+    }
+
+    handle->config = config;
+    handle->error_handler = callback;
+
+    _mqtt_connect(handle);
+
     return handle;
 }
 
+
 MQTTStatus mqtt_reconnect(MQTTHandle *handle) {
-    // TODO: reconnect
+    if (handle->reader_alive) {
+        DEBUG_LOG("Closing old connection");
+        close(handle->sock);
+        join_read_task(handle);
+    }
+    _mqtt_connect(handle);
+
+    return MQTT_STATUS_OK;
 }
 
 MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTEventHandler callback) {
-    // TODO: subscribe
+    if (!handle->reader_alive) {
+        handle->error_handler(handle, MQTT_Error_Connection_Reset);
+        return MQTT_STATUS_ERROR;
+    }
+    return (send_subscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR);
+    // TODO: add subscription to list
 }
 
 MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) {
-    // TODO: unsubscribe
+    if (!handle->reader_alive) {
+        handle->error_handler(handle, MQTT_Error_Connection_Reset);
+        return MQTT_STATUS_ERROR;
+    }
+    return (send_unsubscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR);
+    // TODO: remove subscription from list
 }
 
 MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level) {
-    // TODO: publish
+    if (!handle->reader_alive) {
+        handle->error_handler(handle, MQTT_Error_Connection_Reset);
+        return MQTT_STATUS_ERROR;
+    }
+    return (send_publish_packet(handle, topic, payload, qos_level) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR);
 }
 
 MQTTStatus mqtt_disconnect(MQTTHandle *handle) {
-    release_platform(handle);
-    free(handle);
+    DEBUG_LOG("Disconnecting...")
+    if (close(handle->sock)) {
+        return MQTT_STATUS_ERROR;
+    }
+    join_read_task(handle);
+    mqtt_free(handle);
+
+    return MQTT_STATUS_OK;
 }
diff --git a/src/mqtt_internal.h b/src/mqtt_internal.h
index 1157e05..5ee673b 100644
--- a/src/mqtt_internal.h
+++ b/src/mqtt_internal.h
@@ -2,6 +2,7 @@
 #define mqtt_internal_h__included
 
 #include "mqtt.h"
+#include "packet.h"
 
 typedef struct _PlatformData PlatformData;
 
@@ -9,17 +10,39 @@ typedef struct {
     char *topic;
     MQTTEventHandler *handler;
     bool pending;
+} SubscriptionItem;
+
+typedef struct {
+    SubscriptionItem *items;
+    uint8_t num_items;
 } Subscriptions;
 
+typedef void (*MQTTCallback)(MQTTHandle *handle, MQTTPacket *packet, void *context);
+
+typedef struct {
+    MQTTControlPacketType type;
+    uint16_t packet_id;
+    void *context;
+    MQTTCallback callback;
+} MQTTCallbackQueueItem;
+
+typedef struct {
+    MQTTCallbackQueueItem *pending;
+    uint8_t num_items;
+} MQTTCallbackQueue;
+
 struct _MQTTHandle {
-    MQTTErrorHandler *errorHandler;
-    Subscriptions *subscriptions;
-    int num_subscriptions;
+    MQTTConfig *config;
+
+    MQTTErrorHandler error_handler;
+    Subscriptions subscriptions;
 
     int sock;
     bool reader_alive;
 
-    // TODO: status queue (Waiting for ACK)
+    uint16_t packet_id_counter;
+
+    MQTTCallbackQueue queue;
     PlatformData *platform;
 };
 
diff --git a/src/protocol.c b/src/protocol.c
index e69de29..04c4d4f 100644
--- a/src/protocol.c
+++ b/src/protocol.c
@@ -0,0 +1,100 @@
+#include <unistd.h>
+#include <stdio.h>
+
+#include "mqtt_internal.h"
+#include "packet.h"
+#include "buffer.h"
+
+#include "debug.h"
+
+static bool send_buffer(MQTTHandle *handle, Buffer *buffer) {
+    DEBUG_LOG("Attempting to send:");
+    buffer_hexdump(buffer, 2);
+
+    while (buffer_free_space(buffer) > 0) {
+        ssize_t bytes = write(handle->sock, buffer->data + buffer->position, buffer_free_space(buffer));
+        if (bytes <= 0) {
+            DEBUG_LOG("write error, %s", strerror(errno));
+            buffer_release(buffer);
+            return false;
+        }
+        DEBUG_LOG("Wrote %zu bytes...", bytes);
+        buffer->position += bytes;
+    }
+    DEBUG_LOG("Buffer written...");
+    buffer_release(buffer);
+    return true;
+}
+
+bool send_connect_packet(MQTTHandle *handle) {
+    ConnectPayload *payload = calloc(1, sizeof(ConnectPayload));
+
+    payload->client_id = handle->config->client_id;
+    payload->protocol_level = 4;
+    payload->keepalive_interval = 60;
+
+    // TODO: support last will
+    // payload->will_topic = "test/lastwill";
+    // payload->will_message = "disconnected";
+    // payload->will_qos = MQTT_QOS_1;
+    // payload->retain_will = true;
+
+    payload->username = handle->config->username;
+    payload->password = handle->config->password;
+    
+    Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeConnect, payload });
+    free(payload);
+
+    // TODO: add waiting ConnAck to queue
+
+    encoded->position = 0;
+    return send_buffer(handle, encoded);
+}
+
+bool send_subscribe_packet(MQTTHandle *handle, char *topic, MQTTQosLevel qos) {
+    SubscribePayload *payload = calloc(1, sizeof(SubscribePayload));
+
+    payload->packet_id = handle->packet_id_counter++;
+    payload->topic = topic;
+    payload->qos = qos;
+
+    Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeSubscribe, payload });
+    free(payload);
+
+    // TODO: add waiting SubAck to queue
+
+    encoded->position = 0;
+    return send_buffer(handle, encoded);
+}
+
+bool send_unsubscribe_packet(MQTTHandle *handle, char *topic) {
+    UnsubscribePayload *payload = calloc(1, sizeof(UnsubscribePayload));
+
+    payload->packet_id = 10;
+    payload->topic = "test/topic";
+
+    Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeUnsubscribe, payload });
+    free(payload);
+
+    // TODO: add waiting UnsubAck to queue
+    encoded->position = 0;
+    return send_buffer(handle, encoded);
+}
+
+bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos) {
+    PublishPayload *payload = calloc(1, sizeof(PublishPayload));
+
+    payload->qos = qos;
+    payload->retain = true;
+    payload->topic = topic;
+    payload->packet_id = handle->packet_id_counter++;
+    payload->message = message;
+
+    Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePublish, payload });
+    free(payload);
+
+    // TODO: Handle QoS and add waiting packets to queue
+
+    encoded->position = 0;
+    return send_buffer(handle, encoded);
+}
diff --git a/src/protocol.h b/src/protocol.h
index e69de29..aaf3267 100644
--- a/src/protocol.h
+++ b/src/protocol.h
@@ -0,0 +1,11 @@
+#ifndef protocol_h__included
+#define protocol_h__included
+
+#include "mqtt.h"
+
+bool send_connect_packet(MQTTHandle *handle);
+bool send_subscribe_packet(MQTTHandle *handle, char *topic);
+bool send_unsubscribe_packet(MQTTHandle *handle, char *topic);
+bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos);
+
+#endif
diff --git a/tests/connect_publish.c b/tests/connect_publish.c
new file mode 100644
index 0000000..4d2d43b
--- /dev/null
+++ b/tests/connect_publish.c
@@ -0,0 +1,51 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+
+#include "mqtt.h"
+
+#define LOG(fmt, ...) fprintf(stdout, fmt "\n", ## __VA_ARGS__)
+
+bool err_handler(MQTTHandle *handle, MQTTErrorCode error) {
+    LOG("Error received: %d", error);
+    exit(1);
+
+    return true;
+}
+
+int main(int argc, char **argv) {
+    MQTTConfig config = { 0 };
+
+    config.client_id = "libmqtt_testsuite";
+    config.hostname = "localhost";
+    config.port = 1883;
+
+    LOG("Trying to connect to %s", config.hostname);
+    MQTTHandle *mqtt = mqtt_connect(&config, err_handler);
+
+    if (mqtt == NULL) {
+        LOG("Connection failed!");
+        return 1;
+    }
+
+    LOG("Connected!");
+
+    sleep(5);
+
+    LOG("Trying publish to testsuite/mqtt/test...");
+    MQTTStatus result = mqtt_publish(mqtt, "testsuite/mqtt/test", "payload", MQTT_QOS_0);
+    if (result != MQTT_STATUS_OK) {
+        LOG("Could not publish");
+        return 1;
+    }
+
+    sleep(5);
+
+    LOG("Disconnecting...");
+    result = mqtt_disconnect(mqtt);
+    if (result != MQTT_STATUS_OK) {
+        LOG("Could not disconnect");
+        return 1;
+    }
+    return 0;
+}
diff --git a/tests/connect_subscribe.c b/tests/connect_subscribe.c
new file mode 100644
index 0000000..2c87cc1
--- /dev/null
+++ b/tests/connect_subscribe.c
@@ -0,0 +1,53 @@
+#include <stdio.h>
+#include <unistd.h>
+
+#include "mqtt.h"
+
+#define LOG(fmt, ...) fprintf(stdout, fmt "\n", ## __VA_ARGS__)
+
+bool err_handler(MQTTHandle *handle, MQTTErrorCode error) {
+    LOG("Error received: %d", error);
+    
+    return 1;
+}
+
+bool leave = false;
+void callback(MQTTHandle *handle, char *topic, char *payload) {
+    LOG("Received publish: %s -> %s", topic, payload);
+    leave = true;
+}
+
+int main(int argc, char **argv) {
+    MQTTConfig config;
+
+    config.client_id = "libmqtt_testsuite";
+    config.hostname = "test.mosquitto.org";
+    config.port = 1883;
+
+    LOG("Trying to connect to test.mosquitto.org");
+    MQTTHandle *mqtt = mqtt_connect(&config, err_handler);
+
+    if (mqtt == NULL) {
+        LOG("Connection failed!");
+        return 1;
+    }
+    LOG("Connected!");
+    
+    sleep(5);
+
+    LOG("Trying subscribe on testsuite/mqtt/test...");
+    MQTTStatus result = mqtt_subscribe(mqtt, "testsuite/mqtt/test", callback);
+    if (result != MQTT_STATUS_OK) {
+        LOG("Could not publish");
+        return 1;
+    }
+
+    while (!leave) {
+        LOG("Waiting...");
+        sleep(1);
+    }
+
+    LOG("Disconnecting...");
+    mqtt_disconnect(mqtt);
+    return 0;
+}