From 1d134a23c9144209e32991e75f5a15b2cb8f0bd2 Mon Sep 17 00:00:00 2001 From: Johannes Schriewer Date: Mon, 30 Jul 2018 02:56:05 +0200 Subject: [PATCH] Add more callbacks to the API so we can subscribe after successful connection for example --- src/mqtt.c | 80 ++++++++++++++++++++++++++++-------------------------- src/mqtt.h | 35 +++++++++++++----------- 2 files changed, 60 insertions(+), 55 deletions(-) diff --git a/src/mqtt.c b/src/mqtt.c index 02c2767..fd579e7 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -23,17 +23,6 @@ static inline void disconnect(MQTTHandle *handle) { // FIXME: Do we have to do anything else? } -static inline bool find_waiting(MQTTHandle *handle, MQTTPacket *packet) { - // TODO: Try to find a waiting task and call its callback - // TODO: Remove waiting task from queue - - return false; -} - -static inline void send_subscription(MQTTHandle *handle, PublishPayload *payload) { - // TODO: find subscriber and call the callback -} - static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) { switch (packet->packet_type) { case PacketTypeConnAck: @@ -43,13 +32,20 @@ static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) { case PacketTypePubComp: case PacketTypeSubAck: case PacketTypeUnsubAck: - case PacketTypePingResp: - if (!find_waiting(handle, packet)) { + if (!dispatch_packet(handle, packet)) { + DEBUG_LOG("Unexpected packet!"); disconnect(handle); } + break; case PacketTypePublish: - send_subscription(handle, packet->payload); + dispatch_subscription(handle, packet->payload); + // TODO: Handle QoS + break; + + // just for keepalive, do not handle + case PacketTypePingResp: + break; // client -> server, will not be handled in client case PacketTypeConnect: @@ -57,18 +53,19 @@ static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) { case PacketTypeUnsubscribe: case PacketTypePingReq: case PacketTypeDisconnect: + DEBUG_LOG("Server packet on client connection? What's up with the broker?"); disconnect(handle); break; } } static void _reader(MQTTHandle *handle) { - int num_bytes; + ssize_t num_bytes; char *read_buffer = malloc(max_receive_buffer_size); uint8_t offset = 0; handle->reader_alive = true; - + while (1) { num_bytes = read(handle->sock, &read_buffer[offset], max_receive_buffer_size - offset); if (num_bytes == 0) { @@ -78,13 +75,11 @@ static void _reader(MQTTHandle *handle) { return; } else if (num_bytes < 0) { if ((errno == EINTR) || (errno == EAGAIN)) { - DEBUG_LOG("Interrupted"); continue; } /* Set reader task to dead */ handle->reader_alive = false; - DEBUG_LOG("Read error: %s", strerror(errno)); return; } @@ -92,13 +87,11 @@ static void _reader(MQTTHandle *handle) { Buffer *buffer = buffer_from_data_no_copy(read_buffer, num_bytes); MQTTPacket *packet = mqtt_packet_decode(buffer); if (packet == NULL) { - DEBUG_LOG("Invalid packet"); // invalid packet if (num_bytes < max_receive_buffer_size) { // maybe not long enough, try to fetch the rest offset += num_bytes; free(buffer); - DEBUG_LOG("Trying to read more..."); break; } else { // no space in buffer, bail and reconnect @@ -109,12 +102,12 @@ static void _reader(MQTTHandle *handle) { return; } } else { - DEBUG_LOG("Packet parsed"); + // hexdump(buffer->data, num_bytes, 2); + parse_packet(handle, packet); free_MQTTPacket(packet); - + if (!buffer_eof(buffer)) { - DEBUG_LOG("Residual buffer data"); // Not complete recv buffer was consumed, so we have more than one packet in there size_t remaining = max_receive_buffer_size - buffer->position; memmove(read_buffer, read_buffer + buffer->position, remaining); @@ -122,7 +115,6 @@ static void _reader(MQTTHandle *handle) { num_bytes -= remaining; free(buffer); } else { - DEBUG_LOG("Buffer consumed"); // buffer consumed completely, read another chunk offset = 0; free(buffer); @@ -133,15 +125,15 @@ static void _reader(MQTTHandle *handle) { } } -static void _mqtt_connect(MQTTHandle *handle) { +static void _mqtt_connect(MQTTHandle *handle, MQTTEventHandler callback, void *context) { int ret; struct sockaddr_in servaddr; memset(&servaddr, 0, sizeof(servaddr)); - - handle->sock = socket(AF_INET, SOCK_STREAM, 0); + + handle->sock = socket(AF_INET, SOCK_STREAM, 0); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(handle->config->port); - + char ip[40]; if (!hostname_to_ip(handle->config->hostname, ip)) { bool free_handle = handle->error_handler(handle, MQTT_Error_Host_Not_Found); @@ -176,18 +168,20 @@ static void _mqtt_connect(MQTTHandle *handle) { run_read_task(handle, _reader); + expect_packet(handle, PacketTypeConnAck, 0, callback, context); + bool result = send_connect_packet(handle); if (result == false) { + DEBUG_LOG("Sending connect packet failed, running error handler"); bool free_handle = handle->error_handler(handle, MQTT_Error_Broker_Disconnected); if (free_handle) { mqtt_free(handle); } - DEBUG_LOG("Sending connect packet failed..."); close(handle->sock); } } -MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback) { +MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTEventHandler callback, void *context, MQTTErrorHandler error_callback) { MQTTHandle *handle = calloc(sizeof(struct _MQTTHandle), 1); initialize_platform(handle); @@ -196,32 +190,37 @@ MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback) { } handle->config = config; - handle->error_handler = callback; + handle->error_handler = error_callback; - _mqtt_connect(handle); + _mqtt_connect(handle, callback, context); return handle; } -MQTTStatus mqtt_reconnect(MQTTHandle *handle) { +MQTTStatus mqtt_reconnect(MQTTHandle *handle, MQTTEventHandler callback, void *context) { if (handle->reader_alive) { DEBUG_LOG("Closing old connection"); close(handle->sock); join_read_task(handle); } - _mqtt_connect(handle); + + // TODO: re-submit unacknowledged packages with QoS > 0 + // TODO: clear waiting packets + // TODO: re-subscribe all subscriptions + + _mqtt_connect(handle, callback, context); return MQTT_STATUS_OK; } -MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTEventHandler callback) { +MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback) { if (!handle->reader_alive) { handle->error_handler(handle, MQTT_Error_Connection_Reset); return MQTT_STATUS_ERROR; } + add_subscription(handle, topic, callback); return (send_subscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); - // TODO: add subscription to list } MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) { @@ -229,8 +228,8 @@ MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) { handle->error_handler(handle, MQTT_Error_Connection_Reset); return MQTT_STATUS_ERROR; } + remove_subscription(handle, topic); return (send_unsubscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); - // TODO: remove subscription from list } MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level) { @@ -241,13 +240,16 @@ MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosL return (send_publish_packet(handle, topic, payload, qos_level) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); } -MQTTStatus mqtt_disconnect(MQTTHandle *handle) { - DEBUG_LOG("Disconnecting...") +MQTTStatus mqtt_disconnect(MQTTHandle *handle, MQTTEventHandler callback, void *callback_context) { + send_disconnect_packet(handle); if (close(handle->sock)) { return MQTT_STATUS_ERROR; } join_read_task(handle); mqtt_free(handle); + if (callback) { + callback(NULL, callback_context); + } return MQTT_STATUS_OK; } diff --git a/src/mqtt.h b/src/mqtt.h index 51e536c..9b6e645 100644 --- a/src/mqtt.h +++ b/src/mqtt.h @@ -37,54 +37,57 @@ typedef enum { } MQTTErrorCode; /** Error handler callback - * + * * Return true if the handle should be freed, false to keep it */ typedef bool (*MQTTErrorHandler)(MQTTHandle *handle, MQTTErrorCode code); /** Event handler callback */ -typedef void (*MQTTEventHandler)(MQTTHandle *handle, char *topic, char *payload); +typedef void (*MQTTEventHandler)(MQTTHandle *handle, void *context); + +/** publish event callback */ +typedef void (*MQTTPublishEventHandler)(MQTTHandle *handle, char *topic, char *payload); /** * Connect to MQTT broker - * + * * @param config: MQTT configuration * @param callback: Callback function to call on errors * @returns handle to mqtt connection or NULL on error - * + * * If the error handler is called with Host not found or Connection refused, * the handler is in charge of freeing the handle by returning true * or re-trying by changing settings and calling mqtt_reconnect() and returning false */ -MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback); +MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTEventHandler callback, void *callback_context, MQTTErrorHandler error_callback); /** * Re-Connect to MQTT broker - * + * * Usually called in the MQTTErrorHandler callback, if called on a working * connection the connection will be disconnected before reconnecting. - * + * * If there were registered subscriptions they will be re-instated after * a successful reconnect. - * + * * @param handle: MQTT Handle from `mqtt_connect` * @returns: Status code */ -MQTTStatus mqtt_reconnect(MQTTHandle *handle); +MQTTStatus mqtt_reconnect(MQTTHandle *handle, MQTTEventHandler callback, void *callback_context); /** * Subscribe to a topic - * + * * @param handle: MQTT Handle from `mqtt_connect` * @param topic: Topic to subscribe * @param callback: Callback function to call when receiving something for that topic * @returns: Status code */ -MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTEventHandler callback); +MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback); /** * Un-Subscribe from a topic - * + * * @param handle: MQTT Handle from `mqtt_connect` * @param topic: Topic to unsubscribe * @returns: Status code @@ -93,7 +96,7 @@ MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic); /** * Publish something to the broker - * + * * @param handle: MQTT Handle from `mqtt_connect` * @param topic: Topic to publish to * @param payload: Message payload to publish @@ -103,13 +106,13 @@ MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosL /** * Disconnect from MQTT broker - * + * * @param handle: MQTT Handle from `mqtt_connect` * @returns: Status code - * + * * @attention: do not use the handle after calling this function, * all resources will be freed, this handle is now invalid! */ -MQTTStatus mqtt_disconnect(MQTTHandle *handle); +MQTTStatus mqtt_disconnect(MQTTHandle *handle, MQTTEventHandler callback, void *callback_context); #endif /* mqtt_h__included */