Implement server->client QoS

Re #20
This commit is contained in:
Johannes Schriewer 2018-08-06 01:46:58 +02:00
parent 790ddec60e
commit 65f8dd49eb
9 changed files with 240 additions and 8 deletions

View file

@ -133,6 +133,26 @@ add_executable (connect_subscribe.test tests/connect_subscribe.c)
)
add_test(NAME ConnectSubscribe COMMAND ${PROJECT_BINARY_DIR}/connect_subscribe.test)
add_executable (connect_subscribe_qos1.test tests/connect_subscribe_qos1.c)
target_link_libraries (connect_subscribe_qos1.test mqtt-full ${PLATFORM_LIBS})
target_include_directories(connect_subscribe_qos1.test
PRIVATE
${PROJECT_BINARY_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/src
${CMAKE_CURRENT_SOURCE_DIR}/platform
)
add_test(NAME ConnectSubscribeQos1 COMMAND ${PROJECT_BINARY_DIR}/connect_subscribe_qos1.test)
add_executable (connect_subscribe_qos2.test tests/connect_subscribe_qos2.c)
target_link_libraries (connect_subscribe_qos2.test mqtt-full ${PLATFORM_LIBS})
target_include_directories(connect_subscribe_qos2.test
PRIVATE
${PROJECT_BINARY_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/src
${CMAKE_CURRENT_SOURCE_DIR}/platform
)
add_test(NAME ConnectSubscribeQos2 COMMAND ${PROJECT_BINARY_DIR}/connect_subscribe_qos2.test)
add_executable (connect_reconnect.test tests/connect_reconnect.c)
target_link_libraries (connect_reconnect.test mqtt-full ${PLATFORM_LIBS})
target_include_directories(connect_reconnect.test

View file

@ -12,14 +12,12 @@ MQTT library for multiple platforms including embedded targets.
- MQTT connection as a client is working
- All packet types are implemented
- Supports MQTT 3.1.1 (aka. protocol level 4)
- Platform support for linux is working
- Test have a coverage of about 92% (lines) and 97% (functions), untested code is just bail-out error handling for fatal errors (usually programming errors or network failure)
- Platform support for Linux and Windows is working
- Builds on Linux (GCC and Clang) and Windows (MSVC and Clang/c2)
## TODO
- [ ] QoS testing, server -> client does not
- [ ] Reconnect does not work correctly
- [ ] Running in MQTT Broker mode (very low prio)
- [ ] Implement Protocol level 3 (low prio)
- [ ] Implement Draft Protocol level 5

View file

@ -98,11 +98,28 @@ static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) {
}
break;
case PacketTypePublish:
case PacketTypePublish: {
// DEBUG_LOG("Publish on %s -> %s", ((PublishPayload *)packet->payload)->topic, ((PublishPayload *)packet->payload)->message);
dispatch_subscription(handle, (PublishPayload *)packet->payload);
// TODO: Handle QoS
PublishPayload *payload = (PublishPayload *)packet->payload;
switch (payload->qos) {
case MQTT_QOS_0:
dispatch_subscription(handle, payload);
break;
case MQTT_QOS_1:
if (send_puback_packet(handle, payload->packet_id)) {
DEBUG_LOG("Dispatching subscription...");
dispatch_subscription(handle, payload);
} else {
DEBUG_LOG("Error sending PubAck");
}
break;
case MQTT_QOS_2:
DEBUG_LOG("Sending PubRec packet");
send_pubrec_packet(handle, payload->packet_id, dispatch_subscription_direct, payload);
break;
}
break;
}
// just for keepalive, do not handle
case PacketTypePingResp:

View file

@ -52,6 +52,26 @@ void handle_pubrec(MQTTHandle *handle, void *context) {
send_buffer(handle, encoded);
}
void handle_pubrel(MQTTHandle *handle, void *context) {
PublishCallback *ctx = (PublishCallback *)context;
PubCompPayload newPayload = {
.packet_id = ctx->payload->packet_id
};
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePubComp, &newPayload });
encoded->position = 0;
if (send_buffer(handle, encoded)) {
if (ctx->callback) {
ctx->callback(handle, ctx->payload->topic, ctx->payload->message);
}
}
free(ctx->payload->topic);
free(ctx->payload->message);
free(ctx->payload);
}
/*
* packet constructors
*/
@ -190,3 +210,36 @@ bool send_disconnect_packet(MQTTHandle *handle) {
return send_buffer(handle, encoded);
}
#endif /* MQTT_CLIENT */
#if MQTT_CLIENT
bool send_puback_packet(MQTTHandle *handle, uint16_t packet_id) {
PacketIDPayload *payload = (PacketIDPayload *)calloc(1, sizeof(PacketIDPayload));
payload->packet_id = packet_id;
DEBUG_LOG("Sending PUBACK");
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePubAck, payload });
encoded->position = 0;
return send_buffer(handle, encoded);
}
#endif /* MQTT_CLIENT */
#if MQTT_CLIENT
bool send_pubrec_packet(MQTTHandle *handle, uint16_t packet_id, MQTTPublishEventHandler callback, PublishPayload *publish) {
PacketIDPayload *payload = (PacketIDPayload *)calloc(1, sizeof(PacketIDPayload));
payload->packet_id = packet_id;
PublishCallback *ctx = (PublishCallback *)malloc(sizeof(PublishCallback));
ctx->payload = malloc(sizeof(PublishPayload));
memcpy(ctx->payload, publish, sizeof(PublishPayload));
ctx->payload->topic = strdup(publish->topic);
ctx->payload->message = strdup(publish->message);
ctx->callback = callback;
ctx->qos = MQTT_QOS_2;
expect_packet(handle, PacketTypePubRel, packet_id, handle_pubrel, ctx);
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePubRec, payload });
encoded->position = 0;
return send_buffer(handle, encoded);
}
#endif /* MQTT_CLIENT */

View file

@ -9,6 +9,8 @@ bool send_subscribe_packet(MQTTHandle *handle, char *topic, MQTTQosLevel qos);
bool send_unsubscribe_packet(MQTTHandle *handle, char *topic);
bool send_ping_packet(MQTTHandle *handle);
bool send_disconnect_packet(MQTTHandle *handle);
bool send_puback_packet(MQTTHandle *handle, uint16_t packet_id);
bool send_pubrec_packet(MQTTHandle *handle, uint16_t packet_id, MQTTPublishEventHandler callback, PublishPayload *publish);
#endif /* MQTT_CLIENT */
bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos, MQTTPublishEventHandler callback);

View file

@ -54,8 +54,6 @@ void subscription_set_pending(MQTTHandle *handle, char *topic, bool pending) {
void dispatch_subscription(MQTTHandle *handle, PublishPayload *payload) {
SubscriptionItem *item = handle->subscriptions.items;
// TODO: Handle server Qos
while (item != NULL) {
if ((item->pending == false) && (strcmp(payload->topic, item->topic) == 0)) {
if (item->handler) {
@ -67,3 +65,18 @@ void dispatch_subscription(MQTTHandle *handle, PublishPayload *payload) {
item = item->next;
}
}
void dispatch_subscription_direct(MQTTHandle *handle, char *topic, char *message) {
SubscriptionItem *item = handle->subscriptions.items;
while (item != NULL) {
if ((item->pending == false) && (strcmp(topic, item->topic) == 0)) {
if (item->handler) {
item->handler(handle, topic, message);
}
break;
}
item = item->next;
}
}

View file

@ -21,5 +21,6 @@ void remove_subscription(MQTTHandle *handle, char *topic);
void subscription_set_pending(MQTTHandle *handle, char *topic, bool pending);
void dispatch_subscription(MQTTHandle *handle, PublishPayload *payload);
void dispatch_subscription_direct(MQTTHandle *handle, char *topic, char *message);
#endif /* subscription_h__included */

View file

@ -0,0 +1,64 @@
#include <stdio.h>
#include <stdlib.h>
#include "platform.h"
#include "mqtt.h"
bool leave = false;
#define LOG(fmt, ...) fprintf(stdout, fmt "\n", ## __VA_ARGS__)
bool err_handler(MQTTHandle *handle, MQTTConfig *config, MQTTErrorCode error) {
LOG("Error received: %d", error);
return 1;
}
void callback(MQTTHandle *handle, char *topic, char *payload) {
LOG("Received publish: %s -> %s", topic, payload);
MQTTStatus result = mqtt_unsubscribe(handle, "testsuite/mqtt/test_qos1");
if (result != MQTT_STATUS_OK) {
LOG("Could not unsubscribe test");
exit(1);
}
platform_sleep(1000);
leave = true;
}
void mqtt_connected(MQTTHandle *handle, void *context) {
LOG("Connected!");
LOG("Trying subscribe on testsuite/mqtt/test_qos1...");
MQTTStatus result = mqtt_subscribe(handle, "testsuite/mqtt/test_qos1", MQTT_QOS_1, callback);
if (result != MQTT_STATUS_OK) {
LOG("Could not subscribe test");
exit(1);
}
}
int main(int argc, char **argv) {
MQTTConfig config = { 0 };
config.client_id = "libmqtt_testsuite";
config.hostname = "localhost";
config.clean_session = true;
LOG("Trying to connect to %s...", config.hostname);
MQTTHandle *mqtt = mqtt_connect(&config, mqtt_connected, NULL, err_handler);
if (mqtt == NULL) {
LOG("Connection failed!");
return 1;
}
while (!leave) {
LOG("Waiting...");
platform_sleep(1000);
}
LOG("Disconnecting...");
mqtt_disconnect(mqtt, NULL, NULL);
}

View file

@ -0,0 +1,64 @@
#include <stdio.h>
#include <stdlib.h>
#include "platform.h"
#include "mqtt.h"
bool leave = false;
#define LOG(fmt, ...) fprintf(stdout, fmt "\n", ## __VA_ARGS__)
bool err_handler(MQTTHandle *handle, MQTTConfig *config, MQTTErrorCode error) {
LOG("Error received: %d", error);
return 1;
}
void callback(MQTTHandle *handle, char *topic, char *payload) {
LOG("Received publish: %s -> %s", topic, payload);
MQTTStatus result = mqtt_unsubscribe(handle, "testsuite/mqtt/test_qos2");
if (result != MQTT_STATUS_OK) {
LOG("Could not unsubscribe test");
exit(1);
}
platform_sleep(1000);
leave = true;
}
void mqtt_connected(MQTTHandle *handle, void *context) {
LOG("Connected!");
LOG("Trying subscribe on testsuite/mqtt/test_qos2...");
MQTTStatus result = mqtt_subscribe(handle, "testsuite/mqtt/test_qos2", MQTT_QOS_2, callback);
if (result != MQTT_STATUS_OK) {
LOG("Could not subscribe test");
exit(1);
}
}
int main(int argc, char **argv) {
MQTTConfig config = { 0 };
config.client_id = "libmqtt_testsuite";
config.hostname = "localhost";
config.clean_session = true;
LOG("Trying to connect to %s...", config.hostname);
MQTTHandle *mqtt = mqtt_connect(&config, mqtt_connected, NULL, err_handler);
if (mqtt == NULL) {
LOG("Connection failed!");
return 1;
}
while (!leave) {
LOG("Waiting...");
platform_sleep(1000);
}
LOG("Disconnecting...");
mqtt_disconnect(mqtt, NULL, NULL);
}