WIP: S&F Progress

This commit is contained in:
Thomas Göttgens
2022-12-21 12:57:42 +01:00
parent 34c73da886
commit ae2ca1d89c
5 changed files with 255 additions and 198 deletions

View File

@@ -22,41 +22,31 @@ int32_t StoreForwardModule::runOnce()
if (this->busy) {
// Only send packets if the channel is less than 25% utilized.
if (airTime->channelUtilizationPercent() < polite_channel_util_percent) {
// DEBUG_MSG("--- --- --- In busy loop 1 %d\n", this->packetHistoryTXQueue_index);
storeForwardModule->sendPayload(this->busyTo, this->packetHistoryTXQueue_index);
if (this->packetHistoryTXQueue_index == packetHistoryTXQueue_size) {
strcpy(this->routerMessage, "** S&F - Done");
storeForwardModule->sendMessage(this->busyTo, this->routerMessage);
// DEBUG_MSG("--- --- --- In busy loop - Done \n");
// Tell the client we're done sending
StoreAndForward sf = StoreAndForward_init_zero;
sf.rr = StoreAndForward_RequestResponse_ROUTER_PING;
storeForwardModule->sendMessage(this->busyTo, sf);
DEBUG_MSG("*** S&F - Done. (ROUTER_PING)\n");
this->packetHistoryTXQueue_index = 0;
this->busy = false;
} else {
this->packetHistoryTXQueue_index++;
}
} else {
DEBUG_MSG("Channel utilization is too high. Retrying later.\n");
DEBUG_MSG("*** Channel utilization is too high. Retrying later.\n");
}
DEBUG_MSG("SF bitrate = %f bytes / sec\n", myNodeInfo.bitrate);
DEBUG_MSG("*** SF bitrate = %f bytes / sec\n", myNodeInfo.bitrate);
} else if (millis() - lastHeartbeat > 300000) {
} else if ((millis() - lastHeartbeat > (heartbeatInterval * 1000)) && (airTime->channelUtilizationPercent() < polite_channel_util_percent)) {
lastHeartbeat = millis();
DEBUG_MSG("Sending heartbeat\n");
DEBUG_MSG("*** Sending heartbeat\n");
StoreAndForward sf;
sf.rr = StoreAndForward_RequestResponse_ROUTER_HEARTBEAT;
sf.has_heartbeat = true;
sf.heartbeat.period = 300;
sf.heartbeat.secondary = 0; // TODO we always have one primary router for now
MeshPacket *p = allocDataProtobuf(sf);
p->to = NODENUM_BROADCAST;
p->decoded.want_response = false;
p->priority = MeshPacket_Priority_MIN;
service.sendToMesh(p);
sf.variant.heartbeat.period = 300;
sf.variant.heartbeat.secondary = 0; // TODO we always have one primary router for now
storeForwardModule->sendMessage(NODENUM_BROADCAST, sf);
}
return (this->packetTimeMax);
}
@@ -74,7 +64,7 @@ void StoreForwardModule::populatePSRAM()
https://learn.upesy.com/en/programmation/psram.html#psram-tab
*/
DEBUG_MSG("Before PSRAM initilization: heap %d/%d PSRAM %d/%d\n", ESP.getFreeHeap(), ESP.getHeapSize(), ESP.getFreePsram(), ESP.getPsramSize());
DEBUG_MSG("*** Before PSRAM initilization: heap %d/%d PSRAM %d/%d\n", ESP.getFreeHeap(), ESP.getHeapSize(), ESP.getFreePsram(), ESP.getPsramSize());
this->packetHistoryTXQueue =
static_cast<PacketHistoryStruct *>(ps_calloc(this->historyReturnMax, sizeof(PacketHistoryStruct)));
@@ -83,46 +73,35 @@ void StoreForwardModule::populatePSRAM()
Note: This needs to be done after every thing that would use PSRAM
*/
uint32_t numberOfPackets = (this->records ? this->records : (((ESP.getFreePsram() / 3) * 2) / sizeof(PacketHistoryStruct)));
this->records = numberOfPackets;
this->packetHistory = static_cast<PacketHistoryStruct *>(ps_calloc(numberOfPackets, sizeof(PacketHistoryStruct)));
DEBUG_MSG("After PSRAM initilization: heap %d/%d PSRAM %d/%d\n", ESP.getFreeHeap(), ESP.getHeapSize(), ESP.getFreePsram(), ESP.getPsramSize());
DEBUG_MSG("numberOfPackets for packetHistory - %u\n", numberOfPackets);
DEBUG_MSG("*** After PSRAM initilization: heap %d/%d PSRAM %d/%d\n", ESP.getFreeHeap(), ESP.getHeapSize(), ESP.getFreePsram(), ESP.getPsramSize());
DEBUG_MSG("*** numberOfPackets for packetHistory - %u\n", numberOfPackets);
}
void StoreForwardModule::historyReport()
{
DEBUG_MSG("Message history contains %u records\n", this->packetHistoryCurrent);
}
/*
*
*/
void StoreForwardModule::historySend(uint32_t msAgo, uint32_t to)
{
// uint32_t packetsSent = 0;
uint32_t queueSize = storeForwardModule->historyQueueCreate(msAgo, to);
if (queueSize) {
snprintf(this->routerMessage, 80, "** S&F - Sending %u message(s)", queueSize);
storeForwardModule->sendMessage(to, this->routerMessage);
DEBUG_MSG ("*** S&F - Sending %u message(s)\n", queueSize);
this->busy = true; // runOnce() will pickup the next steps once busy = true.
this->busyTo = to;
} else {
strcpy(this->routerMessage, "** S&F - No history to send");
storeForwardModule->sendMessage(to, this->routerMessage);
DEBUG_MSG ("*** S&F - No history to send\n");
}
StoreAndForward sf = StoreAndForward_init_zero;
sf.rr = StoreAndForward_RequestResponse_ROUTER_HISTORY;
sf.which_variant = StoreAndForward_history_tag;
sf.variant.history.history_messages = queueSize;
storeForwardModule->sendMessage(to, sf);
}
uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to)
{
// uint32_t packetHistoryTXQueueIndex = 0;
this->packetHistoryTXQueue_size = 0;
for (int i = 0; i < this->packetHistoryCurrent; i++) {
@@ -133,7 +112,7 @@ uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to)
DEBUG_MSG("SF historyQueueCreate - math %d\n", (millis() - msAgo));
*/
if (this->packetHistory[i].time && (this->packetHistory[i].time < (millis() - msAgo))) {
DEBUG_MSG("SF historyQueueCreate - Time matches - ok\n");
DEBUG_MSG("*** SF historyQueueCreate - Time matches - ok\n");
/*
Copy the messages that were received by the router in the last msAgo
to the packetHistoryTXQueue structure.
@@ -144,7 +123,6 @@ uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to)
if ((this->packetHistory[i].to & NODENUM_BROADCAST) == NODENUM_BROADCAST ||
((this->packetHistory[i].to & NODENUM_BROADCAST) == to)) {
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].time = this->packetHistory[i].time;
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].time = this->packetHistory[i].time;
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].to = this->packetHistory[i].to;
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].from = this->packetHistory[i].from;
this->packetHistoryTXQueue[this->packetHistoryTXQueue_size].channel = this->packetHistory[i].channel;
@@ -153,9 +131,8 @@ uint32_t StoreForwardModule::historyQueueCreate(uint32_t msAgo, uint32_t to)
Constants_DATA_PAYLOAD_LEN);
this->packetHistoryTXQueue_size++;
DEBUG_MSG("PacketHistoryStruct time=%d\n", this->packetHistory[i].time);
DEBUG_MSG("PacketHistoryStruct msg=%.*s\n", this->packetHistory[i].payload);
// DEBUG_MSG("PacketHistoryStruct msg=%.*s\n", this->packetHistoryTXQueue[packetHistoryTXQueueIndex].payload);
DEBUG_MSG("*** PacketHistoryStruct time=%d\n", this->packetHistory[i].time);
DEBUG_MSG("*** PacketHistoryStruct msg=%s\n", this->packetHistory[i].payload);
}
}
}
@@ -174,6 +151,7 @@ void StoreForwardModule::historyAdd(const MeshPacket &mp)
memcpy(this->packetHistory[this->packetHistoryCurrent].payload, p.payload.bytes, Constants_DATA_PAYLOAD_LEN);
this->packetHistoryCurrent++;
this->packetHistoryMax++;
}
MeshPacket *StoreForwardModule::allocReply()
@@ -184,7 +162,7 @@ MeshPacket *StoreForwardModule::allocReply()
void StoreForwardModule::sendPayload(NodeNum dest, uint32_t packetHistory_index)
{
DEBUG_MSG("Sending S&F Payload\n");
DEBUG_MSG("*** Sending S&F Payload\n");
MeshPacket *p = allocReply();
p->to = dest;
@@ -203,12 +181,14 @@ void StoreForwardModule::sendPayload(NodeNum dest, uint32_t packetHistory_index)
service.sendToMesh(p);
}
void StoreForwardModule::sendMessage(NodeNum dest, char *str)
void StoreForwardModule::sendMessage(NodeNum dest, StoreAndForward payload)
{
MeshPacket *p = allocReply();
MeshPacket *p = allocDataProtobuf(payload);
p->to = dest;
p->priority = MeshPacket_Priority_MIN;
// FIXME - Determine if the delayed packet is broadcast or delayed. For now, assume
// everything is broadcast.
p->delayed = MeshPacket_Delayed_DELAYED_BROADCAST;
@@ -216,60 +196,59 @@ void StoreForwardModule::sendMessage(NodeNum dest, char *str)
// Let's assume that if the router received the S&F request that the client is in range.
// TODO: Make this configurable.
p->want_ack = false;
p->decoded.payload.size = strlen(str); // You must specify how many bytes are in the reply
memcpy(p->decoded.payload.bytes, str, strlen(str));
p->decoded.want_response = false;
service.sendToMesh(p);
// HardwareMessage_init_default
}
void StoreForwardModule::statsSend(uint32_t to)
{
StoreAndForward sf;
sf.which_variant = StoreAndForward_stats_tag;
sf.variant.stats.messages_total = this->packetHistoryMax;
sf.variant.stats.messages_saved = this->packetHistoryCurrent;
sf.variant.stats.messages_max = this->records;
sf.variant.stats.up_time = millis() / 1000;
sf.variant.stats.requests = this->requests;
sf.variant.stats.requests_history = this->requests_history;
sf.variant.stats.heartbeat = this->heartbeat;
sf.variant.stats.return_max = this->historyReturnMax;
sf.variant.stats.return_window = this->historyReturnWindow;
DEBUG_MSG("*** Sending S&F Stats\n");
storeForwardModule->sendMessage(to, sf);
}
ProcessMessage StoreForwardModule::handleReceived(const MeshPacket &mp)
{
#ifdef ARCH_ESP32
if (moduleConfig.store_forward.enabled) {
DEBUG_MSG("--- S&F Received something\n");
// The router node should not be sending messages as a client. Unless he is a ROUTER_CLIENT
if ((getFrom(&mp) != nodeDB.getNodeNum()) || (config.device.role == Config_DeviceConfig_Role_ROUTER_CLIENT)) {
if (mp.decoded.portnum == PortNum_TEXT_MESSAGE_APP) {
DEBUG_MSG("Packet came from - PortNum_TEXT_MESSAGE_APP\n");
auto &p = mp.decoded;
if ((p.payload.bytes[0] == 'S') && (p.payload.bytes[1] == 'F') && (p.payload.bytes[2] == 0x00)) {
DEBUG_MSG("--- --- --- Request to send\n");
// Send the last 60 minutes of messages.
if (this->busy) {
strcpy(this->routerMessage, "** S&F - Busy. Try again shortly.");
storeForwardModule->sendMessage(getFrom(&mp), this->routerMessage);
} else {
storeForwardModule->historySend(1000 * 60, getFrom(&mp));
}
} else if ((p.payload.bytes[0] == 'S') && (p.payload.bytes[1] == 'F') && (p.payload.bytes[2] == 'm') &&
(p.payload.bytes[3] == 0x00)) {
strlcpy(this->routerMessage,
"01234567890123456789012345678901234567890123456789012345678901234567890123456789"
"01234567890123456789012345678901234567890123456789012345678901234567890123456789"
"01234567890123456789012345678901234567890123456789012345678901234567890123456",
sizeof(this->routerMessage));
storeForwardModule->sendMessage(getFrom(&mp), this->routerMessage);
} else {
storeForwardModule->historyAdd(mp);
}
storeForwardModule->historyAdd(mp);
DEBUG_MSG("*** S&F stored. Message history contains %u records now.\n", this->packetHistoryCurrent);
} else if (mp.decoded.portnum == PortNum_STORE_FORWARD_APP) {
DEBUG_MSG("Packet came from an PortNum_STORE_FORWARD_APP port %u\n", mp.decoded.portnum);
} else {
DEBUG_MSG("Packet came from an unknown port %u\n", mp.decoded.portnum);
}
auto &p = mp.decoded;
StoreAndForward scratch;
StoreAndForward *decoded = NULL;
if (mp.which_payload_variant == MeshPacket_decoded_tag) {
if (pb_decode_from_bytes(p.payload.bytes, p.payload.size, &StoreAndForward_msg, &scratch)) {
decoded = &scratch;
} else {
DEBUG_MSG("Error decoding protobuf module!\n");
// if we can't decode it, nobody can process it!
return ProcessMessage::STOP;
}
return handleReceivedProtobuf(mp, decoded) ? ProcessMessage::STOP : ProcessMessage::CONTINUE;
}
} // all others are irrelevant
}
}
@@ -285,95 +264,115 @@ bool StoreForwardModule::handleReceivedProtobuf(const MeshPacket &mp, StoreAndFo
return false;
}
if (mp.decoded.portnum != PortNum_STORE_FORWARD_APP) {
DEBUG_MSG("Packet came from port %u\n", mp.decoded.portnum);
return false;
} else {
DEBUG_MSG("Packet came from PortNum_STORE_FORWARD_APP port %u\n", mp.decoded.portnum);
requests++;
switch (p->rr) {
case StoreAndForward_RequestResponse_CLIENT_ERROR:
case StoreAndForward_RequestResponse_CLIENT_ABORT:
if(is_server) {
// Do nothing
DEBUG_MSG("StoreAndForward_RequestResponse_CLIENT_ERROR\n");
// stop sending stuff, the client wants to abort or has another error
if ((this->busy) && (this->busyTo == getFrom(&mp))) {
DEBUG_MSG("*** Client in ERROR or ABORT requested\n");
this->packetHistoryTXQueue_index = 0;
this->busy = false;
}
}
break;
case StoreAndForward_RequestResponse_CLIENT_HISTORY:
if(is_server) {
DEBUG_MSG("StoreAndForward_RequestResponse_CLIENT_HISTORY\n");
requests_history++;
DEBUG_MSG("*** Client Request to send HISTORY\n");
// Send the last 60 minutes of messages.
if (this->busy) {
strcpy(this->routerMessage, "** S&F - Busy. Try again shortly.");
storeForwardModule->sendMessage(getFrom(&mp), this->routerMessage);
StoreAndForward sf = StoreAndForward_init_zero;
sf.rr = StoreAndForward_RequestResponse_ROUTER_BUSY;
storeForwardModule->sendMessage(getFrom(&mp), sf);
DEBUG_MSG("*** S&F - Busy. Try again shortly.\n");
} else {
storeForwardModule->historySend(1000 * 60, getFrom(&mp));
if ((p->which_variant == StoreAndForward_history_tag) && (p->variant.history.window > 0)){
storeForwardModule->historySend(p->variant.history.window * 1000 * 60, getFrom(&mp)); // window is in minutes
} else {
storeForwardModule->historySend(60 * 1000 * 60, getFrom(&mp)); // defaults to 60 minutes
}
}
}
break;
case StoreAndForward_RequestResponse_CLIENT_PING:
if(is_server) {
// Do nothing
DEBUG_MSG("StoreAndForward_RequestResponse_CLIENT_PING\n");
DEBUG_MSG("*** StoreAndForward_RequestResponse_CLIENT_PING\n");
// respond with a ROUTER PONG
StoreAndForward sf = StoreAndForward_init_zero;
sf.rr = StoreAndForward_RequestResponse_ROUTER_PONG;
storeForwardModule->sendMessage(getFrom(&mp), sf);
}
break;
case StoreAndForward_RequestResponse_CLIENT_PONG:
if(is_server) {
// Do nothing
DEBUG_MSG("StoreAndForward_RequestResponse_CLIENT_PONG\n");
DEBUG_MSG("*** StoreAndForward_RequestResponse_CLIENT_PONG\n");
// The Client is alive, update NodeDB
nodeDB.updateFrom(mp);
}
break;
case StoreAndForward_RequestResponse_CLIENT_STATS:
if(is_server) {
// Do nothing
DEBUG_MSG("StoreAndForward_RequestResponse_CLIENT_STATS\n");
DEBUG_MSG("*** Client Request to send STATS\n");
if (this->busy) {
StoreAndForward sf = StoreAndForward_init_zero;
sf.rr = StoreAndForward_RequestResponse_ROUTER_BUSY;
storeForwardModule->sendMessage(getFrom(&mp), sf);
DEBUG_MSG("*** S&F - Busy. Try again shortly.\n");
} else {
storeForwardModule->statsSend(getFrom(&mp));
}
}
break;
case StoreAndForward_RequestResponse_ROUTER_BUSY:
if(is_client) {
// Do nothing
DEBUG_MSG("StoreAndForward_RequestResponse_ROUTER_BUSY\n");
DEBUG_MSG("*** StoreAndForward_RequestResponse_ROUTER_BUSY\n");
// retry in messages_saved * packetTimeMax ms
retry_delay = millis() + packetHistoryCurrent * packetTimeMax;
}
break;
case StoreAndForward_RequestResponse_ROUTER_ERROR:
if(is_client) {
// Do nothing
DEBUG_MSG("StoreAndForward_RequestResponse_ROUTER_ERROR\n");
DEBUG_MSG("*** StoreAndForward_RequestResponse_ROUTER_ERROR\n");
// retry in messages_saved * packetTimeMax * 2 ms
retry_delay = millis() + packetHistoryCurrent * packetTimeMax * 2;
}
break;
case StoreAndForward_RequestResponse_ROUTER_PONG:
// A router responded, this is equal to receiving a heartbeat
case StoreAndForward_RequestResponse_ROUTER_HEARTBEAT:
if(is_client) {
// Do nothing
DEBUG_MSG("StoreAndForward_RequestResponse_ROUTER_HEARTBEAT\n");
// register heartbeat and interval
if (p->which_variant == StoreAndForward_heartbeat_tag) {
heartbeatInterval = p->variant.heartbeat.period;
}
lastHeartbeat = millis();
DEBUG_MSG("*** StoreAndForward Heartbeat received\n");
}
break;
case StoreAndForward_RequestResponse_ROUTER_PING:
if(is_client) {
// Do nothing
DEBUG_MSG("StoreAndForward_RequestResponse_ROUTER_PING\n");
}
break;
case StoreAndForward_RequestResponse_ROUTER_PONG:
if(is_client) {
// Do nothing
DEBUG_MSG("StoreAndForward_RequestResponse_ROUTER_PONG\n");
DEBUG_MSG("*** StoreAndForward_RequestResponse_ROUTER_PING\n");
// respond with a CLIENT PONG
StoreAndForward sf = StoreAndForward_init_zero;
sf.rr = StoreAndForward_RequestResponse_CLIENT_PONG;
storeForwardModule->sendMessage(getFrom(&mp), sf);
}
break;
default:
assert(0); // unexpected state - FIXME, make an error code and reboot
}
}
return true; // There's no need for others to look at this message.
}
@@ -398,7 +397,7 @@ StoreForwardModule::StoreForwardModule()
// Router
if ((config.device.role == Config_DeviceConfig_Role_ROUTER) || (config.device.role == Config_DeviceConfig_Role_ROUTER_CLIENT)) {
DEBUG_MSG("Initializing Store & Forward Module in Router mode\n");
DEBUG_MSG("*** Initializing Store & Forward Module in Router mode\n");
if (ESP.getPsramSize() > 0) {
if (ESP.getFreePsram() >= 1024 * 1024) {
@@ -424,19 +423,19 @@ StoreForwardModule::StoreForwardModule()
this->populatePSRAM();
is_server = true;
} else {
DEBUG_MSG("Device has less than 1M of PSRAM free.\n");
DEBUG_MSG("Store & Forward Module - disabling server.\n");
DEBUG_MSG("*** Device has less than 1M of PSRAM free.\n");
DEBUG_MSG("*** Store & Forward Module - disabling server.\n");
}
} else {
DEBUG_MSG("Device doesn't have PSRAM.\n");
DEBUG_MSG("Store & Forward Module - disabling server.\n");
DEBUG_MSG("*** Device doesn't have PSRAM.\n");
DEBUG_MSG("*** Store & Forward Module - disabling server.\n");
}
// Client
}
if ((config.device.role == Config_DeviceConfig_Role_CLIENT) || (config.device.role == Config_DeviceConfig_Role_ROUTER_CLIENT)) {
is_client = true;
DEBUG_MSG("Initializing Store & Forward Module in Client mode\n");
DEBUG_MSG("*** Initializing Store & Forward Module in Client mode\n");
}
}
#endif

View File

@@ -24,10 +24,9 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
uint32_t busyTo = 0;
char routerMessage[Constants_DATA_PAYLOAD_LEN] = {0};
uint32_t receivedRecord[50][2] = {{0}};
PacketHistoryStruct *packetHistory = 0;
uint32_t packetHistoryCurrent = 0;
uint32_t packetHistoryMax = 0;
PacketHistoryStruct *packetHistoryTXQueue = 0;
uint32_t packetHistoryTXQueue_size = 0;
@@ -35,20 +34,21 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
uint32_t packetTimeMax = 5000;
unsigned long lastHeartbeat = 0;
bool is_client = false;
bool is_server = false;
public:
StoreForwardModule();
unsigned long lastHeartbeat = 0;
uint32_t heartbeatInterval = 300;
/**
Update our local reference of when we last saw that node.
@return 0 if we have never seen that node before otherwise return the last time we saw the node.
*/
void historyAdd(const MeshPacket &mp);
void historyReport();
void statsSend(uint32_t to);
void historySend(uint32_t msAgo, uint32_t to);
uint32_t historyQueueCreate(uint32_t msAgo, uint32_t to);
@@ -57,12 +57,21 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
* Send our payload into the mesh
*/
void sendPayload(NodeNum dest = NODENUM_BROADCAST, uint32_t packetHistory_index = 0);
void sendMessage(NodeNum dest, char *str);
void sendMessage(NodeNum dest, StoreAndForward payload);
virtual MeshPacket *allocReply() override;
/*
Override the wantPortnum method.
*/
virtual bool wantPortnum(PortNum p) { return true; };
-Override the wantPacket method.
*/
virtual bool wantPacket(const MeshPacket *p) override
{
switch(p->decoded.portnum) {
case PortNum_TEXT_MESSAGE_APP:
case PortNum_STORE_FORWARD_APP:
return true;
default:
return false;
}
}
private:
void populatePSRAM();
@@ -73,6 +82,12 @@ class StoreForwardModule : private concurrency::OSThread, public ProtobufModule<
uint32_t records = 0; // Calculated
bool heartbeat = false; // No heartbeat.
// stats
uint32_t requests = 0;
uint32_t requests_history = 0;
uint32_t retry_delay = 0;
protected:
virtual int32_t runOnce() override;