mesh flooding seems to work pretty well!

This commit is contained in:
geeksville
2020-04-17 12:41:01 -07:00
parent ea24394110
commit 65406eaa08
12 changed files with 155 additions and 83 deletions

View File

@@ -47,7 +47,9 @@ ErrorCode CustomRF95::send(MeshPacket *p)
// we almost certainly guarantee no one outside will like the packet we are sending.
if (_mode == RHModeIdle || (_mode == RHModeRx && !isReceiving())) {
// if the radio is idle, we can send right away
DEBUG_MSG("immedate send on mesh (txGood=%d,rxGood=%d,rxBad=%d)\n", txGood(), rxGood(), rxBad());
DEBUG_MSG("immediate send on mesh fr=0x%x,to=0x%x,id=%d\n (txGood=%d,rxGood=%d,rxBad=%d)\n", p->from, p->to, p->id,
txGood(), rxGood(), rxBad());
startSend(p);
return ERRNO_OK;
} else {
@@ -159,6 +161,8 @@ void CustomRF95::startSend(MeshPacket *txp)
// DEBUG_MSG("sending queued packet on mesh (txGood=%d,rxGood=%d,rxBad=%d)\n", rf95.txGood(), rf95.rxGood(), rf95.rxBad());
assert(txp->has_payload);
lastTxStart = millis();
size_t numbytes = pb_encode_to_bytes(radiobuf, sizeof(radiobuf), SubPacket_fields, &txp->payload);
sendingPacket = txp;
@@ -178,4 +182,25 @@ void CustomRF95::startSend(MeshPacket *txp)
assert(res);
}
#define TX_WATCHDOG_TIMEOUT 30 * 1000
#include "error.h"
void CustomRF95::loop()
{
// It should never take us more than 30 secs to send a packet, if it does, we have a bug, FIXME, move most of this
// into CustomRF95
uint32_t now = millis();
if (lastTxStart != 0 && (now - lastTxStart) > TX_WATCHDOG_TIMEOUT && mode() == RHGenericDriver::RHModeTx) {
DEBUG_MSG("ERROR! Bug! Tx packet took too long to send, forcing radio into rx mode\n");
setModeRx();
if (sendingPacket) { // There was probably a packet we were trying to send, free it
packetPool.release(sendingPacket);
sendingPacket = NULL;
}
recordCriticalError(ErrTxWatchdog);
lastTxStart = 0; // Stop checking for now, because we just warned the developer
}
}
#endif

View File

@@ -15,6 +15,8 @@ class CustomRF95 : public RH_RF95, public RadioInterface
PointerQueue<MeshPacket> txQueue;
uint32_t lastTxStart = 0L;
public:
/** pool is the pool we will alloc our rx packets from
* rxDest is where we will send any rx packets, it becomes receivers responsibility to return packet to the pool
@@ -38,6 +40,8 @@ class CustomRF95 : public RH_RF95, public RadioInterface
bool init();
void loop(); // Idle processing
protected:
// After doing standard behavior, check to see if a new packet arrived or one was sent and start a new send or receive as
// necessary

View File

@@ -36,13 +36,17 @@ void FloodingRouter::handleReceived(MeshPacket *p)
DEBUG_MSG("Ignoring incoming floodmsg, because we've already seen it\n");
packetPool.release(p);
} else {
if (p->to == NODENUM_BROADCAST && p->id != 0) {
DEBUG_MSG("Rebroadcasting received floodmsg to neighbors\n");
// FIXME, wait a random delay
if (p->to == NODENUM_BROADCAST) {
if (p->id != 0) {
DEBUG_MSG("Rebroadcasting received floodmsg to neighbors\n");
// FIXME, wait a random delay
MeshPacket *tosend = packetPool.allocCopy(*p);
// Note: we are careful to resend using the original senders node id
Router::send(tosend); // We are careful not to call our hooked version of send()
MeshPacket *tosend = packetPool.allocCopy(*p);
// Note: we are careful to resend using the original senders node id
Router::send(tosend); // We are careful not to call our hooked version of send()
} else {
DEBUG_MSG("Ignoring a simple (0 hop) broadcast\n");
}
}
// handle the packet as normal
@@ -58,18 +62,22 @@ bool FloodingRouter::wasSeenRecently(const MeshPacket *p)
if (p->to != NODENUM_BROADCAST)
return false; // Not a broadcast, so we don't care
if (p->id == 0)
if (p->id == 0) {
DEBUG_MSG("Ignoring message with zero id\n");
return false; // Not a floodable message ID, so we don't care
}
uint32_t now = millis();
for (int i = 0; i < recentBroadcasts.size();) {
BroadcastRecord &r = recentBroadcasts[i];
if ((now - r.rxTimeMsec) >= FLOOD_EXPIRE_TIME) {
DEBUG_MSG("Deleting old recentBroadcast %d\n", i);
DEBUG_MSG("Deleting old broadcast record %d\n", i);
recentBroadcasts.erase(recentBroadcasts.begin() + i); // delete old record
} else {
if (r.id == p->id && r.sender == p->from) {
DEBUG_MSG("Found existing broadcast record for fr=0x%x,to=0x%x,id=%d\n", p->from, p->to, p->id);
// Update the time on this record to now
r.rxTimeMsec = now;
return true;
@@ -85,6 +93,7 @@ bool FloodingRouter::wasSeenRecently(const MeshPacket *p)
r.sender = p->from;
r.rxTimeMsec = now;
recentBroadcasts.push_back(r);
DEBUG_MSG("Adding broadcast record for fr=0x%x,to=0x%x,id=%d\n", p->from, p->to, p->id);
return false;
}

72
src/rf95/FloodingRouter.h Normal file
View File

@@ -0,0 +1,72 @@
#pragma once
#include "Router.h"
#include <vector>
/**
* A record of a recent message broadcast
*/
struct BroadcastRecord {
NodeNum sender;
PacketId id;
uint32_t rxTimeMsec; // Unix time in msecs - the time we received it
};
/**
* This is a mixin that extends Router with the ability to do Naive Flooding (in the standard mesh protocol sense)
*
* Rules for broadcasting (listing here for now, will move elsewhere eventually):
If to==BROADCAST and id==0, this is a simple broadcast (0 hops). It will be
sent only by the current node and other nodes will not attempt to rebroadcast
it.
If to==BROADCAST and id!=0, this is a "naive flooding" broadcast. The initial
node will send it on all local interfaces.
When other nodes receive this message, they will
first check if their recentBroadcasts table contains the (from, id) pair that
indicates this message. If so, we've already seen it - so we discard it. If
not, we add it to the table and then resend this message on all interfaces.
When resending we are careful to use the "from" ID of the original sender. Not
our own ID. When resending we pick a random delay between 0 and 10 seconds to
decrease the chance of collisions with transmitters we can not even hear.
Any entries in recentBroadcasts that are older than X seconds (longer than the
max time a flood can take) will be discarded.
*/
class FloodingRouter : public Router
{
private:
std::vector<BroadcastRecord> recentBroadcasts;
public:
/**
* Constructor
*
*/
FloodingRouter();
/**
* Send a packet on a suitable interface. This routine will
* later free() the packet to pool. This routine is not allowed to stall.
* If the txmit queue is full it might return an error
*/
virtual ErrorCode send(MeshPacket *p);
protected:
/**
* Called from loop()
* Handle any packet that is received by an interface on this node.
* Note: some packets may merely being passed through this node and will be forwarded elsewhere.
*
* Note: this method will free the provided packet
*/
virtual void handleReceived(MeshPacket *p);
private:
/**
* Update recentBroadcasts and return true if we have already seen this packet
*/
bool wasSeenRecently(const MeshPacket *p);
};

View File

@@ -37,6 +37,8 @@ class RadioInterface
*/
void setReceiver(PointerQueue<MeshPacket> *_rxDest) { rxDest = _rxDest; }
virtual void loop() {} // Idle processing
/**
* Send a packet (possibly by enquing in a private fifo). This routine will
* later free() the packet to pool. This routine is not allowed to stall.

View File

@@ -35,6 +35,9 @@ Router::Router() : fromRadioQueue(MAX_RX_FROMRADIO) {}
*/
void Router::loop()
{
if (iface)
iface->loop();
MeshPacket *mp;
while ((mp = fromRadioQueue.dequeuePtr(0)) != NULL) {
handleReceived(mp);
@@ -49,6 +52,7 @@ void Router::loop()
ErrorCode Router::send(MeshPacket *p)
{
assert(iface);
DEBUG_MSG("Sending packet via interface fr=0x%x,to=0x%x,id=%d\n", p->from, p->to, p->id);
return iface->send(p);
}
@@ -64,7 +68,7 @@ void Router::handleReceived(MeshPacket *p)
// Also, we should set the time from the ISR and it should have msec level resolution
p->rx_time = gps.getValidTime(); // store the arrival timestamp for the phone
DEBUG_MSG("Notifying observers of received packet\n");
DEBUG_MSG("Notifying observers of received packet fr=0x%x,to=0x%x,id=%d\n", p->from, p->to, p->id);
notifyPacketReceived.notifyObservers(p);
packetPool.release(p);
}