diff --git a/src/protocol.c b/src/protocol.c index 04c4d4f..8d40a28 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -8,20 +8,14 @@ #include "debug.h" static bool send_buffer(MQTTHandle *handle, Buffer *buffer) { - DEBUG_LOG("Attempting to send:"); - buffer_hexdump(buffer, 2); - - while (buffer_free_space(buffer) > 0) { + while (!buffer_eof(buffer)) { ssize_t bytes = write(handle->sock, buffer->data + buffer->position, buffer_free_space(buffer)); if (bytes <= 0) { - DEBUG_LOG("write error, %s", strerror(errno)); buffer_release(buffer); return false; } - DEBUG_LOG("Wrote %zu bytes...", bytes); buffer->position += bytes; } - DEBUG_LOG("Buffer written..."); buffer_release(buffer); return true; } @@ -41,27 +35,36 @@ bool send_connect_packet(MQTTHandle *handle) { payload->username = handle->config->username; payload->password = handle->config->password; - + Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeConnect, payload }); free(payload); - // TODO: add waiting ConnAck to queue + // ConnAck waiting packet added to queue from _mqtt_connect encoded->position = 0; return send_buffer(handle, encoded); } +void remove_pending(MQTTHandle *handle, void *context) { + SubscribePayload *payload = (SubscribePayload *)context; + + subscription_set_pending(handle, payload->topic, false); + + free(payload->topic); + free(payload); +} + bool send_subscribe_packet(MQTTHandle *handle, char *topic, MQTTQosLevel qos) { SubscribePayload *payload = calloc(1, sizeof(SubscribePayload)); payload->packet_id = handle->packet_id_counter++; - payload->topic = topic; + payload->topic = strdup(topic); payload->qos = qos; Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeSubscribe, payload }); - free(payload); - // TODO: add waiting SubAck to queue + // add waiting for SubAck to queue + expect_packet(handle, PacketTypeSubAck, payload->packet_id, remove_pending, payload); encoded->position = 0; return send_buffer(handle, encoded); @@ -74,13 +77,31 @@ bool send_unsubscribe_packet(MQTTHandle *handle, char *topic) { payload->topic = "test/topic"; Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeUnsubscribe, payload }); + + // add waiting for UnsubAck to queue + expect_packet(handle, PacketTypeUnsubAck, payload->packet_id, NULL, NULL); free(payload); - // TODO: add waiting UnsubAck to queue encoded->position = 0; return send_buffer(handle, encoded); } +void handle_pubrec(MQTTHandle *handle, void *context) { + PublishPayload *payload = (PublishPayload *)context; + + PubRelPayload newPayload = { + .packet_id = payload->packet_id + }; + + Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePubRel, &newPayload }); + + expect_packet(handle, PacketTypePubComp, payload->packet_id, NULL, NULL); + free(payload); + + encoded->position = 0; + send_buffer(handle, encoded); +} + bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos) { PublishPayload *payload = calloc(1, sizeof(PublishPayload)); @@ -91,10 +112,28 @@ bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQos payload->message = message; Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePublish, payload }); - free(payload); - // TODO: Handle QoS and add waiting packets to queue + // Handle QoS and add waiting packets to queue + switch(payload->qos) { + case MQTT_QOS_0: + // fire and forget + free(payload); + break; + case MQTT_QOS_1: + expect_packet(handle, PacketTypePubAck, payload->packet_id, NULL, NULL); + free(payload); + break; + case MQTT_QOS_2: + expect_packet(handle, PacketTypePubRec, payload->packet_id, handle_pubrec, payload); + break; + } encoded->position = 0; return send_buffer(handle, encoded); } + +bool send_disconnect_packet(MQTTHandle *handle) { + Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeDisconnect, NULL }); + encoded->position = 0; + return send_buffer(handle, encoded); +} diff --git a/src/protocol.h b/src/protocol.h index aaf3267..a83710b 100644 --- a/src/protocol.h +++ b/src/protocol.h @@ -7,5 +7,6 @@ bool send_connect_packet(MQTTHandle *handle); bool send_subscribe_packet(MQTTHandle *handle, char *topic); bool send_unsubscribe_packet(MQTTHandle *handle, char *topic); bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos); +bool send_disconnect_packet(MQTTHandle *handle); #endif