/* * Copyright (c) 2014, Texas Instruments Incorporated - http://www.ti.com/ * Copyright (c) 2017, George Oikonomou - http://www.spd.gr * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Neither the name of the copyright holder nor the names of its * contributors may be used to endorse or promote products derived * from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE * COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED * OF THE POSSIBILITY OF SUCH DAMAGE. */ /*---------------------------------------------------------------------------*/ #include "contiki.h" #include "net/routing/routing.h" #include "mqtt.h" #include "net/ipv6/uip.h" #include "net/ipv6/uip-icmp6.h" #include "net/ipv6/sicslowpan.h" #include "sys/etimer.h" #include "sys/ctimer.h" #include "lib/sensors.h" #include "dev/button-hal.h" #include "dev/leds.h" #include "os/sys/log.h" #include "mqtt-client.h" #include #include /*---------------------------------------------------------------------------*/ #define LOG_MODULE "mqtt-client" #ifdef MQTT_CLIENT_CONF_LOG_LEVEL #define LOG_LEVEL MQTT_CLIENT_CONF_LOG_LEVEL #else #define LOG_LEVEL LOG_LEVEL_NONE #endif /*---------------------------------------------------------------------------*/ /* Controls whether the example will work in IBM Watson IoT platform mode */ #ifdef MQTT_CLIENT_CONF_WITH_IBM_WATSON #define MQTT_CLIENT_WITH_IBM_WATSON MQTT_CLIENT_CONF_WITH_IBM_WATSON #else #define MQTT_CLIENT_WITH_IBM_WATSON 0 #endif /*---------------------------------------------------------------------------*/ /* MQTT broker address. Ignored in Watson mode */ #ifdef MQTT_CLIENT_CONF_BROKER_IP_ADDR #define MQTT_CLIENT_BROKER_IP_ADDR MQTT_CLIENT_CONF_BROKER_IP_ADDR #else #define MQTT_CLIENT_BROKER_IP_ADDR "fd00::1" #endif /*---------------------------------------------------------------------------*/ /* * MQTT Org ID. * * If it equals "quickstart", the client will connect without authentication. * In all other cases, the client will connect with authentication mode. * * In Watson mode, the username will be "use-token-auth". In non-Watson mode * the username will be MQTT_CLIENT_USERNAME. * * In all cases, the password will be MQTT_CLIENT_AUTH_TOKEN. */ #ifdef MQTT_CLIENT_CONF_ORG_ID #define MQTT_CLIENT_ORG_ID MQTT_CLIENT_CONF_ORG_ID #else #define MQTT_CLIENT_ORG_ID "quickstart" #endif /*---------------------------------------------------------------------------*/ /* MQTT token */ #ifdef MQTT_CLIENT_CONF_AUTH_TOKEN #define MQTT_CLIENT_AUTH_TOKEN MQTT_CLIENT_CONF_AUTH_TOKEN #else #define MQTT_CLIENT_AUTH_TOKEN "AUTHTOKEN" #endif /*---------------------------------------------------------------------------*/ #if MQTT_CLIENT_WITH_IBM_WATSON /* With IBM Watson support */ static const char *broker_ip = "0064:ff9b:0000:0000:0000:0000:b8ac:7cbd"; #define MQTT_CLIENT_USERNAME "use-token-auth" #else /* MQTT_CLIENT_WITH_IBM_WATSON */ /* Without IBM Watson support. To be used with other brokers, e.g. Mosquitto */ static const char *broker_ip = MQTT_CLIENT_BROKER_IP_ADDR; #ifdef MQTT_CLIENT_CONF_USERNAME #define MQTT_CLIENT_USERNAME MQTT_CLIENT_CONF_USERNAME #else #define MQTT_CLIENT_USERNAME "use-token-auth" #endif #endif /* MQTT_CLIENT_WITH_IBM_WATSON */ /*---------------------------------------------------------------------------*/ #ifdef MQTT_CLIENT_CONF_STATUS_LED #define MQTT_CLIENT_STATUS_LED MQTT_CLIENT_CONF_STATUS_LED #else #define MQTT_CLIENT_STATUS_LED LEDS_GREEN #endif /*---------------------------------------------------------------------------*/ #ifdef MQTT_CLIENT_CONF_WITH_EXTENSIONS #define MQTT_CLIENT_WITH_EXTENSIONS MQTT_CLIENT_CONF_WITH_EXTENSIONS #else #define MQTT_CLIENT_WITH_EXTENSIONS 0 #endif /*---------------------------------------------------------------------------*/ /* * A timeout used when waiting for something to happen (e.g. to connect or to * disconnect) */ #define STATE_MACHINE_PERIODIC (CLOCK_SECOND >> 1) /*---------------------------------------------------------------------------*/ /* Provide visible feedback via LEDS during various states */ /* When connecting to broker */ #define CONNECTING_LED_DURATION (CLOCK_SECOND >> 2) /* Each time we try to publish */ #define PUBLISH_LED_ON_DURATION (CLOCK_SECOND) /*---------------------------------------------------------------------------*/ /* Connections and reconnections */ #define RETRY_FOREVER 0xFF #define RECONNECT_INTERVAL (CLOCK_SECOND * 2) /* * Number of times to try reconnecting to the broker. * Can be a limited number (e.g. 3, 10 etc) or can be set to RETRY_FOREVER */ #define RECONNECT_ATTEMPTS RETRY_FOREVER #define CONNECTION_STABLE_TIME (CLOCK_SECOND * 5) static struct timer connection_life; static uint8_t connect_attempt; /*---------------------------------------------------------------------------*/ /* Various states */ static uint8_t state; #define STATE_INIT 0 #define STATE_REGISTERED 1 #define STATE_CONNECTING 2 #define STATE_CONNECTED 3 #define STATE_PUBLISHING 4 #define STATE_DISCONNECTED 5 #define STATE_NEWCONFIG 6 #define STATE_CONFIG_ERROR 0xFE #define STATE_ERROR 0xFF /*---------------------------------------------------------------------------*/ #define CONFIG_ORG_ID_LEN 32 #define CONFIG_TYPE_ID_LEN 32 #define CONFIG_AUTH_TOKEN_LEN 32 #define CONFIG_EVENT_TYPE_ID_LEN 32 #define CONFIG_CMD_TYPE_LEN 8 #define CONFIG_IP_ADDR_STR_LEN 64 /*---------------------------------------------------------------------------*/ /* A timeout used when waiting to connect to a network */ #define NET_CONNECT_PERIODIC (CLOCK_SECOND >> 2) #define NO_NET_LED_DURATION (NET_CONNECT_PERIODIC >> 1) /*---------------------------------------------------------------------------*/ /* Default configuration values */ #define DEFAULT_TYPE_ID "mqtt-client" #define DEFAULT_EVENT_TYPE_ID "status" #define DEFAULT_SUBSCRIBE_CMD_TYPE "+" #define DEFAULT_BROKER_PORT 1883 #define DEFAULT_PUBLISH_INTERVAL (30 * CLOCK_SECOND) #define DEFAULT_KEEP_ALIVE_TIMER 60 #define DEFAULT_RSSI_MEAS_INTERVAL (CLOCK_SECOND * 30) /*---------------------------------------------------------------------------*/ #define MQTT_CLIENT_SENSOR_NONE (void *)0xFFFFFFFF /*---------------------------------------------------------------------------*/ /* Payload length of ICMPv6 echo requests used to measure RSSI with def rt */ #define ECHO_REQ_PAYLOAD_LEN 20 /*---------------------------------------------------------------------------*/ PROCESS_NAME(mqtt_client_process); AUTOSTART_PROCESSES(&mqtt_client_process); /*---------------------------------------------------------------------------*/ /** * \brief Data structure declaration for the MQTT client configuration */ typedef struct mqtt_client_config { char org_id[CONFIG_ORG_ID_LEN]; char type_id[CONFIG_TYPE_ID_LEN]; char auth_token[CONFIG_AUTH_TOKEN_LEN]; char event_type_id[CONFIG_EVENT_TYPE_ID_LEN]; char broker_ip[CONFIG_IP_ADDR_STR_LEN]; char cmd_type[CONFIG_CMD_TYPE_LEN]; clock_time_t pub_interval; int def_rt_ping_interval; uint16_t broker_port; } mqtt_client_config_t; /*---------------------------------------------------------------------------*/ /* Maximum TCP segment size for outgoing segments of our socket */ #define MAX_TCP_SEGMENT_SIZE 32 /*---------------------------------------------------------------------------*/ /* * Buffers for Client ID and Topic. * Make sure they are large enough to hold the entire respective string * * d:quickstart:status:EUI64 is 32 bytes long * iot-2/evt/status/fmt/json is 25 bytes * We also need space for the null termination */ #define BUFFER_SIZE 64 static char client_id[BUFFER_SIZE]; static char pub_topic[BUFFER_SIZE]; static char sub_topic[BUFFER_SIZE]; /*---------------------------------------------------------------------------*/ /* * The main MQTT buffers. * We will need to increase if we start publishing more data. */ #define APP_BUFFER_SIZE 512 static struct mqtt_connection conn; static char app_buffer[APP_BUFFER_SIZE]; /*---------------------------------------------------------------------------*/ #define QUICKSTART "quickstart" /*---------------------------------------------------------------------------*/ static struct mqtt_message *msg_ptr = 0; static struct etimer publish_periodic_timer; static struct ctimer ct; static char *buf_ptr; static uint16_t seq_nr_value = 0; /*---------------------------------------------------------------------------*/ /* Parent RSSI functionality */ static struct uip_icmp6_echo_reply_notification echo_reply_notification; static struct etimer echo_request_timer; static int def_rt_rssi = 0; /*---------------------------------------------------------------------------*/ static mqtt_client_config_t conf; /*---------------------------------------------------------------------------*/ #if MQTT_CLIENT_WITH_EXTENSIONS extern const mqtt_client_extension_t *mqtt_client_extensions[]; extern const uint8_t mqtt_client_extension_count; #else static const mqtt_client_extension_t *mqtt_client_extensions[] = { NULL }; static const uint8_t mqtt_client_extension_count = 0; #endif /*---------------------------------------------------------------------------*/ PROCESS(mqtt_client_process, "MQTT Client"); /*---------------------------------------------------------------------------*/ static int ipaddr_sprintf(char *buf, uint8_t buf_len, const uip_ipaddr_t *addr) { uint16_t a; uint8_t len = 0; int i, f; for(i = 0, f = 0; i < sizeof(uip_ipaddr_t); i += 2) { a = (addr->u8[i] << 8) + addr->u8[i + 1]; if(a == 0 && f >= 0) { if(f++ == 0) { len += snprintf(&buf[len], buf_len - len, "::"); } } else { if(f > 0) { f = -1; } else if(i > 0) { len += snprintf(&buf[len], buf_len - len, ":"); } len += snprintf(&buf[len], buf_len - len, "%x", a); } } return len; } /*---------------------------------------------------------------------------*/ static void echo_reply_handler(uip_ipaddr_t *source, uint8_t ttl, uint8_t *data, uint16_t datalen) { if(uip_ip6addr_cmp(source, uip_ds6_defrt_choose())) { def_rt_rssi = sicslowpan_get_last_rssi(); } } /*---------------------------------------------------------------------------*/ static void publish_led_off(void *d) { leds_off(MQTT_CLIENT_STATUS_LED); } /*---------------------------------------------------------------------------*/ static void pub_handler(const char *topic, uint16_t topic_len, const uint8_t *chunk, uint16_t chunk_len) { LOG_DBG("Pub Handler: topic='%s' (len=%u), chunk_len=%u\n", topic, topic_len, chunk_len); /* If we don't like the length, ignore */ if(topic_len != 23 || chunk_len != 1) { LOG_ERR("Incorrect topic or chunk len. Ignored\n"); return; } /* If the format != json, ignore */ if(strncmp(&topic[topic_len - 4], "json", 4) != 0) { LOG_ERR("Incorrect format\n"); } if(strncmp(&topic[10], "leds", 4) == 0) { LOG_DBG("Received MQTT SUB\n"); if(chunk[0] == '1') { leds_on(LEDS_RED); } else if(chunk[0] == '0') { leds_off(LEDS_RED); } return; } } /*---------------------------------------------------------------------------*/ static void mqtt_event(struct mqtt_connection *m, mqtt_event_t event, void *data) { switch(event) { case MQTT_EVENT_CONNECTED: { LOG_DBG("Application has a MQTT connection\n"); timer_set(&connection_life, CONNECTION_STABLE_TIME); state = STATE_CONNECTED; break; } case MQTT_EVENT_DISCONNECTED: { LOG_DBG("MQTT Disconnect. Reason %u\n", *((mqtt_event_t *)data)); state = STATE_DISCONNECTED; process_poll(&mqtt_client_process); break; } case MQTT_EVENT_PUBLISH: { msg_ptr = data; /* Implement first_flag in publish message? */ if(msg_ptr->first_chunk) { msg_ptr->first_chunk = 0; LOG_DBG("Application received publish for topic '%s'. Payload " "size is %i bytes.\n", msg_ptr->topic, msg_ptr->payload_length); } pub_handler(msg_ptr->topic, strlen(msg_ptr->topic), msg_ptr->payload_chunk, msg_ptr->payload_length); break; } case MQTT_EVENT_SUBACK: { LOG_DBG("Application is subscribed to topic successfully\n"); break; } case MQTT_EVENT_UNSUBACK: { LOG_DBG("Application is unsubscribed to topic successfully\n"); break; } case MQTT_EVENT_PUBACK: { LOG_DBG("Publishing complete.\n"); break; } default: LOG_DBG("Application got a unhandled MQTT event: %i\n", event); break; } } /*---------------------------------------------------------------------------*/ static int construct_pub_topic(void) { int len = snprintf(pub_topic, BUFFER_SIZE, "iot-2/evt/%s/fmt/json", conf.event_type_id); /* len < 0: Error. Len >= BUFFER_SIZE: Buffer too small */ if(len < 0 || len >= BUFFER_SIZE) { LOG_INFO("Pub Topic: %d, Buffer %d\n", len, BUFFER_SIZE); return 0; } return 1; } /*---------------------------------------------------------------------------*/ static int construct_sub_topic(void) { int len = snprintf(sub_topic, BUFFER_SIZE, "iot-2/cmd/%s/fmt/json", conf.cmd_type); /* len < 0: Error. Len >= BUFFER_SIZE: Buffer too small */ if(len < 0 || len >= BUFFER_SIZE) { LOG_INFO("Sub Topic: %d, Buffer %d\n", len, BUFFER_SIZE); return 0; } return 1; } /*---------------------------------------------------------------------------*/ static int construct_client_id(void) { int len = snprintf(client_id, BUFFER_SIZE, "d:%s:%s:%02x%02x%02x%02x%02x%02x", conf.org_id, conf.type_id, linkaddr_node_addr.u8[0], linkaddr_node_addr.u8[1], linkaddr_node_addr.u8[2], linkaddr_node_addr.u8[5], linkaddr_node_addr.u8[6], linkaddr_node_addr.u8[7]); /* len < 0: Error. Len >= BUFFER_SIZE: Buffer too small */ if(len < 0 || len >= BUFFER_SIZE) { LOG_ERR("Client ID: %d, Buffer %d\n", len, BUFFER_SIZE); return 0; } return 1; } /*---------------------------------------------------------------------------*/ static void update_config(void) { if(construct_client_id() == 0) { /* Fatal error. Client ID larger than the buffer */ state = STATE_CONFIG_ERROR; return; } if(construct_sub_topic() == 0) { /* Fatal error. Topic larger than the buffer */ state = STATE_CONFIG_ERROR; return; } if(construct_pub_topic() == 0) { /* Fatal error. Topic larger than the buffer */ state = STATE_CONFIG_ERROR; return; } /* Reset the counter */ seq_nr_value = 0; state = STATE_INIT; /* * Schedule next timer event ASAP * * If we entered an error state then we won't do anything when it fires. * * Since the error at this stage is a config error, we will only exit this * error state if we get a new config. */ etimer_set(&publish_periodic_timer, 0); return; } /*---------------------------------------------------------------------------*/ static int init_config() { /* Populate configuration with default values */ memset(&conf, 0, sizeof(mqtt_client_config_t)); memcpy(conf.org_id, MQTT_CLIENT_ORG_ID, strlen(MQTT_CLIENT_ORG_ID)); memcpy(conf.type_id, DEFAULT_TYPE_ID, strlen(DEFAULT_TYPE_ID)); memcpy(conf.auth_token, MQTT_CLIENT_AUTH_TOKEN, strlen(MQTT_CLIENT_AUTH_TOKEN)); memcpy(conf.event_type_id, DEFAULT_EVENT_TYPE_ID, strlen(DEFAULT_EVENT_TYPE_ID)); memcpy(conf.broker_ip, broker_ip, strlen(broker_ip)); memcpy(conf.cmd_type, DEFAULT_SUBSCRIBE_CMD_TYPE, 1); conf.broker_port = DEFAULT_BROKER_PORT; conf.pub_interval = DEFAULT_PUBLISH_INTERVAL; conf.def_rt_ping_interval = DEFAULT_RSSI_MEAS_INTERVAL; return 1; } /*---------------------------------------------------------------------------*/ static void subscribe(void) { /* Publish MQTT topic in IBM quickstart format */ mqtt_status_t status; status = mqtt_subscribe(&conn, NULL, sub_topic, MQTT_QOS_LEVEL_0); LOG_DBG("Subscribing!\n"); if(status == MQTT_STATUS_OUT_QUEUE_FULL) { LOG_ERR("Tried to subscribe but command queue was full!\n"); } } /*---------------------------------------------------------------------------*/ static void publish(void) { /* Publish MQTT topic in IBM quickstart format */ int len; int remaining = APP_BUFFER_SIZE; int i; char def_rt_str[64]; seq_nr_value++; buf_ptr = app_buffer; len = snprintf(buf_ptr, remaining, "{" "\"d\":{" "\"Platform\":\""CONTIKI_TARGET_STRING"\"," #ifdef CONTIKI_BOARD_STRING "\"Board\":\""CONTIKI_BOARD_STRING"\"," #endif "\"Seq #\":%d," "\"Uptime (sec)\":%lu", seq_nr_value, clock_seconds()); if(len < 0 || len >= remaining) { LOG_ERR("Buffer too short. Have %d, need %d + \\0\n", remaining, len); return; } remaining -= len; buf_ptr += len; /* Put our Default route's string representation in a buffer */ memset(def_rt_str, 0, sizeof(def_rt_str)); ipaddr_sprintf(def_rt_str, sizeof(def_rt_str), uip_ds6_defrt_choose()); len = snprintf(buf_ptr, remaining, ",\"Def Route\":\"%s\",\"RSSI (dBm)\":%d", def_rt_str, def_rt_rssi); if(len < 0 || len >= remaining) { LOG_ERR("Buffer too short. Have %d, need %d + \\0\n", remaining, len); return; } remaining -= len; buf_ptr += len; for(i = 0; i < mqtt_client_extension_count; i++) { len = snprintf(buf_ptr, remaining, ",%s", mqtt_client_extensions[i]->value()); if(len < 0 || len >= remaining) { LOG_ERR("Buffer too short. Have %d, need %d + \\0\n", remaining, len); return; } remaining -= len; buf_ptr += len; } len = snprintf(buf_ptr, remaining, "}}"); if(len < 0 || len >= remaining) { LOG_ERR("Buffer too short. Have %d, need %d + \\0\n", remaining, len); return; } mqtt_publish(&conn, NULL, pub_topic, (uint8_t *)app_buffer, strlen(app_buffer), MQTT_QOS_LEVEL_0, MQTT_RETAIN_OFF); LOG_DBG("Publish!\n"); } /*---------------------------------------------------------------------------*/ static void connect_to_broker(void) { /* Connect to MQTT server */ mqtt_connect(&conn, conf.broker_ip, conf.broker_port, (conf.pub_interval * 3) / CLOCK_SECOND); state = STATE_CONNECTING; } /*---------------------------------------------------------------------------*/ static void ping_parent(void) { if(uip_ds6_get_global(ADDR_PREFERRED) == NULL) { return; } uip_icmp6_send(uip_ds6_defrt_choose(), ICMP6_ECHO_REQUEST, 0, ECHO_REQ_PAYLOAD_LEN); } /*---------------------------------------------------------------------------*/ static void state_machine(void) { switch(state) { case STATE_INIT: /* If we have just been configured register MQTT connection */ mqtt_register(&conn, &mqtt_client_process, client_id, mqtt_event, MAX_TCP_SEGMENT_SIZE); /* * If we are not using the quickstart service (thus we are an IBM * registered device), we need to provide user name and password */ if(strncasecmp(conf.org_id, QUICKSTART, strlen(conf.org_id)) != 0) { if(strlen(conf.auth_token) == 0) { LOG_ERR("User name set, but empty auth token\n"); state = STATE_ERROR; break; } else { mqtt_set_username_password(&conn, MQTT_CLIENT_USERNAME, conf.auth_token); } } /* _register() will set auto_reconnect. We don't want that. */ conn.auto_reconnect = 0; connect_attempt = 1; state = STATE_REGISTERED; LOG_DBG("Init\n"); /* Continue */ case STATE_REGISTERED: if(uip_ds6_get_global(ADDR_PREFERRED) != NULL) { /* Registered and with a public IP. Connect */ LOG_DBG("Registered. Connect attempt %u\n", connect_attempt); ping_parent(); connect_to_broker(); } else { leds_on(MQTT_CLIENT_STATUS_LED); ctimer_set(&ct, NO_NET_LED_DURATION, publish_led_off, NULL); } etimer_set(&publish_periodic_timer, NET_CONNECT_PERIODIC); return; break; case STATE_CONNECTING: leds_on(MQTT_CLIENT_STATUS_LED); ctimer_set(&ct, CONNECTING_LED_DURATION, publish_led_off, NULL); /* Not connected yet. Wait */ LOG_DBG("Connecting (%u)\n", connect_attempt); break; case STATE_CONNECTED: /* Don't subscribe unless we are a registered device */ if(strncasecmp(conf.org_id, QUICKSTART, strlen(conf.org_id)) == 0) { LOG_DBG("Using 'quickstart': Skipping subscribe\n"); state = STATE_PUBLISHING; } /* Continue */ case STATE_PUBLISHING: /* If the timer expired, the connection is stable. */ if(timer_expired(&connection_life)) { /* * Intentionally using 0 here instead of 1: We want RECONNECT_ATTEMPTS * attempts if we disconnect after a successful connect */ connect_attempt = 0; } if(mqtt_ready(&conn) && conn.out_buffer_sent) { /* Connected. Publish */ if(state == STATE_CONNECTED) { subscribe(); state = STATE_PUBLISHING; } else { leds_on(MQTT_CLIENT_STATUS_LED); ctimer_set(&ct, PUBLISH_LED_ON_DURATION, publish_led_off, NULL); LOG_DBG("Publishing\n"); publish(); } etimer_set(&publish_periodic_timer, conf.pub_interval); /* Return here so we don't end up rescheduling the timer */ return; } else { /* * Our publish timer fired, but some MQTT packet is already in flight * (either not sent at all, or sent but not fully ACKd). * * This can mean that we have lost connectivity to our broker or that * simply there is some network delay. In both cases, we refuse to * trigger a new message and we wait for TCP to either ACK the entire * packet after retries, or to timeout and notify us. */ LOG_DBG("Publishing... (MQTT state=%d, q=%u)\n", conn.state, conn.out_queue_full); } break; case STATE_DISCONNECTED: LOG_DBG("Disconnected\n"); if(connect_attempt < RECONNECT_ATTEMPTS || RECONNECT_ATTEMPTS == RETRY_FOREVER) { /* Disconnect and backoff */ clock_time_t interval; mqtt_disconnect(&conn); connect_attempt++; interval = connect_attempt < 3 ? RECONNECT_INTERVAL << connect_attempt : RECONNECT_INTERVAL << 3; LOG_DBG("Disconnected. Attempt %u in %lu ticks\n", connect_attempt, interval); etimer_set(&publish_periodic_timer, interval); state = STATE_REGISTERED; return; } else { /* Max reconnect attempts reached. Enter error state */ state = STATE_ERROR; LOG_DBG("Aborting connection after %u attempts\n", connect_attempt - 1); } break; case STATE_CONFIG_ERROR: /* Idle away. The only way out is a new config */ LOG_ERR("Bad configuration.\n"); return; case STATE_ERROR: default: leds_on(MQTT_CLIENT_STATUS_LED); /* * 'default' should never happen. * * If we enter here it's because of some error. Stop timers. The only thing * that can bring us out is a new config event */ LOG_ERR("Default case: State=0x%02x\n", state); return; } /* If we didn't return so far, reschedule ourselves */ etimer_set(&publish_periodic_timer, STATE_MACHINE_PERIODIC); } /*---------------------------------------------------------------------------*/ static void init_extensions(void) { int i; for(i = 0; i < mqtt_client_extension_count; i++) { if(mqtt_client_extensions[i]->init) { mqtt_client_extensions[i]->init(); } } } /*---------------------------------------------------------------------------*/ PROCESS_THREAD(mqtt_client_process, ev, data) { PROCESS_BEGIN(); printf("MQTT Client Process\n"); if(init_config() != 1) { PROCESS_EXIT(); } init_extensions(); update_config(); def_rt_rssi = 0x8000000; uip_icmp6_echo_reply_callback_add(&echo_reply_notification, echo_reply_handler); etimer_set(&echo_request_timer, conf.def_rt_ping_interval); /* Main loop */ while(1) { PROCESS_YIELD(); if(ev == button_hal_release_event && ((button_hal_button_t *)data)->unique_id == BUTTON_HAL_ID_BUTTON_ZERO) { if(state == STATE_ERROR) { connect_attempt = 1; state = STATE_REGISTERED; } } if((ev == PROCESS_EVENT_TIMER && data == &publish_periodic_timer) || ev == PROCESS_EVENT_POLL || (ev == button_hal_release_event && ((button_hal_button_t *)data)->unique_id == BUTTON_HAL_ID_BUTTON_ZERO)) { state_machine(); } if(ev == PROCESS_EVENT_TIMER && data == &echo_request_timer) { ping_parent(); etimer_set(&echo_request_timer, conf.def_rt_ping_interval); } } PROCESS_END(); } /*---------------------------------------------------------------------------*/