From 1e9221161307dae3e2f0c1d16b2dcead408bd514 Mon Sep 17 00:00:00 2001 From: Jonas Olsson Date: Tue, 17 Feb 2015 13:44:29 +0100 Subject: [PATCH 1/5] Add MQTT 3.1 engine --- apps/mqtt/Makefile.mqtt | 1 + apps/mqtt/mqtt.c | 1484 +++++++++++++++++++++++++++++++++++++++ apps/mqtt/mqtt.h | 509 ++++++++++++++ 3 files changed, 1994 insertions(+) create mode 100644 apps/mqtt/Makefile.mqtt create mode 100644 apps/mqtt/mqtt.c create mode 100644 apps/mqtt/mqtt.h diff --git a/apps/mqtt/Makefile.mqtt b/apps/mqtt/Makefile.mqtt new file mode 100644 index 000000000..06d7bd5ab --- /dev/null +++ b/apps/mqtt/Makefile.mqtt @@ -0,0 +1 @@ +mqtt_src = mqtt.c diff --git a/apps/mqtt/mqtt.c b/apps/mqtt/mqtt.c new file mode 100644 index 000000000..112c584cb --- /dev/null +++ b/apps/mqtt/mqtt.c @@ -0,0 +1,1484 @@ +/* + * Copyright (c) 2015, Texas Instruments Incorporated - http://www.ti.com/ + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/*---------------------------------------------------------------------------*/ +/** + * \addtogroup mqtt-engine + * @{ + */ +/** + * \file + * Implementation of the Contiki MQTT engine + * + * \author + * Texas Instruments + */ +/*---------------------------------------------------------------------------*/ +#include "mqtt.h" +#include "contiki.h" +#include "contiki-net.h" +#include "contiki-lib.h" +#include "lib/random.h" +#include "sys/ctimer.h" +#include "sys/etimer.h" +#include "sys/pt.h" +#include "net/rpl/rpl.h" +#include "net/ip/uip.h" +#include "net/ipv6/uip-ds6.h" +#include "dev/leds.h" + +#include "tcp-socket.h" + +#include "lib/assert.h" +#include "lib/list.h" + +#include +#include +#include +/*---------------------------------------------------------------------------*/ +#define MIN(a, b) ((a) < (b) ? (a) : (b)) +/*---------------------------------------------------------------------------*/ +#define DEBUG 0 +#if DEBUG +#define PRINTF(...) PRINTF(__VA_ARGS__) +#else +#define PRINTF(...) +#endif +/*---------------------------------------------------------------------------*/ +typedef enum { + MQTT_FHDR_MSG_TYPE_CONNECT = 0x10, + MQTT_FHDR_MSG_TYPE_CONNACK = 0x20, + MQTT_FHDR_MSG_TYPE_PUBLISH = 0x30, + MQTT_FHDR_MSG_TYPE_PUBACK = 0x40, + MQTT_FHDR_MSG_TYPE_PUBREC = 0x50, + MQTT_FHDR_MSG_TYPE_PUBREL = 0x60, + MQTT_FHDR_MSG_TYPE_PUBCOMP = 0x70, + MQTT_FHDR_MSG_TYPE_SUBSCRIBE = 0x80, + MQTT_FHDR_MSG_TYPE_SUBACK = 0x90, + MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE = 0xA0, + MQTT_FHDR_MSG_TYPE_UNSUBACK = 0xB0, + MQTT_FHDR_MSG_TYPE_PINGREQ = 0xC0, + MQTT_FHDR_MSG_TYPE_PINGRESP = 0xD0, + MQTT_FHDR_MSG_TYPE_DISCONNECT = 0xE0, + + MQTT_FHDR_DUP_FLAG = 0x08, + + MQTT_FHDR_QOS_LEVEL_0 = 0x00, + MQTT_FHDR_QOS_LEVEL_1 = 0x02, + MQTT_FHDR_QOS_LEVEL_2 = 0x04, + + MQTT_FHDR_RETAIN_FLAG = 0x01, +} mqtt_fhdr_fields_t; +/*---------------------------------------------------------------------------*/ +typedef enum { + MQTT_VHDR_USERNAME_FLAG = 0x80, + MQTT_VHDR_PASSWORD_FLAG = 0x40, + + MQTT_VHDR_WILL_RETAIN_FLAG = 0x20, + MQTT_VHDR_WILL_QOS_LEVEL_0 = 0x00, + MQTT_VHDR_WILL_QOS_LEVEL_1 = 0x08, + MQTT_VHDR_WILL_QOS_LEVEL_2 = 0x10, + + MQTT_VHDR_WILL_FLAG = 0x04, + MQTT_VHDR_CLEAN_SESSION_FLAG = 0x02, +} mqtt_vhdr_conn_fields_t; +/*---------------------------------------------------------------------------*/ +typedef enum { + MQTT_VHDR_CONN_ACCEPTED, + MQTT_VHDR_CONN_REJECTED_PROTOCOL, + MQTT_VHDR_CONN_REJECTED_IDENTIFIER, + MQTT_VHDR_CONN_REJECTED_UNAVAILABLE, + MQTT_VHDR_CONN_REJECTED_BAD_USER_PASS, + MQTT_VHDR_CONN_REJECTED_UNAUTHORIZED, +} mqtt_vhdr_connack_fields_t; +/*---------------------------------------------------------------------------*/ +#define MQTT_CONNECT_VHDR_FLAGS_SIZE 12 + +#define MQTT_STRING_LEN_SIZE 2 +#define MQTT_MID_SIZE 2 +#define MQTT_QOS_SIZE 1 +/*---------------------------------------------------------------------------*/ +#define RESPONSE_WAIT_TIMEOUT (CLOCK_SECOND * 10) +/*---------------------------------------------------------------------------*/ +#define INCREMENT_MID(conn) (conn)->mid_counter += 2 +#define MQTT_STRING_LENGTH(s) (((s)->length) == 0 ? 0 : (MQTT_STRING_LEN_SIZE + (s)->length)) +/*---------------------------------------------------------------------------*/ +/* Protothread send macros */ +#define PT_MQTT_WRITE_BYTES(conn, data, len) \ + while(write_bytes(conn, data, len)) { \ + PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \ + } + +#define PT_MQTT_WRITE_BYTE(conn, data) \ + while(write_byte(conn, data)) { \ + PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \ + } +/*---------------------------------------------------------------------------*/ +/* + * Sends the continue send event and wait for that event. + * + * The reason we cannot use PROCESS_PAUSE() is since we would risk loosing any + * events posted during the sending process. + */ +#define PT_MQTT_WAIT_SEND() \ + do { \ + process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL); \ + PROCESS_WAIT_EVENT(); \ + if(ev == mqtt_abort_now_event) { \ + conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \ + PT_EXIT(&conn->out_proto_thread); \ + process_post(PROCESS_CURRENT(), ev, data); \ + } else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \ + process_post(PROCESS_CURRENT(), ev, data); \ + } \ + } while(0) +/*---------------------------------------------------------------------------*/ +static process_event_t mqtt_do_connect_tcp_event; +static process_event_t mqtt_do_connect_mqtt_event; +static process_event_t mqtt_do_disconnect_mqtt_event; +static process_event_t mqtt_do_subscribe_event; +static process_event_t mqtt_do_unsubscribe_event; +static process_event_t mqtt_do_publish_event; +static process_event_t mqtt_do_pingreq_event; +static process_event_t mqtt_continue_send_event; +static process_event_t mqtt_abort_now_event; +process_event_t mqtt_update_event; + +/* + * Min and Max event numbers we want to acknowledge while we're in the process + * of doing something else. continue_send does not count, therefore must be + * allocated last + */ +static process_event_t mqtt_event_min; +static process_event_t mqtt_event_max; +/*---------------------------------------------------------------------------*/ +/* Prototypes */ +static int +tcp_input(struct tcp_socket *s, void *ptr, const uint8_t *input_data_ptr, + int input_data_len); + +static void tcp_event(struct tcp_socket *s, void *ptr, + tcp_socket_event_t event); + +static void reset_packet(struct mqtt_in_packet *packet); +/*---------------------------------------------------------------------------*/ +LIST(mqtt_conn_list); +/*---------------------------------------------------------------------------*/ +PROCESS(mqtt_process, "MQTT process"); +/*---------------------------------------------------------------------------*/ +static void +call_event(struct mqtt_connection *conn, + mqtt_event_t event, + void *data) +{ + conn->event_callback(conn, event, data); + process_post(conn->app_process, mqtt_update_event, NULL); +} +/*---------------------------------------------------------------------------*/ +static void +reset_defaults(struct mqtt_connection *conn) +{ + conn->mid_counter = 1; + PT_INIT(&conn->out_proto_thread); + conn->waiting_for_pingresp = 0; + + reset_packet(&conn->in_packet); + conn->out_buffer_sent = 0; +} +/*---------------------------------------------------------------------------*/ +static void +abort_connection(struct mqtt_connection *conn) +{ + conn->out_buffer_ptr = conn->out_buffer; + conn->out_queue_full = 0; + + /* Reset outgoing packet */ + memset(&conn->out_packet, 0, sizeof(conn->out_packet)); + + tcp_socket_close(&conn->socket); + tcp_socket_unregister(&conn->socket); + + memset(&conn->socket, 0, sizeof(conn->socket)); + + conn->state = MQTT_CONN_STATE_NOT_CONNECTED; +} +/*---------------------------------------------------------------------------*/ +static void +connect_tcp(struct mqtt_connection *conn) +{ + conn->state = MQTT_CONN_STATE_TCP_CONNECTING; + + reset_defaults(conn); + tcp_socket_register(&(conn->socket), + conn, + conn->in_buffer, + MQTT_TCP_INPUT_BUFF_SIZE, + conn->out_buffer, + MQTT_TCP_OUTPUT_BUFF_SIZE, + tcp_input, + tcp_event); + tcp_socket_connect(&(conn->socket), &(conn->server_ip), conn->server_port); +} +/*---------------------------------------------------------------------------*/ +static void +disconnect_tcp(struct mqtt_connection *conn) +{ + conn->state = MQTT_CONN_STATE_DISCONNECTING; + tcp_socket_close(&(conn->socket)); + tcp_socket_unregister(&conn->socket); + + memset(&conn->socket, 0, sizeof(conn->socket)); +} +/*---------------------------------------------------------------------------*/ +static void +send_out_buffer(struct mqtt_connection *conn) +{ + if(conn->out_buffer_ptr - conn->out_buffer == 0) { + conn->out_buffer_sent = 1; + return; + } + conn->out_buffer_sent = 0; + + DBG("MQTT - (send_out_buffer) Space used in buffer: %i\n", + conn->out_buffer_ptr - conn->out_buffer); + + tcp_socket_send(&conn->socket, conn->out_buffer, + conn->out_buffer_ptr - conn->out_buffer); +} +/*---------------------------------------------------------------------------*/ +static void +string_to_mqtt_string(struct mqtt_string *mqtt_string, char *string) +{ + if(mqtt_string == NULL) { + return; + } + mqtt_string->string = string; + + if(string != NULL) { + mqtt_string->length = strlen(string); + } else { + mqtt_string->length = 0; + } +} +/*---------------------------------------------------------------------------*/ +static int +write_byte(struct mqtt_connection *conn, uint8_t data) +{ + DBG("MQTT - (write_byte) buff_size: %i write: '%02X'\n", + &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr, + data); + + if(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr == 0) { + send_out_buffer(conn); + return 1; + } + + *conn->out_buffer_ptr = data; + conn->out_buffer_ptr++; + return 0; +} +/*---------------------------------------------------------------------------*/ +static int +write_bytes(struct mqtt_connection *conn, uint8_t *data, uint16_t len) +{ + uint16_t write_bytes; + write_bytes = + MIN(&conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr, + len - conn->out_write_pos); + + memcpy(conn->out_buffer_ptr, &data[conn->out_write_pos], write_bytes); + conn->out_write_pos += write_bytes; + conn->out_buffer_ptr += write_bytes; + + DBG("MQTT - (write_bytes) len: %u write_pos: %lu\n", len, + conn->out_write_pos); + + if(len - conn->out_write_pos == 0) { + conn->out_write_pos = 0; + return 0; + } else { + send_out_buffer(conn); + return len - conn->out_write_pos; + } +} +/*---------------------------------------------------------------------------*/ +static void +encode_remaining_length(uint8_t *remaining_length, + uint8_t *remaining_length_bytes, + uint32_t length) +{ + uint8_t digit; + + DBG("MQTT - Encoding length %lu\n", length); + + *remaining_length_bytes = 0; + do { + digit = length % 128; + length = length / 128; + if(length > 0) { + digit = digit | 0x80; + } + + remaining_length[*remaining_length_bytes] = digit; + (*remaining_length_bytes)++; + DBG("MQTT - Encode len digit '%u' length '%lu'\n", digit, length); + } while(length > 0 && *remaining_length_bytes < 5); + DBG("MQTT - remaining_length_bytes %u\n", *remaining_length_bytes); +} +/*---------------------------------------------------------------------------*/ +static void +keep_alive_callback(void *ptr) +{ + struct mqtt_connection *conn = ptr; + + DBG("MQTT - (keep_alive_callback) Called!\n"); + + /* The flag is set when the PINGREQ has been sent */ + if(conn->waiting_for_pingresp) { + PRINTF("MQTT - Disconnect due to no PINGRESP from broker.\n"); + disconnect_tcp(conn); + return; + } + + process_post(&mqtt_process, mqtt_do_pingreq_event, conn); +} +/*---------------------------------------------------------------------------*/ +static void +reset_packet(struct mqtt_in_packet *packet) +{ + memset(packet, 0, sizeof(struct mqtt_in_packet)); + packet->remaining_multiplier = 1; +} +/*---------------------------------------------------------------------------*/ +static +PT_THREAD(connect_pt(struct pt *pt, struct mqtt_connection *conn)) +{ + PT_BEGIN(pt); + + DBG("MQTT - Sending CONNECT message...\n"); + + /* Set up FHDR */ + conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_CONNECT; + conn->out_packet.remaining_length = 0; + conn->out_packet.remaining_length += MQTT_CONNECT_VHDR_FLAGS_SIZE; + conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->client_id); + conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.username); + conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->credentials.password); + conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.topic); + conn->out_packet.remaining_length += MQTT_STRING_LENGTH(&conn->will.message); + encode_remaining_length(conn->out_packet.remaining_length_enc, + &conn->out_packet.remaining_length_enc_bytes, + conn->out_packet.remaining_length); + if(conn->out_packet.remaining_length_enc_bytes > 4) { + call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL); + PRINTF("MQTT - Error, remaining length > 4 bytes\n"); + PT_EXIT(pt); + } + + /* Write Fixed Header */ + PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr); + PT_MQTT_WRITE_BYTES(conn, + conn->out_packet.remaining_length_enc, + conn->out_packet.remaining_length_enc_bytes); + PT_MQTT_WRITE_BYTE(conn, 0); + PT_MQTT_WRITE_BYTE(conn, 6); + PT_MQTT_WRITE_BYTES(conn, (uint8_t *)MQTT_PROTOCOL_NAME, 6); + PT_MQTT_WRITE_BYTE(conn, MQTT_PROTOCOL_VERSION); + PT_MQTT_WRITE_BYTE(conn, conn->connect_vhdr_flags); + PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive >> 8)); + PT_MQTT_WRITE_BYTE(conn, (conn->keep_alive & 0x00FF)); + PT_MQTT_WRITE_BYTE(conn, conn->client_id.length << 8); + PT_MQTT_WRITE_BYTE(conn, conn->client_id.length & 0x00FF); + PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->client_id.string, + conn->client_id.length); + if(conn->connect_vhdr_flags & MQTT_VHDR_WILL_FLAG) { + PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length << 8); + PT_MQTT_WRITE_BYTE(conn, conn->will.topic.length & 0x00FF); + PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.topic.string, + conn->will.topic.length); + PT_MQTT_WRITE_BYTE(conn, conn->will.message.length << 8); + PT_MQTT_WRITE_BYTE(conn, conn->will.message.length & 0x00FF); + PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->will.message.string, + conn->will.message.length); + DBG("MQTT - Setting will topic to '%s' %u bytes and message to '%s' %u bytes\n", + conn->will.topic.string, + conn->will.topic.length, + conn->will.message.string, + conn->will.message.length); + } + if(conn->connect_vhdr_flags & MQTT_VHDR_USERNAME_FLAG) { + PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length << 8); + PT_MQTT_WRITE_BYTE(conn, conn->credentials.username.length & 0x00FF); + PT_MQTT_WRITE_BYTES(conn, + (uint8_t *)conn->credentials.username.string, + conn->credentials.username.length); + } + if(conn->connect_vhdr_flags & MQTT_VHDR_PASSWORD_FLAG) { + PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length << 8); + PT_MQTT_WRITE_BYTE(conn, conn->credentials.password.length & 0x00FF); + PT_MQTT_WRITE_BYTES(conn, + (uint8_t *)conn->credentials.password.string, + conn->credentials.password.length); + } + + /* Send out buffer */ + send_out_buffer(conn); + conn->state = MQTT_CONN_STATE_CONNECTING_TO_BROKER; + + timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT); + + /* Wait for CONNACK */ + reset_packet(&conn->in_packet); + PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK || + timer_expired(&conn->t)); + if(timer_expired(&conn->t)) { + DBG("Timeout waiting for CONNACK\n"); + /* We stick to the letter of the spec here: Tear the connection down */ + mqtt_disconnect(conn); + } + reset_packet(&conn->in_packet); + + DBG("MQTT - Done sending CONNECT\n"); + +#if DEBUG_MQTT == 1 + DBG("MQTT - CONNECT message sent: \n"); + uint16_t i; + for(i = 0; i < (conn->out_buffer_ptr - conn->out_buffer); i++) { + DBG("%02X ", conn->out_buffer[i]); + } + DBG("\n"); +#endif + + PT_END(pt); +} +/*---------------------------------------------------------------------------*/ +static +PT_THREAD(disconnect_pt(struct pt *pt, struct mqtt_connection *conn)) +{ + PT_BEGIN(pt); + + PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_DISCONNECT); + PT_MQTT_WRITE_BYTE(conn, 0); + + send_out_buffer(conn); + + /* + * Wait a couple of seconds for a TCP ACK. We don't really need the ACK, + * we do want the TCP/IP stack to actually send this disconnect before we + * tear down the session. + */ + timer_set(&conn->t, (CLOCK_SECOND * 2)); + PT_WAIT_UNTIL(pt, conn->out_buffer_sent || timer_expired(&conn->t)); + + PT_END(pt); +} +/*---------------------------------------------------------------------------*/ +static +PT_THREAD(subscribe_pt(struct pt *pt, struct mqtt_connection *conn)) +{ + PT_BEGIN(pt); + + DBG("MQTT - Sending subscribe message! topic %s topic_length %i\n", + conn->out_packet.topic, + conn->out_packet.topic_length); + DBG("MQTT - Buffer space is %i \n", + &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr); + + /* Set up FHDR */ + conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_SUBSCRIBE | MQTT_FHDR_QOS_LEVEL_1; + conn->out_packet.remaining_length = MQTT_MID_SIZE + + MQTT_STRING_LEN_SIZE + + conn->out_packet.topic_length + + MQTT_QOS_SIZE; + encode_remaining_length(conn->out_packet.remaining_length_enc, + &conn->out_packet.remaining_length_enc_bytes, + conn->out_packet.remaining_length); + if(conn->out_packet.remaining_length_enc_bytes > 4) { + call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL); + PRINTF("MQTT - Error, remaining length > 4 bytes\n"); + PT_EXIT(pt); + } + + /* Write Fixed Header */ + PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr); + PT_MQTT_WRITE_BYTES(conn, + conn->out_packet.remaining_length_enc, + conn->out_packet.remaining_length_enc_bytes); + /* Write Variable Header */ + PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid << 8)); + PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF)); + /* Write Payload */ + PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8)); + PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF)); + PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic, + conn->out_packet.topic_length); + PT_MQTT_WRITE_BYTE(conn, conn->out_packet.qos); + + /* Send out buffer */ + send_out_buffer(conn); + timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT); + + /* Wait for SUBACK. */ + reset_packet(&conn->in_packet); + PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK || + timer_expired(&conn->t)); + + if(timer_expired(&conn->t)) { + DBG("Timeout waiting for SUBACK\n"); + } + reset_packet(&conn->in_packet); + + /* This is clear after the entire transaction is complete */ + conn->out_queue_full = 0; + + DBG("MQTT - Done in send_subscribe!\n"); + + PT_END(pt); +} +/*---------------------------------------------------------------------------*/ +static +PT_THREAD(unsubscribe_pt(struct pt *pt, struct mqtt_connection *conn)) +{ + PT_BEGIN(pt); + + DBG("MQTT - Sending unsubscribe message on topic %s topic_length %i\n", + conn->out_packet.topic, + conn->out_packet.topic_length); + DBG("MQTT - Buffer space is %i \n", + &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr); + + /* Set up FHDR */ + conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_UNSUBSCRIBE | + MQTT_FHDR_QOS_LEVEL_1; + conn->out_packet.remaining_length = MQTT_MID_SIZE + + MQTT_STRING_LEN_SIZE + + conn->out_packet.topic_length; + encode_remaining_length(conn->out_packet.remaining_length_enc, + &conn->out_packet.remaining_length_enc_bytes, + conn->out_packet.remaining_length); + if(conn->out_packet.remaining_length_enc_bytes > 4) { + call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL); + PRINTF("MQTT - Error, remaining length > 4 bytes\n"); + PT_EXIT(pt); + } + + /* Write Fixed Header */ + PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr); + PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc, + conn->out_packet.remaining_length_enc_bytes); + /* Write Variable Header */ + PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid << 8)); + PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF)); + /* Write Payload */ + PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8)); + PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF)); + PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic, + conn->out_packet.topic_length); + + /* Send out buffer */ + send_out_buffer(conn); + timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT); + + /* Wait for UNSUBACK */ + reset_packet(&conn->in_packet); + PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK || + timer_expired(&conn->t)); + + if(timer_expired(&conn->t)) { + DBG("Timeout waiting for UNSUBACK\n"); + } + + reset_packet(&conn->in_packet); + + /* This is clear after the entire transaction is complete */ + conn->out_queue_full = 0; + + DBG("MQTT - Done writing subscribe message to out buffer!\n"); + + PT_END(pt); +} +/*---------------------------------------------------------------------------*/ +static +PT_THREAD(publish_pt(struct pt *pt, struct mqtt_connection *conn)) +{ + PT_BEGIN(pt); + + DBG("MQTT - Sending publish message! topic %s topic_length %i\n", + conn->out_packet.topic, + conn->out_packet.topic_length); + DBG("MQTT - Buffer space is %i \n", + &conn->out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE] - conn->out_buffer_ptr); + + /* Set up FHDR */ + conn->out_packet.fhdr = MQTT_FHDR_MSG_TYPE_PUBLISH | + conn->out_packet.qos << 1; + if(conn->out_packet.retain == MQTT_RETAIN_ON) { + conn->out_packet.fhdr |= MQTT_FHDR_RETAIN_FLAG; + } + conn->out_packet.remaining_length = MQTT_STRING_LEN_SIZE + + conn->out_packet.topic_length + + conn->out_packet.payload_size; + if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) { + conn->out_packet.remaining_length += MQTT_MID_SIZE; + } + encode_remaining_length(conn->out_packet.remaining_length_enc, + &conn->out_packet.remaining_length_enc_bytes, + conn->out_packet.remaining_length); + if(conn->out_packet.remaining_length_enc_bytes > 4) { + call_event(conn, MQTT_EVENT_PROTOCOL_ERROR, NULL); + PRINTF("MQTT - Error, remaining length > 4 bytes\n"); + PT_EXIT(pt); + } + + /* Write Fixed Header */ + PT_MQTT_WRITE_BYTE(conn, conn->out_packet.fhdr); + PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.remaining_length_enc, + conn->out_packet.remaining_length_enc_bytes); + /* Write Variable Header */ + PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length >> 8)); + PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.topic_length & 0x00FF)); + PT_MQTT_WRITE_BYTES(conn, (uint8_t *)conn->out_packet.topic, + conn->out_packet.topic_length); + if(conn->out_packet.qos > MQTT_QOS_LEVEL_0) { + PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid << 8)); + PT_MQTT_WRITE_BYTE(conn, (conn->out_packet.mid & 0x00FF)); + } + /* Write Payload */ + PT_MQTT_WRITE_BYTES(conn, + conn->out_packet.payload, + conn->out_packet.payload_size); + + send_out_buffer(conn); + timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT); + + /* + * If QoS is zero then wait until the message has been sent, since there is + * no ACK to wait for. + * + * Also notify the app will not be notified via PUBACK or PUBCOMP + */ + if(conn->out_packet.qos == 0) { + process_post(conn->app_process, mqtt_update_event, NULL); + } else if(conn->out_packet.qos == 1) { + /* Wait for PUBACK */ + reset_packet(&conn->in_packet); + PT_WAIT_UNTIL(pt, conn->out_packet.qos_state == MQTT_QOS_STATE_GOT_ACK || + timer_expired(&conn->t)); + if(timer_expired(&conn->t)) { + DBG("Timeout waiting for PUBACK\n"); + } + if(conn->in_packet.mid != conn->out_packet.mid) { + DBG("MQTT - Warning, got PUBACK with none matching MID. Currently there " + "is no support for several concurrent PUBLISH messages.\n"); + } + } else if(conn->out_packet.qos == 2) { + DBG("MQTT - QoS not implemented yet.\n"); + /* Should wait for PUBREC, send PUBREL and then wait for PUBCOMP */ + } + + reset_packet(&conn->in_packet); + + /* This is clear after the entire transaction is complete */ + conn->out_queue_full = 0; + + DBG("MQTT - Publish Enqueued\n"); + + PT_END(pt); +} +/*---------------------------------------------------------------------------*/ +static +PT_THREAD(pingreq_pt(struct pt *pt, struct mqtt_connection *conn)) +{ + PT_BEGIN(pt); + + DBG("MQTT - Sending PINGREQ\n"); + + /* Write Fixed Header */ + PT_MQTT_WRITE_BYTE(conn, MQTT_FHDR_MSG_TYPE_PINGREQ); + PT_MQTT_WRITE_BYTE(conn, 0); + + send_out_buffer(conn); + + /* Start timeout for reply. */ + conn->waiting_for_pingresp = 1; + + /* Wait for PINGRESP or timeout */ + reset_packet(&conn->in_packet); + timer_set(&conn->t, RESPONSE_WAIT_TIMEOUT); + + PT_WAIT_UNTIL(pt, conn->in_packet.packet_received || timer_expired(&conn->t)); + + reset_packet(&conn->in_packet); + + conn->waiting_for_pingresp = 0; + + PT_END(pt); +} +/*---------------------------------------------------------------------------*/ +static void +handle_connack(struct mqtt_connection *conn) +{ + DBG("MQTT - Got CONNACK\n"); + + if(conn->in_packet.payload[1] != 0) { + PRINTF("MQTT - Connection refused with Return Code %i\n", + conn->in_packet.payload[1]); + call_event(conn, + MQTT_EVENT_CONNECTION_REFUSED_ERROR, + &conn->in_packet.payload[1]); + } + + conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK; + + ctimer_set(&conn->keep_alive_timer, conn->keep_alive * CLOCK_SECOND, + keep_alive_callback, conn); + + /* Always reset packet before callback since it might be used directly */ + conn->state = MQTT_CONN_STATE_CONNECTED_TO_BROKER; + call_event(conn, MQTT_EVENT_CONNECTED, NULL); +} +/*---------------------------------------------------------------------------*/ +static void +handle_pingresp(struct mqtt_connection *conn) +{ + DBG("MQTT - Got RINGRESP\n"); +} +/*---------------------------------------------------------------------------*/ +static void +handle_suback(struct mqtt_connection *conn) +{ + struct mqtt_suback_event suback_event; + + DBG("MQTT - Got SUBACK\n"); + + /* Only accept SUBACKS with X topic QoS response, assume 1 */ + if(conn->in_packet.remaining_length > MQTT_MID_SIZE + + MQTT_MAX_TOPICS_PER_SUBSCRIBE * MQTT_QOS_SIZE) { + DBG("MQTT - Error, SUBACK with > 1 topic, not supported.\n"); + } + + conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK; + + suback_event.mid = (conn->in_packet.payload[0] << 8) | + (conn->in_packet.payload[1]); + suback_event.qos_level = conn->in_packet.payload[2]; + conn->in_packet.mid = suback_event.mid; + + if(conn->in_packet.mid != conn->out_packet.mid) { + DBG("MQTT - Warning, got SUBACK with none matching MID. Currently there is" + "no support for several concurrent SUBSCRIBE messages.\n"); + } + + /* Always reset packet before callback since it might be used directly */ + call_event(conn, MQTT_EVENT_SUBACK, &suback_event); +} +/*---------------------------------------------------------------------------*/ +static void +handle_unsuback(struct mqtt_connection *conn) +{ + DBG("MQTT - Got UNSUBACK\n"); + + conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK; + conn->in_packet.mid = (conn->in_packet.payload[0] << 8) | + (conn->in_packet.payload[1]); + + if(conn->in_packet.mid != conn->out_packet.mid) { + DBG("MQTT - Warning, got UNSUBACK with none matching MID. Currently there is" + "no support for several concurrent UNSUBSCRIBE messages.\n"); + } + + call_event(conn, MQTT_EVENT_UNSUBACK, &conn->in_packet.mid); +} +/*---------------------------------------------------------------------------*/ +static void +handle_puback(struct mqtt_connection *conn) +{ + DBG("MQTT - Got PUBACK\n"); + + conn->out_packet.qos_state = MQTT_QOS_STATE_GOT_ACK; + conn->in_packet.mid = (conn->in_packet.payload[0] << 8) | + (conn->in_packet.payload[1]); + + call_event(conn, MQTT_EVENT_PUBACK, &conn->in_packet.mid); +} +/*---------------------------------------------------------------------------*/ +static void +handle_publish(struct mqtt_connection *conn) +{ + DBG("MQTT - Got PUBLISH, called once per manageable chunk of message.\n"); + DBG("MQTT - Handling publish on topic '%s'\n", conn->in_publish_msg.topic); + + DBG("MQTT - This chunk is %i bytes\n", conn->in_packet.payload_pos); + + if(((conn->in_packet.fhdr & 0x09) >> 1) > 0) { + PRINTF("MQTT - Error, got incoming PUBLISH with QoS > 0, not supported atm!\n"); + } + + call_event(conn, MQTT_EVENT_PUBLISH, &conn->in_publish_msg); + + if(conn->in_publish_msg.first_chunk == 1) { + conn->in_publish_msg.first_chunk = 0; + } + + /* If this is the last time handle_publish will be called, reset packet. */ + if(conn->in_publish_msg.payload_left == 0) { + + /* Check for QoS and initiate the reply, do not rely on the data in the + * in_packet being untouched. */ + + DBG("MQTT - (handle_publish) resetting packet.\n"); + reset_packet(&conn->in_packet); + } +} +/*---------------------------------------------------------------------------*/ +static void +parse_publish_vhdr(struct mqtt_connection *conn, + uint32_t *pos, + const uint8_t *input_data_ptr, + int input_data_len) +{ + uint16_t copy_bytes; + + /* Read out topic length */ + if(conn->in_packet.topic_len_received == 0) { + conn->in_packet.topic_len = (input_data_ptr[(*pos)++] << 8); + conn->in_packet.byte_counter++; + if(*pos >= input_data_len) { + return; + } + conn->in_packet.topic_len |= input_data_ptr[(*pos)++]; + conn->in_packet.byte_counter++; + conn->in_packet.topic_len_received = 1; + + DBG("MQTT - Read PUBLISH topic len %i\n", conn->in_packet.topic_len); + /* WARNING: Check here if TOPIC fits in payload area, otherwise error */ + } + + /* Read out topic */ + if(conn->in_packet.topic_len_received == 1 && + conn->in_packet.topic_received == 0) { + copy_bytes = MIN(conn->in_packet.topic_len - conn->in_packet.topic_pos, + input_data_len - *pos); + DBG("MQTT - topic_pos: %i copy_bytes: %i", conn->in_packet.topic_pos, + copy_bytes); + memcpy(&conn->in_publish_msg.topic[conn->in_packet.topic_pos], + &input_data_ptr[*pos], + copy_bytes); + (*pos) += copy_bytes; + conn->in_packet.byte_counter += copy_bytes; + conn->in_packet.topic_pos += copy_bytes; + + if(conn->in_packet.topic_len - conn->in_packet.topic_pos == 0) { + DBG("MQTT - Got topic '%s'", conn->in_publish_msg.topic); + conn->in_packet.topic_received = 1; + conn->in_publish_msg.topic[conn->in_packet.topic_pos] = '\0'; + conn->in_publish_msg.payload_length = + conn->in_packet.remaining_length - conn->in_packet.topic_len - 2; + conn->in_publish_msg.payload_left = conn->in_publish_msg.payload_length; + } + + /* Set this once per incomming publish message */ + conn->in_publish_msg.first_chunk = 1; + } +} +/*---------------------------------------------------------------------------*/ +static int +tcp_input(struct tcp_socket *s, + void *ptr, + const uint8_t *input_data_ptr, + int input_data_len) +{ + struct mqtt_connection *conn = ptr; + uint32_t pos = 0; + uint32_t copy_bytes = 0; + uint8_t byte; + + if(input_data_len == 0) { + return 0; + } + + if(conn->in_packet.packet_received) { + reset_packet(&conn->in_packet); + } + + DBG("tcp_input with %i bytes of data:\n", input_data_len); + + /* Read the fixed header field, if we do not have it */ + if(!conn->in_packet.fhdr) { + conn->in_packet.fhdr = input_data_ptr[pos++]; + conn->in_packet.byte_counter++; + + DBG("MQTT - Read VHDR '%02X'\n", conn->in_packet.fhdr); + + if(pos >= input_data_len) { + return 0; + } + } + + /* Read the Remaining Length field, if we do not have it */ + if(!conn->in_packet.has_remaining_length) { + do { + if(pos >= input_data_len) { + return 0; + } + + byte = input_data_ptr[pos++]; + conn->in_packet.byte_counter++; + conn->in_packet.remaining_length_bytes++; + DBG("MQTT - Read Remaining Length byte\n"); + + if(conn->in_packet.byte_counter > 5) { + call_event(conn, MQTT_EVENT_ERROR, NULL); + DBG("Received more then 4 byte 'remaining lenght'."); + return 0; + } + + conn->in_packet.remaining_length += + (byte & 127) * conn->in_packet.remaining_multiplier; + conn->in_packet.remaining_multiplier *= 128; + } while((byte & 128) != 0); + + DBG("MQTT - Finished reading remaining length byte\n"); + conn->in_packet.has_remaining_length = 1; + } + + /* + * Check for unsupported payload length. Will read all incoming data from the + * server in any case and then reset the packet. + * + * TODO: Decide if we, for example, want to disconnect instead. + */ + if((conn->in_packet.remaining_length > MQTT_INPUT_BUFF_SIZE) && + (conn->in_packet.fhdr & 0xF0) != MQTT_FHDR_MSG_TYPE_PUBLISH) { + + PRINTF("MQTT - Error, unsupported payload size for non-PUBLISH message\n"); + + conn->in_packet.byte_counter += input_data_len; + if(conn->in_packet.byte_counter >= + (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) { + conn->in_packet.packet_received = 1; + } + return 0; + } + + /* + * Supported payload, reads out both VHDR and Payload of all packets. + * + * Note: There will always be at least one byte left to read when we enter + * this loop. + */ + while(conn->in_packet.byte_counter < + (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)) { + + if((conn->in_packet.fhdr & 0xF0) == MQTT_FHDR_MSG_TYPE_PUBLISH && + conn->in_packet.topic_received == 0) { + parse_publish_vhdr(conn, &pos, input_data_ptr, input_data_len); + } + + /* Read in as much as we can into the packet payload */ + copy_bytes = MIN(input_data_len - pos, + MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos); + DBG("- Copied %lu payload bytes\n", copy_bytes); + memcpy(&conn->in_packet.payload[conn->in_packet.payload_pos], + &input_data_ptr[pos], + copy_bytes); + conn->in_packet.byte_counter += copy_bytes; + conn->in_packet.payload_pos += copy_bytes; + pos += copy_bytes; + + uint8_t i; + DBG("MQTT - Copied bytes: \n"); + for(i = 0; i < copy_bytes; i++) { + DBG("%02X ", conn->in_packet.payload[i]); + } + DBG("\n"); + + /* Full buffer, shall only happen to PUBLISH messages. */ + if(MQTT_INPUT_BUFF_SIZE - conn->in_packet.payload_pos == 0) { + conn->in_publish_msg.payload_chunk = conn->in_packet.payload; + conn->in_publish_msg.payload_chunk_length = MQTT_INPUT_BUFF_SIZE; + conn->in_publish_msg.payload_left -= MQTT_INPUT_BUFF_SIZE; + + handle_publish(conn); + + conn->in_publish_msg.payload_chunk = conn->in_packet.payload; + conn->in_packet.payload_pos = 0; + } + + if(pos >= input_data_len && + (conn->in_packet.byte_counter < (MQTT_FHDR_SIZE + conn->in_packet.remaining_length))) { + return 0; + } + } + + /* Debug information */ + DBG("\n"); + /* Take care of input */ + DBG("MQTT - Finished reading packet!\n"); + /* What to return? */ + DBG("MQTT - total data was %i bytes of data. \n", + (MQTT_FHDR_SIZE + conn->in_packet.remaining_length)); + + /* Handle packet here. */ + switch(conn->in_packet.fhdr & 0xF0) { + case MQTT_FHDR_MSG_TYPE_CONNACK: + handle_connack(conn); + break; + case MQTT_FHDR_MSG_TYPE_PUBLISH: + /* This is the only or the last chunk of publish payload */ + conn->in_publish_msg.payload_chunk = conn->in_packet.payload; + conn->in_publish_msg.payload_chunk_length = conn->in_packet.payload_pos; + conn->in_publish_msg.payload_left = 0; + handle_publish(conn); + break; + case MQTT_FHDR_MSG_TYPE_PUBACK: + handle_puback(conn); + break; + case MQTT_FHDR_MSG_TYPE_SUBACK: + handle_suback(conn); + break; + case MQTT_FHDR_MSG_TYPE_UNSUBACK: + handle_unsuback(conn); + break; + case MQTT_FHDR_MSG_TYPE_PINGRESP: + handle_pingresp(conn); + break; + + /* QoS 2 not implemented yet */ + case MQTT_FHDR_MSG_TYPE_PUBREC: + case MQTT_FHDR_MSG_TYPE_PUBREL: + case MQTT_FHDR_MSG_TYPE_PUBCOMP: + call_event(conn, MQTT_EVENT_NOT_IMPLEMENTED_ERROR, NULL); + PRINTF("MQTT - Got unhandled MQTT Message Type '%i'", + (conn->in_packet.fhdr & 0xF0)); + break; + + default: + /* All server-only message */ + PRINTF("MQTT - Got MQTT Message Type '%i'", (conn->in_packet.fhdr & 0xF0)); + break; + } + + conn->in_packet.packet_received = 1; + + return 0; +} +/*---------------------------------------------------------------------------*/ +/* + * Handles TCP events from Simple TCP + */ +static void +tcp_event(struct tcp_socket *s, void *ptr, tcp_socket_event_t event) +{ + struct mqtt_connection *conn = ptr; + + /* Take care of event */ + switch(event) { + + /* Fall through to manage different disconnect event the same way. */ + case TCP_SOCKET_CLOSED: + case TCP_SOCKET_TIMEDOUT: + case TCP_SOCKET_ABORTED: { + + DBG("MQTT - Disconnected by tcp event %d\n", event); + process_post(&mqtt_process, mqtt_abort_now_event, conn); + conn->state = MQTT_CONN_STATE_NOT_CONNECTED; + ctimer_stop(&conn->keep_alive_timer); + call_event(conn, MQTT_EVENT_DISCONNECTED, &event); + abort_connection(conn); + + /* If connecting retry */ + if(conn->auto_reconnect == 1) { + connect_tcp(conn); + } + break; + } + case TCP_SOCKET_CONNECTED: { + conn->state = MQTT_CONN_STATE_TCP_CONNECTED; + conn->out_buffer_sent = 1; + + process_post(&mqtt_process, mqtt_do_connect_mqtt_event, conn); + break; + } + case TCP_SOCKET_DATA_SENT: { + DBG("MQTT - Got TCP_DATA_SENT\n"); + + if(conn->socket.output_data_len == 0) { + conn->out_buffer_sent = 1; + conn->out_buffer_ptr = conn->out_buffer; + } + + ctimer_restart(&conn->keep_alive_timer); + break; + } + + default: { + DBG("MQTT - TCP Event %d is currently not managed by the tcp event callback\n", + event); + } + } +} +/*---------------------------------------------------------------------------*/ +PROCESS_THREAD(mqtt_process, ev, data) +{ + static struct mqtt_connection *conn; + + PROCESS_BEGIN(); + + while(1) { + PROCESS_WAIT_EVENT(); + + if(ev == mqtt_abort_now_event) { + DBG("MQTT - Abort\n"); + conn = data; + conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; + + abort_connection(conn); + } + if(ev == mqtt_do_connect_tcp_event) { + conn = data; + DBG("MQTT - Got mqtt_do_connect_tcp_event!\n"); + connect_tcp(conn); + } + if(ev == mqtt_do_connect_mqtt_event) { + conn = data; + conn->socket.output_data_max_seg = conn->max_segment_size; + DBG("MQTT - Got mqtt_do_connect_mqtt_event!\n"); + + if(conn->out_buffer_sent == 1) { + PT_INIT(&conn->out_proto_thread); + while(connect_pt(&conn->out_proto_thread, conn) < PT_EXITED && + conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) { + PT_MQTT_WAIT_SEND(); + } + } + } + if(ev == mqtt_do_disconnect_mqtt_event) { + conn = data; + DBG("MQTT - Got mqtt_do_disconnect_mqtt_event!\n"); + + /* Send MQTT Disconnect if we are connected */ + if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) { + if(conn->out_buffer_sent == 1) { + PT_INIT(&conn->out_proto_thread); + while(disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED && + conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) { + PT_MQTT_WAIT_SEND(); + } + abort_connection(conn); + call_event(conn, MQTT_EVENT_DISCONNECTED, &ev); + } else { + process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn); + } + } + } + if(ev == mqtt_do_pingreq_event) { + conn = data; + DBG("MQTT - Got mqtt_do_pingreq_event!\n"); + + if(conn->out_buffer_sent == 1 && + conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + PT_INIT(&conn->out_proto_thread); + while(pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED && + conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + PT_MQTT_WAIT_SEND(); + } + } + } + if(ev == mqtt_do_subscribe_event) { + conn = data; + DBG("MQTT - Got mqtt_do_subscribe_mqtt_event!\n"); + + if(conn->out_buffer_sent == 1 && + conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + PT_INIT(&conn->out_proto_thread); + while(subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED && + conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + PT_MQTT_WAIT_SEND(); + } + } + } + if(ev == mqtt_do_unsubscribe_event) { + conn = data; + DBG("MQTT - Got mqtt_do_unsubscribe_mqtt_event!\n"); + + if(conn->out_buffer_sent == 1 && + conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + PT_INIT(&conn->out_proto_thread); + while(unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED && + conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + PT_MQTT_WAIT_SEND(); + } + } + } + if(ev == mqtt_do_publish_event) { + conn = data; + DBG("MQTT - Got mqtt_do_publish_mqtt_event!\n"); + + if(conn->out_buffer_sent == 1 && + conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + PT_INIT(&conn->out_proto_thread); + while(publish_pt(&conn->out_proto_thread, conn) < PT_EXITED && + conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + PT_MQTT_WAIT_SEND(); + } + } + } + } + PROCESS_END(); +} +/*---------------------------------------------------------------------------*/ +void +mqtt_init(void) +{ + static uint8_t inited = 0; + if(!inited) { + mqtt_do_connect_tcp_event = process_alloc_event(); + mqtt_event_min = mqtt_do_connect_tcp_event; + + mqtt_do_connect_mqtt_event = process_alloc_event(); + mqtt_do_disconnect_mqtt_event = process_alloc_event(); + mqtt_do_subscribe_event = process_alloc_event(); + mqtt_do_unsubscribe_event = process_alloc_event(); + mqtt_do_publish_event = process_alloc_event(); + mqtt_do_pingreq_event = process_alloc_event(); + mqtt_update_event = process_alloc_event(); + mqtt_abort_now_event = process_alloc_event(); + mqtt_event_max = mqtt_abort_now_event; + + mqtt_continue_send_event = process_alloc_event(); + + list_init(mqtt_conn_list); + process_start(&mqtt_process, NULL); + inited = 1; + } +} +/*---------------------------------------------------------------------------*/ +mqtt_status_t +mqtt_register(struct mqtt_connection *conn, struct process *app_process, + char *client_id, mqtt_event_callback_t event_callback, + uint16_t max_segment_size) +{ + if(strlen(client_id) < 1) { + return MQTT_STATUS_INVALID_ARGS_ERROR; + } + + /* Set defaults - Set all to zero to begin with */ + memset(conn, 0, sizeof(struct mqtt_connection)); + string_to_mqtt_string(&conn->client_id, client_id); + conn->event_callback = event_callback; + conn->app_process = app_process; + conn->auto_reconnect = 1; + conn->max_segment_size = max_segment_size; + reset_defaults(conn); + + mqtt_init(); + list_add(mqtt_conn_list, conn); + + DBG("MQTT - Registered successfully\n"); + + return MQTT_STATUS_OK; +} +/*---------------------------------------------------------------------------*/ +/* + * Connect to MQTT broker. + * + * N.B. Non-blocking call. + */ +mqtt_status_t +mqtt_connect(struct mqtt_connection *conn, char *host, uint16_t port, + uint16_t keep_alive) +{ + uip_ip6addr_t ip6addr; + uip_ipaddr_t *ipaddr; + ipaddr = &ip6addr; + + /* Check if we are already trying to connect */ + if(conn->state > MQTT_CONN_STATE_NOT_CONNECTED) { + return MQTT_STATUS_OK; + } + + conn->server_host = host; + conn->keep_alive = keep_alive; + conn->server_port = port; + conn->out_buffer_ptr = conn->out_buffer; + conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK; + conn->connect_vhdr_flags |= MQTT_VHDR_CLEAN_SESSION_FLAG; + + /* convert the string IPv6 address to a numeric IPv6 address */ + uiplib_ip6addrconv(host, &ip6addr); + + uip_ipaddr_copy(&(conn->server_ip), ipaddr); + + /* + * Initiate the connection if the IP could be resolved. Otherwise the + * connection will be initiated when the DNS lookup is finished, in the main + * event loop. + */ + process_post(&mqtt_process, mqtt_do_connect_tcp_event, conn); + + return MQTT_STATUS_OK; +} +/*----------------------------------------------------------------------------*/ +void +mqtt_disconnect(struct mqtt_connection *conn) +{ + if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + return; + } + + conn->state = MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT; + + process_post(&mqtt_process, mqtt_do_disconnect_mqtt_event, conn); +} +/*----------------------------------------------------------------------------*/ +mqtt_status_t +mqtt_subscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic, + mqtt_qos_level_t qos_level) +{ + if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + return MQTT_STATUS_NOT_CONNECTED_ERROR; + } + + DBG("MQTT - Call to mqtt_subscribe...\n"); + + /* Currently don't have a queue, so only one item at a time */ + if(conn->out_queue_full) { + DBG("MQTT - Not accepted!\n"); + return MQTT_STATUS_OUT_QUEUE_FULL; + } + conn->out_queue_full = 1; + DBG("MQTT - Accepted!\n"); + + conn->out_packet.mid = INCREMENT_MID(conn); + conn->out_packet.topic = topic; + conn->out_packet.topic_length = strlen(topic); + conn->out_packet.qos = qos_level; + conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK; + + process_post(&mqtt_process, mqtt_do_subscribe_event, conn); + return MQTT_STATUS_OK; +} +/*----------------------------------------------------------------------------*/ +mqtt_status_t +mqtt_unsubscribe(struct mqtt_connection *conn, uint16_t *mid, char *topic) +{ + if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + return MQTT_STATUS_NOT_CONNECTED_ERROR; + } + + DBG("MQTT - Call to mqtt_unsubscribe...\n"); + /* Currently don't have a queue, so only one item at a time */ + if(conn->out_queue_full) { + DBG("MQTT - Not accepted!\n"); + return MQTT_STATUS_OUT_QUEUE_FULL; + } + conn->out_queue_full = 1; + DBG("MQTT - Accepted!\n"); + + conn->out_packet.mid = INCREMENT_MID(conn); + conn->out_packet.topic = topic; + conn->out_packet.topic_length = strlen(topic); + conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK; + + process_post(&mqtt_process, mqtt_do_unsubscribe_event, conn); + return MQTT_STATUS_OK; +} +/*----------------------------------------------------------------------------*/ +mqtt_status_t +mqtt_publish(struct mqtt_connection *conn, uint16_t *mid, char *topic, + uint8_t *payload, uint32_t payload_size, + mqtt_qos_level_t qos_level, mqtt_retain_t retain) +{ + if(conn->state != MQTT_CONN_STATE_CONNECTED_TO_BROKER) { + return MQTT_STATUS_NOT_CONNECTED_ERROR; + } + + DBG("MQTT - Call to mqtt_publish...\n"); + + /* Currently don't have a queue, so only one item at a time */ + if(conn->out_queue_full) { + DBG("MQTT - Not accepted!\n"); + return MQTT_STATUS_OUT_QUEUE_FULL; + } + conn->out_queue_full = 1; + DBG("MQTT - Accepted!\n"); + + conn->out_packet.mid = INCREMENT_MID(conn); + conn->out_packet.retain = retain; + conn->out_packet.topic = topic; + conn->out_packet.topic_length = strlen(topic); + conn->out_packet.payload = payload; + conn->out_packet.payload_size = payload_size; + conn->out_packet.qos = qos_level; + conn->out_packet.qos_state = MQTT_QOS_STATE_NO_ACK; + + process_post(&mqtt_process, mqtt_do_publish_event, conn); + return MQTT_STATUS_OK; +} +/*----------------------------------------------------------------------------*/ +void +mqtt_set_username_password(struct mqtt_connection *conn, char *username, + char *password) +{ + /* Set strings, NULL string will simply set length to zero */ + string_to_mqtt_string(&conn->credentials.username, username); + string_to_mqtt_string(&conn->credentials.password, password); + + /* Set CONNECT VHDR flags */ + if(username != NULL) { + conn->connect_vhdr_flags |= MQTT_VHDR_USERNAME_FLAG; + } else { + conn->connect_vhdr_flags &= ~MQTT_VHDR_USERNAME_FLAG; + } + if(password != NULL) { + conn->connect_vhdr_flags |= MQTT_VHDR_PASSWORD_FLAG; + } else { + conn->connect_vhdr_flags &= ~MQTT_VHDR_PASSWORD_FLAG; + } +} +/*----------------------------------------------------------------------------*/ +void +mqtt_set_last_will(struct mqtt_connection *conn, char *topic, char *message, + mqtt_qos_level_t qos) +{ + /* Set strings, NULL string will simply set length to zero */ + string_to_mqtt_string(&conn->will.topic, topic); + string_to_mqtt_string(&conn->will.message, message); + + /* Currently not used! */ + conn->will.qos = qos; + + if(topic != NULL) { + conn->connect_vhdr_flags |= MQTT_VHDR_WILL_FLAG | + MQTT_VHDR_WILL_RETAIN_FLAG; + } +} +/*----------------------------------------------------------------------------*/ +/** @} */ diff --git a/apps/mqtt/mqtt.h b/apps/mqtt/mqtt.h new file mode 100644 index 000000000..4b27fa30c --- /dev/null +++ b/apps/mqtt/mqtt.h @@ -0,0 +1,509 @@ +/* + * Copyright (c) 2015, Texas Instruments Incorporated - http://www.ti.com/ + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/*---------------------------------------------------------------------------*/ +/** + * \addtogroup apps + * @{ + * + * \defgroup mqtt-engine An implementation of MQTT v3.1 + * @{ + * + * This application is an engine for MQTT v3.1. It supports QoS Levels 0 and 1. + * + * MQTT is a Client Server publish/subscribe messaging transport protocol. + * It is light weight, open, simple, and designed so as to be easy to implement. + * These characteristics make it ideal for use in many situations, including + * constrained environments such as for communication in Machine to Machine + * (M2M) and Internet of Things (IoT) contexts where a small code footprint is + * required and/or network bandwidth is at a premium. + * + * The protocol runs over TCP/IP, more specifically tcp_socket. + * Its features include: + * + * - Use of the publish/subscribe message pattern which provides + * one-to-many message distribution and decoupling of applications. + * - A messaging transport that is agnostic to the content of the payload. + * Three qualities of service for message delivery: + * -- "At most once" (0), where messages are delivered according to the best + * efforts of the operating environment. Message loss can occur. + * This level could be used, for example, with ambient sensor data where it + * does not matter if an individual reading is lost as the next one will be + * published soon after. + * --"At least once" (1), where messages are assured to arrive but duplicates + * can occur. + * -- "Exactly once" (2), where message are assured to arrive exactly once. + * This level could be used, for example, with billing systems where duplicate + * or lost messages could lead to incorrect charges being applied. This QoS + * level is currently not supported in this implementation. + * + * - A small transport overhead and protocol exchanges minimized to reduce + * network traffic. + * - A mechanism, Last Will, to notify interested parties when an abnormal + * disconnection occurs. + * + * The protocol specification and other useful information can be found + * here: http://mqtt.org + * + */ +/** + * \file + * Header file for the Contiki MQTT engine + * + * \author + * Texas Instruments + */ +/*---------------------------------------------------------------------------*/ +#ifndef MQTT_H_ +#define MQTT_H_ +/*---------------------------------------------------------------------------*/ +#include "contiki.h" +#include "contiki-net.h" +#include "contiki-lib.h" +#include "lib/random.h" +#include "sys/ctimer.h" +#include "sys/etimer.h" +#include "net/rpl/rpl.h" +#include "net/ip/uip.h" +#include "net/ipv6/uip-ds6.h" +#include "dev/leds.h" + +#include "tcp-socket.h" +#include "udp-socket.h" + +#include +#include +#include +/*---------------------------------------------------------------------------*/ +/* Protocol constants */ +#define MQTT_CLIENT_ID_MAX_LEN 23 + +/* Size of the underlying TCP buffers */ +#define MQTT_TCP_INPUT_BUFF_SIZE 512 +#define MQTT_TCP_OUTPUT_BUFF_SIZE 512 + +#define MQTT_INPUT_BUFF_SIZE 512 +#define MQTT_MAX_TOPIC_LENGTH 64 +#define MQTT_MAX_TOPICS_PER_SUBSCRIBE 1 + +#define MQTT_FHDR_SIZE 1 +#define MQTT_MAX_REMAINING_LENGTH_BYTES 4 +#define MQTT_PROTOCOL_VERSION 3 +#define MQTT_PROTOCOL_NAME "MQIsdp" +#define MQTT_TOPIC_MAX_LENGTH 128 +/*---------------------------------------------------------------------------*/ +/* + * Debug configuration, this is similar but not exactly like the Debugging + * System discussion at https://github.com/contiki-os/contiki/wiki. + */ +#define DEBUG_MQTT 0 + +#if DEBUG_MQTT == 1 +#define DBG(...) printf(__VA_ARGS__) +#else +#define DBG(...) +#endif /* DEBUG */ +/*---------------------------------------------------------------------------*/ +extern process_event_t mqtt_update_event; + +/* Forward declaration */ +struct mqtt_connection; + +typedef enum { + MQTT_RETAIN_OFF, + MQTT_RETAIN_ON, +} mqtt_retain_t; + +/** + * \brief MQTT engine events + */ +typedef enum { + MQTT_EVENT_CONNECTED, + MQTT_EVENT_DISCONNECTED, + + MQTT_EVENT_SUBACK, + MQTT_EVENT_UNSUBACK, + MQTT_EVENT_PUBLISH, + MQTT_EVENT_PUBACK, + + /* Errors */ + MQTT_EVENT_ERROR = 0x80, + MQTT_EVENT_PROTOCOL_ERROR, + MQTT_EVENT_CONNECTION_REFUSED_ERROR, + MQTT_EVENT_DNS_ERROR, + MQTT_EVENT_NOT_IMPLEMENTED_ERROR, + /* Add more */ +} mqtt_event_t; + +typedef enum { + MQTT_STATUS_OK, + + MQTT_STATUS_OUT_QUEUE_FULL, + + /* Errors */ + MQTT_STATUS_ERROR = 0x80, + MQTT_STATUS_NOT_CONNECTED_ERROR, + MQTT_STATUS_INVALID_ARGS_ERROR, + MQTT_STATUS_DNS_ERROR, +} mqtt_status_t; + +typedef enum { + MQTT_QOS_LEVEL_0, + MQTT_QOS_LEVEL_1, + MQTT_QOS_LEVEL_2, +} mqtt_qos_level_t; + +typedef enum { + MQTT_QOS_STATE_NO_ACK, + MQTT_QOS_STATE_GOT_ACK, + + /* Expand for QoS 2 */ +} mqtt_qos_state_t; +/*---------------------------------------------------------------------------*/ +/* + * This is the state of the connection itself. + * + * N.B. The order is important because of runtime checks on how far the + * connection has proceeded. + */ +typedef enum { + MQTT_CONN_STATE_ERROR, + MQTT_CONN_STATE_DNS_ERROR, + MQTT_CONN_STATE_DISCONNECTING, + + MQTT_CONN_STATE_NOT_CONNECTED, + MQTT_CONN_STATE_DNS_LOOKUP, + MQTT_CONN_STATE_TCP_CONNECTING, + MQTT_CONN_STATE_TCP_CONNECTED, + MQTT_CONN_STATE_CONNECTING_TO_BROKER, + MQTT_CONN_STATE_CONNECTED_TO_BROKER, + MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT, + MQTT_CONN_STATE_ABORT_IMMEDIATE, +} mqtt_conn_state_t; +/*---------------------------------------------------------------------------*/ +struct mqtt_string { + char *string; + uint16_t length; +}; + +/* + * Note that the pairing mid <-> QoS level only applies one-to-one if we only + * allow the subscription of one topic at a time. Otherwise we will have an + * ordered list of QoS levels corresponding to the order of topics. + * + * This could be part of a union of event data structures. + */ +struct mqtt_suback_event { + uint16_t mid; + mqtt_qos_level_t qos_level; +}; + +/* This is the MQTT message that is exposed to the end user. */ +struct mqtt_message { + uint32_t mid; + char topic[MQTT_MAX_TOPIC_LENGTH + 1]; /* +1 for string termination */ + + uint8_t *payload_chunk; + uint16_t payload_chunk_length; + + uint8_t first_chunk; + uint16_t payload_length; + uint16_t payload_left; +}; + +/* This struct represents a packet received from the MQTT server. */ +struct mqtt_in_packet { + /* Used by the list interface, must be first in the struct. */ + struct mqtt_connection *next; + + /* Total bytes read so far. Compared to the remaining length to to decide when + * we've read the payload. */ + uint32_t byte_counter; + uint8_t packet_received; + + uint8_t fhdr; + uint16_t remaining_length; + uint16_t mid; + + /* Helper variables needed to decode the remaining_length */ + uint8_t remaining_multiplier; + uint8_t has_remaining_length; + uint8_t remaining_length_bytes; + + /* Not the same as payload in the MQTT sense, it also contains the variable + * header. + */ + uint8_t payload_pos; + uint8_t payload[MQTT_INPUT_BUFF_SIZE]; + + /* Message specific data */ + uint16_t topic_len; + uint16_t topic_pos; + uint8_t topic_len_received; + uint8_t topic_received; +}; + +/* This struct represents a packet sent to the MQTT server. */ +struct mqtt_out_packet { + uint8_t fhdr; + uint32_t remaining_length; + uint8_t remaining_length_enc[MQTT_MAX_REMAINING_LENGTH_BYTES]; + uint8_t remaining_length_enc_bytes; + uint16_t mid; + char *topic; + uint16_t topic_length; + uint8_t *payload; + uint32_t payload_size; + mqtt_qos_level_t qos; + mqtt_qos_state_t qos_state; + mqtt_retain_t retain; +}; +/*---------------------------------------------------------------------------*/ +/** + * \brief MQTT event callback function + * \param m A pointer to a MQTT connection + * \param event The event number + * \param data A user-defined pointer + * + * The MQTT socket event callback function gets called whenever there is an + * event on a MQTT connection, such as the connection getting connected + * or closed. + */ +typedef void (*mqtt_event_callback_t)(struct mqtt_connection *m, + mqtt_event_t event, + void *data); + +typedef void (*mqtt_topic_callback_t)(struct mqtt_connection *m, + struct mqtt_message *msg); +/*---------------------------------------------------------------------------*/ +struct mqtt_will { + struct mqtt_string topic; + struct mqtt_string message; + mqtt_qos_level_t qos; +}; + +struct mqtt_credentials { + struct mqtt_string username; + struct mqtt_string password; +}; + +struct mqtt_connection { + /* Used by the list interface, must be first in the struct */ + struct mqtt_connection *next; + struct timer t; + + struct mqtt_string client_id; + + uint8_t connect_vhdr_flags; + uint8_t auto_reconnect; + + uint16_t keep_alive; + struct ctimer keep_alive_timer; + uint8_t waiting_for_pingresp; + + struct mqtt_will will; + struct mqtt_credentials credentials; + + mqtt_conn_state_t state; + mqtt_event_callback_t event_callback; + + /* Internal data */ + uint16_t mid_counter; + + /* Used for communication between MQTT API and APP */ + uint8_t out_queue_full; + struct process *app_process; + + /* Outgoing data related */ + uint8_t *out_buffer_ptr; + uint8_t out_buffer[MQTT_TCP_OUTPUT_BUFF_SIZE]; + uint8_t out_buffer_sent; + struct mqtt_out_packet out_packet; + struct pt out_proto_thread; + uint32_t out_write_pos; + uint16_t max_segment_size; + + /* Incoming data related */ + uint8_t in_buffer[MQTT_TCP_INPUT_BUFF_SIZE]; + struct mqtt_in_packet in_packet; + struct mqtt_message in_publish_msg; + + /* TCP related information */ + char *server_host; + uip_ipaddr_t server_ip; + uint16_t server_port; + struct tcp_socket socket; +}; +/* This is the API exposed to the user. */ +/*---------------------------------------------------------------------------*/ +/** + * \brief Initializes the MQTT engine. + * \param conn A pointer to the MQTT connection. + * \param app_process A pointer to the application process handling the MQTT + * connection. + * \param client_id A pointer to the MQTT client ID. + * \param event_callback Callback function responsible for handling the + * callback from MQTT engine. + * \param max_segment_size The TCP segment size to use for this MQTT/TCP + * connection. + * \return MQTT_STATUS_OK or MQTT_STATUS_INVALID_ARGS_ERROR + * + * This function initializes the MQTT engine and shall be called before any + * other MQTT function. + */ +mqtt_status_t mqtt_register(struct mqtt_connection *conn, + struct process *app_process, + char *client_id, + mqtt_event_callback_t event_callback, + uint16_t max_segment_size); +/*---------------------------------------------------------------------------*/ +/** + * \brief Connects to a MQTT broker. + * \param conn A pointer to the MQTT connection. + * \param host IP address of the broker to connect to. + * \param port Port of the broker to connect to, default is MQTT port is 1883. + * \param keep_alive Keep alive timer in seconds. Used by broker to handle + * client disc. Defines the maximum time interval between two messages + * from the client. Shall be min 1.5 x report interval. + * \return MQTT_STATUS_OK or an error status + * + * This function connects to a MQTT broker. + */ +mqtt_status_t mqtt_connect(struct mqtt_connection *conn, + char *host, + uint16_t port, + uint16_t keep_alive); +/*---------------------------------------------------------------------------*/ +/** + * \brief Disconnects from a MQTT broker. + * \param conn A pointer to the MQTT connection. + * + * This function disconnects from a MQTT broker. + */ +void mqtt_disconnect(struct mqtt_connection *conn); +/*---------------------------------------------------------------------------*/ +/** + * \brief Subscribes to a MQTT topic. + * \param conn A pointer to the MQTT connection. + * \param mid A pointer to message ID. + * \param topic A pointer to the topic to subscribe to. + * \param qos_level Quality Of Service level to use. Currently supports 0, 1. + * \return MQTT_STATUS_OK or some error status + * + * This function subscribes to a topic on a MQTT broker. + */ +mqtt_status_t mqtt_subscribe(struct mqtt_connection *conn, + uint16_t *mid, + char *topic, + mqtt_qos_level_t qos_level); +/*---------------------------------------------------------------------------*/ +/** + * \brief Unsubscribes from a MQTT topic. + * \param conn A pointer to the MQTT connection. + * \param mid A pointer to message ID. + * \param topic A pointer to the topic to unsubscribe from. + * \return MQTT_STATUS_OK or some error status + * + * This function unsubscribes from a topic on a MQTT broker. + */ +mqtt_status_t mqtt_unsubscribe(struct mqtt_connection *conn, + uint16_t *mid, + char *topic); +/*---------------------------------------------------------------------------*/ +/** + * \brief Publish to a MQTT topic. + * \param conn A pointer to the MQTT connection. + * \param mid A pointer to message ID. + * \param topic A pointer to the topic to subscribe to. + * \param payload A pointer to the topic payload. + * \param payload_size Payload size. + * \param qos_level Quality Of Service level to use. Currently supports 0, 1. + * \param retain If the RETAIN flag is set to 1, in a PUBLISH Packet sent by a + * Client to a Server, the Server MUST store the Application Message + * and its QoS, so that it can be delivered to future subscribers whose + * subscriptions match its topic name + * \return MQTT_STATUS_OK or some error status + * + * This function publishes to a topic on a MQTT broker. + */ +mqtt_status_t mqtt_publish(struct mqtt_connection *conn, + uint16_t *mid, + char *topic, + uint8_t *payload, + uint32_t payload_size, + mqtt_qos_level_t qos_level, + mqtt_retain_t retain); +/*---------------------------------------------------------------------------*/ +/** + * \brief Set the user name and password for a MQTT client. + * \param conn A pointer to the MQTT connection. + * \param username A pointer to the user name. + * \param password A pointer to the password. + * + * This function sets clients user name and password to use when connecting to + * a MQTT broker. + */ +void mqtt_set_username_password(struct mqtt_connection *conn, + char *username, + char *password); +/*---------------------------------------------------------------------------*/ +/** + * \brief Set the last will topic and message for a MQTT client. + * \param conn A pointer to the MQTT connection. + * \param topic A pointer to the Last Will topic. + * \param message A pointer to the Last Will message (payload). + * \param qos The desired QoS level. + * + * This function sets clients Last Will topic and message (payload). + * If the Will Flag is set to 1 (using the function) this indicates that, + * if the Connect request is accepted, a Will Message MUST be stored on the + * Server and associated with the Network Connection. The Will Message MUST + * be published when the Network Connection is subsequently closed. + * + * This functionality can be used to get notified that a device has + * disconnected from the broker. + * + */ +void mqtt_set_last_will(struct mqtt_connection *conn, + char *topic, + char *message, + mqtt_qos_level_t qos); + +#define mqtt_connected(conn) \ + ((conn)->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER ? 1 : 0) + +#define mqtt_ready(conn) \ + (!(conn)->out_queue_full && mqtt_connected((conn))) +/*---------------------------------------------------------------------------*/ +#endif /* MQTT_H_ */ +/*---------------------------------------------------------------------------*/ +/** + * @} + * @} + */ From 6112ec54d3f3ca401d94a07034103d053b99c85d Mon Sep 17 00:00:00 2001 From: Jonas Olsson Date: Tue, 17 Feb 2015 13:46:46 +0100 Subject: [PATCH 2/5] Add MQTT demo for the cc2538dk platform --- examples/cc2538dk/mqtt-demo/Makefile | 10 + examples/cc2538dk/mqtt-demo/Makefile.target | 1 + examples/cc2538dk/mqtt-demo/README.md | 62 ++ examples/cc2538dk/mqtt-demo/mqtt-demo.c | 735 ++++++++++++++++++++ examples/cc2538dk/mqtt-demo/project-conf.h | 52 ++ 5 files changed, 860 insertions(+) create mode 100644 examples/cc2538dk/mqtt-demo/Makefile create mode 100644 examples/cc2538dk/mqtt-demo/Makefile.target create mode 100644 examples/cc2538dk/mqtt-demo/README.md create mode 100644 examples/cc2538dk/mqtt-demo/mqtt-demo.c create mode 100644 examples/cc2538dk/mqtt-demo/project-conf.h diff --git a/examples/cc2538dk/mqtt-demo/Makefile b/examples/cc2538dk/mqtt-demo/Makefile new file mode 100644 index 000000000..850bdc609 --- /dev/null +++ b/examples/cc2538dk/mqtt-demo/Makefile @@ -0,0 +1,10 @@ +DEFINES+=PROJECT_CONF_H=\"project-conf.h\" + +all: mqtt-demo + +CONTIKI_WITH_IPV6 = 1 + +APPS += mqtt + +CONTIKI=../../.. +include $(CONTIKI)/Makefile.include diff --git a/examples/cc2538dk/mqtt-demo/Makefile.target b/examples/cc2538dk/mqtt-demo/Makefile.target new file mode 100644 index 000000000..777593c88 --- /dev/null +++ b/examples/cc2538dk/mqtt-demo/Makefile.target @@ -0,0 +1 @@ +TARGET = cc2538dk diff --git a/examples/cc2538dk/mqtt-demo/README.md b/examples/cc2538dk/mqtt-demo/README.md new file mode 100644 index 000000000..8b4e3b043 --- /dev/null +++ b/examples/cc2538dk/mqtt-demo/README.md @@ -0,0 +1,62 @@ +MQTT Demo +========= +The MQTT client can be used to: + +* Publish sensor readings to an MQTT broker. +* Subscribe to a topic and receive commands from an MQTT broker + +The demo will give some visual feedback with the green LED: +* Very fast blinking: Searching for a network +* Fast blinking: Connecting to broker +* Slow, long blinking: Sending a publish message + +Publishing +---------- +By default the example will attempt to publish readings to an MQTT broker +running on the IPv6 address specified as `MQTT_DEMO_BROKER_IP_ADDR` in +`project-conf.h`. This functionality was tested successfully with +[mosquitto](http://mosquitto.org/). + +The publish messages include sensor readings but also some other information, +such as device uptime in seconds and a message sequence number. The demo will +publish to topic `iot-2/evt/status/fmt/json`. The device will connect using +client-id `d:quickstart:cc2538:`, where `` gets +constructed from the device's IEEE address. + +Subscribing +----------- +You can also subscribe to topics and receive commands, but this will only +work if you use "Org ID" != 'quickstart'. To achieve this, you will need to +change 'Org ID' (`DEFAULT_ORG_ID`). In this scenario, the device will subscribe +to: + +`iot-2/cmd/+/fmt/json` + +You can then use this to toggle LEDs. To do this, you can for example +use mosquitto client to publish to `iot-2/cmd/leds/fmt/json`. So, to change +the state of an LED, you would do this: + +`mosquitto_pub -h -m "1" -t iot-2/cmd/leds/fmt/json` + +Where `broker IP` should be replaced with the IP address of your mosquitto +broker (the one where you device has subscribed). Replace `-m "1'` with `-m "0"` +to turn the LED back off. + +Bear in mind that, even though the topic suggests that messages are of json +format, they are in fact not. This was done in order to avoid linking a json +parser into the firmware. This comment only applies to parsing incoming +messages, outgoing publish messages use proper json payload. + +IBM Quickstart Service +---------------------- +It is also possible to publish to IBM's quickstart service. To do so, you need +to undefine `MQTT_DEMO_BROKER_IP_ADDR`. + +The device will then try to connect to IBM's quickstart over NAT64, so you will +need a NAT64 gateway in your network to make this work. A guide on how to +setup NAT64 is out of scope here. + +If you want to use IBM's cloud service with a registered device, change +'Org ID' (`DEFAULT_ORG_ID`) and provide the 'Auth Token' (`DEFAULT_AUTH_TOKEN`), +which acts as a 'password', but bear in mind that it gets transported in clear +text. diff --git a/examples/cc2538dk/mqtt-demo/mqtt-demo.c b/examples/cc2538dk/mqtt-demo/mqtt-demo.c new file mode 100644 index 000000000..dde255ee1 --- /dev/null +++ b/examples/cc2538dk/mqtt-demo/mqtt-demo.c @@ -0,0 +1,735 @@ +/* + * Copyright (c) 2014, Texas Instruments Incorporated - http://www.ti.com/ + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/*---------------------------------------------------------------------------*/ +/** \addtogroup cc2538-examples + * @{ + * + * \defgroup cc2538-mqtt-demo + * + * Demonstrates MQTT functionality. Works with IBM Quickstart as well as + * mosquitto + * @{ + * + * \file + * An MQTT example for the cc2538dk platform + */ +/*---------------------------------------------------------------------------*/ +#include "contiki-conf.h" +#include "rpl/rpl-private.h" +#include "mqtt.h" +#include "net/rpl/rpl.h" +#include "net/ip/uip.h" +#include "net/ipv6/uip-icmp6.h" +#include "net/ipv6/sicslowpan.h" +#include "sys/etimer.h" +#include "sys/ctimer.h" +#include "lib/sensors.h" +#include "dev/button-sensor.h" +#include "dev/leds.h" +#include "dev/adc-sensor.h" + +#include +/*---------------------------------------------------------------------------*/ +/* + * IBM server: messaging.quickstart.internetofthings.ibmcloud.com + * (184.172.124.189) mapped in an NAT64 (prefix 64:ff9b::/96) IPv6 address + * Note: If not able to connect; lookup the IP address again as it may change. + * + * Alternatively, publish to a local MQTT broker (e.g. mosquitto) running on + * the node that hosts your border router + */ +#ifdef MQTT_DEMO_BROKER_IP_ADDR +static const char *broker_ip = MQTT_DEMO_BROKER_IP_ADDR; +#define DEFAULT_ORG_ID "mqtt-demo" +#else +static const char *broker_ip = "0064:ff9b:0000:0000:0000:0000:b8ac:7cbd"; +#define DEFAULT_ORG_ID "quickstart" +#endif +/*---------------------------------------------------------------------------*/ +/* + * A timeout used when waiting for something to happen (e.g. to connect or to + * disconnect) + */ +#define STATE_MACHINE_PERIODIC (CLOCK_SECOND >> 1) +/*---------------------------------------------------------------------------*/ +/* Provide visible feedback via LEDS during various states */ +/* When connecting to broker */ +#define CONNECTING_LED_DURATION (CLOCK_SECOND >> 2) + +/* Each time we try to publish */ +#define PUBLISH_LED_ON_DURATION (CLOCK_SECOND) +/*---------------------------------------------------------------------------*/ +/* Connections and reconnections */ +#define RETRY_FOREVER 0xFF +#define RECONNECT_INTERVAL (CLOCK_SECOND * 2) + +/* + * Number of times to try reconnecting to the broker. + * Can be a limited number (e.g. 3, 10 etc) or can be set to RETRY_FOREVER + */ +#define RECONNECT_ATTEMPTS RETRY_FOREVER +#define CONNECTION_STABLE_TIME (CLOCK_SECOND * 5) +static struct timer connection_life; +static uint8_t connect_attempt; +/*---------------------------------------------------------------------------*/ +/* Various states */ +static uint8_t state; +#define STATE_INIT 0 +#define STATE_REGISTERED 1 +#define STATE_CONNECTING 2 +#define STATE_CONNECTED 3 +#define STATE_PUBLISHING 4 +#define STATE_DISCONNECTED 5 +#define STATE_NEWCONFIG 6 +#define STATE_CONFIG_ERROR 0xFE +#define STATE_ERROR 0xFF +/*---------------------------------------------------------------------------*/ +#define CONFIG_ORG_ID_LEN 32 +#define CONFIG_TYPE_ID_LEN 32 +#define CONFIG_AUTH_TOKEN_LEN 32 +#define CONFIG_EVENT_TYPE_ID_LEN 32 +#define CONFIG_CMD_TYPE_LEN 8 +#define CONFIG_IP_ADDR_STR_LEN 64 +/*---------------------------------------------------------------------------*/ +#define RSSI_MEASURE_INTERVAL_MAX 86400 /* secs: 1 day */ +#define RSSI_MEASURE_INTERVAL_MIN 5 /* secs */ +#define PUBLISH_INTERVAL_MAX 86400 /* secs: 1 day */ +#define PUBLISH_INTERVAL_MIN 5 /* secs */ +/*---------------------------------------------------------------------------*/ +/* A timeout used when waiting to connect to a network */ +#define NET_CONNECT_PERIODIC (CLOCK_SECOND >> 2) +#define NO_NET_LED_DURATION (NET_CONNECT_PERIODIC >> 1) +/*---------------------------------------------------------------------------*/ +/* Default configuration values */ +#define DEFAULT_TYPE_ID "cc2538" +#define DEFAULT_AUTH_TOKEN "AUTHZ" +#define DEFAULT_EVENT_TYPE_ID "status" +#define DEFAULT_SUBSCRIBE_CMD_TYPE "+" +#define DEFAULT_BROKER_PORT 1883 +#define DEFAULT_PUBLISH_INTERVAL (30 * CLOCK_SECOND) +#define DEFAULT_KEEP_ALIVE_TIMER 60 +#define DEFAULT_RSSI_MEAS_INTERVAL (CLOCK_SECOND * 30) +/*---------------------------------------------------------------------------*/ +/* Take a sensor reading on button press */ +#define PUBLISH_TRIGGER &button_sensor + +/* Payload length of ICMPv6 echo requests used to measure RSSI with def rt */ +#define ECHO_REQ_PAYLOAD_LEN 20 +/*---------------------------------------------------------------------------*/ +PROCESS_NAME(mqtt_demo_process); +AUTOSTART_PROCESSES(&mqtt_demo_process); +/*---------------------------------------------------------------------------*/ +/** + * \brief Data structure declaration for the MQTT client configuration + */ +typedef struct mqtt_client_config { + char org_id[CONFIG_ORG_ID_LEN]; + char type_id[CONFIG_TYPE_ID_LEN]; + char auth_token[CONFIG_AUTH_TOKEN_LEN]; + char event_type_id[CONFIG_EVENT_TYPE_ID_LEN]; + char broker_ip[CONFIG_IP_ADDR_STR_LEN]; + char cmd_type[CONFIG_CMD_TYPE_LEN]; + clock_time_t pub_interval; + int def_rt_ping_interval; + uint16_t broker_port; +} mqtt_client_config_t; +/*---------------------------------------------------------------------------*/ +/* Maximum TCP segment size for outgoing segments of our socket */ +#define MAX_TCP_SEGMENT_SIZE 32 +/*---------------------------------------------------------------------------*/ +#define STATUS_LED LEDS_GREEN +/*---------------------------------------------------------------------------*/ +/* + * Buffers for Client ID and Topic. + * Make sure they are large enough to hold the entire respective string + * + * d:quickstart:status:EUI64 is 32 bytes long + * iot-2/evt/status/fmt/json is 25 bytes + * We also need space for the null termination + */ +#define BUFFER_SIZE 64 +static char client_id[BUFFER_SIZE]; +static char pub_topic[BUFFER_SIZE]; +static char sub_topic[BUFFER_SIZE]; +/*---------------------------------------------------------------------------*/ +/* + * The main MQTT buffers. + * We will need to increase if we start publishing more data. + */ +#define APP_BUFFER_SIZE 512 +static struct mqtt_connection conn; +static char app_buffer[APP_BUFFER_SIZE]; +/*---------------------------------------------------------------------------*/ +#define QUICKSTART "quickstart" +/*---------------------------------------------------------------------------*/ +static struct mqtt_message *msg_ptr = 0; +static struct etimer publish_periodic_timer; +static struct ctimer ct; +static char *buf_ptr; +static uint16_t seq_nr_value = 0; +/*---------------------------------------------------------------------------*/ +/* Parent RSSI functionality */ +static struct uip_icmp6_echo_reply_notification echo_reply_notification; +static struct etimer echo_request_timer; +static int def_rt_rssi = 0; +/*---------------------------------------------------------------------------*/ +static mqtt_client_config_t conf; +/*---------------------------------------------------------------------------*/ +PROCESS(mqtt_demo_process, "MQTT Demo"); +/*---------------------------------------------------------------------------*/ +int +ipaddr_sprintf(char *buf, uint8_t buf_len, const uip_ipaddr_t *addr) +{ + uint16_t a; + uint8_t len = 0; + int i, f; + for(i = 0, f = 0; i < sizeof(uip_ipaddr_t); i += 2) { + a = (addr->u8[i] << 8) + addr->u8[i + 1]; + if(a == 0 && f >= 0) { + if(f++ == 0) { + len += snprintf(&buf[len], buf_len - len, "::"); + } + } else { + if(f > 0) { + f = -1; + } else if(i > 0) { + len += snprintf(&buf[len], buf_len - len, ":"); + } + len += snprintf(&buf[len], buf_len - len, "%x", a); + } + } + + return len; +} +/*---------------------------------------------------------------------------*/ +static void +echo_reply_handler(uip_ipaddr_t *source, uint8_t ttl, uint8_t *data, + uint16_t datalen) +{ + if(uip_ip6addr_cmp(source, uip_ds6_defrt_choose())) { + def_rt_rssi = sicslowpan_get_last_rssi(); + } +} +/*---------------------------------------------------------------------------*/ +static void +publish_led_off(void *d) +{ + leds_off(STATUS_LED); +} +/*---------------------------------------------------------------------------*/ +static void +pub_handler(const char *topic, uint16_t topic_len, const uint8_t *chunk, + uint16_t chunk_len) +{ + DBG("Pub Handler: topic='%s' (len=%u), chunk_len=%u\n", topic, topic_len, + chunk_len); + + /* If we don't like the length, ignore */ + if(topic_len != 23 || chunk_len != 1) { + printf("Incorrect topic or chunk len. Ignored\n"); + return; + } + + /* If the format != json, ignore */ + if(strncmp(&topic[topic_len - 4], "json", 4) != 0) { + printf("Incorrect format\n"); + } + + if(strncmp(&topic[10], "leds", 4) == 0) { + if(chunk[0] == '1') { + leds_on(LEDS_RED); + } else if(chunk[0] == '0') { + leds_off(LEDS_RED); + } + return; + } +} +/*---------------------------------------------------------------------------*/ +static void +mqtt_event(struct mqtt_connection *m, mqtt_event_t event, void *data) +{ + switch(event) { + case MQTT_EVENT_CONNECTED: { + DBG("APP - Application has a MQTT connection\n"); + timer_set(&connection_life, CONNECTION_STABLE_TIME); + state = STATE_CONNECTED; + break; + } + case MQTT_EVENT_DISCONNECTED: { + DBG("APP - MQTT Disconnect. Reason %u\n", *((mqtt_event_t *)data)); + + state = STATE_DISCONNECTED; + process_poll(&mqtt_demo_process); + break; + } + case MQTT_EVENT_PUBLISH: { + msg_ptr = data; + + /* Implement first_flag in publish message? */ + if(msg_ptr->first_chunk) { + msg_ptr->first_chunk = 0; + DBG("APP - Application received a publish on topic '%s'. Payload " + "size is %i bytes. Content:\n\n", + msg_ptr->topic, msg_ptr->payload_length); + } + + pub_handler(msg_ptr->topic, strlen(msg_ptr->topic), msg_ptr->payload_chunk, + msg_ptr->payload_length); + break; + } + case MQTT_EVENT_SUBACK: { + DBG("APP - Application is subscribed to topic successfully\n"); + break; + } + case MQTT_EVENT_UNSUBACK: { + DBG("APP - Application is unsubscribed to topic successfully\n"); + break; + } + case MQTT_EVENT_PUBACK: { + DBG("APP - Publishing complete.\n"); + break; + } + default: + DBG("APP - Application got a unhandled MQTT event: %i\n", event); + break; + } +} +/*---------------------------------------------------------------------------*/ +static int +construct_pub_topic(void) +{ + int len = snprintf(pub_topic, BUFFER_SIZE, "iot-2/evt/%s/fmt/json", + conf.event_type_id); + + /* len < 0: Error. Len >= BUFFER_SIZE: Buffer too small */ + if(len < 0 || len >= BUFFER_SIZE) { + printf("Pub Topic: %d, Buffer %d\n", len, BUFFER_SIZE); + return 0; + } + + return 1; +} +/*---------------------------------------------------------------------------*/ +static int +construct_sub_topic(void) +{ + int len = snprintf(sub_topic, BUFFER_SIZE, "iot-2/cmd/%s/fmt/json", + conf.cmd_type); + + /* len < 0: Error. Len >= BUFFER_SIZE: Buffer too small */ + if(len < 0 || len >= BUFFER_SIZE) { + printf("Sub Topic: %d, Buffer %d\n", len, BUFFER_SIZE); + return 0; + } + + return 1; +} +/*---------------------------------------------------------------------------*/ +static int +construct_client_id(void) +{ + int len = snprintf(client_id, BUFFER_SIZE, "d:%s:%s:%02x%02x%02x%02x%02x%02x", + conf.org_id, conf.type_id, + linkaddr_node_addr.u8[0], linkaddr_node_addr.u8[1], + linkaddr_node_addr.u8[2], linkaddr_node_addr.u8[5], + linkaddr_node_addr.u8[6], linkaddr_node_addr.u8[7]); + + /* len < 0: Error. Len >= BUFFER_SIZE: Buffer too small */ + if(len < 0 || len >= BUFFER_SIZE) { + printf("Client ID: %d, Buffer %d\n", len, BUFFER_SIZE); + return 0; + } + + return 1; +} +/*---------------------------------------------------------------------------*/ +static void +update_config(void) +{ + if(construct_client_id() == 0) { + /* Fatal error. Client ID larger than the buffer */ + state = STATE_CONFIG_ERROR; + return; + } + + if(construct_sub_topic() == 0) { + /* Fatal error. Topic larger than the buffer */ + state = STATE_CONFIG_ERROR; + return; + } + + if(construct_pub_topic() == 0) { + /* Fatal error. Topic larger than the buffer */ + state = STATE_CONFIG_ERROR; + return; + } + + /* Reset the counter */ + seq_nr_value = 0; + + state = STATE_INIT; + + /* + * Schedule next timer event ASAP + * + * If we entered an error state then we won't do anything when it fires. + * + * Since the error at this stage is a config error, we will only exit this + * error state if we get a new config. + */ + etimer_set(&publish_periodic_timer, 0); + + return; +} +/*---------------------------------------------------------------------------*/ +static int +init_config() +{ + /* Populate configuration with default values */ + memset(&conf, 0, sizeof(mqtt_client_config_t)); + + memcpy(conf.org_id, DEFAULT_ORG_ID, strlen(DEFAULT_ORG_ID)); + memcpy(conf.type_id, DEFAULT_TYPE_ID, strlen(DEFAULT_TYPE_ID)); + memcpy(conf.auth_token, DEFAULT_AUTH_TOKEN, strlen(DEFAULT_AUTH_TOKEN)); + memcpy(conf.event_type_id, DEFAULT_EVENT_TYPE_ID, + strlen(DEFAULT_EVENT_TYPE_ID)); + memcpy(conf.broker_ip, broker_ip, strlen(broker_ip)); + memcpy(conf.cmd_type, DEFAULT_SUBSCRIBE_CMD_TYPE, 1); + + conf.broker_port = DEFAULT_BROKER_PORT; + conf.pub_interval = DEFAULT_PUBLISH_INTERVAL; + conf.def_rt_ping_interval = DEFAULT_RSSI_MEAS_INTERVAL; + + return 1; +} +/*---------------------------------------------------------------------------*/ +static void +subscribe(void) +{ + /* Publish MQTT topic in IBM quickstart format */ + mqtt_status_t status; + + status = mqtt_subscribe(&conn, NULL, sub_topic, MQTT_QOS_LEVEL_0); + + DBG("APP - Subscribing!\n"); + if(status == MQTT_STATUS_OUT_QUEUE_FULL) { + DBG("APP - Tried to subscribe but command queue was full!\n"); + } +} +/*---------------------------------------------------------------------------*/ +static void +publish(void) +{ + /* Publish MQTT topic in IBM quickstart format */ + int len; + int remaining = APP_BUFFER_SIZE; + int16_t value; + + seq_nr_value++; + + buf_ptr = app_buffer; + + len = snprintf(buf_ptr, remaining, + "{" + "\"d\":{" + "\"myName\":\"%s\"," + "\"Seq #\":%d," + "\"Uptime (sec)\":%lu", + BOARD_STRING, seq_nr_value, clock_seconds()); + + if(len < 0 || len >= remaining) { + printf("Buffer too short. Have %d, need %d + \\0\n", remaining, len); + return; + } + + remaining -= len; + buf_ptr += len; + + /* Put our Default route's string representation in a buffer */ + char def_rt_str[64]; + memset(def_rt_str, 0, sizeof(def_rt_str)); + ipaddr_sprintf(def_rt_str, sizeof(def_rt_str), uip_ds6_defrt_choose()); + + len = snprintf(buf_ptr, remaining, ",\"Def Route\":\"%s\",\"RSSI (dBm)\":%d", + def_rt_str, def_rt_rssi); + + if(len < 0 || len >= remaining) { + printf("Buffer too short. Have %d, need %d + \\0\n", remaining, len); + return; + } + remaining -= len; + buf_ptr += len; + + value = adc_sensor.value(ADC_SENSOR_TEMP); + len = snprintf(buf_ptr, remaining, ",\"On-Chip Temp (mC)\":%d", + 25000 + ((value >> 4) - 1422) * 10000 / 42); + + if(len < 0 || len >= remaining) { + printf("Buffer too short. Have %d, need %d + \\0\n", remaining, len); + return; + } + remaining -= len; + buf_ptr += len; + + value = adc_sensor.value(ADC_SENSOR_VDD_3); + len = snprintf(buf_ptr, remaining, ",\"VDD3 (mV)\":%d", + value * (3 * 1190) / (2047 << 4)); + + if(len < 0 || len >= remaining) { + printf("Buffer too short. Have %d, need %d + \\0\n", remaining, len); + return; + } + remaining -= len; + buf_ptr += len; + + len = snprintf(buf_ptr, remaining, "}}"); + + if(len < 0 || len >= remaining) { + printf("Buffer too short. Have %d, need %d + \\0\n", remaining, len); + return; + } + + mqtt_publish(&conn, NULL, pub_topic, (uint8_t *)app_buffer, + strlen(app_buffer), MQTT_QOS_LEVEL_0, MQTT_RETAIN_OFF); + + DBG("APP - Publish!\n"); +} +/*---------------------------------------------------------------------------*/ +static void +connect_to_broker(void) +{ + /* Connect to MQTT server */ + mqtt_connect(&conn, conf.broker_ip, conf.broker_port, + conf.pub_interval * 3); + + state = STATE_CONNECTING; +} +/*---------------------------------------------------------------------------*/ +static void +ping_parent(void) +{ + if(uip_ds6_get_global(ADDR_PREFERRED) == NULL) { + return; + } + + uip_icmp6_send(uip_ds6_defrt_choose(), ICMP6_ECHO_REQUEST, 0, + ECHO_REQ_PAYLOAD_LEN); +} +/*---------------------------------------------------------------------------*/ +static void +state_machine(void) +{ + switch(state) { + case STATE_INIT: + /* If we have just been configured register MQTT connection */ + mqtt_register(&conn, &mqtt_demo_process, client_id, mqtt_event, + MAX_TCP_SEGMENT_SIZE); + + /* + * If we are not using the quickstart service (thus we are an IBM + * registered device), we need to provide user name and password + */ + if(strncasecmp(conf.org_id, QUICKSTART, strlen(conf.org_id)) != 0) { + if(strlen(conf.auth_token) == 0) { + printf("User name set, but empty auth token\n"); + state = STATE_ERROR; + break; + } else { + mqtt_set_username_password(&conn, "use-token-auth", + conf.auth_token); + } + } + + /* _register() will set auto_reconnect. We don't want that. */ + conn.auto_reconnect = 0; + connect_attempt = 1; + + state = STATE_REGISTERED; + DBG("Init\n"); + /* Continue */ + case STATE_REGISTERED: + if(uip_ds6_get_global(ADDR_PREFERRED) != NULL) { + /* Registered and with a public IP. Connect */ + DBG("Registered. Connect attempt %u\n", connect_attempt); + ping_parent(); + connect_to_broker(); + } else { + leds_on(STATUS_LED); + ctimer_set(&ct, NO_NET_LED_DURATION, publish_led_off, NULL); + } + etimer_set(&publish_periodic_timer, NET_CONNECT_PERIODIC); + return; + break; + case STATE_CONNECTING: + leds_on(STATUS_LED); + ctimer_set(&ct, CONNECTING_LED_DURATION, publish_led_off, NULL); + /* Not connected yet. Wait */ + DBG("Connecting (%u)\n", connect_attempt); + break; + case STATE_CONNECTED: + /* Don't subscribe unless we are a registered device */ + if(strncasecmp(conf.org_id, QUICKSTART, strlen(conf.org_id)) == 0) { + DBG("Using 'quickstart': Skipping subscribe\n"); + state = STATE_PUBLISHING; + } + /* Continue */ + case STATE_PUBLISHING: + /* If the timer expired, the connection is stable. */ + if(timer_expired(&connection_life)) { + /* + * Intentionally using 0 here instead of 1: We want RECONNECT_ATTEMPTS + * attempts if we disconnect after a successful connect + */ + connect_attempt = 0; + } + + if(mqtt_ready(&conn) && conn.out_buffer_sent) { + /* Connected. Publish */ + if(state == STATE_CONNECTED) { + subscribe(); + state = STATE_PUBLISHING; + } else { + leds_on(STATUS_LED); + ctimer_set(&ct, PUBLISH_LED_ON_DURATION, publish_led_off, NULL); + publish(); + } + etimer_set(&publish_periodic_timer, conf.pub_interval); + + DBG("Publishing\n"); + /* Return here so we don't end up rescheduling the timer */ + return; + } else { + /* + * Our publish timer fired, but some MQTT packet is already in flight + * (either not sent at all, or sent but not fully ACKd). + * + * This can mean that we have lost connectivity to our broker or that + * simply there is some network delay. In both cases, we refuse to + * trigger a new message and we wait for TCP to either ACK the entire + * packet after retries, or to timeout and notify us. + */ + DBG("Publishing... (MQTT state=%d, q=%u)\n", conn.state, + conn.out_queue_full); + } + break; + case STATE_DISCONNECTED: + DBG("Disconnected\n"); + if(connect_attempt < RECONNECT_ATTEMPTS || + RECONNECT_ATTEMPTS == RETRY_FOREVER) { + /* Disconnect and backoff */ + clock_time_t interval; + mqtt_disconnect(&conn); + connect_attempt++; + + interval = connect_attempt < 3 ? RECONNECT_INTERVAL << connect_attempt : + RECONNECT_INTERVAL << 3; + + DBG("Disconnected. Attempt %u in %lu ticks\n", connect_attempt, interval); + + etimer_set(&publish_periodic_timer, interval); + + state = STATE_REGISTERED; + return; + } else { + /* Max reconnect attempts reached. Enter error state */ + state = STATE_ERROR; + DBG("Aborting connection after %u attempts\n", connect_attempt - 1); + } + break; + case STATE_CONFIG_ERROR: + /* Idle away. The only way out is a new config */ + printf("Bad configuration.\n"); + return; + case STATE_ERROR: + default: + leds_on(STATUS_LED); + /* + * 'default' should never happen. + * + * If we enter here it's because of some error. Stop timers. The only thing + * that can bring us out is a new config event + */ + printf("Default case: State=0x%02x\n", state); + return; + } + + /* If we didn't return so far, reschedule ourselves */ + etimer_set(&publish_periodic_timer, STATE_MACHINE_PERIODIC); +} +/*---------------------------------------------------------------------------*/ +PROCESS_THREAD(mqtt_demo_process, ev, data) +{ + + PROCESS_BEGIN(); + + printf("MQTT Demo Process\n"); + + if(init_config() != 1) { + PROCESS_EXIT(); + } + + update_config(); + + def_rt_rssi = 0x8000000; + uip_icmp6_echo_reply_callback_add(&echo_reply_notification, + echo_reply_handler); + etimer_set(&echo_request_timer, conf.def_rt_ping_interval); + + /* Main loop */ + while(1) { + + PROCESS_YIELD(); + + if(ev == sensors_event && data == PUBLISH_TRIGGER) { + if(state == STATE_ERROR) { + connect_attempt = 1; + state = STATE_REGISTERED; + } + } + + if((ev == PROCESS_EVENT_TIMER && data == &publish_periodic_timer) || + ev == PROCESS_EVENT_POLL || + (ev == sensors_event && data == PUBLISH_TRIGGER)) { + state_machine(); + } + + if(ev == PROCESS_EVENT_TIMER && data == &echo_request_timer) { + ping_parent(); + etimer_set(&echo_request_timer, conf.def_rt_ping_interval); + } + } + + PROCESS_END(); +} +/*---------------------------------------------------------------------------*/ +/** + * @} + * @} + */ diff --git a/examples/cc2538dk/mqtt-demo/project-conf.h b/examples/cc2538dk/mqtt-demo/project-conf.h new file mode 100644 index 000000000..0aaef655d --- /dev/null +++ b/examples/cc2538dk/mqtt-demo/project-conf.h @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2012, Texas Instruments Incorporated - http://www.ti.com/ + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, + * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED + * OF THE POSSIBILITY OF SUCH DAMAGE. + */ +/*---------------------------------------------------------------------------*/ +/** + * \addtogroup cc2538-mqtt-demo + * @{ + * + * \file + * Project specific configuration defines for the MQTT demo + */ +/*---------------------------------------------------------------------------*/ +#ifndef PROJECT_CONF_H_ +#define PROJECT_CONF_H_ +/*---------------------------------------------------------------------------*/ +/* User configuration */ +#define MQTT_DEMO_STATUS_LED LEDS_GREEN +#define MQTT_DEMO_PUBLISH_TRIGGER &button_right_sensor + +/* If undefined, the demo will attempt to connect to IBM's quickstart */ +#define MQTT_DEMO_BROKER_IP_ADDR "aaaa::1" +/*---------------------------------------------------------------------------*/ +#endif /* PROJECT_CONF_H_ */ +/*---------------------------------------------------------------------------*/ +/** @} */ From 303f4a41fcbca88efd5bba195d6eb97fccb92c97 Mon Sep 17 00:00:00 2001 From: Jonas Olsson Date: Tue, 17 Feb 2015 13:49:09 +0100 Subject: [PATCH 3/5] Add Travis test for the MQTT demo --- regression-tests/15-compile-arm-apcs-ports/Makefile | 1 + 1 file changed, 1 insertion(+) diff --git a/regression-tests/15-compile-arm-apcs-ports/Makefile b/regression-tests/15-compile-arm-apcs-ports/Makefile index 572489d8a..394c11414 100644 --- a/regression-tests/15-compile-arm-apcs-ports/Makefile +++ b/regression-tests/15-compile-arm-apcs-ports/Makefile @@ -14,6 +14,7 @@ webserver-ipv6/cc2538dk \ cc2538dk/cc2538dk \ cc2538dk/udp-ipv6-echo-server/cc2538dk \ cc2538dk/sniffer/cc2538dk \ +cc2538dk/mqtt-demo/cc2538dk \ ipv6/multicast/econotag \ ipv6/multicast/cc2538dk \ From 4fb2bd410fee68d97e9c35f531f11aff947b59a6 Mon Sep 17 00:00:00 2001 From: Jonas Olsson Date: Tue, 17 Feb 2015 16:07:37 +0100 Subject: [PATCH 4/5] Add title to doxygen group --- examples/cc2538dk/mqtt-demo/mqtt-demo.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/cc2538dk/mqtt-demo/mqtt-demo.c b/examples/cc2538dk/mqtt-demo/mqtt-demo.c index dde255ee1..78a465b70 100644 --- a/examples/cc2538dk/mqtt-demo/mqtt-demo.c +++ b/examples/cc2538dk/mqtt-demo/mqtt-demo.c @@ -31,7 +31,7 @@ /** \addtogroup cc2538-examples * @{ * - * \defgroup cc2538-mqtt-demo + * \defgroup cc2538-mqtt-demo CC2538DK MQTT Demo Project * * Demonstrates MQTT functionality. Works with IBM Quickstart as well as * mosquitto From 2bb7fcc0cb72451d98551fb71997a421486b074f Mon Sep 17 00:00:00 2001 From: Jonas Olsson Date: Tue, 17 Feb 2015 16:11:45 +0100 Subject: [PATCH 5/5] Fixed doxygen formatting --- examples/cc2538dk/mqtt-demo/mqtt-demo.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/cc2538dk/mqtt-demo/mqtt-demo.c b/examples/cc2538dk/mqtt-demo/mqtt-demo.c index 78a465b70..398feec68 100644 --- a/examples/cc2538dk/mqtt-demo/mqtt-demo.c +++ b/examples/cc2538dk/mqtt-demo/mqtt-demo.c @@ -31,14 +31,14 @@ /** \addtogroup cc2538-examples * @{ * - * \defgroup cc2538-mqtt-demo CC2538DK MQTT Demo Project + * \defgroup cc2538-mqtt-demo CC2538DK MQTT Demo Project * - * Demonstrates MQTT functionality. Works with IBM Quickstart as well as - * mosquitto + * Demonstrates MQTT functionality. Works with IBM Quickstart as well as + * mosquitto. * @{ * * \file - * An MQTT example for the cc2538dk platform + * An MQTT example for the cc2538dk platform */ /*---------------------------------------------------------------------------*/ #include "contiki-conf.h"