From ccd0ea3bf8642601e3ad8187a046b0da29fefc92 Mon Sep 17 00:00:00 2001 From: Johannes Schriewer Date: Sun, 5 Aug 2018 02:28:38 +0200 Subject: [PATCH] Port to MSVC/Windows The encode and decode packet tests do not work currently as the compiler does not like the test harnish. connect - publish and connect - subscribe work though and seem to not crash for now. It is possible that the mqtt.dll is not working correctly right now, possibly the exports are not right. Re #21 --- CMakeLists.txt | 25 ++- platform/windows.c | 341 ++++++++++++++++++++++++++++++++++++++ src/mqtt.c | 3 - src/mqtt.h.in | 1 + src/protocol.c | 1 - tests/connect_publish.c | 18 +- tests/connect_subscribe.c | 16 +- tests/test.h | 7 +- 8 files changed, 378 insertions(+), 34 deletions(-) create mode 100644 platform/windows.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 074098f..a5a4954 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,13 +17,29 @@ configure_file ( # # Build flags # -set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -Wall -Wextra -fprofile-arcs -ftest-coverage -O0 -pthread -DDEBUG=1") -set(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} -Wall -Os -pthread") -set(CMAKE_EXE_LINKER_FLAGS_DEBUG "-lgcov") +if (UNIX) + set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -Wall -Wextra -fprofile-arcs -ftest-coverage -O0 -pthread -DDEBUG=1") + set(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} -Wall -Os -pthread") + set(CMAKE_EXE_LINKER_FLAGS_DEBUG "-lgcov") +endif() # UNIX + +if (MSVC) + set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -DDEBUG=1 -DMSVC") + set(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} -DMSVC") +endif() # # Platform abstraction # +if (MSVC) + set(PLATFORM_CODE + platform/windows.c + platform/platform.h + ) + find_package(Threads REQUIRED) + set(PLATFORM_LIBS wsock32 ws2_32 ${CMAKE_THREAD_LIBS_INIT}) +endif() + if (UNIX) set(PLATFORM_CODE platform/linux.c @@ -84,6 +100,7 @@ add_library(mqtt SHARED ${mqtt-source}) VERSION "${LIBMQTT_VERSION_MAJOR}.${LIBMQTT_VERSION_MINOR}" SOVERSION ${LIBMQTT_VERSION_MAJOR} ) + target_link_libraries(mqtt ${PLATFORM_LIBS}) target_include_directories(mqtt PUBLIC $ @@ -100,6 +117,7 @@ add_executable (connect_publish.test tests/connect_publish.c) PRIVATE ${PROJECT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/src + ${CMAKE_CURRENT_SOURCE_DIR}/platform ) add_test(NAME ConnectPublish COMMAND ${PROJECT_BINARY_DIR}/connect_publish.test) @@ -109,6 +127,7 @@ add_executable (connect_subscribe.test tests/connect_subscribe.c) PRIVATE ${PROJECT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/src + ${CMAKE_CURRENT_SOURCE_DIR}/platform ) add_test(NAME ConnectSubscribe COMMAND ${PROJECT_BINARY_DIR}/connect_subscribe.test) diff --git a/platform/windows.c b/platform/windows.c new file mode 100644 index 0000000..0113b34 --- /dev/null +++ b/platform/windows.c @@ -0,0 +1,341 @@ +#include +//#include +#include +#include +#include +#include +#include + +#include + +#include "debug.h" +#include "mqtt_internal.h" +#include "platform.h" + +const size_t max_receive_buffer_size = 4 * 4096; // 16 KB + +#define MAX_TASKS 16 +#define MAX_TIMERS 5 + +typedef struct { + PlatformTimerCallback callback; + int status; + int interval; + +} PlatformTimer; + +struct _PlatformData { + HANDLE tasks[MAX_TASKS]; + PlatformTimer timers[MAX_TIMERS]; + WSADATA wsa; + SOCKET sock; + int timer_task; +}; + +void *timer_task(MQTTHandle *handle) { + while (1) { + platform_sleep(1000); + + bool active = false; + for (uint8_t i = 0; i < MAX_TIMERS; i++) { + PlatformTimer *timer = &handle->platform->timers[i]; + + if (timer->callback != NULL) { + timer->status--; + + if (timer->status == 0) { + timer->callback(handle, i); + timer->status = timer->interval; + } + + active = true; + } + } + + if (!active) { + return NULL; + } + } +} + +PlatformStatusCode platform_init(MQTTHandle *handle) { + handle->platform = (PlatformData *)calloc(1, sizeof(struct _PlatformData)); + handle->platform->sock = -1; + handle->platform->timer_task = -1; + if (!handle->platform) { + return PlatformStatusError; + } + + if (WSAStartup(MAKEWORD(2,2),&handle->platform->wsa) != 0) { + DEBUG_LOG("Winsock init failed. Error Code : %d", WSAGetLastError()); + return PlatformStatusError; + } + + return PlatformStatusOk; +} + +PlatformStatusCode platform_release(MQTTHandle *handle) { + PlatformData *p = handle->platform; + + // shut down all timers + if (p->timer_task >= 0) { + for (uint8_t free_timer = 0; free_timer < MAX_TIMERS; free_timer++) { + PlatformStatusCode ret = platform_destroy_timer(handle, free_timer); + if (ret != PlatformStatusOk) { + DEBUG_LOG("Could not shut down all timers"); + return PlatformStatusError; + } + } + } + + // check if there are tasks running + for (uint8_t free_task = 0; free_task < MAX_TASKS; free_task++) { + if (p->tasks[free_task] != 0) { + DEBUG_LOG("Cannot free platform handle, there are tasks running!"); + return PlatformStatusError; + } + } + + free(handle->platform); + return PlatformStatusOk; +} + +PlatformStatusCode platform_run_task(MQTTHandle *handle, int *task_handle, PlatformTask callback) { + PlatformData *p = handle->platform; + uint8_t free_task = 0; + + for (free_task = 0; free_task < MAX_TASKS; free_task++) { + if (p->tasks[free_task] == 0) { + break; + } + } + if (free_task == MAX_TASKS) { + return PlatformStatusError; + } + + DWORD thread_id; + p->tasks[free_task] = CreateThread( + NULL, // default security attributes + 0, // use default stack size + callback, // thread function name + handle, // argument to thread function + 0, // use default creation flags + &thread_id); // returns the thread identifier + + if (p->tasks[free_task] == NULL) { + return PlatformStatusError; + } + + *task_handle = free_task; + return PlatformStatusOk; +} + +PlatformStatusCode platform_cleanup_task(MQTTHandle *handle, int task_handle) { + PlatformData *p = handle->platform; + + if ((task_handle < 0) || (task_handle >= MAX_TASKS)) { + return PlatformStatusError; + } + + if (p->tasks[task_handle]) { + HANDLE wait[] = { p->tasks[task_handle] }; + WaitForMultipleObjects(1, wait, TRUE, INFINITE); + CloseHandle(p->tasks[task_handle]); + p->tasks[task_handle] = 0; + } + return PlatformStatusOk; +} + +PlatformStatusCode platform_resolve_host(char *hostname , char *ip) { + struct in_addr **addr_list; + int i; + + struct hostent *he = gethostbyname(hostname); + if (he == NULL) { + DEBUG_LOG("Resolving host failed: %d", WSAGetLastError()); + return PlatformStatusError; + } + + addr_list = (struct in_addr **) he->h_addr_list; + + for(i = 0; addr_list[i] != NULL; i++) { + strcpy(ip , inet_ntoa(*addr_list[i])); + break; + } + + return PlatformStatusOk; +} + +PlatformStatusCode platform_connect(MQTTHandle *handle) { + PlatformData *p = handle->platform; + + int ret; + struct sockaddr_in servaddr; + memset(&servaddr, 0, sizeof(servaddr)); + + p->sock = socket(AF_INET, SOCK_STREAM, 0); + if (p->sock == INVALID_SOCKET) { + bool free_handle = handle->error_handler(handle, handle->config, MQTT_Error_Internal); + if (free_handle) { + mqtt_free(handle); + } + DEBUG_LOG("Resolving hostname failed: %s", strerror(errno)); + return PlatformStatusError; + } + servaddr.sin_family = AF_INET; + servaddr.sin_port = htons(handle->config->port); + + char ip[40]; + if (platform_resolve_host(handle->config->hostname, ip) != PlatformStatusOk) { + bool free_handle = handle->error_handler(handle, handle->config, MQTT_Error_Host_Not_Found); + if (free_handle) { + mqtt_free(handle); + } + DEBUG_LOG("Resolving hostname failed: %s", strerror(errno)); + closesocket(p->sock); + return PlatformStatusError; + } + servaddr.sin_addr.s_addr = inet_addr(ip); + + ret = connect(p->sock, (struct sockaddr *)&servaddr, sizeof(servaddr)); + if (ret != 0) { + bool free_handle = handle->error_handler(handle, handle->config, MQTT_Error_Connection_Refused); + if (free_handle) { + mqtt_free(handle); + } + DEBUG_LOG("Connection failed: %s", strerror(errno)); + closesocket(p->sock); + return PlatformStatusError; + } + + return PlatformStatusOk; +} + +PlatformStatusCode platform_read(MQTTHandle *handle, Buffer *buffer) { + PlatformData *p = handle->platform; + + while (1) { + int num_bytes = recv(p->sock, &buffer->data[buffer->position], buffer_free_space(buffer), 0); + if (num_bytes == 0) { + /* Socket closed, coordinated shutdown */ + DEBUG_LOG("Socket closed"); + return PlatformStatusError; + } else if (num_bytes < 0) { + if ((errno == EINTR) || (errno == EAGAIN)) { + continue; + } + + /* Some error occured */ + return PlatformStatusError; + } + + buffer->position += num_bytes; + return PlatformStatusOk; + } +} + +PlatformStatusCode platform_write(MQTTHandle *handle, Buffer *buffer) { + PlatformData *p = handle->platform; + + while (!buffer_eof(buffer)) { + int bytes = send(p->sock, buffer->data + buffer->position, buffer_free_space(buffer), 0); + if (bytes <= 0) { + return PlatformStatusError; + } + buffer->position += bytes; + } + + return PlatformStatusOk; +} + +PlatformStatusCode platform_disconnect(MQTTHandle *handle) { + PlatformData *p = handle->platform; + if (p->sock >= 0) { + closesocket(p->sock); + WSACleanup(); + p->sock = -1; + } + + return PlatformStatusOk; +} + +PlatformStatusCode platform_create_timer(MQTTHandle *handle, int interval, int *timer_handle, PlatformTimerCallback callback) { + PlatformData *p = handle->platform; + uint8_t free_timer = 0; + + for (free_timer = 0; free_timer < MAX_TIMERS; free_timer++) { + DEBUG_LOG("Timer %d: %s", free_timer, p->timers[free_timer].callback ? "Occupied" : "Free"); + if (p->timers[free_timer].callback == NULL) { + break; + } + } + if (free_timer == MAX_TASKS) { + return PlatformStatusError; + } + + PlatformTimer *timer = &p->timers[free_timer]; + + timer->callback = callback; + timer->status = interval; + timer->interval = interval; + + if (p->timer_task < 0) { + PlatformStatusCode ret = platform_run_task(handle, &p->timer_task, timer_task); + if (ret != PlatformStatusOk) { + DEBUG_LOG("Could not start timer task"); + return PlatformStatusError; + } + } + + *timer_handle = free_timer; + return PlatformStatusOk; +} + +PlatformStatusCode platform_destroy_timer(MQTTHandle *handle, int timer_handle) { + PlatformData *p = handle->platform; + + if ((timer_handle < 0) || (timer_handle >= MAX_TIMERS)) { + DEBUG_LOG("Invalid timer handle"); + return PlatformStatusError; + } + + p->timers[timer_handle].callback = NULL; + + + // check if there is a timer running + uint8_t free_timer = 0; + + for (free_timer = 0; free_timer < MAX_TIMERS; free_timer++) { + if (p->timers[free_timer].callback != NULL) { + break; + } + } + if ((free_timer == MAX_TIMERS) && (p->timer_task >= 0)) { + // if we get here we have no running timers, so we destroy the timer task + PlatformStatusCode ret = platform_cleanup_task(handle, p->timer_task); + if (ret == PlatformStatusOk) { + p->timer_task = -1; + } else { + DEBUG_LOG("Could not finish timer task"); + return PlatformStatusError; + } + } + + return PlatformStatusOk; +} + + +PlatformStatusCode platform_sleep(int milliseconds) { + HANDLE timer; + LARGE_INTEGER ft; + + // Convert to 100 nanosecond interval, negative value indicates relative time + ft.QuadPart = -(10 * milliseconds * 1000); + + timer = CreateWaitableTimer(NULL, TRUE, NULL); + SetWaitableTimer(timer, &ft, 0, NULL, NULL, 0); + WaitForSingleObject(timer, INFINITE); + CloseHandle(timer); + + return PlatformStatusOk; +} \ No newline at end of file diff --git a/src/mqtt.c b/src/mqtt.c index 495bc74..5cc6956 100644 --- a/src/mqtt.c +++ b/src/mqtt.c @@ -1,8 +1,5 @@ -#include #include #include -#include -#include #include #include #include diff --git a/src/mqtt.h.in b/src/mqtt.h.in index 88c63d5..363078b 100644 --- a/src/mqtt.h.in +++ b/src/mqtt.h.in @@ -50,6 +50,7 @@ typedef enum { } MQTTQosLevel; typedef enum { + MQTT_Error_Internal, /**< Internal error */ MQTT_Error_Host_Not_Found, /**< Host could not be resolved */ MQTT_Error_Connection_Refused, /**< Connection was refused, wrong port? */ MQTT_Error_Broker_Disconnected, /**< Broker went down, perhaps restart? */ diff --git a/src/protocol.c b/src/protocol.c index 3295072..b4cac7f 100644 --- a/src/protocol.c +++ b/src/protocol.c @@ -1,4 +1,3 @@ -#include #include #include diff --git a/tests/connect_publish.c b/tests/connect_publish.c index 3df16d0..b013901 100644 --- a/tests/connect_publish.c +++ b/tests/connect_publish.c @@ -1,31 +1,27 @@ #include #include -#include +#include "platform.h" #include "mqtt.h" int leave = 0; #define LOG(fmt, ...) fprintf(stdout, fmt "\n", ## __VA_ARGS__) -#ifndef _unused -#define _unused __attribute__((unused)) -#endif - -bool err_handler(_unused MQTTHandle *handle, _unused MQTTConfig *config, MQTTErrorCode error) { +bool err_handler(MQTTHandle *handle, MQTTConfig *config, MQTTErrorCode error) { LOG("Error received: %d", error); exit(1); return true; } -void publish_handler(_unused MQTTHandle *handle, char *topic, char *message) { +void publish_handler(MQTTHandle *handle, char *topic, char *message) { LOG("Published %s -> %s", topic, message); leave++; } -void mqtt_connected(MQTTHandle *handle, _unused void *context) { +void mqtt_connected(MQTTHandle *handle, void *context) { LOG("Connected!"); MQTTStatus result; @@ -53,7 +49,7 @@ void mqtt_connected(MQTTHandle *handle, _unused void *context) { leave = true; } -int main(_unused int argc, _unused char **argv) { +int main(int argc, char **argv) { MQTTConfig config = { 0 }; config.client_id = "libmqtt_testsuite_this_is_too_long"; @@ -81,7 +77,7 @@ int main(_unused int argc, _unused char **argv) { int cancel = 0; while (leave < 3) { LOG("Waiting..."); - sleep(1); + platform_sleep(1000); cancel++; if (cancel == 10) { break; @@ -89,7 +85,7 @@ int main(_unused int argc, _unused char **argv) { } LOG("Waiting for ping to happen..."); - sleep(5); + platform_sleep(5000); LOG("Disconnecting..."); MQTTStatus result = mqtt_disconnect(mqtt, NULL, NULL); diff --git a/tests/connect_subscribe.c b/tests/connect_subscribe.c index 354e88d..44c6aac 100644 --- a/tests/connect_subscribe.c +++ b/tests/connect_subscribe.c @@ -1,18 +1,14 @@ #include #include -#include +#include "platform.h" #include "mqtt.h" bool leave = false; #define LOG(fmt, ...) fprintf(stdout, fmt "\n", ## __VA_ARGS__) -#ifndef _unused -#define _unused __attribute__((unused)) -#endif - -bool err_handler(_unused MQTTHandle *handle, _unused MQTTConfig *config, MQTTErrorCode error) { +bool err_handler(MQTTHandle *handle, MQTTConfig *config, MQTTErrorCode error) { LOG("Error received: %d", error); return 1; @@ -32,12 +28,12 @@ void callback(MQTTHandle *handle, char *topic, char *payload) { LOG("Could not unsubscribe test 2"); exit(1); } - sleep(1); + platform_sleep(1000); leave = true; } -void mqtt_connected(MQTTHandle *handle, _unused void *context) { +void mqtt_connected(MQTTHandle *handle, void *context) { LOG("Connected!"); LOG("Trying subscribe on testsuite/mqtt/test..."); @@ -54,7 +50,7 @@ void mqtt_connected(MQTTHandle *handle, _unused void *context) { } } -int main(_unused int argc, _unused char **argv) { +int main(int argc, char **argv) { MQTTConfig config = { 0 }; config.client_id = "libmqtt_testsuite"; @@ -71,7 +67,7 @@ int main(_unused int argc, _unused char **argv) { while (!leave) { LOG("Waiting..."); - sleep(1); + platform_sleep(1000); } LOG("Disconnecting..."); diff --git a/tests/test.h b/tests/test.h index 9bfa45a..2f5a61c 100644 --- a/tests/test.h +++ b/tests/test.h @@ -2,7 +2,6 @@ #include #include #include -#include #include @@ -66,11 +65,7 @@ static TestResult not_implemented(void) { } #endif -#ifndef _unused -#define _unused __attribute__((unused)) -#endif - -int main(_unused int argc, _unused char **argv) { +int main(int argc, char **argv) { uint16_t successes = 0; uint16_t skips = 0; uint16_t failures = 0;