/* * Copyright (c) 2015, Texas Instruments Incorporated - http://www.ti.com/ * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the name of the copyright holder nor the names of its * contributors may be used to endorse or promote products derived * from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED * OF THE POSSIBILITY OF SUCH DAMAGE. */ /*---------------------------------------------------------------------------*/ /** * \addtogroup mqtt-engine * @{ */ /** * \file * Implementation of the Contiki MQTT engine * * \author * Texas Instruments */ /*---------------------------------------------------------------------------*/ #include "mqtt.h" #include "contiki.h" #include "contiki-net.h" #include "contiki-lib.h" #include "lib/random.h" #include "sys/ctimer.h" #include "sys/etimer.h" #include "sys/pt.h" #include "net/ipv6/uip.h" #include "net/ipv6/uip-ds6.h" #include "dev/leds.h" #include "tcp-socket.h" #include "lib/assert.h" #include "lib/list.h" #include "sys/cc.h" #include #include #include /*---------------------------------------------------------------------------*/ #define DEBUG 0 #if DEBUG #define PRINTF(...) printf(__VA_ARGS__) #else #define PRINTF(...) #endif /*---------------------------------------------------------------------------*/ typedef enum { MQTT_FHDR_MSG_TYPE_CONNECT = 0x10, MQTT_FHDR_MSG_TYPE_CONNACK = 0x20, MQTT_FHDR_MSG_TYPE_PUBLISH = 0x30, MQTT_FHDR_MSG_TYPE_PUBACK = 0x40, MQTT_FHDR_MSG_TYPE_PUBREC = 0x50, MQTT_FHDR_MSG_TYPE_PUBREL = 0x60, MQTT_FHDR_MSG_TYPE_PUBCOMP = 0x70, MQTT_FHDR_MSG_TYPE_SUBSCRIBE = 0x80, MQTT_FHDR_MSG_TYPE_SUBACK = 0x90, MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE = 0xA0, MQTT_FHDR_MSG_TYPE_UNSUBACK = 0xB0, MQTT_FHDR_MSG_TYPE_PINGREQ = 0xC0, MQTT_FHDR_MSG_TYPE_PINGRESP = 0xD0, MQTT_FHDR_MSG_TYPE_DISCONNECT = 0xE0, MQTT_FHDR_DUP_FLAG = 0x08, MQTT_FHDR_QOS_LEVEL_0 = 0x00, MQTT_FHDR_QOS_LEVEL_1 = 0x02, MQTT_FHDR_QOS_LEVEL_2 = 0x04, MQTT_FHDR_RETAIN_FLAG = 0x01, } mqtt_fhdr_fields_t; /*---------------------------------------------------------------------------*/ typedef enum { MQTT_VHDR_USERNAME_FLAG = 0x80, MQTT_VHDR_PASSWORD_FLAG = 0x40, MQTT_VHDR_WILL_RETAIN_FLAG = 0x20, MQTT_VHDR_WILL_QOS_LEVEL_0 = 0x00, MQTT_VHDR_WILL_QOS_LEVEL_1 = 0x08, MQTT_VHDR_WILL_QOS_LEVEL_2 = 0x10, MQTT_VHDR_WILL_FLAG = 0x04, MQTT_VHDR_CLEAN_SESSION_FLAG = 0x02, } mqtt_vhdr_conn_fields_t; /*---------------------------------------------------------------------------*/ typedef enum { MQTT_VHDR_CONN_ACCEPTED, MQTT_VHDR_CONN_REJECTED_PROTOCOL, MQTT_VHDR_CONN_REJECTED_IDENTIFIER, MQTT_VHDR_CONN_REJECTED_UNAVAILABLE, MQTT_VHDR_CONN_REJECTED_BAD_USER_PASS, MQTT_VHDR_CONN_REJECTED_UNAUTHORIZED, } mqtt_vhdr_connack_fields_t; /*---------------------------------------------------------------------------*/ #define MQTT_CONNECT_VHDR_FLAGS_SIZE 12 #define MQTT_STRING_LEN_SIZE 2 #define MQTT_MID_SIZE 2 #define MQTT_QOS_SIZE 1 /*---------------------------------------------------------------------------*/ #define RESPONSE_WAIT_TIMEOUT (CLOCK_SECOND * 10) /*---------------------------------------------------------------------------*/ #define INCREMENT_MID(conn) (conn)->mid_counter += 2 #define MQTT_STRING_LENGTH(s) (((s)->length) == 0 ? 0 : (MQTT_STRING_LEN_SIZE + (s)->length)) /*---------------------------------------------------------------------------*/ /* Protothread send macros */ #define PT_MQTT_WRITE_BYTES(conn, data, len) \ conn->out_write_pos = 0; \ while(write_bytes(conn, data, len)) { \ PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \ } #define PT_MQTT_WRITE_BYTE(conn, data) \ while(write_byte(conn, data)) { \ PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \ } /*---------------------------------------------------------------------------*/ /* * Sends the continue send event and wait for that event. * * The reason we cannot use PROCESS_PAUSE() is since we would risk loosing any * events posted during the sending process. */ #define PT_MQTT_WAIT_SEND() \ do { \ if (PROCESS_ERR_OK == \ process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL)) { \ do { \ PROCESS_WAIT_EVENT(); \ if(ev == mqtt_abort_now_event) { \ conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \ PT_INIT(&conn->out_proto_thread); \ process_post(PROCESS_CURRENT(), ev, data); \ } else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \ process_post(PROCESS_CURRENT(), ev, data); \ } \ } while (ev != mqtt_continue_send_event); \ } \ } while(0) /*---------------------------------------------------------------------------*/ static process_event_t mqtt_do_connect_tcp_event; static process_event_t mqtt_do_connect_mqtt_event; static process_event_t mqtt_do_disconnect_mqtt_event; static process_event_t mqtt_do_subscribe_event; static process_event_t mqtt_do_unsubscribe_event; static process_event_t mqtt_do_publish_event; static process_event_t mqtt_do_pingreq_event; static process_event_t mqtt_continue_send_event; static process_event_t mqtt_abort_now_event; process_event_t mqtt_update_event; /* * Min and Max event numbers we want to acknowledge while we're in the process * of doing something else. continue_send does not count, therefore must be * allocated last */ static process_event_t mqtt_event_min; static process_event_t mqtt_event_max; /*---------------------------------------------------------------------------*/ /* Prototypes */ static int tcp_input(struct tcp_socket *s, void *ptr, const uint8_t *input_data_ptr, int input_data_len); static void tcp_event(struct tcp_socket *s, void *ptr, tcp_socket_event_t event); static void reset_packet(struct mqtt_in_packet *packet); /*---------------------------------------------------------------------------*/ LIST(mqtt_conn_list); /*---------------------------------------------------------------------------*/ PROCESS(mqtt_process, "MQTT process"); /*---------------------------------------------------------------------------*/ static void call_event(struct mqtt_connection *conn, mqtt_event_t event, void *data) { conn->event_callback(conn, event, data); process_post(conn->app_process, mqtt_update_event, NULL); } /*---------------------------------------------------------------------------*/ static void reset_defaults(struct mqtt_connection *conn) { conn->mid_counter = 1; PT_INIT(&conn->out_proto_thread); conn->waiting_for_pingresp = 0; reset_packet(&conn->in_packet); conn->out_buffer_sent = 0; } /*---------------------------------------------------------------------------*/ static void abort_connection(struct mqtt_connection *conn) { conn->out_buffer_ptr = conn->out_buffer; conn->out_queue_full = 0; /* Reset outgoing packet */ memset(&conn->out_packet, 0, sizeof(conn->out_packet)); tcp_socket_close(&conn->socket); tcp_socket_unregister(&conn->socket); memset(&conn->socket, 0, sizeof(conn->socket)); conn->state = MQTT_CONN_STATE_NOT_CONNECTED; } /*---------------------------------------------------------------------------*/ static void connect_tcp(struct mqtt_connection *conn) { conn->state = MQTT_CONN_STATE_TCP_CONNECTING; reset_defaults(conn); tcp_socket_register(&(conn->socket), conn, conn->in_buffer, MQTT_TCP_INPUT_BUFF_SIZE, conn->out_buffer, MQTT_TCP_OUTPUT_BUFF_SIZE, tcp_input, tcp_event); tcp_socket_connect(&(conn->socket), &(conn->server_ip), conn->server_port); } /*---------------------------------------------------------------------------*/ static void disconnect_tcp(struct mqtt_connection *conn) { conn->state = MQTT_CONN_STATE_DISCONNECTING; tcp_socket_close(&(conn->socket)); tcp_socket_unregister(&conn->socket); memset(&conn->socket, 0, sizeof(conn->socket)); } /*---------------------------------------------------------------------------*/ static void send_out_buffer(struct mqtt_connection *conn) { if(conn->out_buffer_ptr - conn->out_buffer == 0) { conn->out_buffer_sent = 1; return; } conn->out_buffer_sent = 0; DBG("MQTT - (send_out_buffer) Space used in buffer: %i\n", conn->out_buffer_ptr - conn->out_buffer); tcp_socket_send(&conn->socket, conn->out_buffer, conn->out_buffer_ptr - conn->out_buffer); } /*---------------------------------------------------------------------------*/ static void string_to_mqtt_string(struct mqtt_string *mqtt_string, char *string) { if(mqtt_string == NULL) { return; } mqtt_string->string = string; if(string != NULL) { mqtt_string->length = strlen(string); } else { mqtt_string->length = 0; } } /*---------------------------------------------------------------------------*/ static int write_byte(struct mqtt_connection *conn, uint8_t data) { DBG("MQTT - (write_byte) buff_size: %i write: '%02X'\n", &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr, data); if(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr == 0) { send_out_buffer(conn); return 1; } *conn->out_buffer_ptr = data; conn->out_buffer_ptr++; return 0; } /*---------------------------------------------------------------------------*/ static int write_bytes(struct mqtt_connection *conn, uint8_t *data, uint16_t len) { uint16_t write_bytes; write_bytes = MIN(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr, len - conn->out_write_pos); memcpy(conn->out_buffer_ptr, &data[conn->out_write_pos], write_bytes); conn->out_write_pos += write_bytes; conn->out_buffer_ptr += write_bytes; DBG("MQTT - (write_bytes) len: %u write_pos: %lu\n", len, conn->out_write_pos); if(len - conn->out_write_pos == 0) { conn->out_write_pos = 0; return 0; } else { send_out_buffer(conn); return len - conn->out_write_pos; } } /*---------------------------------------------------------------------------*/ static void encode_remaining_length(uint8_t *remaining_length, uint8_t *remaining_length_bytes, uint32_t length) { uint8_t digit; DBG("MQTT - Encoding length %lu\n", length); *remaining_length_bytes = 0; do { digit = length % 128; length = length / 128; if(length > 0) { digit = digit | 0x80; } remaining_length[*remaining_length_bytes] = digit; (*remaining_length_bytes)++; DBG("MQTT - Encode len digit '%u' length '%lu'\n", digit, length); } while(length > 0 && *remaining_length_bytes < 5); DBG("MQTT - remaining_length_bytes %u\n", *remaining_length_bytes); } /*---------------------------------------------------------------------------*/ static void keep_alive_callback(void *ptr) { struct mqtt_connection *conn = ptr; DBG("MQTT - (keep_alive_callback) Called!\n"); /* The flag is set when the PINGREQ has been sent */ if(conn->waiting_for_pingresp) { PRINTF("MQTT - Disconnect due to no PINGRESP from broker.\n"); disconnect_tcp(conn); return; } process_post(&mqtt_process, mqtt_do_pingreq_event, conn); } /*---------------------------------------------------------------------------*/ static void reset_packet(struct mqtt_in_packet *packet) { memset(packet, 0, sizeof(struct mqtt_in_packet)); packet->remaining_multiplier = 1; } /*---------------------------------------------------------------------------*/ static PT_THREAD(connect_pt(struct pt *pt, struct mqtt_connection *conn)) { PT_BEGIN(pt); DBG("MQTT - Sending CONNECT message...\n"); /* Set up FHDR */ conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_CONNECT; conn->out_packet.remaining_length = 0; conn->out_packet.remaining_length += MQTT_CONNECT_VHDR_FLAGS_SIZE; conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->client_id); conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.username); conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.password); conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.topic); conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.message); encode_remaining_length(conn->out_packet.remaining_length_enc, &conn->out_packet.remaining_length_enc_bytes, conn->out_packet.remaining_length); if(conn->out_packet.remaining_length_enc_bytes > 4) { call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL); PRINTF("MQTT - Error, remaining length > 4 bytes\n"); PT_EXIT(pt); } /* Write Fixed Header */ PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr); PT_MQTT_WRITE_BYTES(conn, conn->out_packet.remaining_length_enc, conn->out_packet.remaining_length_enc_bytes); PT_MQTT_WRITE_BYTE(conn, 0); PT_MQTT_WRITE_BYTE(conn, 6); PT_MQTT_WRITE_BYTES(conn, (uint8_t *)MQTT_PROTOCOL_NAME, 6); PT_MQTT_WRITE_BYTE(conn, MQTT_PROTOCOL_VERSION); PT_MQTT_WRITE_BYTE(conn, conn->connect_vhdr_flags); PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive >> 8)); PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive & 0x00FF)); PT_MQTT_WRITE_BYTE(conn, conn->client_id.length >> 8); PT_MQTT_WRITE_BYTE(conn, conn->client_id.length & 0x00FF); PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->client_id.string, conn->client_id.length); if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) { PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length >> 8); PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length & 0x00FF); PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.topic.string, conn->will.topic.length); PT_MQTT_WRITE_BYTE(conn, conn->will.message.length >> 8); PT_MQTT_WRITE_BYTE(conn, conn->will.message.length & 0x00FF); PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.message.string, conn->will.message.length); DBG("MQTT - Setting will topic to '%s' %u bytes and message to '%s' %u bytes\n", conn->will.topic.string, conn->will.topic.length, conn->will.message.string, conn->will.message.length); } if(conn->connect_vhdr_flags & MQTT_VHDR_USERNAME_FLAG) { PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length >> 8); PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length & 0x00FF); PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->credentials.username.string, conn->credentials.username.length); } if(conn->connect_vhdr_flags & MQTT_VHDR_PASSWORD_FLAG) { PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length >> 8); PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length & 0x00FF); PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->credentials.password.string, conn->credentials.password.length); } /* Send out buffer */ send_out_buffer(conn); conn->state = MQTT_CONN_STATE_CONNECTING_TO_BROKER; timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT); /* Wait for CONNACK */ reset_packet(&conn->in_packet); PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK || timer_expired(&conn->t)); if(timer_expired(&conn->t)) { DBG("Timeout waiting for CONNACK\n"); /* We stick to the letter of the spec here: Tear the connection down */ mqtt_disconnect(conn); } reset_packet(&conn->in_packet); DBG("MQTT - Done sending CONNECT\n"); #if DEBUG_MQTT == 1 DBG("MQTT - CONNECT message sent: \n"); uint16_t i; for(i = 0; i < (conn->out_buffer_ptr - conn->out_buffer); i++) { DBG("%02X ", conn->out_buffer[i]); } DBG("\n"); #endif PT_END(pt); } /*---------------------------------------------------------------------------*/ static PT_THREAD(disconnect_pt(struct pt *pt, struct mqtt_connection *conn)) { PT_BEGIN(pt); PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_DISCONNECT); PT_MQTT_WRITE_BYTE(conn, 0); send_out_buffer(conn); /* * Wait a couple of seconds for a TCP ACK. We don't really need the ACK, * we do want the TCP/IP stack to actually send this disconnect before we * tear down the session. */ timer_set(&conn->t, (CLOCK_SECOND * 2)); PT_WAIT_UNTIL(pt, conn->out_buffer_sent || timer_expired(&conn->t)); PT_END(pt); } /*---------------------------------------------------------------------------*/ static PT_THREAD(subscribe_pt(struct pt *pt, struct mqtt_connection *conn)) { PT_BEGIN(pt); DBG("MQTT - Sending subscribe message! topic %s topic_length %i\n", conn->out_packet.topic, conn->out_packet.topic_length); DBG("MQTT - Buffer space is %i \n", &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr); /* Set up FHDR */ conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_SUBSCRIBE | MQTT_FHDR_QOS_LEVEL_1; conn->out_packet.remaining_length = MQTT_MID_SIZE + MQTT_STRING_LEN_SIZE + conn->out_packet.topic_length + MQTT_QOS_SIZE; encode_remaining_length(conn->out_packet.remaining_length_enc, &conn->out_packet.remaining_length_enc_bytes, conn->out_packet.remaining_length); if(conn->out_packet.remaining_length_enc_bytes > 4) { call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL); PRINTF("MQTT - Error, remaining length > 4 bytes\n"); PT_EXIT(pt); } /* Write Fixed Header */ PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr); PT_MQTT_WRITE_BYTES(conn, conn->out_packet.remaining_length_enc, conn->out_packet.remaining_length_enc_bytes); /* Write Variable Header */ PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8)); PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF)); /* Write Payload */ PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8)); PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF)); PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic, conn->out_packet.topic_length); PT_MQTT_WRITE_BYTE(conn, conn->out_packet.qos); /* Send out buffer */ send_out_buffer(conn); timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT); /* Wait for SUBACK. */ reset_packet(&conn->in_packet); PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK || timer_expired(&conn->t)); if(timer_expired(&conn->t)) { DBG("Timeout waiting for SUBACK\n"); } reset_packet(&conn->in_packet); /* This is clear after the entire transaction is complete */ conn->out_queue_full = 0; DBG("MQTT - Done in send_subscribe!\n"); PT_END(pt); } /*---------------------------------------------------------------------------*/ static PT_THREAD(unsubscribe_pt(struct pt *pt, struct mqtt_connection *conn)) { PT_BEGIN(pt); DBG("MQTT - Sending unsubscribe message on topic %s topic_length %i\n", conn->out_packet.topic, conn->out_packet.topic_length); DBG("MQTT - Buffer space is %i \n", &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr); /* Set up FHDR */ conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE | MQTT_FHDR_QOS_LEVEL_1; conn->out_packet.remaining_length = MQTT_MID_SIZE + MQTT_STRING_LEN_SIZE + conn->out_packet.topic_length; encode_remaining_length(conn->out_packet.remaining_length_enc, &conn->out_packet.remaining_length_enc_bytes, conn->out_packet.remaining_length); if(conn->out_packet.remaining_length_enc_bytes > 4) { call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL); PRINTF("MQTT - Error, remaining length > 4 bytes\n"); PT_EXIT(pt); } /* Write Fixed Header */ PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr); PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc, conn->out_packet.remaining_length_enc_bytes); /* Write Variable Header */ PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8)); PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF)); /* Write Payload */ PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8)); PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF)); PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic, conn->out_packet.topic_length); /* Send out buffer */ send_out_buffer(conn); timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT); /* Wait for UNSUBACK */ reset_packet(&conn->in_packet); PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK || timer_expired(&conn->t)); if(timer_expired(&conn->t)) { DBG("Timeout waiting for UNSUBACK\n"); } reset_packet(&conn->in_packet); /* This is clear after the entire transaction is complete */ conn->out_queue_full = 0; DBG("MQTT - Done writing subscribe message to out buffer!\n"); PT_END(pt); } /*---------------------------------------------------------------------------*/ static PT_THREAD(publish_pt(struct pt *pt, struct mqtt_connection *conn)) { PT_BEGIN(pt); DBG("MQTT - Sending publish message! topic %s topic_length %i\n", conn->out_packet.topic, conn->out_packet.topic_length); DBG("MQTT - Buffer space is %i \n", &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr); /* Set up FHDR */ conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_PUBLISH | conn->out_packet.qos << 1; if(conn->out_packet.retain == MQTT_RETAIN_ON) { conn->out_packet.fhdr |= MQTT_FHDR_RETAIN_FLAG; } conn->out_packet.remaining_length = MQTT_STRING_LEN_SIZE + conn->out_packet.topic_length + conn->out_packet.payload_size; if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) { conn->out_packet.remaining_length += MQTT_MID_SIZE; } encode_remaining_length(conn->out_packet.remaining_length_enc, &conn->out_packet.remaining_length_enc_bytes, conn->out_packet.remaining_length); if(conn->out_packet.remaining_length_enc_bytes > 4) { call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL); PRINTF("MQTT - Error, remaining length > 4 bytes\n"); PT_EXIT(pt); } /* Write Fixed Header */ PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr); PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc, conn->out_packet.remaining_length_enc_bytes); /* Write Variable Header */ PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8)); PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF)); PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic, conn->out_packet.topic_length); if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) { PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid >> 8)); PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF)); } /* Write Payload */ PT_MQTT_WRITE_BYTES(conn, conn->out_packet.payload, conn->out_packet.payload_size); send_out_buffer(conn); timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT); /* * If QoS is zero then wait until the message has been sent, since there is * no ACK to wait for. * * Also notify the app will not be notified via PUBACK or PUBCOMP */ if(conn->out_packet.qos == 0) { process_post(conn->app_process, mqtt_update_event, NULL); } else if(conn->out_packet.qos == 1) { /* Wait for PUBACK */ reset_packet(&conn->in_packet); PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK || timer_expired(&conn->t)); if(timer_expired(&conn->t)) { DBG("Timeout waiting for PUBACK\n"); } if(conn->in_packet.mid != conn->out_packet.mid) { DBG("MQTT - Warning, got PUBACK with none matching MID. Currently there " "is no support for several concurrent PUBLISH messages.\n"); } } else if(conn->out_packet.qos == 2) { DBG("MQTT - QoS not implemented yet.\n"); /* Should wait for PUBREC, send PUBREL and then wait for PUBCOMP */ } reset_packet(&conn->in_packet); /* This is clear after the entire transaction is complete */ conn->out_queue_full = 0; DBG("MQTT - Publish Enqueued\n"); PT_END(pt); } /*---------------------------------------------------------------------------*/ static PT_THREAD(pingreq_pt(struct pt *pt, struct mqtt_connection *conn)) { PT_BEGIN(pt); DBG("MQTT - Sending PINGREQ\n"); /* Write Fixed Header */ PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_PINGREQ); PT_MQTT_WRITE_BYTE(conn, 0); send_out_buffer(conn); /* Start timeout for reply. */ conn->waiting_for_pingresp = 1; /* Wait for PINGRESP or timeout */ reset_packet(&conn->in_packet); timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT); PT_WAIT_UNTIL(pt, conn->in_packet.packet_received || timer_expired(&conn->t)); reset_packet(&conn->in_packet); conn->waiting_for_pingresp = 0; PT_END(pt); } /*---------------------------------------------------------------------------*/ static void handle_connack(struct mqtt_connection *conn) { DBG("MQTT - Got CONNACK\n"); if(conn->in_packet.payload[1] != 0) { PRINTF("MQTT - Connection refused with Return Code %i\n", conn->in_packet.payload[1]); call_event(conn, MQTT_EVENT_CONNECTION_REFUSED_ERROR, &conn->in_packet.payload[1]); abort_connection(conn); return; } conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK; ctimer_set(&conn->keep_alive_timer, conn->keep_alive * CLOCK_SECOND, keep_alive_callback, conn); /* Always reset packet before callback since it might be used directly */ conn->state = MQTT_CONN_STATE_CONNECTED_TO_BROKER; call_event(conn, MQTT_EVENT_CONNECTED, NULL); } /*---------------------------------------------------------------------------*/ static void handle_pingresp(struct mqtt_connection *conn) { DBG("MQTT - Got PINGRESP\n"); } /*---------------------------------------------------------------------------*/ static void handle_suback(struct mqtt_connection *conn) { struct mqtt_suback_event suback_event; DBG("MQTT - Got SUBACK\n"); /* Only accept SUBACKS with X topic QoS response, assume 1 */ if(conn->in_packet.remaining_length > MQTT_MID_SIZE + MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE) { DBG("MQTT - Error, SUBACK with > 1 topic, not supported.\n"); } conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK; suback_event.mid = (conn->in_packet.payload[0] << 8) | (conn->in_packet.payload[1]); suback_event.qos_level = conn->in_packet.payload[2]; conn->in_packet.mid = suback_event.mid; if(conn->in_packet.mid != conn->out_packet.mid) { DBG("MQTT - Warning, got SUBACK with none matching MID. Currently there is" "no support for several concurrent SUBSCRIBE messages.\n"); } /* Always reset packet before callback since it might be used directly */ call_event(conn, MQTT_EVENT_SUBACK, &suback_event); } /*---------------------------------------------------------------------------*/ static void handle_unsuback(struct mqtt_connection *conn) { DBG("MQTT - Got UNSUBACK\n"); conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK; conn->in_packet.mid = (conn->in_packet.payload[0] << 8) | (conn->in_packet.payload[1]); if(conn->in_packet.mid != conn->out_packet.mid) { DBG("MQTT - Warning, got UNSUBACK with none matching MID. Currently there is" "no support for several concurrent UNSUBSCRIBE messages.\n"); } call_event(conn, MQTT_EVENT_UNSUBACK, &conn->in_packet.mid); } /*---------------------------------------------------------------------------*/ static void handle_puback(struct mqtt_connection *conn) { DBG("MQTT - Got PUBACK\n"); conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK; conn->in_packet.mid = (conn->in_packet.payload[0] << 8) | (conn->in_packet.payload[1]); call_event(conn, MQTT_EVENT_PUBACK, &conn->in_packet.mid); } /*---------------------------------------------------------------------------*/ static void handle_publish(struct mqtt_connection *conn) { DBG("MQTT - Got PUBLISH, called once per manageable chunk of message.\n"); DBG("MQTT - Handling publish on topic '%s'\n", conn->in_publish_msg.topic); DBG("MQTT - This chunk is %i bytes\n", conn->in_packet.payload_pos); if(((conn->in_packet.fhdr & 0x09) >> 1) > 0) { PRINTF("MQTT - Error, got incoming PUBLISH with QoS > 0, not supported atm!\n"); } call_event(conn, MQTT_EVENT_PUBLISH, &conn->in_publish_msg); if(conn->in_publish_msg.first_chunk == 1) { conn->in_publish_msg.first_chunk = 0; } /* If this is the last time handle_publish will be called, reset packet. */ if(conn->in_publish_msg.payload_left == 0) { /* Check for QoS and initiate the reply, do not rely on the data in the * in_packet being untouched. */ DBG("MQTT - (handle_publish) resetting packet.\n"); reset_packet(&conn->in_packet); } } /*---------------------------------------------------------------------------*/ static void parse_publish_vhdr(struct mqtt_connection *conn, uint32_t *pos, const uint8_t *input_data_ptr, int input_data_len) { uint16_t copy_bytes; /* Read out topic length */ if(conn->in_packet.topic_len_received == 0) { conn->in_packet.topic_pos = 0; conn->in_packet.topic_len = (input_data_ptr[(*pos)++] << 8); conn->in_packet.byte_counter++; if(*pos >= input_data_len) { return; } conn->in_packet.topic_len |= input_data_ptr[(*pos)++]; conn->in_packet.byte_counter++; conn->in_packet.topic_len_received = 1; /* Abort if topic is longer than our topic buffer */ if(conn->in_packet.topic_len > MQTT_MAX_TOPIC_LENGTH) { DBG("MQTT - topic too long %u/%u\n", conn->in_packet.topic_len, MQTT_MAX_TOPIC_LENGTH); return; } DBG("MQTT - Read PUBLISH topic len %i\n", conn->in_packet.topic_len); /* WARNING: Check here if TOPIC fits in payload area, otherwise error */ } /* Read out topic */ if(conn->in_packet.topic_len_received == 1 && conn->in_packet.topic_received == 0) { copy_bytes = MIN(conn->in_packet.topic_len - conn->in_packet.topic_pos, input_data_len - *pos); DBG("MQTT - topic_pos: %i copy_bytes: %i", conn->in_packet.topic_pos, copy_bytes); memcpy(&conn->in_publish_msg.topic[conn->in_packet.topic_pos], &input_data_ptr[*pos], copy_bytes); (*pos) += copy_bytes; conn->in_packet.byte_counter += copy_bytes; conn->in_packet.topic_pos += copy_bytes; if(conn->in_packet.topic_len - conn->in_packet.topic_pos == 0) { DBG("MQTT - Got topic '%s'", conn->in_publish_msg.topic); conn->in_packet.topic_received = 1; conn->in_publish_msg.topic[conn->in_packet.topic_pos] = '\0'; conn->in_publish_msg.payload_length = conn->in_packet.remaining_length - conn->in_packet.topic_len - 2; conn->in_publish_msg.payload_left = conn->in_publish_msg.payload_length; } /* Set this once per incomming publish message */ conn->in_publish_msg.first_chunk = 1; } } /*---------------------------------------------------------------------------*/ static int tcp_input(struct tcp_socket *s, void *ptr, const uint8_t *input_data_ptr, int input_data_len) { struct mqtt_connection *conn = ptr; uint32_t pos = 0; uint32_t copy_bytes = 0; uint8_t byte; if(input_data_len == 0) { return 0; } if(conn->in_packet.packet_received) { reset_packet(&conn->in_packet); } DBG("tcp_input with %i bytes of data:\n", input_data_len); /* Read the fixed header field, if we do not have it */ if(!conn->in_packet.fhdr) { conn->in_packet.fhdr = input_data_ptr[pos++]; conn->in_packet.byte_counter++; DBG("MQTT - Read VHDR '%02X'\n", conn->in_packet.fhdr); if(pos >= input_data_len) { return 0; } } /* Read the Remaining Length field, if we do not have it */ if(!conn->in_packet.has_remaining_length) { do { if(pos >= input_data_len) { return 0; } byte = input_data_ptr[pos++]; conn->in_packet.byte_counter++; conn->in_packet.remaining_length_bytes++; DBG("MQTT - Read Remaining Length byte\n"); if(conn->in_packet.byte_counter > 5) { call_event(conn, MQTT_EVENT_ERROR, NULL); DBG("Received more then 4 byte 'remaining lenght'."); return 0; } conn->in_packet.remaining_length += (byte & 127) * conn->in_packet.remaining_multiplier; conn->in_packet.remaining_multiplier *= 128; } while((byte & 128) != 0); DBG("MQTT - Finished reading remaining length byte\n"); conn->in_packet.has_remaining_length = 1; } /* * Check for unsupported payload length. Will read all incoming data from the * server in any case and then reset the packet. * * TODO: Decide if we, for example, want to disconnect instead. */ if((conn->in_packet.remaining_length > MQTT_INPUT_BUFF_SIZE) && (conn->in_packet.fhdr & 0xF0) != MQTT_FHDR_MSG_TYPE_PUBLISH) { PRINTF("MQTT - Error, unsupported payload size for non-PUBLISH message\n"); conn->in_packet.byte_counter += input_data_len; if(conn->in_packet.byte_counter >= (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) { conn->in_packet.packet_received = 1; } return 0; } /* * Supported payload, reads out both VHDR and Payload of all packets. * * Note: There will always be at least one byte left to read when we enter * this loop. */ while(conn->in_packet.byte_counter < (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) { if((conn->in_packet.fhdr & 0xF0) == MQTT_FHDR_MSG_TYPE_PUBLISH && conn->in_packet.topic_received == 0) { parse_publish_vhdr(conn, &pos, input_data_ptr, input_data_len); } /* Read in as much as we can into the packet payload */ copy_bytes = MIN(input_data_len - pos, MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos); DBG("- Copied %lu payload bytes\n", copy_bytes); memcpy(&conn->in_packet.payload[conn->in_packet.payload_pos], &input_data_ptr[pos], copy_bytes); conn->in_packet.byte_counter += copy_bytes; conn->in_packet.payload_pos += copy_bytes; pos += copy_bytes; uint8_t i; DBG("MQTT - Copied bytes: \n"); for(i = 0; i < copy_bytes; i++) { DBG("%02X ", conn->in_packet.payload[i]); } DBG("\n"); /* Full buffer, shall only happen to PUBLISH messages. */ if(MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos == 0) { conn->in_publish_msg.payload_chunk = conn->in_packet.payload; conn->in_publish_msg.payload_chunk_length = MQTT_INPUT_BUFF_SIZE; conn->in_publish_msg.payload_left -= MQTT_INPUT_BUFF_SIZE; handle_publish(conn); conn->in_publish_msg.payload_chunk = conn->in_packet.payload; conn->in_packet.payload_pos = 0; } if(pos >= input_data_len && (conn->in_packet.byte_counter < (MQTT_FHDR_SIZE + conn->in_packet.remaining_length))) { return 0; } } /* Debug information */ DBG("\n"); /* Take care of input */ DBG("MQTT - Finished reading packet!\n"); /* What to return? */ DBG("MQTT - total data was %i bytes of data. \n", (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)); /* Handle packet here. */ switch(conn->in_packet.fhdr & 0xF0) { case MQTT_FHDR_MSG_TYPE_CONNACK: handle_connack(conn); break; case MQTT_FHDR_MSG_TYPE_PUBLISH: /* This is the only or the last chunk of publish payload */ conn->in_publish_msg.payload_chunk = conn->in_packet.payload; conn->in_publish_msg.payload_chunk_length = conn->in_packet.payload_pos; conn->in_publish_msg.payload_left = 0; handle_publish(conn); break; case MQTT_FHDR_MSG_TYPE_PUBACK: handle_puback(conn); break; case MQTT_FHDR_MSG_TYPE_SUBACK: handle_suback(conn); break; case MQTT_FHDR_MSG_TYPE_UNSUBACK: handle_unsuback(conn); break; case MQTT_FHDR_MSG_TYPE_PINGRESP: handle_pingresp(conn); break; /* QoS 2 not implemented yet */ case MQTT_FHDR_MSG_TYPE_PUBREC: case MQTT_FHDR_MSG_TYPE_PUBREL: case MQTT_FHDR_MSG_TYPE_PUBCOMP: call_event(conn, MQTT_EVENT_NOT_IMPLEMENTED_ERROR, NULL); PRINTF("MQTT - Got unhandled MQTT Message Type '%i'", (conn->in_packet.fhdr & 0xF0)); break; default: /* All server-only message */ PRINTF("MQTT - Got MQTT Message Type '%i'", (conn->in_packet.fhdr & 0xF0)); break; } conn->in_packet.packet_received = 1; return 0; } /*---------------------------------------------------------------------------*/ /* * Handles TCP events from Simple TCP */ static void tcp_event(struct tcp_socket *s, void *ptr, tcp_socket_event_t event) { struct mqtt_connection *conn = ptr; /* Take care of event */ switch(event) { /* Fall through to manage different disconnect event the same way. */ case TCP_SOCKET_CLOSED: case TCP_SOCKET_TIMEDOUT: case TCP_SOCKET_ABORTED: { DBG("MQTT - Disconnected by tcp event %d\n", event); process_post(&mqtt_process, mqtt_abort_now_event, conn); conn->state = MQTT_CONN_STATE_NOT_CONNECTED; ctimer_stop(&conn->keep_alive_timer); call_event(conn, MQTT_EVENT_DISCONNECTED, &event); abort_connection(conn); /* If connecting retry */ if(conn->auto_reconnect == 1) { connect_tcp(conn); } break; } case TCP_SOCKET_CONNECTED: { conn->state = MQTT_CONN_STATE_TCP_CONNECTED; conn->out_buffer_sent = 1; process_post(&mqtt_process, mqtt_do_connect_mqtt_event, conn); break; } case TCP_SOCKET_DATA_SENT: { DBG("MQTT - Got TCP_DATA_SENT\n"); if(conn->socket.output_data_len == 0) { conn->out_buffer_sent = 1; conn->out_buffer_ptr = conn->out_buffer; } ctimer_restart(&conn->keep_alive_timer); break; } default: { DBG("MQTT - TCP Event %d is currently not managed by the tcp event callback\n", event); } } } /*---------------------------------------------------------------------------*/ PROCESS_THREAD(mqtt_process, ev, data) { static struct mqtt_connection *conn; PROCESS_BEGIN(); while(1) { PROCESS_WAIT_EVENT(); if(ev == mqtt_abort_now_event) { DBG("MQTT - Abort\n"); conn = data; conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; abort_connection(conn); } if(ev == mqtt_do_connect_tcp_event) { conn = data; DBG("MQTT - Got mqtt_do_connect_tcp_event!\n"); connect_tcp(conn); } if(ev == mqtt_do_connect_mqtt_event) { conn = data; conn->socket.output_data_max_seg = conn->max_segment_size; DBG("MQTT - Got mqtt_do_connect_mqtt_event!\n"); if(conn->out_buffer_sent == 1) { PT_INIT(&conn->out_proto_thread); while(connect_pt(&conn->out_proto_thread, conn) < PT_EXITED && conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) { PT_MQTT_WAIT_SEND(); } } } if(ev == mqtt_do_disconnect_mqtt_event) { conn = data; DBG("MQTT - Got mqtt_do_disconnect_mqtt_event!\n"); /* Send MQTT Disconnect if we are connected */ if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) { if(conn->out_buffer_sent == 1) { PT_INIT(&conn->out_proto_thread); while(conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE && disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED) { PT_MQTT_WAIT_SEND(); } abort_connection(conn); call_event(conn, MQTT_EVENT_DISCONNECTED, &ev); } else { process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn); } } } if(ev == mqtt_do_pingreq_event) { conn = data; DBG("MQTT - Got mqtt_do_pingreq_event!\n"); if(conn->out_buffer_sent == 1 && conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { PT_INIT(&conn->out_proto_thread); while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER && pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED) { PT_MQTT_WAIT_SEND(); } } } if(ev == mqtt_do_subscribe_event) { conn = data; DBG("MQTT - Got mqtt_do_subscribe_mqtt_event!\n"); if(conn->out_buffer_sent == 1 && conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { PT_INIT(&conn->out_proto_thread); while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER && subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) { PT_MQTT_WAIT_SEND(); } } } if(ev == mqtt_do_unsubscribe_event) { conn = data; DBG("MQTT - Got mqtt_do_unsubscribe_mqtt_event!\n"); if(conn->out_buffer_sent == 1 && conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { PT_INIT(&conn->out_proto_thread); while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER && unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) { PT_MQTT_WAIT_SEND(); } } } if(ev == mqtt_do_publish_event) { conn = data; DBG("MQTT - Got mqtt_do_publish_mqtt_event!\n"); if(conn->out_buffer_sent == 1 && conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { PT_INIT(&conn->out_proto_thread); while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER && publish_pt(&conn->out_proto_thread, conn) < PT_EXITED) { PT_MQTT_WAIT_SEND(); } } } } PROCESS_END(); } /*---------------------------------------------------------------------------*/ void mqtt_init(void) { static uint8_t inited = 0; if(!inited) { mqtt_do_connect_tcp_event = process_alloc_event(); mqtt_event_min = mqtt_do_connect_tcp_event; mqtt_do_connect_mqtt_event = process_alloc_event(); mqtt_do_disconnect_mqtt_event = process_alloc_event(); mqtt_do_subscribe_event = process_alloc_event(); mqtt_do_unsubscribe_event = process_alloc_event(); mqtt_do_publish_event = process_alloc_event(); mqtt_do_pingreq_event = process_alloc_event(); mqtt_update_event = process_alloc_event(); mqtt_abort_now_event = process_alloc_event(); mqtt_event_max = mqtt_abort_now_event; mqtt_continue_send_event = process_alloc_event(); list_init(mqtt_conn_list); process_start(&mqtt_process, NULL); inited = 1; } } /*---------------------------------------------------------------------------*/ mqtt_status_t mqtt_register(struct mqtt_connection *conn, struct process *app_process, char *client_id, mqtt_event_callback_t event_callback, uint16_t max_segment_size) { if(strlen(client_id) < 1) { return MQTT_STATUS_INVALID_ARGS_ERROR; } /* Set defaults - Set all to zero to begin with */ memset(conn, 0, sizeof(struct mqtt_connection)); string_to_mqtt_string(&conn->client_id, client_id); conn->event_callback = event_callback; conn->app_process = app_process; conn->auto_reconnect = 1; conn->max_segment_size = max_segment_size; reset_defaults(conn); mqtt_init(); list_add(mqtt_conn_list, conn); DBG("MQTT - Registered successfully\n"); return MQTT_STATUS_OK; } /*---------------------------------------------------------------------------*/ /* * Connect to MQTT broker. * * N.B. Non-blocking call. */ mqtt_status_t mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port, uint16_t keep_alive) { uip_ip6addr_t ip6addr; uip_ipaddr_t *ipaddr; ipaddr = &ip6addr; /* Check if we are already trying to connect */ if(conn->state > MQTT_CONN_STATE_NOT_CONNECTED) { return MQTT_STATUS_OK; } conn->server_host = host; conn->keep_alive = keep_alive; conn->server_port = port; conn->out_buffer_ptr = conn->out_buffer; conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK; conn->connect_vhdr_flags |= MQTT_VHDR_CLEAN_SESSION_FLAG; /* convert the string IPv6 address to a numeric IPv6 address */ if(uiplib_ip6addrconv(host, &ip6addr) == 0) { return MQTT_STATUS_ERROR; } uip_ipaddr_copy(&(conn->server_ip), ipaddr); /* * Initiate the connection if the IP could be resolved. Otherwise the * connection will be initiated when the DNS lookup is finished, in the main * event loop. */ process_post(&mqtt_process, mqtt_do_connect_tcp_event, conn); return MQTT_STATUS_OK; } /*----------------------------------------------------------------------------*/ void mqtt_disconnect(struct mqtt_connection *conn) { if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) { return; } conn->state = MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT; process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn); } /*----------------------------------------------------------------------------*/ mqtt_status_t mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, mqtt_qos_level_t qos_level) { if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) { return MQTT_STATUS_NOT_CONNECTED_ERROR; } DBG("MQTT - Call to mqtt_subscribe...\n"); /* Currently don't have a queue, so only one item at a time */ if(conn->out_queue_full) { DBG("MQTT - Not accepted!\n"); return MQTT_STATUS_OUT_QUEUE_FULL; } conn->out_queue_full = 1; DBG("MQTT - Accepted!\n"); conn->out_packet.mid = INCREMENT_MID(conn); conn->out_packet.topic = topic; conn->out_packet.topic_length = strlen(topic); conn->out_packet.qos = qos_level; conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK; process_post(&mqtt_process, mqtt_do_subscribe_event, conn); return MQTT_STATUS_OK; } /*----------------------------------------------------------------------------*/ mqtt_status_t mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic) { if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) { return MQTT_STATUS_NOT_CONNECTED_ERROR; } DBG("MQTT - Call to mqtt_unsubscribe...\n"); /* Currently don't have a queue, so only one item at a time */ if(conn->out_queue_full) { DBG("MQTT - Not accepted!\n"); return MQTT_STATUS_OUT_QUEUE_FULL; } conn->out_queue_full = 1; DBG("MQTT - Accepted!\n"); conn->out_packet.mid = INCREMENT_MID(conn); conn->out_packet.topic = topic; conn->out_packet.topic_length = strlen(topic); conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK; process_post(&mqtt_process, mqtt_do_unsubscribe_event, conn); return MQTT_STATUS_OK; } /*----------------------------------------------------------------------------*/ mqtt_status_t mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic, uint8_t *payload, uint32_t payload_size, mqtt_qos_level_t qos_level, mqtt_retain_t retain) { if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) { return MQTT_STATUS_NOT_CONNECTED_ERROR; } DBG("MQTT - Call to mqtt_publish...\n"); /* Currently don't have a queue, so only one item at a time */ if(conn->out_queue_full) { DBG("MQTT - Not accepted!\n"); return MQTT_STATUS_OUT_QUEUE_FULL; } conn->out_queue_full = 1; DBG("MQTT - Accepted!\n"); conn->out_packet.mid = INCREMENT_MID(conn); conn->out_packet.retain = retain; conn->out_packet.topic = topic; conn->out_packet.topic_length = strlen(topic); conn->out_packet.payload = payload; conn->out_packet.payload_size = payload_size; conn->out_packet.qos = qos_level; conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK; process_post(&mqtt_process, mqtt_do_publish_event, conn); return MQTT_STATUS_OK; } /*----------------------------------------------------------------------------*/ void mqtt_set_username_password(struct mqtt_connection *conn, char *username, char *password) { /* Set strings, NULL string will simply set length to zero */ string_to_mqtt_string(&conn->credentials.username, username); string_to_mqtt_string(&conn->credentials.password, password); /* Set CONNECT VHDR flags */ if(username != NULL) { conn->connect_vhdr_flags |= MQTT_VHDR_USERNAME_FLAG; } else { conn->connect_vhdr_flags &= ~MQTT_VHDR_USERNAME_FLAG; } if(password != NULL) { conn->connect_vhdr_flags |= MQTT_VHDR_PASSWORD_FLAG; } else { conn->connect_vhdr_flags &= ~MQTT_VHDR_PASSWORD_FLAG; } } /*----------------------------------------------------------------------------*/ void mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message, mqtt_qos_level_t qos) { /* Set strings, NULL string will simply set length to zero */ string_to_mqtt_string(&conn->will.topic, topic); string_to_mqtt_string(&conn->will.message, message); /* Currently not used! */ conn->will.qos = qos; if(topic != NULL) { conn->connect_vhdr_flags |= MQTT_VHDR_WILL_FLAG | MQTT_VHDR_WILL_RETAIN_FLAG; } } /*----------------------------------------------------------------------------*/ /** @} */