Start implementation of protocol statemachine
This commit is contained in:
parent
8938d6ae18
commit
db273fc8a4
6 changed files with 424 additions and 31 deletions
223
src/mqtt.c
223
src/mqtt.c
|
@ -10,89 +10,244 @@
|
||||||
#include "mqtt.h"
|
#include "mqtt.h"
|
||||||
#include "mqtt_internal.h"
|
#include "mqtt_internal.h"
|
||||||
#include "platform.h"
|
#include "platform.h"
|
||||||
|
#include "protocol.h"
|
||||||
|
#include "debug.h"
|
||||||
|
|
||||||
#define BUF_LEN MAX_BUFFER_SIZE
|
static inline void mqtt_free(MQTTHandle *handle) {
|
||||||
|
release_platform(handle);
|
||||||
|
free(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void disconnect(MQTTHandle *handle) {
|
||||||
|
close(handle->sock);
|
||||||
|
// FIXME: Do we have to do anything else?
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline bool find_waiting(MQTTHandle *handle, MQTTPacket *packet) {
|
||||||
|
// TODO: Try to find a waiting task and call its callback
|
||||||
|
// TODO: Remove waiting task from queue
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void send_subscription(MQTTHandle *handle, PublishPayload *payload) {
|
||||||
|
// TODO: find subscriber and call the callback
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline void parse_packet(MQTTHandle *handle, MQTTPacket *packet) {
|
||||||
|
switch (packet->packet_type) {
|
||||||
|
case PacketTypeConnAck:
|
||||||
|
case PacketTypePubAck:
|
||||||
|
case PacketTypePubRec:
|
||||||
|
case PacketTypePubRel:
|
||||||
|
case PacketTypePubComp:
|
||||||
|
case PacketTypeSubAck:
|
||||||
|
case PacketTypeUnsubAck:
|
||||||
|
case PacketTypePingResp:
|
||||||
|
if (!find_waiting(handle, packet)) {
|
||||||
|
disconnect(handle);
|
||||||
|
}
|
||||||
|
|
||||||
|
case PacketTypePublish:
|
||||||
|
send_subscription(handle, packet->payload);
|
||||||
|
|
||||||
|
// client -> server, will not be handled in client
|
||||||
|
case PacketTypeConnect:
|
||||||
|
case PacketTypeSubscribe:
|
||||||
|
case PacketTypeUnsubscribe:
|
||||||
|
case PacketTypePingReq:
|
||||||
|
case PacketTypeDisconnect:
|
||||||
|
disconnect(handle);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void _reader(MQTTHandle *handle) {
|
static void _reader(MQTTHandle *handle) {
|
||||||
int num_bytes;
|
int num_bytes;
|
||||||
char buffer[BUF_LEN];
|
char *read_buffer = malloc(max_receive_buffer_size);
|
||||||
|
uint8_t offset = 0;
|
||||||
|
|
||||||
handle->reader_alive = true;
|
handle->reader_alive = true;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
num_bytes = read(handle->sock, &buffer, BUF_LEN);
|
num_bytes = read(handle->sock, &read_buffer[offset], max_receive_buffer_size - offset);
|
||||||
if (num_bytes == 0) {
|
if (num_bytes == 0) {
|
||||||
/* Socket closed, coordinated shutdown */
|
/* Socket closed, coordinated shutdown */
|
||||||
|
DEBUG_LOG("Socket closed");
|
||||||
handle->reader_alive = false;
|
handle->reader_alive = false;
|
||||||
return;
|
return;
|
||||||
} else if (num_bytes < 0) {
|
} else if (num_bytes < 0) {
|
||||||
if ((errno == EINTR) || (errno == EAGAIN)) {
|
if ((errno == EINTR) || (errno == EAGAIN)) {
|
||||||
|
DEBUG_LOG("Interrupted");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Set reader task to dead */
|
/* Set reader task to dead */
|
||||||
handle->reader_alive = false;
|
handle->reader_alive = false;
|
||||||
|
DEBUG_LOG("Read error: %s", strerror(errno));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Parse and dispatch
|
while (1) {
|
||||||
|
Buffer *buffer = buffer_from_data_no_copy(read_buffer, num_bytes);
|
||||||
|
MQTTPacket *packet = mqtt_packet_decode(buffer);
|
||||||
|
if (packet == NULL) {
|
||||||
|
DEBUG_LOG("Invalid packet");
|
||||||
|
// invalid packet
|
||||||
|
if (num_bytes < max_receive_buffer_size) {
|
||||||
|
// maybe not long enough, try to fetch the rest
|
||||||
|
offset += num_bytes;
|
||||||
|
free(buffer);
|
||||||
|
DEBUG_LOG("Trying to read more...");
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
// no space in buffer, bail and reconnect
|
||||||
|
DEBUG_LOG("Buffer overflow!");
|
||||||
|
disconnect(handle);
|
||||||
|
handle->reader_alive = false;
|
||||||
|
free(buffer);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
DEBUG_LOG("Packet parsed");
|
||||||
|
parse_packet(handle, packet);
|
||||||
|
free_MQTTPacket(packet);
|
||||||
|
|
||||||
|
if (!buffer_eof(buffer)) {
|
||||||
|
DEBUG_LOG("Residual buffer data");
|
||||||
|
// Not complete recv buffer was consumed, so we have more than one packet in there
|
||||||
|
size_t remaining = max_receive_buffer_size - buffer->position;
|
||||||
|
memmove(read_buffer, read_buffer + buffer->position, remaining);
|
||||||
|
offset -= remaining;
|
||||||
|
num_bytes -= remaining;
|
||||||
|
free(buffer);
|
||||||
|
} else {
|
||||||
|
DEBUG_LOG("Buffer consumed");
|
||||||
|
// buffer consumed completely, read another chunk
|
||||||
|
offset = 0;
|
||||||
|
free(buffer);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void _mqtt_connect(MQTTHandle *handle) {
|
||||||
|
int ret;
|
||||||
|
struct sockaddr_in servaddr;
|
||||||
|
memset(&servaddr, 0, sizeof(servaddr));
|
||||||
|
|
||||||
|
handle->sock = socket(AF_INET, SOCK_STREAM, 0);
|
||||||
|
servaddr.sin_family = AF_INET;
|
||||||
|
servaddr.sin_port = htons(handle->config->port);
|
||||||
|
|
||||||
|
char ip[40];
|
||||||
|
if (!hostname_to_ip(handle->config->hostname, ip)) {
|
||||||
|
bool free_handle = handle->error_handler(handle, MQTT_Error_Host_Not_Found);
|
||||||
|
if (free_handle) {
|
||||||
|
mqtt_free(handle);
|
||||||
|
}
|
||||||
|
DEBUG_LOG("Resolving hostname failed: %s", strerror(errno));
|
||||||
|
close(handle->sock);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ret = inet_pton(AF_INET, ip, &(servaddr.sin_addr));
|
||||||
|
if (ret == 0) {
|
||||||
|
bool free_handle = handle->error_handler(handle, MQTT_Error_Host_Not_Found);
|
||||||
|
if (free_handle) {
|
||||||
|
mqtt_free(handle);
|
||||||
|
}
|
||||||
|
DEBUG_LOG("Converting to servaddr failed: %s", strerror(errno));
|
||||||
|
close(handle->sock);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = connect(handle->sock, (struct sockaddr *)&servaddr, sizeof(servaddr));
|
||||||
|
if (ret != 0) {
|
||||||
|
bool free_handle = handle->error_handler(handle, MQTT_Error_Connection_Refused);
|
||||||
|
if (free_handle) {
|
||||||
|
mqtt_free(handle);
|
||||||
|
}
|
||||||
|
DEBUG_LOG("Connection failed: %s", strerror(errno));
|
||||||
|
close(handle->sock);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
run_read_task(handle, _reader);
|
||||||
|
|
||||||
|
bool result = send_connect_packet(handle);
|
||||||
|
if (result == false) {
|
||||||
|
bool free_handle = handle->error_handler(handle, MQTT_Error_Broker_Disconnected);
|
||||||
|
if (free_handle) {
|
||||||
|
mqtt_free(handle);
|
||||||
|
}
|
||||||
|
DEBUG_LOG("Sending connect packet failed...");
|
||||||
|
close(handle->sock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback) {
|
MQTTHandle *mqtt_connect(MQTTConfig *config, MQTTErrorHandler callback) {
|
||||||
int ret;
|
|
||||||
MQTTHandle *handle = calloc(sizeof(struct _MQTTHandle), 1);
|
MQTTHandle *handle = calloc(sizeof(struct _MQTTHandle), 1);
|
||||||
initialize_platform(handle);
|
initialize_platform(handle);
|
||||||
|
|
||||||
struct sockaddr_in servaddr;
|
|
||||||
memset(&servaddr, 0, sizeof(servaddr));
|
|
||||||
|
|
||||||
if (config->port == 0) {
|
if (config->port == 0) {
|
||||||
config->port = 1883;
|
config->port = 1883;
|
||||||
}
|
}
|
||||||
|
|
||||||
handle->sock = socket(AF_INET, SOCK_STREAM, 0);
|
handle->config = config;
|
||||||
servaddr.sin_family = AF_INET;
|
handle->error_handler = callback;
|
||||||
servaddr.sin_port = htons(config->port);
|
|
||||||
|
|
||||||
ret = inet_pton(AF_INET, config->hostname, &(servaddr.sin_addr));
|
_mqtt_connect(handle);
|
||||||
if (ret == 0) {
|
|
||||||
callback(handle, MQTT_Error_Host_Not_Found);
|
|
||||||
close(handle->sock);
|
|
||||||
free(handle);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
ret = connect(handle->sock, (struct sockaddr *)&servaddr, sizeof(servaddr));
|
|
||||||
if (ret != 0) {
|
|
||||||
callback(handle, MQTT_Error_Connection_Refused);
|
|
||||||
close(handle->sock);
|
|
||||||
free(handle);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
run_read_task(handle, _reader);
|
|
||||||
|
|
||||||
return handle;
|
return handle;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
MQTTStatus mqtt_reconnect(MQTTHandle *handle) {
|
MQTTStatus mqtt_reconnect(MQTTHandle *handle) {
|
||||||
// TODO: reconnect
|
if (handle->reader_alive) {
|
||||||
|
DEBUG_LOG("Closing old connection");
|
||||||
|
close(handle->sock);
|
||||||
|
join_read_task(handle);
|
||||||
|
}
|
||||||
|
_mqtt_connect(handle);
|
||||||
|
|
||||||
|
return MQTT_STATUS_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTEventHandler callback) {
|
MQTTStatus mqtt_subscribe(MQTTHandle *handle, char *topic, MQTTEventHandler callback) {
|
||||||
// TODO: subscribe
|
if (!handle->reader_alive) {
|
||||||
|
handle->error_handler(handle, MQTT_Error_Connection_Reset);
|
||||||
|
return MQTT_STATUS_ERROR;
|
||||||
|
}
|
||||||
|
return (send_subscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR);
|
||||||
|
// TODO: add subscription to list
|
||||||
}
|
}
|
||||||
|
|
||||||
MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) {
|
MQTTStatus mqtt_unsubscribe(MQTTHandle *handle, char *topic) {
|
||||||
// TODO: unsubscribe
|
if (!handle->reader_alive) {
|
||||||
|
handle->error_handler(handle, MQTT_Error_Connection_Reset);
|
||||||
|
return MQTT_STATUS_ERROR;
|
||||||
|
}
|
||||||
|
return (send_unsubscribe_packet(handle, topic) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR);
|
||||||
|
// TODO: remove subscription from list
|
||||||
}
|
}
|
||||||
|
|
||||||
MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level) {
|
MQTTStatus mqtt_publish(MQTTHandle *handle, char *topic, char *payload, MQTTQosLevel qos_level) {
|
||||||
// TODO: publish
|
if (!handle->reader_alive) {
|
||||||
|
handle->error_handler(handle, MQTT_Error_Connection_Reset);
|
||||||
|
return MQTT_STATUS_ERROR;
|
||||||
|
}
|
||||||
|
return (send_publish_packet(handle, topic, payload, qos_level) ? MQTT_STATUS_OK : MQTT_STATUS_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
MQTTStatus mqtt_disconnect(MQTTHandle *handle) {
|
MQTTStatus mqtt_disconnect(MQTTHandle *handle) {
|
||||||
release_platform(handle);
|
DEBUG_LOG("Disconnecting...")
|
||||||
free(handle);
|
if (close(handle->sock)) {
|
||||||
|
return MQTT_STATUS_ERROR;
|
||||||
|
}
|
||||||
|
join_read_task(handle);
|
||||||
|
mqtt_free(handle);
|
||||||
|
|
||||||
|
return MQTT_STATUS_OK;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
#define mqtt_internal_h__included
|
#define mqtt_internal_h__included
|
||||||
|
|
||||||
#include "mqtt.h"
|
#include "mqtt.h"
|
||||||
|
#include "packet.h"
|
||||||
|
|
||||||
typedef struct _PlatformData PlatformData;
|
typedef struct _PlatformData PlatformData;
|
||||||
|
|
||||||
|
@ -9,17 +10,39 @@ typedef struct {
|
||||||
char *topic;
|
char *topic;
|
||||||
MQTTEventHandler *handler;
|
MQTTEventHandler *handler;
|
||||||
bool pending;
|
bool pending;
|
||||||
|
} SubscriptionItem;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SubscriptionItem *items;
|
||||||
|
uint8_t num_items;
|
||||||
} Subscriptions;
|
} Subscriptions;
|
||||||
|
|
||||||
|
typedef void (*MQTTCallback)(MQTTHandle *handle, MQTTPacket *packet, void *context);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
MQTTControlPacketType type;
|
||||||
|
uint16_t packet_id;
|
||||||
|
void *context;
|
||||||
|
MQTTCallback callback;
|
||||||
|
} MQTTCallbackQueueItem;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
MQTTCallbackQueueItem *pending;
|
||||||
|
uint8_t num_items;
|
||||||
|
} MQTTCallbackQueue;
|
||||||
|
|
||||||
struct _MQTTHandle {
|
struct _MQTTHandle {
|
||||||
MQTTErrorHandler *errorHandler;
|
MQTTConfig *config;
|
||||||
Subscriptions *subscriptions;
|
|
||||||
int num_subscriptions;
|
MQTTErrorHandler error_handler;
|
||||||
|
Subscriptions subscriptions;
|
||||||
|
|
||||||
int sock;
|
int sock;
|
||||||
bool reader_alive;
|
bool reader_alive;
|
||||||
|
|
||||||
// TODO: status queue (Waiting for ACK)
|
uint16_t packet_id_counter;
|
||||||
|
|
||||||
|
MQTTCallbackQueue queue;
|
||||||
PlatformData *platform;
|
PlatformData *platform;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
100
src/protocol.c
100
src/protocol.c
|
@ -0,0 +1,100 @@
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
|
||||||
|
#include "mqtt_internal.h"
|
||||||
|
#include "packet.h"
|
||||||
|
#include "buffer.h"
|
||||||
|
|
||||||
|
#include "debug.h"
|
||||||
|
|
||||||
|
static bool send_buffer(MQTTHandle *handle, Buffer *buffer) {
|
||||||
|
DEBUG_LOG("Attempting to send:");
|
||||||
|
buffer_hexdump(buffer, 2);
|
||||||
|
|
||||||
|
while (buffer_free_space(buffer) > 0) {
|
||||||
|
ssize_t bytes = write(handle->sock, buffer->data + buffer->position, buffer_free_space(buffer));
|
||||||
|
if (bytes <= 0) {
|
||||||
|
DEBUG_LOG("write error, %s", strerror(errno));
|
||||||
|
buffer_release(buffer);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
DEBUG_LOG("Wrote %zu bytes...", bytes);
|
||||||
|
buffer->position += bytes;
|
||||||
|
}
|
||||||
|
DEBUG_LOG("Buffer written...");
|
||||||
|
buffer_release(buffer);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool send_connect_packet(MQTTHandle *handle) {
|
||||||
|
ConnectPayload *payload = calloc(1, sizeof(ConnectPayload));
|
||||||
|
|
||||||
|
payload->client_id = handle->config->client_id;
|
||||||
|
payload->protocol_level = 4;
|
||||||
|
payload->keepalive_interval = 60;
|
||||||
|
|
||||||
|
// TODO: support last will
|
||||||
|
// payload->will_topic = "test/lastwill";
|
||||||
|
// payload->will_message = "disconnected";
|
||||||
|
// payload->will_qos = MQTT_QOS_1;
|
||||||
|
// payload->retain_will = true;
|
||||||
|
|
||||||
|
payload->username = handle->config->username;
|
||||||
|
payload->password = handle->config->password;
|
||||||
|
|
||||||
|
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeConnect, payload });
|
||||||
|
free(payload);
|
||||||
|
|
||||||
|
// TODO: add waiting ConnAck to queue
|
||||||
|
|
||||||
|
encoded->position = 0;
|
||||||
|
return send_buffer(handle, encoded);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool send_subscribe_packet(MQTTHandle *handle, char *topic, MQTTQosLevel qos) {
|
||||||
|
SubscribePayload *payload = calloc(1, sizeof(SubscribePayload));
|
||||||
|
|
||||||
|
payload->packet_id = handle->packet_id_counter++;
|
||||||
|
payload->topic = topic;
|
||||||
|
payload->qos = qos;
|
||||||
|
|
||||||
|
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeSubscribe, payload });
|
||||||
|
free(payload);
|
||||||
|
|
||||||
|
// TODO: add waiting SubAck to queue
|
||||||
|
|
||||||
|
encoded->position = 0;
|
||||||
|
return send_buffer(handle, encoded);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool send_unsubscribe_packet(MQTTHandle *handle, char *topic) {
|
||||||
|
UnsubscribePayload *payload = calloc(1, sizeof(UnsubscribePayload));
|
||||||
|
|
||||||
|
payload->packet_id = 10;
|
||||||
|
payload->topic = "test/topic";
|
||||||
|
|
||||||
|
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypeUnsubscribe, payload });
|
||||||
|
free(payload);
|
||||||
|
|
||||||
|
// TODO: add waiting UnsubAck to queue
|
||||||
|
encoded->position = 0;
|
||||||
|
return send_buffer(handle, encoded);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos) {
|
||||||
|
PublishPayload *payload = calloc(1, sizeof(PublishPayload));
|
||||||
|
|
||||||
|
payload->qos = qos;
|
||||||
|
payload->retain = true;
|
||||||
|
payload->topic = topic;
|
||||||
|
payload->packet_id = handle->packet_id_counter++;
|
||||||
|
payload->message = message;
|
||||||
|
|
||||||
|
Buffer *encoded = mqtt_packet_encode(&(MQTTPacket){ PacketTypePublish, payload });
|
||||||
|
free(payload);
|
||||||
|
|
||||||
|
// TODO: Handle QoS and add waiting packets to queue
|
||||||
|
|
||||||
|
encoded->position = 0;
|
||||||
|
return send_buffer(handle, encoded);
|
||||||
|
}
|
|
@ -0,0 +1,11 @@
|
||||||
|
#ifndef protocol_h__included
|
||||||
|
#define protocol_h__included
|
||||||
|
|
||||||
|
#include "mqtt.h"
|
||||||
|
|
||||||
|
bool send_connect_packet(MQTTHandle *handle);
|
||||||
|
bool send_subscribe_packet(MQTTHandle *handle, char *topic);
|
||||||
|
bool send_unsubscribe_packet(MQTTHandle *handle, char *topic);
|
||||||
|
bool send_publish_packet(MQTTHandle *handle, char *topic, char *message, MQTTQosLevel qos);
|
||||||
|
|
||||||
|
#endif
|
51
tests/connect_publish.c
Normal file
51
tests/connect_publish.c
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include "mqtt.h"
|
||||||
|
|
||||||
|
#define LOG(fmt, ...) fprintf(stdout, fmt "\n", ## __VA_ARGS__)
|
||||||
|
|
||||||
|
bool err_handler(MQTTHandle *handle, MQTTErrorCode error) {
|
||||||
|
LOG("Error received: %d", error);
|
||||||
|
exit(1);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char **argv) {
|
||||||
|
MQTTConfig config = { 0 };
|
||||||
|
|
||||||
|
config.client_id = "libmqtt_testsuite";
|
||||||
|
config.hostname = "localhost";
|
||||||
|
config.port = 1883;
|
||||||
|
|
||||||
|
LOG("Trying to connect to %s", config.hostname);
|
||||||
|
MQTTHandle *mqtt = mqtt_connect(&config, err_handler);
|
||||||
|
|
||||||
|
if (mqtt == NULL) {
|
||||||
|
LOG("Connection failed!");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG("Connected!");
|
||||||
|
|
||||||
|
sleep(5);
|
||||||
|
|
||||||
|
LOG("Trying publish to testsuite/mqtt/test...");
|
||||||
|
MQTTStatus result = mqtt_publish(mqtt, "testsuite/mqtt/test", "payload", MQTT_QOS_0);
|
||||||
|
if (result != MQTT_STATUS_OK) {
|
||||||
|
LOG("Could not publish");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
sleep(5);
|
||||||
|
|
||||||
|
LOG("Disconnecting...");
|
||||||
|
result = mqtt_disconnect(mqtt);
|
||||||
|
if (result != MQTT_STATUS_OK) {
|
||||||
|
LOG("Could not disconnect");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
53
tests/connect_subscribe.c
Normal file
53
tests/connect_subscribe.c
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include "mqtt.h"
|
||||||
|
|
||||||
|
#define LOG(fmt, ...) fprintf(stdout, fmt "\n", ## __VA_ARGS__)
|
||||||
|
|
||||||
|
bool err_handler(MQTTHandle *handle, MQTTErrorCode error) {
|
||||||
|
LOG("Error received: %d", error);
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool leave = false;
|
||||||
|
void callback(MQTTHandle *handle, char *topic, char *payload) {
|
||||||
|
LOG("Received publish: %s -> %s", topic, payload);
|
||||||
|
leave = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char **argv) {
|
||||||
|
MQTTConfig config;
|
||||||
|
|
||||||
|
config.client_id = "libmqtt_testsuite";
|
||||||
|
config.hostname = "test.mosquitto.org";
|
||||||
|
config.port = 1883;
|
||||||
|
|
||||||
|
LOG("Trying to connect to test.mosquitto.org");
|
||||||
|
MQTTHandle *mqtt = mqtt_connect(&config, err_handler);
|
||||||
|
|
||||||
|
if (mqtt == NULL) {
|
||||||
|
LOG("Connection failed!");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
LOG("Connected!");
|
||||||
|
|
||||||
|
sleep(5);
|
||||||
|
|
||||||
|
LOG("Trying subscribe on testsuite/mqtt/test...");
|
||||||
|
MQTTStatus result = mqtt_subscribe(mqtt, "testsuite/mqtt/test", callback);
|
||||||
|
if (result != MQTT_STATUS_OK) {
|
||||||
|
LOG("Could not publish");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (!leave) {
|
||||||
|
LOG("Waiting...");
|
||||||
|
sleep(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG("Disconnecting...");
|
||||||
|
mqtt_disconnect(mqtt);
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in a new issue