Add more callbacks to the API so we can subscribe after successful connection for example

This commit is contained in:
Johannes Schriewer 2018-07-30 02:56:05 +02:00
parent 5a17278cc9
commit 1d134a23c9
2 changed files with 60 additions and 55 deletions

View file

@ -23,17 +23,6 @@ static inline void disconnect(MQTTHandle *handle) {
// FIXME: Do we have to do anything else? // FIXME: Do we have to do anything else?
} }
static inline bool find_waiting(MQTTHandle *handle, MQTTPacket *packet) {
// TODO: Try to find a waiting task and call its callback
// TODO: Remove waiting task from queue
return false;
}
static inline void send_subscription(MQTTHandle *handle, PublishPayload *payload) {
// TODO: find subscriber and call the callback
}
static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) { static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) {
switch (packet->packet_type) { switch (packet->packet_type) {
case PacketTypeConnAck: case PacketTypeConnAck:
@ -43,13 +32,20 @@ static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) {
case PacketTypePubComp: case PacketTypePubComp:
case PacketTypeSubAck: case PacketTypeSubAck:
case PacketTypeUnsubAck: case PacketTypeUnsubAck:
case PacketTypePingResp: if (!dispatch_packet(handle, packet)) {
if (!find_waiting(handle, packet)) { DEBUG_LOG("Unexpected packet!");
disconnect(handle); disconnect(handle);
} }
break;
case PacketTypePublish: case PacketTypePublish:
send_subscription(handle, packet->payload); dispatch_subscription(handle, packet->payload);
// TODO: Handle QoS
break;
// just for keepalive, do not handle
case PacketTypePingResp:
break;
// client -> server, will not be handled in client // client -> server, will not be handled in client
case PacketTypeConnect: case PacketTypeConnect:
@ -57,18 +53,19 @@ static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) {
case PacketTypeUnsubscribe: case PacketTypeUnsubscribe:
case PacketTypePingReq: case PacketTypePingReq:
case PacketTypeDisconnect: case PacketTypeDisconnect:
DEBUG_LOG("Server packet on client connection? What's up with the broker?");
disconnect(handle); disconnect(handle);
break; break;
} }
} }
static void _reader(MQTTHandle *handle) { static void _reader(MQTTHandle *handle) {
int num_bytes; ssize_t num_bytes;
char *read_buffer = malloc(max_receive_buffer_size); char *read_buffer = malloc(max_receive_buffer_size);
uint8_t offset = 0; uint8_t offset = 0;
handle->reader_alive = true; handle->reader_alive = true;
while (1) { while (1) {
num_bytes = read(handle->sock, &read_buffer[offset], max_receive_buffer_size - offset); num_bytes = read(handle->sock, &read_buffer[offset], max_receive_buffer_size - offset);
if (num_bytes == 0) { if (num_bytes == 0) {
@ -78,13 +75,11 @@ static void _reader(MQTTHandle *handle) {
return; return;
} else if (num_bytes < 0) { } else if (num_bytes < 0) {
if ((errno == EINTR) || (errno == EAGAIN)) { if ((errno == EINTR) || (errno == EAGAIN)) {
DEBUG_LOG("Interrupted");
continue; continue;
} }
/* Set reader task to dead */ /* Set reader task to dead */
handle->reader_alive = false; handle->reader_alive = false;
DEBUG_LOG("Read error: %s", strerror(errno));
return; return;
} }
@ -92,13 +87,11 @@ static void _reader(MQTTHandle *handle) {
Buffer *buffer = buffer_from_data_no_copy(read_buffer, num_bytes); Buffer *buffer = buffer_from_data_no_copy(read_buffer, num_bytes);
MQTTPacket *packet = mqtt_packet_decode(buffer); MQTTPacket *packet = mqtt_packet_decode(buffer);
if (packet == NULL) { if (packet == NULL) {
DEBUG_LOG("Invalid packet");
// invalid packet // invalid packet
if (num_bytes < max_receive_buffer_size) { if (num_bytes < max_receive_buffer_size) {
// maybe not long enough, try to fetch the rest // maybe not long enough, try to fetch the rest
offset += num_bytes; offset += num_bytes;
free(buffer); free(buffer);
DEBUG_LOG("Trying to read more...");
break; break;
} else { } else {
// no space in buffer, bail and reconnect // no space in buffer, bail and reconnect
@ -109,12 +102,12 @@ static void _reader(MQTTHandle *handle) {
return; return;
} }
} else { } else {
DEBUG_LOG("Packet parsed"); // hexdump(buffer->data, num_bytes, 2);
parse_packet(handle, packet); parse_packet(handle, packet);
free_MQTTPacket(packet); free_MQTTPacket(packet);
if (!buffer_eof(buffer)) { if (!buffer_eof(buffer)) {
DEBUG_LOG("Residual buffer data");
// Not complete recv buffer was consumed, so we have more than one packet in there // Not complete recv buffer was consumed, so we have more than one packet in there
size_t remaining = max_receive_buffer_size - buffer->position; size_t remaining = max_receive_buffer_size - buffer->position;
memmove(read_buffer, read_buffer + buffer->position, remaining); memmove(read_buffer, read_buffer + buffer->position, remaining);
@ -122,7 +115,6 @@ static void _reader(MQTTHandle *handle) {
num_bytes -= remaining; num_bytes -= remaining;
free(buffer); free(buffer);
} else { } else {
DEBUG_LOG("Buffer consumed");
// buffer consumed completely, read another chunk // buffer consumed completely, read another chunk
offset = 0; offset = 0;
free(buffer); free(buffer);
@ -133,15 +125,15 @@ static void _reader(MQTTHandle *handle) {
} }
} }
static void _mqtt_connect(MQTTHandle *handle) { static void _mqtt_connect(MQTTHandle *handle, MQTTEventHandler callback, void *context) {
int ret; int ret;
struct sockaddr_in servaddr; struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr)); memset(&servaddr, 0, sizeof(servaddr));
handle->sock = socket(AF_INET, SOCK_STREAM, 0); handle->sock = socket(AF_INET, SOCK_STREAM, 0);
servaddr.sin_family = AF_INET; servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(handle->config->port); servaddr.sin_port = htons(handle->config->port);
char ip[40]; char ip[40];
if (!hostname_to_ip(handle->config->hostname, ip)) { if (!hostname_to_ip(handle->config->hostname, ip)) {
bool free_handle = handle->error_handler(handle, MQTT_Error_Host_Not_Found); bool free_handle = handle->error_handler(handle, MQTT_Error_Host_Not_Found);
@ -176,18 +168,20 @@ static void _mqtt_connect(MQTTHandle *handle) {
run_read_task(handle, _reader); run_read_task(handle, _reader);
expect_packet(handle, PacketTypeConnAck, 0, callback, context);
bool result = send_connect_packet(handle); bool result = send_connect_packet(handle);
if (result == false) { if (result == false) {
DEBUG_LOG("Sending connect packet failed, running error handler");
bool free_handle = handle->error_handler(handle, MQTT_Error_Broker_Disconnected); bool free_handle = handle->error_handler(handle, MQTT_Error_Broker_Disconnected);
if (free_handle) { if (free_handle) {
mqtt_free(handle); mqtt_free(handle);
} }
DEBUG_LOG("Sending connect packet failed...");
close(handle->sock); close(handle->sock);
} }
} }
MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback) { MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTEventHandler callback, void *context, MQTTErrorHandler error_callback) {
MQTTHandle *handle = calloc(sizeof(struct _MQTTHandle), 1); MQTTHandle *handle = calloc(sizeof(struct _MQTTHandle), 1);
initialize_platform(handle); initialize_platform(handle);
@ -196,32 +190,37 @@ MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback) {
} }
handle->config = config; handle->config = config;
handle->error_handler = callback; handle->error_handler = error_callback;
_mqtt_connect(handle); _mqtt_connect(handle, callback, context);
return handle; return handle;
} }
MQTTStatus mqtt_reconnect(MQTTHandle *handle) { MQTTStatus mqtt_reconnect(MQTTHandle *handle, MQTTEventHandler callback, void *context) {
if (handle->reader_alive) { if (handle->reader_alive) {
DEBUG_LOG("Closing old connection"); DEBUG_LOG("Closing old connection");
close(handle->sock); close(handle->sock);
join_read_task(handle); join_read_task(handle);
} }
_mqtt_connect(handle);
// TODO: re-submit unacknowledged packages with QoS > 0
// TODO: clear waiting packets
// TODO: re-subscribe all subscriptions
_mqtt_connect(handle, callback, context);
return MQTT_STATUS_OK; return MQTT_STATUS_OK;
} }
MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTEventHandler callback) { MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback) {
if (!handle->reader_alive) { if (!handle->reader_alive) {
handle->error_handler(handle, MQTT_Error_Connection_Reset); handle->error_handler(handle, MQTT_Error_Connection_Reset);
return MQTT_STATUS_ERROR; return MQTT_STATUS_ERROR;
} }
add_subscription(handle, topic, callback);
return (send_subscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); return (send_subscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR);
// TODO: add subscription to list
} }
MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) { MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) {
@ -229,8 +228,8 @@ MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) {
handle->error_handler(handle, MQTT_Error_Connection_Reset); handle->error_handler(handle, MQTT_Error_Connection_Reset);
return MQTT_STATUS_ERROR; return MQTT_STATUS_ERROR;
} }
remove_subscription(handle, topic);
return (send_unsubscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); return (send_unsubscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR);
// TODO: remove subscription from list
} }
MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level) { MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level) {
@ -241,13 +240,16 @@ MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosL
return (send_publish_packet(handle, topic, payload, qos_level) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR); return (send_publish_packet(handle, topic, payload, qos_level) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR);
} }
MQTTStatus mqtt_disconnect(MQTTHandle *handle) { MQTTStatus mqtt_disconnect(MQTTHandle *handle, MQTTEventHandler callback, void *callback_context) {
DEBUG_LOG("Disconnecting...") send_disconnect_packet(handle);
if (close(handle->sock)) { if (close(handle->sock)) {
return MQTT_STATUS_ERROR; return MQTT_STATUS_ERROR;
} }
join_read_task(handle); join_read_task(handle);
mqtt_free(handle); mqtt_free(handle);
if (callback) {
callback(NULL, callback_context);
}
return MQTT_STATUS_OK; return MQTT_STATUS_OK;
} }

View file

@ -37,54 +37,57 @@ typedef enum {
} MQTTErrorCode; } MQTTErrorCode;
/** Error handler callback /** Error handler callback
* *
* Return true if the handle should be freed, false to keep it * Return true if the handle should be freed, false to keep it
*/ */
typedef bool (*MQTTErrorHandler)(MQTTHandle *handle, MQTTErrorCode code); typedef bool (*MQTTErrorHandler)(MQTTHandle *handle, MQTTErrorCode code);
/** Event handler callback */ /** Event handler callback */
typedef void (*MQTTEventHandler)(MQTTHandle *handle, char *topic, char *payload); typedef void (*MQTTEventHandler)(MQTTHandle *handle, void *context);
/** publish event callback */
typedef void (*MQTTPublishEventHandler)(MQTTHandle *handle, char *topic, char *payload);
/** /**
* Connect to MQTT broker * Connect to MQTT broker
* *
* @param config: MQTT configuration * @param config: MQTT configuration
* @param callback: Callback function to call on errors * @param callback: Callback function to call on errors
* @returns handle to mqtt connection or NULL on error * @returns handle to mqtt connection or NULL on error
* *
* If the error handler is called with Host not found or Connection refused, * If the error handler is called with Host not found or Connection refused,
* the handler is in charge of freeing the handle by returning true * the handler is in charge of freeing the handle by returning true
* or re-trying by changing settings and calling mqtt_reconnect() and returning false * or re-trying by changing settings and calling mqtt_reconnect() and returning false
*/ */
MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback); MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTEventHandler callback, void *callback_context, MQTTErrorHandler error_callback);
/** /**
* Re-Connect to MQTT broker * Re-Connect to MQTT broker
* *
* Usually called in the MQTTErrorHandler callback, if called on a working * Usually called in the MQTTErrorHandler callback, if called on a working
* connection the connection will be disconnected before reconnecting. * connection the connection will be disconnected before reconnecting.
* *
* If there were registered subscriptions they will be re-instated after * If there were registered subscriptions they will be re-instated after
* a successful reconnect. * a successful reconnect.
* *
* @param handle: MQTT Handle from `mqtt_connect` * @param handle: MQTT Handle from `mqtt_connect`
* @returns: Status code * @returns: Status code
*/ */
MQTTStatus mqtt_reconnect(MQTTHandle *handle); MQTTStatus mqtt_reconnect(MQTTHandle *handle, MQTTEventHandler callback, void *callback_context);
/** /**
* Subscribe to a topic * Subscribe to a topic
* *
* @param handle: MQTT Handle from `mqtt_connect` * @param handle: MQTT Handle from `mqtt_connect`
* @param topic: Topic to subscribe * @param topic: Topic to subscribe
* @param callback: Callback function to call when receiving something for that topic * @param callback: Callback function to call when receiving something for that topic
* @returns: Status code * @returns: Status code
*/ */
MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTEventHandler callback); MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTPublishEventHandler callback);
/** /**
* Un-Subscribe from a topic * Un-Subscribe from a topic
* *
* @param handle: MQTT Handle from `mqtt_connect` * @param handle: MQTT Handle from `mqtt_connect`
* @param topic: Topic to unsubscribe * @param topic: Topic to unsubscribe
* @returns: Status code * @returns: Status code
@ -93,7 +96,7 @@ MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic);
/** /**
* Publish something to the broker * Publish something to the broker
* *
* @param handle: MQTT Handle from `mqtt_connect` * @param handle: MQTT Handle from `mqtt_connect`
* @param topic: Topic to publish to * @param topic: Topic to publish to
* @param payload: Message payload to publish * @param payload: Message payload to publish
@ -103,13 +106,13 @@ MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosL
/** /**
* Disconnect from MQTT broker * Disconnect from MQTT broker
* *
* @param handle: MQTT Handle from `mqtt_connect` * @param handle: MQTT Handle from `mqtt_connect`
* @returns: Status code * @returns: Status code
* *
* @attention: do not use the handle after calling this function, * @attention: do not use the handle after calling this function,
* all resources will be freed, this handle is now invalid! * all resources will be freed, this handle is now invalid!
*/ */
MQTTStatus mqtt_disconnect(MQTTHandle *handle); MQTTStatus mqtt_disconnect(MQTTHandle *handle, MQTTEventHandler callback, void *callback_context);
#endif /* mqtt_h__included */ #endif /* mqtt_h__included */