QoS handling and packet expectations in protocol implementation

This commit is contained in:
Johannes Schriewer 2018-07-30 02:55:23 +02:00
parent 31cf6d965d
commit 5a17278cc9
2 changed files with 55 additions and 15 deletions

View file

@ -8,20 +8,14 @@
#include "debug.h" #include "debug.h"
static bool send_buffer(MQTTHandle *handle, Buffer *buffer) { static bool send_buffer(MQTTHandle *handle, Buffer *buffer) {
DEBUG_LOG("Attempting to send:"); while (!buffer_eof(buffer)) {
buffer_hexdump(buffer, 2);
while (buffer_free_space(buffer) > 0) {
ssize_t bytes = write(handle->sock, buffer->data + buffer->position, buffer_free_space(buffer)); ssize_t bytes = write(handle->sock, buffer->data + buffer->position, buffer_free_space(buffer));
if (bytes <= 0) { if (bytes <= 0) {
DEBUG_LOG("write error, %s", strerror(errno));
buffer_release(buffer); buffer_release(buffer);
return false; return false;
} }
DEBUG_LOG("Wrote %zu bytes...", bytes);
buffer->position += bytes; buffer->position += bytes;
} }
DEBUG_LOG("Buffer written...");
buffer_release(buffer); buffer_release(buffer);
return true; return true;
} }
@ -41,27 +35,36 @@ bool send_connect_packet(MQTTHandle *handle) {
payload->username = handle->config->username; payload->username = handle->config->username;
payload->password = handle->config->password; payload->password = handle->config->password;
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeConnect, payload }); Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeConnect, payload });
free(payload); free(payload);
// TODO: add waiting ConnAck to queue // ConnAck waiting packet added to queue from _mqtt_connect
encoded->position = 0; encoded->position = 0;
return send_buffer(handle, encoded); return send_buffer(handle, encoded);
} }
void remove_pending(MQTTHandle *handle, void *context) {
SubscribePayload *payload = (SubscribePayload *)context;
subscription_set_pending(handle, payload->topic, false);
free(payload->topic);
free(payload);
}
bool send_subscribe_packet(MQTTHandle *handle, char *topic, MQTTQosLevel qos) { bool send_subscribe_packet(MQTTHandle *handle, char *topic, MQTTQosLevel qos) {
SubscribePayload *payload = calloc(1, sizeof(SubscribePayload)); SubscribePayload *payload = calloc(1, sizeof(SubscribePayload));
payload->packet_id = handle->packet_id_counter++; payload->packet_id = handle->packet_id_counter++;
payload->topic = topic; payload->topic = strdup(topic);
payload->qos = qos; payload->qos = qos;
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeSubscribe, payload }); Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeSubscribe, payload });
free(payload);
// TODO: add waiting SubAck to queue // add waiting for SubAck to queue
expect_packet(handle, PacketTypeSubAck, payload->packet_id, remove_pending, payload);
encoded->position = 0; encoded->position = 0;
return send_buffer(handle, encoded); return send_buffer(handle, encoded);
@ -74,13 +77,31 @@ bool send_unsubscribe_packet(MQTTHandle *handle, char *topic) {
payload->topic = "test/topic"; payload->topic = "test/topic";
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeUnsubscribe, payload }); Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeUnsubscribe, payload });
// add waiting for UnsubAck to queue
expect_packet(handle, PacketTypeUnsubAck, payload->packet_id, NULL, NULL);
free(payload); free(payload);
// TODO: add waiting UnsubAck to queue
encoded->position = 0; encoded->position = 0;
return send_buffer(handle, encoded); return send_buffer(handle, encoded);
} }
void handle_pubrec(MQTTHandle *handle, void *context) {
PublishPayload *payload = (PublishPayload *)context;
PubRelPayload newPayload = {
.packet_id = payload->packet_id
};
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePubRel, &newPayload });
expect_packet(handle, PacketTypePubComp, payload->packet_id, NULL, NULL);
free(payload);
encoded->position = 0;
send_buffer(handle, encoded);
}
bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos) { bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos) {
PublishPayload *payload = calloc(1, sizeof(PublishPayload)); PublishPayload *payload = calloc(1, sizeof(PublishPayload));
@ -91,10 +112,28 @@ bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQos
payload->message = message; payload->message = message;
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePublish, payload }); Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePublish, payload });
free(payload);
// TODO: Handle QoS and add waiting packets to queue // Handle QoS and add waiting packets to queue
switch(payload->qos) {
case MQTT_QOS_0:
// fire and forget
free(payload);
break;
case MQTT_QOS_1:
expect_packet(handle, PacketTypePubAck, payload->packet_id, NULL, NULL);
free(payload);
break;
case MQTT_QOS_2:
expect_packet(handle, PacketTypePubRec, payload->packet_id, handle_pubrec, payload);
break;
}
encoded->position = 0; encoded->position = 0;
return send_buffer(handle, encoded); return send_buffer(handle, encoded);
} }
bool send_disconnect_packet(MQTTHandle *handle) {
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeDisconnect, NULL });
encoded->position = 0;
return send_buffer(handle, encoded);
}

View file

@ -7,5 +7,6 @@ bool send_connect_packet(MQTTHandle *handle);
bool send_subscribe_packet(MQTTHandle *handle, char *topic); bool send_subscribe_packet(MQTTHandle *handle, char *topic);
bool send_unsubscribe_packet(MQTTHandle *handle, char *topic); bool send_unsubscribe_packet(MQTTHandle *handle, char *topic);
bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos); bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos);
bool send_disconnect_packet(MQTTHandle *handle);
#endif #endif