Several changes to fix bugs and harden mqtt code.
1. The PT_MQTT_WAIT_SEND() macro has several issues: - It does not check the return value from process_post(), which sometimes returns an error code. See next issue. - Each time the macro is called, is posts an event to itself. The idea seems to be that the event should be absorbed by the macro itself, so when the macro terminates there is NOT a net growth of the event queue. This does not work. The reason is that the PROCESS_WAIT_EVENT() sometimes absorbs a broadcast event instead of its own event, and then the number of events in the event queue increases. This leads to event explosions and overflow in the event queue. - The macro cannot call PT_EXIT(). This will expand to a return statement, causing a return from the function calling the macro (mqtt_process), rather then exiting the protothread (which was probably the intention). Protothreads have lexical scope... Fixes: 1) Check return value from process_post() 2) Loop until the event posted to itself is absorbed (rather than just absorbing the next event) 3) Replace PT_EXIT() with PT_INIT() (doesn't really make a difference, could probably just be removed). 2. Change order of the checks in the protothread-calling loops in mqtt_process(). Reason: When a protothread has been cleared by PT_MQTT_WAIT_SEND(), it will not return a value, so checking against PT_EXITED does not make sense. 3. PT_MQTT_WRITE_BYTES() should initialize conn->out_write_pos to 0. When PT_MQTT_WRITE_BYTES() does not finish (due to TCP disconnect for instance), it may leave conn->out_write_pos with a non-zero value. Next time PT_MQTT_WRITE_BYTES() is called, it will take data from the wrong place. 4. Put MQTT_CONN_STATE_ABORT_IMMEDIATE before MQTT_CONN_STATE_NOT_CONNECTED in the enum list, so that the check if(conn->state > MQTT_CONN_STATE_NOT_CONNECTED) in mqtt_connect() fails when in state MQTT_CONN_STATE_ABORT_IMMEDIATE. Otherwise, it will deadlock and not reattempt connections while in this state.
This commit is contained in:
parent
cbde8855cf
commit
b2cfb727ed
@ -130,6 +130,7 @@ typedef enum {
|
||||
/*---------------------------------------------------------------------------*/
|
||||
/* Protothread send macros */
|
||||
#define PT_MQTT_WRITE_BYTES(conn, data, len) \
|
||||
conn->out_write_pos = 0; \
|
||||
while(write_bytes(conn, data, len)) { \
|
||||
PT_WAIT_UNTIL(pt, (conn)->out_buffer_sent); \
|
||||
}
|
||||
@ -147,15 +148,19 @@ typedef enum {
|
||||
*/
|
||||
#define PT_MQTT_WAIT_SEND() \
|
||||
do { \
|
||||
process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL); \
|
||||
PROCESS_WAIT_EVENT(); \
|
||||
if(ev == mqtt_abort_now_event) { \
|
||||
conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \
|
||||
PT_EXIT(&conn->out_proto_thread); \
|
||||
process_post(PROCESS_CURRENT(), ev, data); \
|
||||
} else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \
|
||||
process_post(PROCESS_CURRENT(), ev, data); \
|
||||
} \
|
||||
if (PROCESS_ERR_OK == \
|
||||
process_post(PROCESS_CURRENT(), mqtt_continue_send_event, NULL)) { \
|
||||
do { \
|
||||
PROCESS_WAIT_EVENT(); \
|
||||
if(ev == mqtt_abort_now_event) { \
|
||||
conn->state = MQTT_CONN_STATE_ABORT_IMMEDIATE; \
|
||||
PT_INIT(&conn->out_proto_thread); \
|
||||
process_post(PROCESS_CURRENT(), ev, data); \
|
||||
} else if(ev >= mqtt_event_min && ev <= mqtt_event_max) { \
|
||||
process_post(PROCESS_CURRENT(), ev, data); \
|
||||
} \
|
||||
} while (ev != mqtt_continue_send_event); \
|
||||
} \
|
||||
} while(0)
|
||||
/*---------------------------------------------------------------------------*/
|
||||
static process_event_t mqtt_do_connect_tcp_event;
|
||||
@ -1188,8 +1193,8 @@ PROCESS_THREAD(mqtt_process, ev, data)
|
||||
if(conn->state == MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT) {
|
||||
if(conn->out_buffer_sent == 1) {
|
||||
PT_INIT(&conn->out_proto_thread);
|
||||
while(disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
|
||||
conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE) {
|
||||
while(conn->state != MQTT_CONN_STATE_ABORT_IMMEDIATE &&
|
||||
disconnect_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
|
||||
PT_MQTT_WAIT_SEND();
|
||||
}
|
||||
abort_connection(conn);
|
||||
@ -1206,8 +1211,8 @@ PROCESS_THREAD(mqtt_process, ev, data)
|
||||
if(conn->out_buffer_sent == 1 &&
|
||||
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
|
||||
PT_INIT(&conn->out_proto_thread);
|
||||
while(pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
|
||||
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
|
||||
while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
|
||||
pingreq_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
|
||||
PT_MQTT_WAIT_SEND();
|
||||
}
|
||||
}
|
||||
@ -1219,8 +1224,8 @@ PROCESS_THREAD(mqtt_process, ev, data)
|
||||
if(conn->out_buffer_sent == 1 &&
|
||||
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
|
||||
PT_INIT(&conn->out_proto_thread);
|
||||
while(subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
|
||||
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
|
||||
while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
|
||||
subscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
|
||||
PT_MQTT_WAIT_SEND();
|
||||
}
|
||||
}
|
||||
@ -1232,8 +1237,8 @@ PROCESS_THREAD(mqtt_process, ev, data)
|
||||
if(conn->out_buffer_sent == 1 &&
|
||||
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
|
||||
PT_INIT(&conn->out_proto_thread);
|
||||
while(unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
|
||||
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
|
||||
while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
|
||||
unsubscribe_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
|
||||
PT_MQTT_WAIT_SEND();
|
||||
}
|
||||
}
|
||||
@ -1245,8 +1250,8 @@ PROCESS_THREAD(mqtt_process, ev, data)
|
||||
if(conn->out_buffer_sent == 1 &&
|
||||
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
|
||||
PT_INIT(&conn->out_proto_thread);
|
||||
while(publish_pt(&conn->out_proto_thread, conn) < PT_EXITED &&
|
||||
conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER) {
|
||||
while(conn->state == MQTT_CONN_STATE_CONNECTED_TO_BROKER &&
|
||||
publish_pt(&conn->out_proto_thread, conn) < PT_EXITED) {
|
||||
PT_MQTT_WAIT_SEND();
|
||||
}
|
||||
}
|
||||
|
@ -196,6 +196,7 @@ typedef enum {
|
||||
MQTT_CONN_STATE_ERROR,
|
||||
MQTT_CONN_STATE_DNS_ERROR,
|
||||
MQTT_CONN_STATE_DISCONNECTING,
|
||||
MQTT_CONN_STATE_ABORT_IMMEDIATE,
|
||||
|
||||
MQTT_CONN_STATE_NOT_CONNECTED,
|
||||
MQTT_CONN_STATE_DNS_LOOKUP,
|
||||
@ -204,7 +205,6 @@ typedef enum {
|
||||
MQTT_CONN_STATE_CONNECTING_TO_BROKER,
|
||||
MQTT_CONN_STATE_CONNECTED_TO_BROKER,
|
||||
MQTT_CONN_STATE_SENDING_MQTT_DISCONNECT,
|
||||
MQTT_CONN_STATE_ABORT_IMMEDIATE,
|
||||
} mqtt_conn_state_t;
|
||||
/*---------------------------------------------------------------------------*/
|
||||
struct mqtt_string {
|
||||
|
Loading…
Reference in New Issue
Block a user