parent
315e45803f
commit
31840ad65d
3 changed files with 36 additions and 168 deletions
181
src/packet.c
181
src/packet.c
|
@ -20,18 +20,6 @@ MQTTPacket *allocate_MQTTPacket(MQTTControlPacketType type) {
|
|||
case PacketTypePublish:
|
||||
packet->payload = calloc(1, sizeof(PublishPayload));
|
||||
break;
|
||||
case PacketTypePubAck:
|
||||
packet->payload = calloc(1, sizeof(PubAckPayload));
|
||||
break;
|
||||
case PacketTypePubRec:
|
||||
packet->payload = calloc(1, sizeof(PubRecPayload));
|
||||
break;
|
||||
case PacketTypePubRel:
|
||||
packet->payload = calloc(1, sizeof(PubRelPayload));
|
||||
break;
|
||||
case PacketTypePubComp:
|
||||
packet->payload = calloc(1, sizeof(PubCompPayload));
|
||||
break;
|
||||
case PacketTypeSubscribe:
|
||||
packet->payload = calloc(1, sizeof(SubscribePayload));
|
||||
break;
|
||||
|
@ -41,8 +29,12 @@ MQTTPacket *allocate_MQTTPacket(MQTTControlPacketType type) {
|
|||
case PacketTypeUnsubscribe:
|
||||
packet->payload = calloc(1, sizeof(UnsubscribePayload));
|
||||
break;
|
||||
case PacketTypePubAck:
|
||||
case PacketTypePubRec:
|
||||
case PacketTypePubRel:
|
||||
case PacketTypePubComp:
|
||||
case PacketTypeUnsubAck:
|
||||
packet->payload = calloc(1, sizeof(UnsubAckPayload));
|
||||
packet->payload = calloc(1, sizeof(PacketIDPayload));
|
||||
break;
|
||||
case PacketTypePingReq:
|
||||
case PacketTypePingResp:
|
||||
|
@ -227,7 +219,7 @@ bool decode_publish(Buffer *buffer, PublishPayload *payload, size_t sz) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool decode_puback(Buffer *buffer, PubAckPayload *payload) {
|
||||
bool decode_packet_id(Buffer *buffer, PacketIDPayload *payload) {
|
||||
payload->packet_id =
|
||||
(buffer->data[buffer->position] << 8)
|
||||
+ buffer->data[buffer->position + 1];
|
||||
|
@ -235,33 +227,6 @@ bool decode_puback(Buffer *buffer, PubAckPayload *payload) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool decode_pubrec(Buffer *buffer, PubRecPayload *payload) {
|
||||
payload->packet_id =
|
||||
(buffer->data[buffer->position] << 8)
|
||||
+ buffer->data[buffer->position + 1];
|
||||
buffer->position += 2;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool decode_pubrel(Buffer *buffer, PubRelPayload *payload) {
|
||||
payload->packet_id =
|
||||
(buffer->data[buffer->position] << 8)
|
||||
+ buffer->data[buffer->position + 1];
|
||||
buffer->position += 2;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool decode_pubcomp(Buffer *buffer, PubCompPayload *payload) {
|
||||
payload->packet_id =
|
||||
(buffer->data[buffer->position] << 8)
|
||||
+ buffer->data[buffer->position + 1];
|
||||
buffer->position += 2;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool decode_subscribe(Buffer *buffer, SubscribePayload *payload) {
|
||||
payload->packet_id =
|
||||
(buffer->data[buffer->position] << 8)
|
||||
|
@ -296,15 +261,6 @@ bool decode_unsubscribe(Buffer *buffer, UnsubscribePayload *payload) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool decode_unsuback(Buffer *buffer, UnsubAckPayload *payload) {
|
||||
payload->packet_id =
|
||||
(buffer->data[buffer->position] << 8)
|
||||
+ buffer->data[buffer->position + 1];
|
||||
buffer->position += 2;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
MQTTPacket *mqtt_packet_decode(Buffer *buffer) {
|
||||
// validate that the buffer is big enough
|
||||
|
@ -328,18 +284,6 @@ MQTTPacket *mqtt_packet_decode(Buffer *buffer) {
|
|||
case PacketTypePublish:
|
||||
valid = decode_publish(buffer, result->payload, packet_size);
|
||||
break;
|
||||
case PacketTypePubAck:
|
||||
valid = decode_puback(buffer, result->payload);
|
||||
break;
|
||||
case PacketTypePubRec:
|
||||
valid = decode_pubrec(buffer, result->payload);
|
||||
break;
|
||||
case PacketTypePubRel:
|
||||
valid = decode_pubrel(buffer, result->payload);
|
||||
break;
|
||||
case PacketTypePubComp:
|
||||
valid = decode_pubcomp(buffer, result->payload);
|
||||
break;
|
||||
case PacketTypeSubscribe:
|
||||
valid = decode_subscribe(buffer, result->payload);
|
||||
break;
|
||||
|
@ -349,8 +293,12 @@ MQTTPacket *mqtt_packet_decode(Buffer *buffer) {
|
|||
case PacketTypeUnsubscribe:
|
||||
valid = decode_unsubscribe(buffer, result->payload);
|
||||
break;
|
||||
case PacketTypePubAck:
|
||||
case PacketTypePubRec:
|
||||
case PacketTypePubRel:
|
||||
case PacketTypePubComp:
|
||||
case PacketTypeUnsubAck:
|
||||
valid = decode_unsuback(buffer, result->payload);
|
||||
valid = decode_packet_id(buffer, result->payload);
|
||||
break;
|
||||
case PacketTypePingReq:
|
||||
case PacketTypePingResp:
|
||||
|
@ -497,49 +445,10 @@ Buffer *encode_publish(PublishPayload *payload) {
|
|||
return buffer;
|
||||
}
|
||||
|
||||
Buffer *encode_puback(PubAckPayload *payload) {
|
||||
Buffer *encode_packet_id(PacketIDPayload *payload, MQTTControlPacketType type) {
|
||||
size_t sz = 2; // packet id
|
||||
|
||||
Buffer *buffer = make_buffer_for_header(sz, PacketTypePubAck);
|
||||
|
||||
// Variable header
|
||||
buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8;
|
||||
buffer->data[buffer->position++] = (payload->packet_id & 0xff);
|
||||
|
||||
assert(buffer_eof(buffer));
|
||||
return buffer;
|
||||
}
|
||||
|
||||
Buffer *encode_pubrec(PubRecPayload *payload) {
|
||||
size_t sz = 2; // packet id
|
||||
|
||||
Buffer *buffer = make_buffer_for_header(sz, PacketTypePubRec);
|
||||
|
||||
// Variable header
|
||||
buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8;
|
||||
buffer->data[buffer->position++] = (payload->packet_id & 0xff);
|
||||
|
||||
assert(buffer_eof(buffer));
|
||||
return buffer;
|
||||
}
|
||||
|
||||
Buffer *encode_pubrel(PubRelPayload *payload) {
|
||||
size_t sz = 2; // packet id
|
||||
|
||||
Buffer *buffer = make_buffer_for_header(sz, PacketTypePubRel);
|
||||
|
||||
// Variable header
|
||||
buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8;
|
||||
buffer->data[buffer->position++] = (payload->packet_id & 0xff);
|
||||
|
||||
assert(buffer_eof(buffer));
|
||||
return buffer;
|
||||
}
|
||||
|
||||
Buffer *encode_pubcomp(PubCompPayload *payload) {
|
||||
size_t sz = 2; // packet id
|
||||
|
||||
Buffer *buffer = make_buffer_for_header(sz, PacketTypePubComp);
|
||||
Buffer *buffer = make_buffer_for_header(sz, type);
|
||||
|
||||
// Variable header
|
||||
buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8;
|
||||
|
@ -602,36 +511,12 @@ Buffer *encode_unsubscribe(UnsubscribePayload *payload) {
|
|||
return buffer;
|
||||
}
|
||||
|
||||
Buffer *encode_unsuback(UnsubAckPayload *payload) {
|
||||
size_t sz = 2; // packet id
|
||||
|
||||
Buffer *buffer = make_buffer_for_header(sz, PacketTypeUnsubAck);
|
||||
|
||||
// Variable header
|
||||
buffer->data[buffer->position++] = (payload->packet_id & 0xff00) >> 8;
|
||||
buffer->data[buffer->position++] = (payload->packet_id & 0xff);
|
||||
|
||||
Buffer *encode_no_payload(MQTTControlPacketType type) {
|
||||
Buffer *buffer = make_buffer_for_header(0, type);
|
||||
assert(buffer_eof(buffer));
|
||||
return buffer;
|
||||
}
|
||||
|
||||
Buffer *encode_pingreq() {
|
||||
Buffer *buffer = make_buffer_for_header(0, PacketTypePingReq);
|
||||
assert(buffer_eof(buffer));
|
||||
return buffer;
|
||||
}
|
||||
|
||||
Buffer *encode_pingresp() {
|
||||
Buffer *buffer = make_buffer_for_header(0, PacketTypePingResp);
|
||||
assert(buffer_eof(buffer));
|
||||
return buffer;
|
||||
}
|
||||
|
||||
Buffer *encode_disconnect() {
|
||||
Buffer *buffer = make_buffer_for_header(0, PacketTypeDisconnect);
|
||||
assert(buffer_eof(buffer));
|
||||
return buffer;
|
||||
}
|
||||
|
||||
Buffer *mqtt_packet_encode(MQTTPacket *packet) {
|
||||
switch (packet->packet_type) {
|
||||
|
@ -641,28 +526,22 @@ Buffer *mqtt_packet_encode(MQTTPacket *packet) {
|
|||
return encode_connack((ConnAckPayload *)packet->payload);
|
||||
case PacketTypePublish:
|
||||
return encode_publish((PublishPayload *)packet->payload);
|
||||
case PacketTypePubAck:
|
||||
return encode_puback((PubAckPayload *)packet->payload);
|
||||
case PacketTypePubRec:
|
||||
return encode_pubrec((PubRecPayload *)packet->payload);
|
||||
case PacketTypePubRel:
|
||||
return encode_pubrel((PubRelPayload *)packet->payload);
|
||||
case PacketTypePubComp:
|
||||
return encode_pubcomp((PubCompPayload *)packet->payload);
|
||||
case PacketTypeSubscribe:
|
||||
return encode_subscribe((SubscribePayload *)packet->payload);
|
||||
case PacketTypeSubAck:
|
||||
return encode_suback((SubAckPayload *)packet->payload);
|
||||
case PacketTypeUnsubscribe:
|
||||
return encode_unsubscribe((UnsubscribePayload *)packet->payload);
|
||||
case PacketTypePubAck:
|
||||
case PacketTypePubRec:
|
||||
case PacketTypePubRel:
|
||||
case PacketTypePubComp:
|
||||
case PacketTypeUnsubAck:
|
||||
return encode_unsuback((UnsubAckPayload *)packet->payload);
|
||||
return encode_packet_id((PacketIDPayload *)packet->payload, packet->packet_type);
|
||||
case PacketTypePingReq:
|
||||
return encode_pingreq();
|
||||
case PacketTypePingResp:
|
||||
return encode_pingresp();
|
||||
case PacketTypeDisconnect:
|
||||
return encode_disconnect();
|
||||
return encode_no_payload(packet->packet_type);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
|
@ -676,22 +555,22 @@ uint16_t get_packet_id(MQTTPacket *packet) {
|
|||
switch(packet->packet_type) {
|
||||
case PacketTypePublish:
|
||||
return ((PublishPayload *)packet->payload)->packet_id;
|
||||
case PacketTypePubAck:
|
||||
return ((PubAckPayload *)packet->payload)->packet_id;
|
||||
case PacketTypePubRec:
|
||||
return ((PubRecPayload *)packet->payload)->packet_id;
|
||||
case PacketTypePubRel:
|
||||
return ((PubRelPayload *)packet->payload)->packet_id;
|
||||
case PacketTypePubComp:
|
||||
return ((PubCompPayload *)packet->payload)->packet_id;
|
||||
case PacketTypeSubscribe:
|
||||
return ((SubscribePayload *)packet->payload)->packet_id;
|
||||
case PacketTypeSubAck:
|
||||
return ((SubAckPayload *)packet->payload)->packet_id;
|
||||
case PacketTypeUnsubscribe:
|
||||
return ((UnsubscribePayload *)packet->payload)->packet_id;
|
||||
|
||||
// the following ones are identical
|
||||
case PacketTypePubAck:
|
||||
case PacketTypePubRec:
|
||||
case PacketTypePubRel:
|
||||
case PacketTypePubComp:
|
||||
case PacketTypeUnsubAck:
|
||||
return ((UnsubAckPayload *)packet->payload)->packet_id;
|
||||
return ((PacketIDPayload *)packet->payload)->packet_id;
|
||||
|
||||
// not in list -> no packet_id, revert to invalid 0
|
||||
default:
|
||||
return 0; // no packet id in payload
|
||||
}
|
||||
|
|
21
src/packet.h
21
src/packet.h
|
@ -68,19 +68,12 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
uint16_t packet_id;
|
||||
} PubAckPayload;
|
||||
} PacketIDPayload;
|
||||
|
||||
typedef struct {
|
||||
uint16_t packet_id;
|
||||
} PubRecPayload;
|
||||
|
||||
typedef struct {
|
||||
uint16_t packet_id;
|
||||
} PubRelPayload;
|
||||
|
||||
typedef struct {
|
||||
uint16_t packet_id;
|
||||
} PubCompPayload;
|
||||
#define PubAckPayload PacketIDPayload
|
||||
#define PubRecPayload PacketIDPayload
|
||||
#define PubRelPayload PacketIDPayload
|
||||
#define PubCompPayload PacketIDPayload
|
||||
|
||||
typedef struct {
|
||||
uint16_t packet_id;
|
||||
|
@ -105,9 +98,7 @@ typedef struct {
|
|||
char *topic;
|
||||
} UnsubscribePayload;
|
||||
|
||||
typedef struct {
|
||||
uint16_t packet_id;
|
||||
} UnsubAckPayload;
|
||||
#define UnsubAckPayload PacketIDPayload
|
||||
|
||||
typedef struct {
|
||||
MQTTControlPacketType packet_type;
|
||||
|
|
|
@ -56,7 +56,6 @@ void remove_from_queue(MQTTHandle *handle, MQTTCallbackQueueItem *remove) {
|
|||
|
||||
bool dispatch_packet(MQTTHandle *handle, MQTTPacket *packet) {
|
||||
MQTTCallbackQueueItem *item = handle->queue.pending;
|
||||
MQTTCallbackQueueItem *prev_item = NULL;
|
||||
uint16_t packet_id = get_packet_id(packet);
|
||||
|
||||
while (item != NULL) {
|
||||
|
@ -68,7 +67,6 @@ bool dispatch_packet(MQTTHandle *handle, MQTTPacket *packet) {
|
|||
remove_from_queue(handle, item);
|
||||
return true;
|
||||
}
|
||||
prev_item = item;
|
||||
item = item->next;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue