This commit is contained in:
Ben Meadors
2023-01-28 06:39:14 -06:00
parent de82119415
commit e8908784f9
6 changed files with 357 additions and 512 deletions

View File

@@ -8,8 +8,7 @@
#include "main.h"
#include "mesh-pb-constants.h"
#include "modules/RoutingModule.h"
extern "C"
{
extern "C" {
#include "mesh/compression/unishox2.h"
}
@@ -26,13 +25,13 @@ extern "C"
*
**/
#define MAX_RX_FROMRADIO \
#define MAX_RX_FROMRADIO \
4 // max number of packets destined to our queue, we dispatch packets quickly so it doesn't need to be big
// I think this is right, one packet for each of the three fifos + one packet being currently assembled for TX or RX
// And every TX packet might have a retransmission packet or an ack alive at any moment
#define MAX_PACKETS \
(MAX_RX_TOPHONE + MAX_RX_FROMRADIO + 2 * MAX_TX_QUEUE + \
#define MAX_PACKETS \
(MAX_RX_TOPHONE + MAX_RX_FROMRADIO + 2 * MAX_TX_QUEUE + \
2) // max number of packets which can be in flight (either queued from reception or queued for sending)
// static MemoryPool<MeshPacket> staticPool(MAX_PACKETS);
@@ -65,8 +64,7 @@ Router::Router() : concurrency::OSThread("Router"), fromRadioQueue(MAX_RX_FROMRA
int32_t Router::runOnce()
{
meshtastic_MeshPacket *mp;
while ((mp = fromRadioQueue.dequeuePtr(0)) != NULL)
{
while ((mp = fromRadioQueue.dequeuePtr(0)) != NULL) {
// printPacket("handle fromRadioQ", mp);
perhapsHandleReceived(mp);
}
@@ -81,14 +79,11 @@ int32_t Router::runOnce()
*/
void Router::enqueueReceivedMessage(meshtastic_MeshPacket *p)
{
if (fromRadioQueue.enqueue(p, 0))
{ // NOWAIT - fixme, if queue is full, delete older messages
if (fromRadioQueue.enqueue(p, 0)) { // NOWAIT - fixme, if queue is full, delete older messages
// Nasty hack because our threading is primitive. interfaces shouldn't need to know about routers FIXME
setReceivedMessage();
}
else
{
} else {
printPacket("BUG! fromRadioQueue is full! Discarding!", p);
packetPool.release(p);
}
@@ -103,8 +98,7 @@ PacketId generatePacketId()
uint32_t numPacketId = UINT32_MAX;
if (!didInit)
{
if (!didInit) {
didInit = true;
// pick a random initial sequence number at boot (to prevent repeated reboots always starting at 0)
@@ -163,25 +157,19 @@ meshtastic_QueueStatus Router::getQueueStatus()
ErrorCode Router::sendLocal(meshtastic_MeshPacket *p, RxSource src)
{
// No need to deliver externally if the destination is the local node
if (p->to == nodeDB.getNodeNum())
{
if (p->to == nodeDB.getNodeNum()) {
printPacket("Enqueued local", p);
enqueueReceivedMessage(p);
return ERRNO_OK;
}
else if (!iface)
{
} else if (!iface) {
// We must be sending to remote nodes also, fail if no interface found
abortSendAndNak(meshtastic_Routing_Error_NO_INTERFACE, p);
return ERRNO_NO_INTERFACES;
}
else
{
} else {
// If we are sending a broadcast, we also treat it as if we just received it ourself
// this allows local apps (and PCs) to see broadcasts sourced locally
if (p->to == NODENUM_BROADCAST)
{
if (p->to == NODENUM_BROADCAST) {
handleReceived(p, src);
}
@@ -205,34 +193,27 @@ void printBytes(const char *label, const uint8_t *p, size_t numbytes)
ErrorCode Router::send(meshtastic_MeshPacket *p)
{
// Skip the normal ceremony for repeaters
if (config.device.role == meshtastic_Config_DeviceConfig_Role_REPEATER)
{
if (config.device.role == meshtastic_Config_DeviceConfig_Role_REPEATER) {
assert(iface);
return iface->send(p);
}
if (p->to == nodeDB.getNodeNum())
{
if (p->to == nodeDB.getNodeNum()) {
LOG_ERROR("BUG! send() called with packet destined for local node!\n");
packetPool.release(p);
return meshtastic_Routing_Error_BAD_REQUEST;
} // should have already been handled by sendLocal
// Abort sending if we are violating the duty cycle
if (!config.lora.override_duty_cycle && myRegion->dutyCycle < 100)
{
if (!config.lora.override_duty_cycle && myRegion->dutyCycle < 100) {
float hourlyTxPercent = airTime->utilizationTXPercent();
if (hourlyTxPercent > myRegion->dutyCycle)
{
if (hourlyTxPercent > myRegion->dutyCycle) {
uint8_t silentMinutes = airTime->getSilentMinutes(hourlyTxPercent, myRegion->dutyCycle);
LOG_WARN("Duty cycle limit exceeded. Aborting send for now, you can send again in %d minutes.\n", silentMinutes);
meshtastic_Routing_Error err = meshtastic_Routing_Error_DUTY_CYCLE_LIMIT;
if (getFrom(p) == nodeDB.getNodeNum())
{ // only send NAK to API, not to the mesh
if (getFrom(p) == nodeDB.getNodeNum()) { // only send NAK to API, not to the mesh
abortSendAndNak(err, p);
}
else
{
} else {
packetPool.release(p);
}
return err;
@@ -257,15 +238,13 @@ ErrorCode Router::send(meshtastic_MeshPacket *p)
p->which_payload_variant == meshtastic_MeshPacket_decoded_tag); // I _think_ all packets should have a payload by now
// If the packet is not yet encrypted, do so now
if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag)
{
if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
ChannelIndex chIndex = p->channel; // keep as a local because we are about to change it
bool shouldActuallyEncrypt = true;
#if HAS_WIFI || HAS_ETHERNET
if (moduleConfig.mqtt.enabled)
{
if (moduleConfig.mqtt.enabled) {
// check if we should send decrypted packets to mqtt
// truth table:
@@ -278,8 +257,7 @@ ErrorCode Router::send(meshtastic_MeshPacket *p)
* => so we only decrypt mqtt if they have a custom mqtt server AND mqtt_encryption_enabled is FALSE
*/
if (*moduleConfig.mqtt.address && !moduleConfig.mqtt.encryption_enabled)
{
if (*moduleConfig.mqtt.address && !moduleConfig.mqtt.encryption_enabled) {
shouldActuallyEncrypt = false;
}
@@ -292,15 +270,13 @@ ErrorCode Router::send(meshtastic_MeshPacket *p)
#endif
auto encodeResult = perhapsEncode(p);
if (encodeResult != meshtastic_Routing_Error_NONE)
{
if (encodeResult != meshtastic_Routing_Error_NONE) {
abortSendAndNak(encodeResult, p);
return encodeResult; // FIXME - this isn't a valid ErrorCode
}
#if HAS_WIFI || HAS_ETHERNET
if (moduleConfig.mqtt.enabled)
{
if (moduleConfig.mqtt.enabled) {
// the packet is now encrypted.
// check if we should send encrypted packets to mqtt
if (mqtt && shouldActuallyEncrypt)
@@ -339,11 +315,9 @@ bool perhapsDecode(meshtastic_MeshPacket *p)
// assert(p->which_payloadVariant == MeshPacket_encrypted_tag);
// Try to find a channel that works with this hash
for (ChannelIndex chIndex = 0; chIndex < channels.getNumChannels(); chIndex++)
{
for (ChannelIndex chIndex = 0; chIndex < channels.getNumChannels(); chIndex++) {
// Try to use this hash/channel pair
if (channels.decryptForHash(chIndex, p->channel))
{
if (channels.decryptForHash(chIndex, p->channel)) {
// Try to decrypt the packet if we can
size_t rawSize = p->encrypted.size;
assert(rawSize <= sizeof(bytes));
@@ -355,23 +329,17 @@ bool perhapsDecode(meshtastic_MeshPacket *p)
// Take those raw bytes and convert them back into a well structured protobuf we can understand
memset(&p->decoded, 0, sizeof(p->decoded));
if (!pb_decode_from_bytes(bytes, rawSize, &meshtastic_Data_msg, &p->decoded))
{
if (!pb_decode_from_bytes(bytes, rawSize, &meshtastic_Data_msg, &p->decoded)) {
LOG_ERROR("Invalid protobufs in received mesh packet (bad psk?)!\n");
}
else if (p->decoded.portnum == meshtastic_PortNum_UNKNOWN_APP)
{
} else if (p->decoded.portnum == meshtastic_PortNum_UNKNOWN_APP) {
LOG_ERROR("Invalid portnum (bad psk?)!\n");
}
else
{
} else {
// parsing was successful
p->which_payload_variant = meshtastic_MeshPacket_decoded_tag; // change type to decoded
p->channel = chIndex; // change to store the index instead of the hash
// Decompress if needed. jm
if (p->decoded.portnum == meshtastic_PortNum_TEXT_MESSAGE_COMPRESSED_APP)
{
if (p->decoded.portnum == meshtastic_PortNum_TEXT_MESSAGE_COMPRESSED_APP) {
// Decompress the payload
char compressed_in[meshtastic_Constants_DATA_PAYLOAD_LEN] = {};
char decompressed_out[meshtastic_Constants_DATA_PAYLOAD_LEN] = {};
@@ -404,14 +372,12 @@ bool perhapsDecode(meshtastic_MeshPacket *p)
meshtastic_Routing_Error perhapsEncode(meshtastic_MeshPacket *p)
{
// If the packet is not yet encrypted, do so now
if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag)
{
if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
size_t numbytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_Data_msg, &p->decoded);
// Only allow encryption on the text message app.
// TODO: Allow modules to opt into compression.
if (p->decoded.portnum == meshtastic_PortNum_TEXT_MESSAGE_APP)
{
if (p->decoded.portnum == meshtastic_PortNum_TEXT_MESSAGE_APP) {
char original_payload[meshtastic_Constants_DATA_PAYLOAD_LEN];
memcpy(original_payload, p->decoded.payload.bytes, p->decoded.payload.size);
@@ -426,17 +392,14 @@ meshtastic_Routing_Error perhapsEncode(meshtastic_MeshPacket *p)
LOG_DEBUG("Original message - %s \n", p->decoded.payload.bytes);
// If the compressed length is greater than or equal to the original size, don't use the compressed form
if (compressed_len >= p->decoded.payload.size)
{
if (compressed_len >= p->decoded.payload.size) {
LOG_DEBUG("Not using compressing message.\n");
// Set the uncompressed payload varient anyway. Shouldn't hurt?
// p->decoded.which_payloadVariant = Data_payload_tag;
// Otherwise we use the compressor
}
else
{
} else {
LOG_DEBUG("Using compressed message.\n");
// Copy the compressed data into the meshpacket
@@ -487,8 +450,7 @@ void Router::handleReceived(meshtastic_MeshPacket *p, RxSource src)
// Take those raw bytes and convert them back into a well structured protobuf we can understand
bool decoded = perhapsDecode(p);
if (decoded)
{
if (decoded) {
// parsing was successful, queue for our recipient
if (src == RX_SRC_LOCAL)
printPacket("handleReceived(LOCAL)", p);
@@ -496,9 +458,7 @@ void Router::handleReceived(meshtastic_MeshPacket *p, RxSource src)
printPacket("handleReceived(USER)", p);
else
printPacket("handleReceived(REMOTE)", p);
}
else
{
} else {
printPacket("packet decoding failed (no PSK?)", p);
}
@@ -513,8 +473,7 @@ void Router::perhapsHandleReceived(meshtastic_MeshPacket *p)
if (ignore)
LOG_DEBUG("Ignoring incoming message, 0x%x is in our ignore list\n", p->from);
else if (ignore |= shouldFilterReceived(p))
{
else if (ignore |= shouldFilterReceived(p)) {
LOG_DEBUG("Incoming message was filtered 0x%x\n", p->from);
}