From 7165a3866f0ba3e8b00523a1117f927ddf6de338 Mon Sep 17 00:00:00 2001 From: adamdunkels Date: Sun, 28 Feb 2010 09:18:01 +0000 Subject: [PATCH] Significant rework of the Contiki data collection protocol: * the new version makes use of MAC-layer feedback so that bad paths can be identified quicker and then avoided. * the new code uses transport layer ACKs that contain feedback from the collect protocol: when a packet cannot be forwarded due to lack of resources, the ACK contains a flag that indicates that the packet could not be forwarded. ACKs also contain the routing metric of the sender, which improves agility in face of rapid path changes. * loop detection and management has been improved: with higher path metric agility, the system is more prone to short-lived routing loops. Instead of dropping looping packets, the new version adjusts the routing metric for the routes that exhibit loops so that the risk for future loops is reduced. * make use of packet attributes to inform the MAC layer of how many times packets should be retransmitted. --- core/net/rime/collect.c | 544 +++++++++++++++++++++++++++------------- core/net/rime/collect.h | 28 ++- 2 files changed, 386 insertions(+), 186 deletions(-) diff --git a/core/net/rime/collect.c b/core/net/rime/collect.c index bd5454ebe..5b13815c3 100644 --- a/core/net/rime/collect.c +++ b/core/net/rime/collect.c @@ -1,6 +1,3 @@ -/* XXX: send explicit congestion notification if already forwarding - packet. */ - /** * \addtogroup rimecollect * @{ @@ -36,7 +33,7 @@ * * This file is part of the Contiki operating system. * - * $Id: collect.c,v 1.33 2010/02/23 18:35:23 adamdunkels Exp $ + * $Id: collect.c,v 1.34 2010/02/28 09:18:01 adamdunkels Exp $ */ /** @@ -56,9 +53,7 @@ #include "dev/radio-sensor.h" -#if CONTIKI_TARGET_NETSIM -#include "ether.h" -#endif +#include "lib/random.h" #include #include @@ -78,12 +73,21 @@ struct recent_packet { uint8_t seqno; }; +struct ack_msg { + uint8_t flags, dummy; + uint16_t rtmetric; +}; + +#define ACK_FLAGS_CONGESTED 0x80 +#define ACK_FLAGS_DROPPED 0x40 +#define ACK_FLAGS_LIFETIME_EXCEEDED 0x20 + static struct recent_packet recent_packets[NUM_RECENT_PACKETS]; static uint8_t recent_packet_ptr; #define FORWARD_PACKET_LIFETIME (CLOCK_SECOND * 16) -#define MAX_FORWARDING_QUEUE 6 -PACKETQUEUE(forwarding_queue, MAX_FORWARDING_QUEUE); +#define MAX_SENDING_QUEUE 6 +PACKETQUEUE(sending_queue, MAX_SENDING_QUEUE); #define SINK 0 #define RTMETRIC_MAX COLLECT_MAX_DEPTH @@ -104,64 +108,16 @@ PACKETQUEUE(forwarding_queue, MAX_FORWARDING_QUEUE); #define PRINTF(...) #endif -/*---------------------------------------------------------------------------*/ -static void -send_queued_packet(void) -{ - struct queuebuf *q; - struct neighbor *n; - struct packetqueue_item *i; - struct collect_conn *c; - i = packetqueue_first(&forwarding_queue); - if(i == NULL) { - PRINTF("%d.%d: nothing on queue\n", - rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); - /* No packet on the queue, so there is nothing for us to send. */ - return; - } - c = packetqueue_ptr(i); - if(c == NULL) { - /* c should not be NULL, but we check it just to be sure. */ - PRINTF("%d.%d: queue, c == NULL!\n", - rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); - return; - } - - if(c->forwarding) { - /* If we are currently forwarding a packet, we wait until the - packet is forwarded and try again then. */ - PRINTF("%d.%d: queue, c is forwarding\n", - rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); - return; - } +#define REXMIT_TIME CLOCK_SECOND * 2 + - q = packetqueue_queuebuf(i); - if(q != NULL) { - PRINTF("%d.%d: queue, q is on queue\n", - rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); - queuebuf_to_packetbuf(q); - - n = neighbor_best(); - - /* Don't send to the neighbor if it is the same neighbor that sent - us the packet. */ - if(n != NULL && !rimeaddr_cmp(&n->addr, packetbuf_addr(PACKETBUF_ADDR_SENDER))) { #if CONTIKI_TARGET_NETSIM - ether_set_line(n->addr.u8[0], n->addr.u8[1]); +#include "ether.h" #endif /* CONTIKI_TARGET_NETSIM */ - PRINTF("%d.%d: sending packet to %d.%d\n", - rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], - n->addr.u8[0], n->addr.u8[1]); - - c->forwarding = 1; - runicast_send(&c->runicast_conn, &n->addr, packetbuf_attr(PACKETBUF_ATTR_MAX_REXMIT)); - } else { - PRINTF("%d.%d: did not find any neighbor to forward to\n", - rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); - } - } -} + +static void send_queued_packet(void); +static void retransmit_callback(void *ptr); /*---------------------------------------------------------------------------*/ static void update_rtmetric(struct collect_conn *tc) @@ -191,9 +147,8 @@ update_rtmetric(struct collect_conn *tc) neighbor_discovery_set_val(&tc->neighbor_discovery_conn, tc->rtmetric); #endif /* COLLECT_ANNOUNCEMENTS */ } else { - /* We set our rtmetric to the rtmetric of our best neighbor plus - the expected transmissions to reach that neighbor. */ + the expected transmissions to reach that neighbor. */ if(n->rtmetric + neighbor_etx(n) != tc->rtmetric) { uint16_t old_rtmetric = tc->rtmetric; @@ -201,8 +156,6 @@ update_rtmetric(struct collect_conn *tc) #if ! COLLECT_ANNOUNCEMENTS - /* neighbor_discovery_set_val(&tc->neighbor_discovery_conn, tc->rtmetric);*/ - /* If we get a significantly better rtmetric than we had before, we call neighbor_discovery_start to start a new period. */ @@ -234,7 +187,7 @@ update_rtmetric(struct collect_conn *tc) if(tc->rtmetric == RTMETRIC_MAX) { strcpy(buf, " "); } else { - sprintf(buf, "%.1f", (float)tc->rtmetric / NEIGHBOR_ETX_SCALE); + sPRINTF(buf, "%.1f", (float)tc->rtmetric / NEIGHBOR_ETX_SCALE); } ether_set_text(buf); } @@ -242,134 +195,367 @@ update_rtmetric(struct collect_conn *tc) } /*---------------------------------------------------------------------------*/ static void -node_packet_received(struct runicast_conn *c, const rimeaddr_t *from, - uint8_t seqno) +send_queued_packet(void) +{ + struct queuebuf *q; + struct neighbor *n; + struct packetqueue_item *i; + struct collect_conn *c; + + PRINTF("%d.%d: send_queued_packet queue len %d\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], + packetqueue_len(&sending_queue)); + + i = packetqueue_first(&sending_queue); + if(i == NULL) { + PRINTF("%d.%d: nothing on queue\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); + /* No packet on the queue, so there is nothing for us to send. */ + return; + } + c = packetqueue_ptr(i); + if(c == NULL) { + /* c should not be NULL, but we check it just to be sure. */ + PRINTF("%d.%d: queue, c == NULL!\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); + return; + } + + if(c->sending) { + /* If we are currently sending a packet, we wait until the + packet is forwarded and try again then. */ + PRINTF("%d.%d: queue, c is sending\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); + return; + } + + q = packetqueue_queuebuf(i); + if(q != NULL) { + PRINTF("%d.%d: queue, q is on queue\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); + queuebuf_to_packetbuf(q); + + n = neighbor_best(); + + while(n != NULL && rimeaddr_cmp(&n->addr, packetbuf_addr(PACKETBUF_ADDR_SENDER))) { + PRINTF("%d.%d: avoiding fowarding loop to %d.%d\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], + n->addr.u8[0], n->addr.u8[1]); + neighbor_remove(&n->addr); + update_rtmetric(c); + n = neighbor_best();; + } + + /* Don't send to the neighbor if it is the same neighbor that sent + us the packet. */ + if(n != NULL) { + clock_time_t time; + uint8_t rexmit_time_scaling; +#if CONTIKI_TARGET_NETSIM + ether_set_line(n->addr.u8[0], n->addr.u8[1]); +#endif /* CONTIKI_TARGET_NETSIM */ + PRINTF("%d.%d: sending packet to %d.%d\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], + n->addr.u8[0], n->addr.u8[1]); + + rimeaddr_copy(&c->current_receiver, &n->addr); + c->sending = 1; + c->transmissions = 0; + c->max_rexmits = 8;//packetbuf_attr(PACKETBUF_ATTR_EMAX_REXMIT); + PRINTF("max_rexmits %d\n", c->max_rexmits); + packetbuf_set_attr(PACKETBUF_ATTR_RELIABLE, 1); + packetbuf_set_attr(PACKETBUF_ATTR_MAX_MAC_REXMIT, 2); + packetbuf_set_attr(PACKETBUF_ATTR_PACKET_ID, c->seqno); + unicast_send(&c->unicast_conn, &n->addr); + rexmit_time_scaling = c->transmissions; + if(rexmit_time_scaling > 3) { + rexmit_time_scaling = 3; + } + time = REXMIT_TIME << rexmit_time_scaling; + time = time / 2 + random_rand() % (time / 2); + PRINTF("retransmission time %lu\n", time); + ctimer_set(&c->retransmission_timer, time, + retransmit_callback, c); + } else { + } + } +} +/*---------------------------------------------------------------------------*/ +static void +send_next_packet(struct collect_conn *tc) +{ + /* Cancel retransmission timer. */ + ctimer_stop(&tc->retransmission_timer); + + /* Remove the first packet on the queue, the packet that was just sent. */ + packetqueue_dequeue(&sending_queue); + tc->seqno = (tc->seqno + 1) % (1 << COLLECT_PACKET_ID_BITS); + tc->sending = 0; + tc->transmissions = 0; + + /* Send the next packet in the queue, if any. */ + send_queued_packet(); +} +/*---------------------------------------------------------------------------*/ +static void +handle_ack(struct collect_conn *tc) +{ + struct ack_msg *msg; + uint16_t rtmetric; + struct neighbor *n; + + if(rimeaddr_cmp(packetbuf_addr(PACKETBUF_ADDR_SENDER), + &tc->current_receiver) && + packetbuf_attr(PACKETBUF_ATTR_PACKET_ID) == tc->seqno) { + + + msg = packetbuf_dataptr(); + memcpy(&rtmetric, &msg->rtmetric, sizeof(uint16_t)); + n = neighbor_find(packetbuf_addr(PACKETBUF_ADDR_SENDER)); + if(n != NULL) { + neighbor_update(n, rtmetric); + update_rtmetric(tc); + } + + PRINTF("%d.%d: ACK from %d.%d after %d transmissions, flags %02x\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], + tc->current_receiver.u8[0], tc->current_receiver.u8[1], + tc->transmissions, + msg->flags); + + if(!(msg->flags & ACK_FLAGS_DROPPED)) { + send_next_packet(tc); + } + } +} +/*---------------------------------------------------------------------------*/ +static void +send_ack(struct collect_conn *tc, const rimeaddr_t *to, int congestion, int dropped, int ttl) +{ + struct ack_msg *ack; + struct queuebuf *q; + uint16_t packet_seqno; + + + PRINTF("send_ack\n"); + + packet_seqno = packetbuf_attr(PACKETBUF_ATTR_PACKET_ID); + + q = queuebuf_new_from_packetbuf(); + if(q != NULL) { + + packetbuf_clear(); + packetbuf_set_datalen(sizeof(struct ack_msg)); + ack = packetbuf_dataptr(); + memset(ack, 0, sizeof(struct ack_msg)); + ack->rtmetric = tc->rtmetric; + ack->flags = (congestion? ACK_FLAGS_CONGESTED: 0) | + (dropped? ACK_FLAGS_DROPPED: 0) | + (ttl? ACK_FLAGS_LIFETIME_EXCEEDED: 0); + /* XXX: send explicit congestion notification in ACK queue full; add rtmetric to ACK. */ + packetbuf_set_addr(PACKETBUF_ADDR_RECEIVER, to); + packetbuf_set_attr(PACKETBUF_ATTR_PACKET_TYPE, PACKETBUF_ATTR_PACKET_TYPE_ACK); + packetbuf_set_attr(PACKETBUF_ATTR_RELIABLE, 0); + packetbuf_set_attr(PACKETBUF_ATTR_ERELIABLE, 0); + packetbuf_set_attr(PACKETBUF_ATTR_PACKET_ID, packet_seqno); + packetbuf_set_attr(PACKETBUF_ATTR_MAX_REXMIT, 2); + unicast_send(&tc->unicast_conn, to); + + PRINTF("%d.%d: collect: Sending ACK to %d.%d for %d\n", + rimeaddr_node_addr.u8[0],rimeaddr_node_addr.u8[1], + to->u8[0], + to->u8[1], + packet_seqno); + + RIMESTATS_ADD(acktx); + + queuebuf_to_packetbuf(q); + queuebuf_free(q); + } else { + PRINTF("%d.%d: collect: could not send ACK to %d.%d for %d: no queued buffers\n", + rimeaddr_node_addr.u8[0],rimeaddr_node_addr.u8[1], + to->u8[0], to->u8[1], + packet_seqno); + } +} +/*---------------------------------------------------------------------------*/ +static void +node_packet_received(struct unicast_conn *c, const rimeaddr_t *from) { struct collect_conn *tc = (struct collect_conn *) - ((char *)c - offsetof(struct collect_conn, runicast_conn)); + ((char *)c - offsetof(struct collect_conn, unicast_conn)); int i; struct neighbor *n; - /* To protect against forwarding duplicate packets, we keep a list + /* To protect against sending duplicate packets, we keep a list of recently forwarded packet seqnos. If the seqno of the current packet exists in the list, we drop the packet and increase the ETX of the neighbor we sent it to in the first place. */ + if(packetbuf_attr(PACKETBUF_ATTR_PACKET_TYPE) == + PACKETBUF_ATTR_PACKET_TYPE_DATA) { + rimeaddr_t ack_to; + uint8_t packet_seqno; - for(i = 0; i < NUM_RECENT_PACKETS; i++) { - if(recent_packets[i].seqno == packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID) && - rimeaddr_cmp(&recent_packets[i].originator, - packetbuf_addr(PACKETBUF_ADDR_ESENDER))) { - PRINTF("%d.%d: dropping duplicate packet from %d.%d with seqno %d\n", - rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], - recent_packets[i].originator.u8[0], recent_packets[i].originator.u8[1], - packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID)); - n = neighbor_find(&recent_packets[i].sent_to); - if(n != NULL) { - neighbor_update_etx(n, neighbor_etx(n) + NEIGHBOR_ETX_SCALE * 4); - } - /* Drop the packet. */ + rimeaddr_copy(&ack_to, packetbuf_addr(PACKETBUF_ADDR_SENDER)); + packet_seqno = packetbuf_attr(PACKETBUF_ATTR_PACKET_ID); + + if(rimeaddr_cmp(&tc->last_received_addr, packetbuf_addr(PACKETBUF_ADDR_SENDER)) && + tc->last_received_seqno == packetbuf_attr(PACKETBUF_ATTR_PACKET_ID)) { + /* This is a duplicate of the packet we last received, so we just send an ACK. */ + send_ack(tc, &ack_to, 0, 0, 0); return; } - } - recent_packets[recent_packet_ptr].seqno = packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID); - rimeaddr_copy(&recent_packets[recent_packet_ptr].originator, - packetbuf_addr(PACKETBUF_ADDR_ESENDER)); - n = neighbor_best(); - rimeaddr_copy(&recent_packets[recent_packet_ptr].sent_to, - &n->addr); + rimeaddr_copy(&tc->last_received_addr, packetbuf_addr(PACKETBUF_ADDR_SENDER)); + tc->last_received_seqno = packetbuf_attr(PACKETBUF_ATTR_PACKET_ID); - recent_packet_ptr = (recent_packet_ptr + 1) % NUM_RECENT_PACKETS; - - if(tc->rtmetric == SINK) { - - /* If we are the sink, we call the receive function. */ - - PRINTF("%d.%d: sink received packet from %d.%d via %d.%d\n", - rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], - packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[0], - packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[1], - from->u8[0], from->u8[1]); - - if(tc->cb->recv != NULL) { - tc->cb->recv(packetbuf_addr(PACKETBUF_ADDR_ESENDER), - packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID), - packetbuf_attr(PACKETBUF_ATTR_HOPS)); + for(i = 0; i < NUM_RECENT_PACKETS; i++) { + if(recent_packets[i].seqno == packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID) && + rimeaddr_cmp(&recent_packets[i].originator, + packetbuf_addr(PACKETBUF_ADDR_ESENDER))) { + PRINTF("%d.%d: found duplicate packet from %d.%d with seqno %d, via %d.%d\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], + recent_packets[i].originator.u8[0], recent_packets[i].originator.u8[1], + packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID), + packetbuf_addr(PACKETBUF_ADDR_SENDER)->u8[0], + packetbuf_addr(PACKETBUF_ADDR_SENDER)->u8[1]); + n = neighbor_find(&recent_packets[i].sent_to); + if(n != NULL) { + neighbor_update_etx(n, neighbor_etx(n) / NEIGHBOR_ETX_SCALE + 4); + update_rtmetric(tc); + } + break; + } } - return; - } else if(packetbuf_attr(PACKETBUF_ATTR_TTL) > 1 && - tc->rtmetric != RTMETRIC_MAX) { + recent_packets[recent_packet_ptr].seqno = packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID); + rimeaddr_copy(&recent_packets[recent_packet_ptr].originator, + packetbuf_addr(PACKETBUF_ADDR_ESENDER)); + /* n = neighbor_best();*/ - /* If we are not the sink, we forward the packet to the best - neighbor. */ - packetbuf_set_attr(PACKETBUF_ATTR_HOPS, packetbuf_attr(PACKETBUF_ATTR_HOPS) + 1); - packetbuf_set_attr(PACKETBUF_ATTR_TTL, packetbuf_attr(PACKETBUF_ATTR_TTL) - 1); - - - PRINTF("%d.%d: packet received from %d.%d via %d.%d, forwarding %d\n", - rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], - packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[0], - packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[1], - from->u8[0], from->u8[1], tc->forwarding); - - if(packetqueue_enqueue_packetbuf(&forwarding_queue, FORWARD_PACKET_LIFETIME, - tc)) { - send_queued_packet(); + if(tc->rtmetric != SINK) { + n = neighbor_best(); + rimeaddr_copy(&recent_packets[recent_packet_ptr].sent_to, + &n->addr); } else { - PRINTF("%d.%d: packet dropped: no queue buffer available\n", - rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); + rimeaddr_copy(&recent_packets[recent_packet_ptr].sent_to, + &rimeaddr_null); } - } else if(packetbuf_attr(PACKETBUF_ATTR_TTL) <= 1) { + + recent_packet_ptr = (recent_packet_ptr + 1) % NUM_RECENT_PACKETS; + + if(tc->rtmetric == SINK) { + + /* If we are the sink, we call the receive function. */ + + send_ack(tc, &ack_to, 0, 0, 0); + + PRINTF("%d.%d: sink received packet %d from %d.%d via %d.%d\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], + packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID), + packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[0], + packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[1], + from->u8[0], from->u8[1]); + + if(tc->cb->recv != NULL) { + tc->cb->recv(packetbuf_addr(PACKETBUF_ADDR_ESENDER), + packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID), + packetbuf_attr(PACKETBUF_ATTR_HOPS)); + } + return; + } else if(packetbuf_attr(PACKETBUF_ATTR_TTL) > 1 && + tc->rtmetric != RTMETRIC_MAX) { + + /* If we are not the sink, we forward the packet to the best + neighbor. */ + packetbuf_set_attr(PACKETBUF_ATTR_HOPS, packetbuf_attr(PACKETBUF_ATTR_HOPS) + 1); + packetbuf_set_attr(PACKETBUF_ATTR_TTL, packetbuf_attr(PACKETBUF_ATTR_TTL) - 1); + + + PRINTF("%d.%d: packet received from %d.%d via %d.%d, sending %d\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], + packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[0], + packetbuf_addr(PACKETBUF_ADDR_ESENDER)->u8[1], + from->u8[0], from->u8[1], tc->sending); + + if(packetqueue_enqueue_packetbuf(&sending_queue, FORWARD_PACKET_LIFETIME, + tc)) { + send_ack(tc, &ack_to, 0, 0, 0); + send_queued_packet(); + } else { + send_ack(tc, &ack_to, 0, 1, 0); + PRINTF("%d.%d: packet dropped: no queue buffer available\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); + } + } else if(packetbuf_attr(PACKETBUF_ATTR_TTL) <= 1) { PRINTF("%d.%d: packet dropped: ttl %d\n", rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], packetbuf_attr(PACKETBUF_ATTR_TTL)); + send_ack(tc, &ack_to, 0, 1, 1); + } + } else if(packetbuf_attr(PACKETBUF_ATTR_PACKET_TYPE) == + PACKETBUF_ATTR_PACKET_TYPE_ACK) { + PRINTF("Collect: incoming ack %d from %d.%d (%d.%d) seqno %d (%d)\n", + packetbuf_attr(PACKETBUF_ATTR_PACKET_TYPE), + packetbuf_addr(PACKETBUF_ADDR_SENDER)->u8[0], + packetbuf_addr(PACKETBUF_ADDR_SENDER)->u8[1], + tc->current_receiver.u8[0], + tc->current_receiver.u8[1], + packetbuf_attr(PACKETBUF_ATTR_PACKET_ID), + tc->seqno); + handle_ack(tc); } - return; } /*---------------------------------------------------------------------------*/ static void -node_packet_sent(struct runicast_conn *c, const rimeaddr_t *to, - uint8_t transmissions) +node_packet_sent(struct unicast_conn *c, int status, int transmissions) { struct collect_conn *tc = (struct collect_conn *) - ((char *)c - offsetof(struct collect_conn, runicast_conn)); + ((char *)c - offsetof(struct collect_conn, unicast_conn)); - PRINTF("%d.%d: sent to %d.%d after %d retransmissions\n", - rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], - to->u8[0], to->u8[1], - transmissions); + /* For data packets, we record the number of transmissions */ + if(packetbuf_attr(PACKETBUF_ATTR_PACKET_TYPE) == + PACKETBUF_ATTR_PACKET_TYPE_DATA) { + PRINTF("%d.%d: sent to %d.%d after %d transmissions\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], + tc->current_receiver.u8[0], tc->current_receiver.u8[1], + transmissions); + + /* neighbor_update_etx(neighbor_find(to), transmissions); + update_rtmetric(tc);*/ + tc->transmissions += transmissions; - - tc->forwarding = 0; - neighbor_update_etx(neighbor_find(to), transmissions); - update_rtmetric(tc); - - /* Remove the first packet on the queue, the packet that was just sent. */ - packetqueue_dequeue(&forwarding_queue); - - /* Send the next packet in the queue, if any. */ - send_queued_packet(); + /* Update ETX with the number of transmissions. */ + PRINTF("Updating ETX with %d transmissions\n", tc->transmissions); + neighbor_update_etx(neighbor_find(&tc->current_receiver), tc->transmissions); + update_rtmetric(tc); + } } /*---------------------------------------------------------------------------*/ static void -node_packet_timedout(struct runicast_conn *c, const rimeaddr_t *to, - uint8_t transmissions) +timedout(struct collect_conn *tc) { - struct collect_conn *tc = (struct collect_conn *) - ((char *)c - offsetof(struct collect_conn, runicast_conn)); - PRINTF("%d.%d: timedout after %d retransmissions: packet dropped\n", - rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], transmissions); + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], tc->transmissions); - tc->forwarding = 0; - neighbor_timedout_etx(neighbor_find(to), transmissions); + tc->sending = 0; + neighbor_timedout_etx(neighbor_find(&tc->current_receiver), tc->transmissions); update_rtmetric(tc); - /* Remove the first packet on the queue, the packet that just timed out. */ - packetqueue_dequeue(&forwarding_queue); + send_next_packet(tc); +} +/*---------------------------------------------------------------------------*/ +static void +retransmit_callback(void *ptr) +{ + struct collect_conn *c = ptr; - /* Send the next packet in the queue, if any. */ - send_queued_packet(); + PRINTF("retransmit\n"); + if(c->transmissions >= c->max_rexmits) { + timedout(c); + } else { + c->sending = 0; + send_queued_packet(); + } } /*---------------------------------------------------------------------------*/ #if !COLLECT_ANNOUNCEMENTS @@ -421,9 +607,8 @@ received_announcement(struct announcement *a, const rimeaddr_t *from, } #endif /* !COLLECT_ANNOUNCEMENTS */ /*---------------------------------------------------------------------------*/ -static const struct runicast_callbacks runicast_callbacks = {node_packet_received, - node_packet_sent, - node_packet_timedout}; +static const struct unicast_callbacks unicast_callbacks = {node_packet_received, + node_packet_sent}; #if !COLLECT_ANNOUNCEMENTS static const struct neighbor_discovery_callbacks neighbor_discovery_callbacks = { adv_received, NULL}; @@ -439,19 +624,19 @@ collect_open(struct collect_conn *tc, uint16_t channels, CLOCK_SECOND * 32, CLOCK_SECOND * 600, &neighbor_discovery_callbacks); -#endif /* !COLLECT_ANNOUNCEMENTS */ - runicast_open(&tc->runicast_conn, channels + 1, &runicast_callbacks); - channel_set_attributes(channels + 1, attributes); - tc->rtmetric = RTMETRIC_MAX; - tc->cb = cb; -#if COLLECT_ANNOUNCEMENTS + neighbor_discovery_start(&tc->neighbor_discovery_conn, tc->rtmetric); +#else /* !COLLECT_ANNOUNCEMENTS */ announcement_register(&tc->announcement, channels, tc->rtmetric, received_announcement); announcement_listen(2); -#else - neighbor_discovery_start(&tc->neighbor_discovery_conn, tc->rtmetric); -#endif /* COLLECT_ANNOUNCEMENTS */ +#endif /* !COLLECT_ANNOUNCEMENTS */ + + unicast_open(&tc->unicast_conn, channels + 1, &unicast_callbacks); + channel_set_attributes(channels + 1, attributes); + tc->rtmetric = RTMETRIC_MAX; + tc->cb = cb; neighbor_init(); + packetqueue_init(&sending_queue); } /*---------------------------------------------------------------------------*/ void @@ -462,7 +647,7 @@ collect_close(struct collect_conn *tc) #else neighbor_discovery_close(&tc->neighbor_discovery_conn); #endif /* COLLECT_ANNOUNCEMENTS */ - runicast_close(&tc->runicast_conn); + unicast_close(&tc->unicast_conn); } /*---------------------------------------------------------------------------*/ void @@ -487,11 +672,18 @@ collect_send(struct collect_conn *tc, int rexmits) { struct neighbor *n; - packetbuf_set_attr(PACKETBUF_ATTR_EPACKET_ID, tc->seqno++); + packetbuf_set_attr(PACKETBUF_ATTR_EPACKET_ID, tc->eseqno++); packetbuf_set_addr(PACKETBUF_ADDR_ESENDER, &rimeaddr_node_addr); packetbuf_set_attr(PACKETBUF_ATTR_HOPS, 1); packetbuf_set_attr(PACKETBUF_ATTR_TTL, MAX_HOPLIM); packetbuf_set_attr(PACKETBUF_ATTR_MAX_REXMIT, rexmits); + + PRINTF("%d.%d: originating packet %d\n", + rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], + packetbuf_attr(PACKETBUF_ATTR_EPACKET_ID)); + + + PRINTF("rexmit %d\n", rexmits); if(tc->rtmetric == SINK) { packetbuf_set_attr(PACKETBUF_ATTR_HOPS, 0); @@ -502,6 +694,7 @@ collect_send(struct collect_conn *tc, int rexmits) } return 1; } else { + update_rtmetric(tc); n = neighbor_best(); if(n != NULL) { #if CONTIKI_TARGET_NETSIM @@ -510,7 +703,7 @@ collect_send(struct collect_conn *tc, int rexmits) PRINTF("%d.%d: sending to %d.%d\n", rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1], n->addr.u8[0], n->addr.u8[1]); - if(packetqueue_enqueue_packetbuf(&forwarding_queue, FORWARD_PACKET_LIFETIME, + if(packetqueue_enqueue_packetbuf(&sending_queue, FORWARD_PACKET_LIFETIME, tc)) { send_queued_packet(); return 1; @@ -519,13 +712,12 @@ collect_send(struct collect_conn *tc, int rexmits) rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); } } else { - /* printf("Didn't find any neighbor\n");*/ PRINTF("%d.%d: did not find any neighbor to send to\n", rimeaddr_node_addr.u8[0], rimeaddr_node_addr.u8[1]); #if COLLECT_ANNOUNCEMENTS announcement_listen(1); #endif /* COLLECT_ANNOUNCEMENTS */ - if(packetqueue_enqueue_packetbuf(&forwarding_queue, FORWARD_PACKET_LIFETIME, + if(packetqueue_enqueue_packetbuf(&sending_queue, FORWARD_PACKET_LIFETIME, tc)) { return 1; } else { diff --git a/core/net/rime/collect.h b/core/net/rime/collect.h index 47824106d..a6af128be 100644 --- a/core/net/rime/collect.h +++ b/core/net/rime/collect.h @@ -47,7 +47,7 @@ * * This file is part of the Contiki operating system. * - * $Id: collect.h,v 1.12 2010/02/23 18:35:23 adamdunkels Exp $ + * $Id: collect.h,v 1.13 2010/02/28 09:18:01 adamdunkels Exp $ */ /** @@ -64,12 +64,16 @@ #include "net/rime/runicast.h" #include "net/rime/neighbor-discovery.h" -#define COLLECT_ATTRIBUTES { PACKETBUF_ADDR_ESENDER, PACKETBUF_ADDRSIZE }, \ - { PACKETBUF_ATTR_EPACKET_ID, PACKETBUF_ATTR_BIT * 4 }, \ - { PACKETBUF_ATTR_TTL, PACKETBUF_ATTR_BIT * 4 }, \ - { PACKETBUF_ATTR_HOPS, PACKETBUF_ATTR_BIT * 4 }, \ - { PACKETBUF_ATTR_MAX_REXMIT, PACKETBUF_ATTR_BIT * 3 }, \ - RUNICAST_ATTRIBUTES +#define COLLECT_PACKET_ID_BITS 4 + +#define COLLECT_ATTRIBUTES { PACKETBUF_ADDR_ESENDER, PACKETBUF_ADDRSIZE }, \ + { PACKETBUF_ATTR_EPACKET_ID, PACKETBUF_ATTR_BIT * 4 }, \ + { PACKETBUF_ATTR_PACKET_ID, PACKETBUF_ATTR_BIT * COLLECT_PACKET_ID_BITS }, \ + { PACKETBUF_ATTR_TTL, PACKETBUF_ATTR_BIT * 4 }, \ + { PACKETBUF_ATTR_HOPS, PACKETBUF_ATTR_BIT * 4 }, \ + { PACKETBUF_ATTR_MAX_REXMIT, PACKETBUF_ATTR_BIT * 3 }, \ + { PACKETBUF_ATTR_PACKET_TYPE, PACKETBUF_ATTR_BIT }, \ + UNICAST_ATTRIBUTES struct collect_callbacks { void (* recv)(const rimeaddr_t *originator, uint8_t seqno, @@ -77,7 +81,7 @@ struct collect_callbacks { }; struct collect_conn { - struct runicast_conn runicast_conn; + struct unicast_conn unicast_conn; #if ! COLLECT_CONF_ANNOUNCEMENTS struct neighbor_discovery_conn neighbor_discovery_conn; #else /* ! COLLECT_CONF_ANNOUNCEMENTS */ @@ -86,8 +90,12 @@ struct collect_conn { const struct collect_callbacks *cb; struct ctimer t; uint16_t rtmetric; - uint8_t forwarding; - uint8_t seqno; + uint8_t sending, transmissions, max_rexmits; + uint8_t seqno, last_received_seqno; + rimeaddr_t last_received_addr; + uint8_t eseqno; + struct ctimer retransmission_timer; + rimeaddr_t current_receiver; }; void collect_open(struct collect_conn *c, uint16_t channels,