From eed22a6f3c1d153cb74aeb435cbd5501a4fbe257 Mon Sep 17 00:00:00 2001 From: adamdunkels Date: Wed, 22 Sep 2010 22:08:08 +0000 Subject: [PATCH] A number of changes to the collect code: * Added an optional "keep alive" mechanism whereby an idle network is periodically probed by dummy packets to maintain a recent quality metric when there is no traffic. * Bugfix in when new routing metrics should be advertised * Rewrote the ACK logic so that a queuebuf is not allocated for each ack, only for those acks generated by the sink node. * Updated the wrap-around logic for sequence numbers: when a sequence number wraps, it won't go back to 0. Instead, it wraps to 128. This allows us to understand when a node has rebooted: when its seqno is < 128, it has recently rebooted. --- core/net/rime/collect.c | 217 ++++++++++++++++++++++++++++++---------- core/net/rime/collect.h | 9 +- 2 files changed, 170 insertions(+), 56 deletions(-) diff --git a/core/net/rime/collect.c b/core/net/rime/collect.c index 0c4ad348b..ab2a0d846 100644 --- a/core/net/rime/collect.c +++ b/core/net/rime/collect.c @@ -33,7 +33,7 @@ * * This file is part of the Contiki operating system. * - * $Id: collect.c,v 1.52 2010/09/14 06:48:36 adamdunkels Exp $ + * $Id: collect.c,v 1.53 2010/09/22 22:08:08 adamdunkels Exp $ */ /** @@ -129,6 +129,8 @@ struct ack_msg { #define REXMIT_TIME CLOCK_SECOND * 1 #define FORWARD_PACKET_LIFETIME (6 * (REXMIT_TIME) << 3) #define MAX_SENDING_QUEUE 16 +#define KEEPALIVE_REXMITS 4 + MEMB(send_queue_memb, struct packetqueue_item, MAX_SENDING_QUEUE); /* These specifiy the sink's routing metric (0) and the maximum @@ -204,6 +206,8 @@ struct { /* Forward declarations. */ static void send_queued_packet(struct collect_conn *c); static void retransmit_callback(void *ptr); +static void set_keepalive_timer(struct collect_conn *c); + /*---------------------------------------------------------------------------*/ /** * This function computes the current rtmetric by adding the last @@ -233,7 +237,7 @@ rtmetric_compute(struct collect_conn *tc) } else { /* Our rtmetric is the rtmetric of our parent neighbor plus the expected transmissions to reach that neighbor. */ - rtmetric = collect_neighbor_rtmetric(n); + rtmetric = collect_neighbor_rtmetric_link_estimate(n); } return rtmetric; @@ -287,8 +291,8 @@ update_parent(struct collect_conn *tc) #if DRAW_TREE printf("#L %d 0\n", tc->parent.u8[0]); #endif /* DRAW_TREE */ - if(collect_neighbor_rtmetric(best) + SIGNIFICANT_RTMETRIC_CHANGE < - collect_neighbor_rtmetric(current)) { + if(collect_neighbor_rtmetric_link_estimate(best) + SIGNIFICANT_RTMETRIC_CHANGE < + collect_neighbor_rtmetric_link_estimate(current)) { /* We switch parent. */ PRINTF("update_parent: new parent %d.%d (%d) old parent %d.%d (%d)\n", best->addr.u8[0], best->addr.u8[1], @@ -381,7 +385,7 @@ update_rtmetric(struct collect_conn *tc) /* If we now have a significantly better or worse rtmetric than we had before, what we need to make sure that our neighbors find out about this quickly. */ - if(new_rtmetric + SIGNIFICANT_RTMETRIC_CHANGE < old_rtmetric && + if(new_rtmetric < old_rtmetric - SIGNIFICANT_RTMETRIC_CHANGE || new_rtmetric > old_rtmetric + SIGNIFICANT_RTMETRIC_CHANGE) { PRINTF("update_rtmetric: new_rtmetric %d + %d < old_rtmetric %d\n", new_rtmetric, SIGNIFICANT_RTMETRIC_CHANGE, old_rtmetric); @@ -464,7 +468,7 @@ send_queued_packet(struct collect_conn *c) /* Remember the parent that we sent this packet to. */ rimeaddr_copy(&c->current_parent, &c->parent); - + /* This is the first time we transmit this packet, so set transmissions to zero. */ c->transmissions = 0; @@ -502,7 +506,7 @@ send_queued_packet(struct collect_conn *c) PRINTF("listen\n"); announcement_listen(1); ctimer_set(&c->transmit_after_scan_timer, ANNOUNCEMENT_SCAN_TIME, - send_queued_packet, tc); + send_queued_packet, c); #else /* COLLECT_CONF_WITH_LISTEN */ announcement_set_value(&c->announcement, RTMETRIC_MAX); announcement_bump(&c->announcement); @@ -631,13 +635,13 @@ handle_ack(struct collect_conn *tc) tc->transmissions); n = collect_neighbor_list_find(&tc->neighbor_list, packetbuf_addr(PACKETBUF_ADDR_SENDER)); - collect_neighbor_tx(n, tc->transmissions); if(n != NULL) { + collect_neighbor_tx(n, tc->transmissions); collect_neighbor_update_rtmetric(n, rtmetric); update_rtmetric(tc); } - + PRINTF("%d.%d: ACK from %d.%d after %d transmissions, flags %02x, rtmetric %d\n", rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], tc->current_parent.u8[0], tc->current_parent.u8[1], @@ -651,6 +655,7 @@ handle_ack(struct collect_conn *tc) if(msg->flags & ACK_FLAGS_RTMETRIC_NEEDS_UPDATE) { bump_advertisement(tc); } + set_keepalive_timer(tc); } else { stats.badack++; } @@ -660,48 +665,32 @@ static void send_ack(struct collect_conn *tc, const rimeaddr_t *to, int flags) { struct ack_msg *ack; - struct queuebuf *q; - uint16_t packet_seqno, packet_eseqno; + uint16_t packet_seqno = packetbuf_attr(PACKETBUF_ATTR_PACKET_ID); + uint16_t packet_eseqno = packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID); + + packetbuf_clear(); + packetbuf_set_datalen(sizeof(struct ack_msg)); + ack = packetbuf_dataptr(); + memset(ack, 0, sizeof(struct ack_msg)); + ack->rtmetric = tc->rtmetric; + ack->flags = flags; - packet_seqno = packetbuf_attr(PACKETBUF_ATTR_PACKET_ID); - packet_eseqno = packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID); + packetbuf_set_addr(PACKETBUF_ADDR_RECEIVER, to); + packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE, PACKETBUF_ATTR_PACKET_TYPE_ACK); + packetbuf_set_attr(PACKETBUF_ATTR_RELIABLE, 0); + packetbuf_set_attr(PACKETBUF_ATTR_ERELIABLE, 0); + packetbuf_set_attr(PACKETBUF_ATTR_PACKET_ID, packet_seqno); + packetbuf_set_attr(PACKETBUF_ATTR_MAX_MAC_TRANSMISSIONS, MAX_ACK_MAC_REXMITS); + unicast_send(&tc->unicast_conn, to); - q = queuebuf_new_from_packetbuf(); - if(q != NULL) { + PRINTF("%d.%d: collect: Sending ACK to %d.%d for %d (epacket_id %d)\n", + rimeaddr_node_addr.u8[0],rimeaddr_node_addr.u8[1], + to->u8[0], + to->u8[1], + packet_seqno, packet_eseqno); - packetbuf_clear(); - packetbuf_set_datalen(sizeof(struct ack_msg)); - ack = packetbuf_dataptr(); - memset(ack, 0, sizeof(struct ack_msg)); - ack->rtmetric = tc->rtmetric; - ack->flags = flags; - - packetbuf_set_addr(PACKETBUF_ADDR_RECEIVER, to); - packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE, PACKETBUF_ATTR_PACKET_TYPE_ACK); - packetbuf_set_attr(PACKETBUF_ATTR_RELIABLE, 0); - packetbuf_set_attr(PACKETBUF_ATTR_ERELIABLE, 0); - packetbuf_set_attr(PACKETBUF_ATTR_PACKET_ID, packet_seqno); - packetbuf_set_attr(PACKETBUF_ATTR_MAX_MAC_TRANSMISSIONS, MAX_ACK_MAC_REXMITS); - unicast_send(&tc->unicast_conn, to); - - PRINTF("%d.%d: collect: Sending ACK to %d.%d for %d (epacket_id %d)\n", - rimeaddr_node_addr.u8[0],rimeaddr_node_addr.u8[1], - to->u8[0], - to->u8[1], - packet_seqno, packet_eseqno); - - RIMESTATS_ADD(acktx); - - queuebuf_to_packetbuf(q); - queuebuf_free(q); - stats.acksent++; - } else { - PRINTF("%d.%d: collect: could not send ACK to %d.%d for %d: no queued buffers\n", - rimeaddr_node_addr.u8[0],rimeaddr_node_addr.u8[1], - to->u8[0], to->u8[1], - packet_seqno); - stats.ackdrop++; - } + RIMESTATS_ADD(acktx); + stats.acksent++; } /*---------------------------------------------------------------------------*/ static void @@ -774,9 +763,23 @@ node_packet_received(struct unicast_conn *c, const rimeaddr_t *from) /* If we are the sink, the packet has reached its final destination and we call the receive function. */ if(tc->rtmetric == RTMETRIC_SINK) { + struct queuebuf *q; + + /* We first send the ACK. We copy the data packet to a queuebuf + first. */ + q = queuebuf_new_from_packetbuf(); + if(q != NULL) { + send_ack(tc, &ack_to, 0); + queuebuf_to_packetbuf(q); + queuebuf_free(q); + } else { + PRINTF("%d.%d: collect: could not send ACK to %d.%d for %d: no queued buffers\n", + rimeaddr_node_addr.u8[0],rimeaddr_node_addr.u8[1], + ack_to.u8[0], ack_to.u8[1], + packet_seqno); + stats.ackdrop++; + } - /* We first send the ACK. */ - send_ack(tc, &ack_to, 0); PRINTF("%d.%d: sink received packet %d from %d.%d via %d.%d\n", rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], @@ -831,7 +834,8 @@ node_packet_received(struct unicast_conn *c, const rimeaddr_t *from) send_ack(tc, &ack_to, ackflags); send_queued_packet(tc); } else { - send_ack(tc, &ack_to, ackflags | ACK_FLAGS_DROPPED | ACK_FLAGS_CONGESTED); + send_ack(tc, &ack_to, + ackflags | ACK_FLAGS_DROPPED | ACK_FLAGS_CONGESTED); PRINTF("%d.%d: packet dropped: no queue buffer available\n", rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); stats.qdrop++; @@ -906,8 +910,8 @@ timedout(struct collect_conn *tc) &tc->current_parent), tc->max_rexmits); update_rtmetric(tc); - send_next_packet(tc); + set_keepalive_timer(tc); } /*---------------------------------------------------------------------------*/ static void @@ -938,7 +942,22 @@ adv_received(struct neighbor_discovery_conn *c, const rimeaddr_t *from, if(n == NULL) { collect_neighbor_list_add(&tc->neighbor_list, from, rtmetric); + if(rtmetric == RTMETRIC_MAX) { + bump_advertisement(tc); + } } else { + /* Check if the advertised rtmetric has changed to + RTMETRIC_MAX. This may indicate that the neighbor has lost its + routes or that it has rebooted. In either case, we bump our + advertisement rate to allow our neighbor to receive a new + rtmetric from us. If our neighbor already happens to have an + rtmetric of RTMETRIC_MAX recorded, it may mean that our + neighbor does not hear our advertisements. If this is the case, + we should not bump our advertisement rate. */ + if(rtmetric == RTMETRIC_MAX && + collect_neighbor_rtmetric(n) != RTMETRIC_MAX) { + bump_advertisement(tc); + } collect_neighbor_update_rtmetric(n, rtmetric); PRINTF("%d.%d: updating neighbor %d.%d, etx %d\n", rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], @@ -963,7 +982,22 @@ received_announcement(struct announcement *a, const rimeaddr_t *from, PRINTF("%d.%d: new neighbor %d.%d, etx %d\n", rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], from->u8[0], from->u8[1], value); + if(value == RTMETRIC_MAX) { + bump_advertisement(tc); + } } else { + /* Check if the advertised rtmetric has changed to + RTMETRIC_MAX. This may indicate that the neighbor has lost its + routes or that it has rebooted. In either case, we bump our + advertisement rate to allow our neighbor to receive a new + rtmetric from us. If our neighbor already happens to have an + rtmetric of RTMETRIC_MAX recorded, it may mean that our + neighbor does not hear our advertisements. If this is the case, + we should not bump our advertisement rate. */ + if(value == RTMETRIC_MAX && + collect_neighbor_rtmetric(n) != RTMETRIC_MAX) { + bump_advertisement(tc); + } collect_neighbor_update_rtmetric(n, value); PRINTF("%d.%d: updating neighbor %d.%d, etx %d\n", rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], @@ -1021,6 +1055,64 @@ collect_open(struct collect_conn *tc, uint16_t channels, #endif /* !COLLECT_ANNOUNCEMENTS */ } /*---------------------------------------------------------------------------*/ +static void +send_keepalive(void *ptr) +{ + struct collect_conn *c = ptr; + struct collect_neighbor *n; + + set_keepalive_timer(c); + + /* Send keepalive message only if there are no pending transmissions. */ + if(packetqueue_len(&c->send_queue) == 0) { + packetbuf_clear(); + packetbuf_set_addr(PACKETBUF_ADDR_ESENDER, &rimeaddr_node_addr); + packetbuf_set_attr(PACKETBUF_ATTR_HOPS, 1); + packetbuf_set_attr(PACKETBUF_ATTR_TTL, 1); + packetbuf_set_attr(PACKETBUF_ATTR_MAX_REXMIT, KEEPALIVE_REXMITS); + + PRINTF("%d.%d: saending keepalive packet %d, max_rexmits %d\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], + packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID), + packetbuf_attr(PACKETBUF_ATTR_MAX_REXMIT)); + + /* Allocate space for the header. */ + packetbuf_hdralloc(sizeof(struct data_msg_hdr)); + + n = collect_neighbor_list_find(&c->neighbor_list, &c->parent); + if(n != NULL) { + PRINTF("%d.%d: sending keepalive to %d.%d\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], + n->addr.u8[0], n->addr.u8[1]); + + if(packetqueue_enqueue_packetbuf(&c->send_queue, + FORWARD_PACKET_LIFETIME, + c)) { + send_queued_packet(c); + } + } + } +} +/*---------------------------------------------------------------------------*/ +static void +set_keepalive_timer(struct collect_conn *c) +{ + if(c->keepalive_period != 0) { + ctimer_set(&c->keepalive_timer, (c->keepalive_period / 2) + + (random_rand() % (c->keepalive_period / 2)), + send_keepalive, c); + } else { + ctimer_stop(&c->keepalive_timer); + } +} +/*---------------------------------------------------------------------------*/ +void +collect_set_keepalive(struct collect_conn *c, clock_time_t period) +{ + c->keepalive_period = period; + set_keepalive_timer(c); +} +/*---------------------------------------------------------------------------*/ void collect_close(struct collect_conn *tc) { @@ -1050,6 +1142,8 @@ collect_set_sink(struct collect_conn *tc, int should_be_sink) announcement_set_value(&tc->announcement, tc->rtmetric); #endif /* COLLECT_ANNOUNCEMENTS */ update_rtmetric(tc); + + bump_advertisement(tc); } /*---------------------------------------------------------------------------*/ int @@ -1057,7 +1151,21 @@ collect_send(struct collect_conn *tc, int rexmits) { struct collect_neighbor *n; - packetbuf_set_attr(PACKETBUF_ATTR_EPACKET_ID, tc->eseqno++); + packetbuf_set_attr(PACKETBUF_ATTR_EPACKET_ID, tc->eseqno); + + /* Increase the sequence number for the packet we send out. We + employ a trick that allows us to see that a node has been + rebooted: if the sequence number wraps to 0, we set it to half of + the sequence number space. This allows us to detect reboots, + since if a sequence number is less than half of the sequence + number space, the data comes from a node that was recently + rebooted. */ + + tc->eseqno = (tc->eseqno + 1) % (1 << COLLECT_PACKET_ID_BITS); + + if(tc->eseqno == 0) { + tc->eseqno = ((int)(1 << COLLECT_PACKET_ID_BITS)) / 2; + } packetbuf_set_addr(PACKETBUF_ADDR_ESENDER, &rimeaddr_node_addr); packetbuf_set_attr(PACKETBUF_ATTR_HOPS, 1); packetbuf_set_attr(PACKETBUF_ATTR_TTL, MAX_HOPLIM); @@ -1086,7 +1194,7 @@ collect_send(struct collect_conn *tc, int rexmits) PRINTF("%d.%d: sending to %d.%d\n", rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], n->addr.u8[0], n->addr.u8[1]); - + if(packetqueue_enqueue_packetbuf(&tc->send_queue, FORWARD_PACKET_LIFETIME, tc)) { @@ -1135,6 +1243,7 @@ void collect_purge(struct collect_conn *tc) { collect_neighbor_list_purge(&tc->neighbor_list); + rimeaddr_copy(&tc->parent, &rimeaddr_null); update_rtmetric(tc); #if DRAW_TREE printf("#L %d 0\n", tc->parent.u8[0]); diff --git a/core/net/rime/collect.h b/core/net/rime/collect.h index 88a8815a1..5008357ac 100644 --- a/core/net/rime/collect.h +++ b/core/net/rime/collect.h @@ -47,7 +47,7 @@ * * This file is part of the Contiki operating system. * - * $Id: collect.h,v 1.20 2010/09/13 13:28:14 adamdunkels Exp $ + * $Id: collect.h,v 1.21 2010/09/22 22:08:08 adamdunkels Exp $ */ /** @@ -93,11 +93,14 @@ struct collect_conn { struct ctimer transmit_after_scan_timer; #endif /* COLLECT_CONF_ANNOUNCEMENTS */ const struct collect_callbacks *cb; - struct ctimer t; struct ctimer retransmission_timer; LIST_STRUCT(send_queue_list); struct packetqueue send_queue; struct collect_neighbor_list neighbor_list; + + struct ctimer keepalive_timer; + clock_time_t keepalive_period; + rimeaddr_t parent, current_parent; uint16_t rtmetric; uint8_t seqno; @@ -122,6 +125,8 @@ void collect_set_sink(struct collect_conn *c, int should_be_sink); int collect_depth(struct collect_conn *c); +void collect_set_keepalive(struct collect_conn *c, clock_time_t period); + void collect_print_stats(void); #define COLLECT_MAX_DEPTH 255