From d508de956856fd591a1033381431be59faabf8eb Mon Sep 17 00:00:00 2001 From: Jonathan Bennett Date: Sat, 20 Dec 2025 13:05:13 -0600 Subject: [PATCH] Add Store and Forward++ module --- src/modules/Modules.cpp | 2 + src/modules/Native/StoreForwardPlusPlus.cpp | 1042 +++++++++++++++++++ src/modules/Native/StoreForwardPlusPlus.h | 212 ++++ src/platform/portduino/PortduinoGlue.cpp | 4 + src/platform/portduino/PortduinoGlue.h | 10 + variants/native/portduino/platformio.ini | 1 + 6 files changed, 1271 insertions(+) create mode 100644 src/modules/Native/StoreForwardPlusPlus.cpp create mode 100644 src/modules/Native/StoreForwardPlusPlus.h diff --git a/src/modules/Modules.cpp b/src/modules/Modules.cpp index 63392f7e4..0b84e5ebd 100644 --- a/src/modules/Modules.cpp +++ b/src/modules/Modules.cpp @@ -61,6 +61,7 @@ #if ARCH_PORTDUINO #include "input/LinuxInputImpl.h" #include "input/SeesawRotary.h" +#include "modules/Native/StoreForwardPlusPlus.h" #include "modules/Telemetry/HostMetrics.h" #if !MESHTASTIC_EXCLUDE_STOREFORWARD #include "modules/StoreForwardModule.h" @@ -243,6 +244,7 @@ void setupModules() #endif #if ARCH_PORTDUINO new HostMetricsModule(); + new StoreForwardPlusPlusModule(); #endif #if HAS_TELEMETRY new DeviceTelemetryModule(); diff --git a/src/modules/Native/StoreForwardPlusPlus.cpp b/src/modules/Native/StoreForwardPlusPlus.cpp new file mode 100644 index 000000000..03a0bf5e7 --- /dev/null +++ b/src/modules/Native/StoreForwardPlusPlus.cpp @@ -0,0 +1,1042 @@ +// I've done a lot of this in SQLite for now, but honestly it needs to happen in memory, and get saved to sqlite during downtime + +// TODO: Message asking for the message x spots from the tip of the chain. +// This would allow individual nodes to apt in, grab the latest fe messages, and not need to sync the entire chain + +// TODO: custom hops. 1 maybe 0. Configurable? + +// TODO: non-stratum0 nodes need to be pointed at their upstream source? Maybe + +// TODO: There will come a point where the chain is too big to sync + +// TODO: evict messages from scratch after a timeout + +// things may get weird if there are multiple stratum-0 nodes on a single mesh. Come up with mitigations + +#include "StoreForwardPlusPlus.h" +#include "MeshService.h" +#include "RTC.h" +#include "SHA256.h" +#include "meshUtils.h" +#include "modules/RoutingModule.h" + +StoreForwardPlusPlusModule::StoreForwardPlusPlusModule() + : ProtobufModule("StoreForwardpp", meshtastic_PortNum_STORE_FORWARD_PLUSPLUS_APP, &meshtastic_StoreForwardPlusPlus_msg), + concurrency::OSThread("StoreForwardpp") +{ + LOG_WARN("StoreForwardPlusPlusModule init"); + + if (portduino_config.sfpp_stratum0) + LOG_WARN("SF++ stratum0"); + + int res = sqlite3_open("test.db", &ppDb); + LOG_WARN("Result1 %u", res); + + char *err = nullptr; + + res = sqlite3_exec(ppDb, " \ + CREATE TABLE channel_messages( \ + destination INT NOT NULL, \ + sender INT NOT NULL, \ + packet_id INT NOT NULL, \ + rx_time INT NOT NULL, \ + root_hash BLOB NOT NULL, \ + encrypted_bytes BLOB NOT NULL, \ + message_hash BLOB NOT NULL, \ + commit_hash BLOB NOT NULL, \ + payload TEXT, \ + counter INT DEFAULT 0, \ + PRIMARY KEY (message_hash) \ + );", + NULL, NULL, &err); + LOG_WARN("Result2 %u", res); + if (err != nullptr) + LOG_ERROR("%s", err); + sqlite3_free(err); + + res = sqlite3_exec(ppDb, " \ + CREATE TABLE local_messages( \ + destination INT NOT NULL, \ + sender INT NOT NULL, \ + packet_id INT NOT NULL, \ + rx_time INT NOT NULL, \ + channel_hash INT NOT NULL, \ + encrypted_bytes BLOB NOT NULL, \ + message_hash BLOB NOT NULL, \ + payload TEXT, \ + PRIMARY KEY (message_hash) \ + );", + NULL, NULL, &err); + LOG_WARN("Result2 %u", res); + if (err != nullptr) + LOG_ERROR("%s", err); + sqlite3_free(err); + + // create table DMs + res = sqlite3_exec(ppDb, " \ + CREATE TABLE direct_messages( \ + destination INT NOT NULL, \ + sender INT NOT NULL, \ + packet_id INT NOT NULL, \ + rx_time INT NOT NULL, \ + channel_hash INT NOT NULL, \ + commit_hash BLOB NOT NULL, \ + encrypted_bytes BLOB NOT NULL, \ + message_hash BLOB NOT NULL, \ + payload TEXT, \ + PRIMARY KEY (message_hash) \ + );", + NULL, NULL, &err); + LOG_WARN("Result2 %u", res); + if (err != nullptr) + LOG_ERROR("%s", err); + sqlite3_free(err); + + // mappings table -- connects the root hashes to channel hashes and DM identifiers + res = sqlite3_exec(ppDb, " \ + CREATE TABLE mappings( \ + chain_type INT NOT NULL, \ + identifier INT NOT NULL, \ + root_hash BLOB NOT NULL, \ + count INT DEFAULT 0, \ + PRIMARY KEY (identifier) \ + );", + NULL, NULL, &err); + LOG_WARN("Result2 %u", res); + if (err != nullptr) + LOG_ERROR("%s", err); + sqlite3_free(err); + + // store schema version somewhere + + // prepared statements *should* make this faster. + sqlite3_prepare_v2(ppDb, "INSERT INTO channel_messages (destination, sender, packet_id, root_hash, \ + encrypted_bytes, message_hash, rx_time, commit_hash, payload, counter) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", + -1, &chain_insert_stmt, NULL); + + sqlite3_prepare_v2(ppDb, "INSERT INTO local_messages (destination, sender, packet_id, channel_hash, \ + encrypted_bytes, message_hash, rx_time, payload) VALUES(?, ?, ?, ?, ?, ?, ?, ?);", + -1, &scratch_insert_stmt, NULL); + + sqlite3_prepare_v2(ppDb, "select destination, sender, packet_id, encrypted_bytes, message_hash, rx_time, channel_hash \ + from local_messages where channel_hash=? order by rx_time asc LIMIT 1;", // earliest first + -1, &fromScratchStmt, NULL); + + sqlite3_prepare_v2(ppDb, + "select destination, sender, packet_id, encrypted_bytes, message_hash, rx_time, channel_hash, payload \ + from local_messages where substr(message_hash,1,?)=? order by rx_time asc LIMIT 1;", // earliest first + -1, &fromScratchByHashStmt, NULL); + + sqlite3_prepare_v2(ppDb, "SELECT COUNT(*) from channel_messages where substr(message_hash,1,?)=?", -1, &checkDup, NULL); + + sqlite3_prepare_v2(ppDb, "SELECT COUNT(*) from local_messages where substr(message_hash,1,?)=?", -1, &checkScratch, NULL); + + sqlite3_prepare_v2(ppDb, "DELETE from local_messages where substr(message_hash,1,?)=?", -1, &removeScratch, NULL); + + sqlite3_prepare_v2(ppDb, "UPDATE channel_messages SET payload=? WHERE substr(message_hash,1,?)=?", -1, &updatePayloadStmt, + NULL); + + sqlite3_prepare_v2(ppDb, "select commit_hash from channel_messages where substr(root_hash,1,?)=? order by rowid ASC;", -1, + &getNextHashStmt, NULL); + + sqlite3_prepare_v2(ppDb, + "select commit_hash, message_hash, rx_time from channel_messages where substr(root_hash,1,?)=? order by " + "rowid desc LIMIT 1;", + -1, &getChainEndStmt, NULL); + + sqlite3_prepare_v2( + ppDb, + "select destination, sender, packet_id, encrypted_bytes, message_hash, rx_time, commit_hash, root_hash, counter, payload \ + from channel_messages where substr(commit_hash,1,?)=?;", + -1, &getLinkStmt, NULL); + + sqlite3_prepare_v2(ppDb, "select identifier from mappings where substr(root_hash,1,?)=?;", -1, &getHashFromRootStmt, NULL); + + sqlite3_prepare_v2(ppDb, "INSERT INTO mappings (chain_type, identifier, root_hash) VALUES(?, ?, ?);", -1, + &addRootToMappingsStmt, NULL); + + sqlite3_prepare_v2(ppDb, "select root_hash from mappings where identifier=?;", -1, &getRootFromChannelHashStmt, NULL); + + sqlite3_prepare_v2(ppDb, "select root_hash from mappings where substr(root_hash,1,?)=?;", -1, &getFullRootHashStmt, NULL); + + sqlite3_prepare_v2(ppDb, "UPDATE mappings SET count=? WHERE substr(root_hash,1,?)=?;", -1, &setChainCountStmt, NULL); + + sqlite3_prepare_v2(ppDb, "SELECT count FROM mappings WHERE substr(root_hash,1,?)=?;", -1, &getChainCountStmt, NULL); + + encryptedOk = true; + + // wait about 15 seconds after boot for the first runOnce() + // TODO: When not doing active development, adjust this to a longer time + this->setInterval(15 * 1000); +} + +int32_t StoreForwardPlusPlusModule::runOnce() +{ + LOG_WARN("StoreForward++ runONce"); + pendingRun = false; + if (getRTCQuality() < RTCQualityNTP) { + LOG_WARN("StoreForward++ deferred due to time quality %u", getRTCQuality()); + return 5 * 60 * 1000; + } + uint8_t root_hash_bytes[32] = {0}; + ChannelHash hash = channels.getHash(0); + getOrAddRootFromChannelHash(hash, root_hash_bytes); + + // get tip of chain for this channel + link_object chain_end = getChainEndLinkObject(root_hash_bytes, 32); + + if (chain_end.rx_time == 0) { + LOG_WARN("Store and Forward++ database lookup returned null"); + return 60 * 60 * 1000; + } + + // broadcast the tip of the chain + canonAnnounce(chain_end.message_hash, chain_end.commit_hash, root_hash_bytes, chain_end.rx_time); + + // eventually timeout things on the scratch queue + return 60 * 60 * 1000; +} + +ProcessMessage StoreForwardPlusPlusModule::handleReceived(const meshtastic_MeshPacket &mp) +{ + // To avoid terrible time problems, require NTP or GPS time + if (getRTCQuality() < RTCQualityNTP) { + return ProcessMessage::CONTINUE; + } + + // For the moment, this is strictly LoRa + if (mp.transport_mechanism != meshtastic_MeshPacket_TransportMechanism_TRANSPORT_LORA) { + return ProcessMessage::CONTINUE; // Let others look at this message also if they want + } + + // will eventually host DMs and other undecodable messages + if (mp.which_payload_variant != meshtastic_MeshPacket_decoded_tag) { + return ProcessMessage::CONTINUE; // Let others look at this message also if they want + } + LOG_WARN("in handleReceived"); + if (mp.decoded.portnum == meshtastic_PortNum_TEXT_MESSAGE_APP && mp.to == NODENUM_BROADCAST) { + link_object lo = ingestTextPacket(mp, router->p_encrypted); + + if (isInDB(lo.message_hash, lo.message_hash_len)) { + LOG_WARN("found message in db"); + // We may have this message already, but we may not have the payload + // if we do, we can update the payload in the database + if (lo.payload != "") + updatePayload(lo.message_hash, lo.message_hash_len, lo.payload); + return ProcessMessage::CONTINUE; + } + + if (!portduino_config.sfpp_stratum0) { + if (!isInDB(lo.message_hash, lo.message_hash_len)) { + addToScratch(lo); + LOG_WARN("added message to scratch"); + // send link to upstream? + } + return ProcessMessage::CONTINUE; + } + addToChain(lo); + + if (!pendingRun) { + setIntervalFromNow(60 * 1000); // run again in 60 seconds to announce the new tip of chain + pendingRun = true; + } + // canonAnnounce(lo.message_hash, lo.commit_hash, lo.root_hash, lo.rx_time); + return ProcessMessage::CONTINUE; // Let others look at this message also if they want + } else if (mp.decoded.portnum == meshtastic_PortNum_STORE_FORWARD_PLUSPLUS_APP) { + LOG_WARN("Got a STORE_FORWARD++ packet"); + meshtastic_StoreForwardPlusPlus scratch; + pb_decode_from_bytes(mp.decoded.payload.bytes, mp.decoded.payload.size, meshtastic_StoreForwardPlusPlus_fields, &scratch); + handleReceivedProtobuf(mp, &scratch); + return ProcessMessage::CONTINUE; + } + return ProcessMessage::CONTINUE; +} + +bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp, meshtastic_StoreForwardPlusPlus *t) +{ + LOG_WARN("in handleReceivedProtobuf"); + LOG_WARN("Sfp++ node %u sent us sf++ packet", mp.from); + printBytes("commit_hash ", t->commit_hash.bytes, t->commit_hash.size); + printBytes("root_hash ", t->root_hash.bytes, t->root_hash.size); + + if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE) { + + // TODO: Regardless of where we are in the chain, if we have a newer message, send it back. + if (portduino_config.sfpp_stratum0) { + LOG_WARN("Received a CANON_ANNOUNCE while stratum 0"); + uint8_t next_commit_hash[32] = {0}; + if (getNextHash(t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size, next_commit_hash)) { + printBytes("next chain hash: ", next_commit_hash, 32); + if (airTime->isTxAllowedChannelUtil(true)) { + broadcastLink(next_commit_hash, 32); + } + } + } else { + uint8_t tmp_root_hash_bytes[32] = {0}; + + LOG_WARN("Received a CANON_ANNOUNCE"); + if (getRootFromChannelHash(router->p_encrypted->channel, tmp_root_hash_bytes)) { + // we found the hash, check if it's the right one + // TODO handle oddball sizes here + if (memcmp(tmp_root_hash_bytes, t->root_hash.bytes, t->root_hash.size) != 0) { + LOG_WARN("Found root hash, and it doesn't match!"); + return true; + } + } else { + // TODO: size check + addRootToMappings(router->p_encrypted->channel, t->root_hash.bytes); + LOG_WARN("Adding root hash to mappings"); + } + + // get tip of chain for this channel + link_object chain_end = getChainEndLinkObject(t->root_hash.bytes, t->root_hash.size); + + // get chain tip + if (chain_end.rx_time != 0) { + // TODO: size check + if (memcmp(chain_end.commit_hash, t->commit_hash.bytes, t->commit_hash.size) == 0) { + LOG_WARN("End of chain matches!"); + sendFromScratch(router->p_encrypted->channel); + } else { + LOG_INFO("End of chain does not match!"); + + // We just got an end of chain announce, checking if we have seen this message and have it in scratch. + if (isInScratch(t->message_hash.bytes, t->message_hash.size)) { + link_object scratch_object = getFromScratch(t->message_hash.bytes, t->message_hash.size); + // if this matches, we don't need to request the message + // we know exactly what it is + if (t->message_hash.size >= 8 && + checkCommitHash(scratch_object, t->commit_hash.bytes, t->message_hash.size)) { + + addToChain(scratch_object); + removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len); + // short circuit and return + // falls through to a request for the message + return true; + } + } + if (airTime->isTxAllowedChannelUtil(true)) { + requestNextMessage(t->root_hash.bytes, t->root_hash.size, chain_end.commit_hash, 32); + } + } + } else { // if chainEnd() + LOG_WARN("No Messages on this chain, request!"); + if (airTime->isTxAllowedChannelUtil(true)) { + requestNextMessage(t->root_hash.bytes, t->root_hash.size, t->root_hash.bytes, t->root_hash.size); + } + } + } + } else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_REQUEST) { + uint8_t next_commit_hash[32] = {0}; + + LOG_WARN("Received link request"); + if (getNextHash(t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size, next_commit_hash)) { + printBytes("next chain hash: ", next_commit_hash, 32); + + broadcastLink(next_commit_hash, 32); + } + + // if root and chain hashes are the same, grab the first message on the chain + // if different, get the message directly after. + // check if the root + + } else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE) { + LOG_WARN("Link Provide received!"); + + link_object incoming_link = ingestLinkMessage(t); + if (incoming_link.root_hash_len == 0) { + LOG_WARN("Hash bytes not found for incoming link"); + return true; + } + + if (!incoming_link.validObject) { + LOG_WARN("commit byte mismatch"); + return true; + } + + if (portduino_config.sfpp_stratum0) { + if (isInDB(incoming_link.message_hash, incoming_link.message_hash_len)) { + LOG_WARN("Received link already in chain"); + // TODO: respond with next link? + return true; + } + + // calculate the commit_hash + addToChain(incoming_link); + if (!pendingRun) { + setIntervalFromNow(60 * 1000); // run again in 60 seconds to announce the new tip of chain + pendingRun = true; + } + // timebox to no more than an hour old + if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { + // if this packet is new to us, we rebroadcast it + rebroadcastLinkObject(incoming_link); + } + + } else { + if (incoming_link.commit_hash_len == 32) { + addToChain(incoming_link); + if (isInScratch(incoming_link.message_hash, incoming_link.message_hash_len)) { + link_object scratch_object = getFromScratch(incoming_link.message_hash, incoming_link.message_hash_len); + if (scratch_object.payload != "") { + updatePayload(incoming_link.message_hash, incoming_link.message_hash_len, scratch_object.payload); + } + removeFromScratch(incoming_link.message_hash, incoming_link.message_hash_len); + } else { + // if this packet is new to us, we rebroadcast it, but only up to an hour old + if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { + LOG_WARN("Attempting to Rebroadcast message"); + rebroadcastLinkObject(incoming_link); + } + } + requestNextMessage(t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size); + } else { + // todo: handle this + // add to scratch? + } + } + } + + return true; +} + +bool StoreForwardPlusPlusModule::getRootFromChannelHash(ChannelHash _ch_hash, uint8_t *_root_hash) +{ + bool found = false; + sqlite3_bind_int(getRootFromChannelHashStmt, 1, _ch_hash); + sqlite3_step(getRootFromChannelHashStmt); + uint8_t *tmp_root_hash = (uint8_t *)sqlite3_column_blob(getRootFromChannelHashStmt, 0); + if (tmp_root_hash) { + LOG_WARN("Found root hash!"); + memcpy(_root_hash, tmp_root_hash, 32); + found = true; + } + sqlite3_reset(getRootFromChannelHashStmt); + return found; +} + +ChannelHash StoreForwardPlusPlusModule::getChannelHashFromRoot(uint8_t *_root_hash, size_t _root_hash_len) +{ + // TODO move and substr() + sqlite3_bind_int(getHashFromRootStmt, 1, _root_hash_len); + sqlite3_bind_blob(getHashFromRootStmt, 2, _root_hash, _root_hash_len, NULL); + sqlite3_step(getHashFromRootStmt); + ChannelHash tmp_hash = (ChannelHash)sqlite3_column_int(getHashFromRootStmt, 0); + sqlite3_reset(getHashFromRootStmt); + return tmp_hash; +} + +// return code indicates newly created chain +bool StoreForwardPlusPlusModule::getOrAddRootFromChannelHash(ChannelHash _ch_hash, uint8_t *_root_hash) +{ + LOG_WARN("getOrAddRootFromChannelHash()"); + bool isNew = !getRootFromChannelHash(_ch_hash, _root_hash); + + if (isNew) { + if (portduino_config.sfpp_stratum0) { + LOG_WARN("Generating Root hash!"); + // generate root hash + SHA256 root_hash; + root_hash.update(&_ch_hash, sizeof(_ch_hash)); + NodeNum ourNode = nodeDB->getNodeNum(); + root_hash.update(&ourNode, sizeof(ourNode)); + uint32_t rtc_sec = getValidTime(RTCQuality::RTCQualityDevice, true); + root_hash.update(&rtc_sec, sizeof(rtc_sec)); + root_hash.finalize(_root_hash, 32); + addRootToMappings(_ch_hash, _root_hash); + } + } + return isNew; +} + +void StoreForwardPlusPlusModule::addRootToMappings(ChannelHash _ch_hash, uint8_t *_root_hash) +{ + LOG_WARN("addRootToMappings()"); + printBytes("_root_hash", _root_hash, 32); + + // write to the table + int type = chain_types::channel_chain; + // note, must be an int variable + + sqlite3_bind_int(addRootToMappingsStmt, 1, type); + sqlite3_bind_int(addRootToMappingsStmt, 2, _ch_hash); + sqlite3_bind_blob(addRootToMappingsStmt, 3, _root_hash, 32, NULL); + auto rc = sqlite3_step(addRootToMappingsStmt); + LOG_WARN("result %u, %s", rc, sqlite3_errmsg(ppDb)); + sqlite3_reset(addRootToMappingsStmt); +} + +StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getChainEndLinkObject(uint8_t *_root_hash, + size_t _root_hash_len) +{ + LOG_WARN("getChainEndLinkObject"); + link_object lo; + + int rc; + sqlite3_bind_int(getChainEndStmt, 1, _root_hash_len); + sqlite3_bind_blob(getChainEndStmt, 2, _root_hash, _root_hash_len, NULL); + sqlite3_step(getChainEndStmt); + uint8_t *last_message_commit_hash = (uint8_t *)sqlite3_column_blob(getChainEndStmt, 0); + uint8_t *last_message_hash = (uint8_t *)sqlite3_column_blob(getChainEndStmt, 1); + uint32_t _rx_time = sqlite3_column_int(getChainEndStmt, 2); + if (last_message_commit_hash != nullptr) { + lo = getLink(last_message_commit_hash, 32); + } + + sqlite3_reset(getChainEndStmt); + return lo; +} + +// TODO: make DM? +void StoreForwardPlusPlusModule::requestNextMessage(uint8_t *_root_hash, size_t _root_hash_len, uint8_t *_commit_hash, + size_t _commit_hash_len) +{ + + meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; + storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_REQUEST; + // set root hash + + // set chain hash + storeforward.commit_hash.size = _commit_hash_len; + memcpy(storeforward.commit_hash.bytes, _commit_hash, _commit_hash_len); + + // set root hash + storeforward.root_hash.size = _root_hash_len; + memcpy(storeforward.root_hash.bytes, _root_hash, _root_hash_len); + + // storeforward. + meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); + p->to = NODENUM_BROADCAST; + p->decoded.want_response = false; + p->priority = meshtastic_MeshPacket_Priority_BACKGROUND; + p->channel = 0; + LOG_INFO("Send packet to mesh"); + service->sendToMesh(p, RX_SRC_LOCAL, true); +} + +bool StoreForwardPlusPlusModule::getNextHash(uint8_t *_root_hash, size_t _root_hash_len, uint8_t *_commit_hash, + size_t _commit_hash_len, uint8_t *next_commit_hash) +{ + LOG_WARN("getNextHash"); + + ChannelHash _channel_hash = getChannelHashFromRoot(_root_hash, _root_hash_len); + LOG_WARN("_channel_hash %u", _channel_hash); + + int rc; + sqlite3_bind_int(getNextHashStmt, 1, _root_hash_len); + sqlite3_bind_blob(getNextHashStmt, 2, _root_hash, _root_hash_len, NULL); + bool next_hash = false; + + // asking for the first entry on the chain + if (memcmp(_root_hash, _commit_hash, _commit_hash_len) == 0) { + rc = sqlite3_step(getNextHashStmt); + if (rc != SQLITE_OK) { + LOG_WARN("here2 %u, %s", rc, sqlite3_errmsg(ppDb)); + } + uint8_t *tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getNextHashStmt, 0); + if (tmp_commit_hash == nullptr) { + LOG_WARN("No next hash found"); + sqlite3_reset(getNextHashStmt); + return false; + } + printBytes("commit_hash", tmp_commit_hash, 32); + memcpy(next_commit_hash, tmp_commit_hash, 32); + next_hash = true; + } else { + bool found_hash = false; + + LOG_WARN("Looking for next hashes"); + uint8_t *tmp_commit_hash; + while (sqlite3_step(getNextHashStmt) != SQLITE_DONE) { + tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getNextHashStmt, 0); + + if (found_hash) { + LOG_WARN("Found hash"); + memcpy(next_commit_hash, tmp_commit_hash, 32); + next_hash = true; + break; + } + if (memcmp(tmp_commit_hash, _commit_hash, _commit_hash_len) == 0) + found_hash = true; + } + } + + sqlite3_reset(getNextHashStmt); + return next_hash; +} + +void StoreForwardPlusPlusModule::broadcastLink(uint8_t *_commit_hash, size_t _commit_hash_len) +{ + int rc; + sqlite3_bind_int(getLinkStmt, 1, _commit_hash_len); + sqlite3_bind_blob(getLinkStmt, 2, _commit_hash, _commit_hash_len, NULL); + LOG_WARN("%d", sqlite3_step(getLinkStmt)); + + meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; + storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE; + + storeforward.encapsulated_to = sqlite3_column_int(getLinkStmt, 0); + storeforward.encapsulated_from = sqlite3_column_int(getLinkStmt, 1); + storeforward.encapsulated_id = sqlite3_column_int(getLinkStmt, 2); + + uint8_t *_payload = (uint8_t *)sqlite3_column_blob(getLinkStmt, 3); + storeforward.message.size = sqlite3_column_bytes(getLinkStmt, 3); + memcpy(storeforward.message.bytes, _payload, storeforward.message.size); + + uint8_t *_message_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 4); + // storeforward.message_hash.size = 8; + // memcpy(storeforward.message_hash.bytes, _message_hash, storeforward.message_hash.size); + + storeforward.encapsulated_rxtime = sqlite3_column_int(getLinkStmt, 5); + + uint8_t *_tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 6); + storeforward.commit_hash.size = 8; + memcpy(storeforward.commit_hash.bytes, _tmp_commit_hash, storeforward.commit_hash.size); + + uint8_t *_root_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 7); + storeforward.root_hash.size = 8; + memcpy(storeforward.root_hash.bytes, _root_hash, storeforward.root_hash.size); + + sqlite3_reset(getLinkStmt); + + meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); + p->to = NODENUM_BROADCAST; + p->decoded.want_response = false; + p->priority = meshtastic_MeshPacket_Priority_BACKGROUND; + p->channel = 0; + LOG_INFO("Send link to mesh"); + service->sendToMesh(p, RX_SRC_LOCAL, true); +} + +// +StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getLink(uint8_t *_commit_hash, size_t _commit_hash_len) +{ + link_object lo; + int rc; + sqlite3_bind_int(getLinkStmt, 1, _commit_hash_len); + sqlite3_bind_blob(getLinkStmt, 2, _commit_hash, _commit_hash_len, NULL); + LOG_WARN("%d", sqlite3_step(getLinkStmt)); + + lo.to = sqlite3_column_int(getLinkStmt, 0); + lo.from = sqlite3_column_int(getLinkStmt, 1); + lo.id = sqlite3_column_int(getLinkStmt, 2); + + uint8_t *_payload = (uint8_t *)sqlite3_column_blob(getLinkStmt, 3); + lo.encrypted_len = sqlite3_column_bytes(getLinkStmt, 3); + memcpy(lo.encrypted_bytes, _payload, lo.encrypted_len); + + uint8_t *_message_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 4); + lo.message_hash_len = 32; + memcpy(lo.message_hash, _message_hash, lo.message_hash_len); + + lo.rx_time = sqlite3_column_int(getLinkStmt, 5); + + uint8_t *_tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 6); + lo.commit_hash_len = 32; + memcpy(lo.commit_hash, _tmp_commit_hash, lo.commit_hash_len); + + uint8_t *_root_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 7); + lo.root_hash_len = 32; + memcpy(lo.root_hash, _root_hash, lo.root_hash_len); + + lo.counter = sqlite3_column_int(getLinkStmt, 8); + + lo.payload = std::string((char *)sqlite3_column_text(getLinkStmt, 9)); + + lo.channel_hash = getChannelHashFromRoot(lo.root_hash, lo.root_hash_len); + + sqlite3_reset(getLinkStmt); + + return lo; +} + +bool StoreForwardPlusPlusModule::sendFromScratch(uint8_t _channel_hash) +{ + LOG_WARN("sendFromScratch"); + // "select destination, sender, packet_id, channel_hash, encrypted_bytes, message_hash, rx_time \ + // from local_messages order by rx_time desc LIMIT 1;" + sqlite3_bind_int(fromScratchStmt, 1, _channel_hash); + if (sqlite3_step(fromScratchStmt) == SQLITE_DONE) { + LOG_WARN("No messages in scratch to forward"); + return false; + } + uint8_t _root_hash[32] = {0}; + if (!getRootFromChannelHash(_channel_hash, _root_hash)) { + LOG_ERROR("Error getting root hash"); + return false; + } + + meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; + storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE; + + storeforward.encapsulated_to = sqlite3_column_int(fromScratchStmt, 0); + storeforward.encapsulated_from = sqlite3_column_int(fromScratchStmt, 1); + storeforward.encapsulated_id = sqlite3_column_int(fromScratchStmt, 2); + + uint8_t *_encrypted = (uint8_t *)sqlite3_column_blob(fromScratchStmt, 3); + storeforward.message.size = sqlite3_column_bytes(fromScratchStmt, 3); + memcpy(storeforward.message.bytes, _encrypted, storeforward.message.size); + + uint8_t *_message_hash = (uint8_t *)sqlite3_column_blob(fromScratchStmt, 4); + storeforward.message_hash.size = 32; + memcpy(storeforward.message_hash.bytes, _message_hash, storeforward.message_hash.size); + + storeforward.encapsulated_rxtime = sqlite3_column_int(fromScratchStmt, 5); + + storeforward.root_hash.size = 32; + memcpy(storeforward.root_hash.bytes, _root_hash, 32); + + sqlite3_reset(fromScratchStmt); + + meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); + p->to = NODENUM_BROADCAST; + p->decoded.want_response = false; + p->priority = meshtastic_MeshPacket_Priority_BACKGROUND; + p->channel = 0; + LOG_INFO("Send link to mesh"); + service->sendToMesh(p, RX_SRC_LOCAL, true); + return true; +} + +bool StoreForwardPlusPlusModule::addToChain(link_object &lo) +{ + LOG_WARN("Add to chain"); + link_object chain_end = getChainEndLinkObject(lo.root_hash, lo.root_hash_len); + + // we may need to calculate the full commit hash at this point + if (lo.commit_hash_len != 32) { + SHA256 commit_hash; + + commit_hash.reset(); + + if (chain_end.commit_hash_len == 32) { + printBytes("last message: 0x", chain_end.commit_hash, 32); + commit_hash.update(chain_end.commit_hash, 32); + } else { + printBytes("new chain root: 0x", lo.root_hash, 32); + commit_hash.update(lo.root_hash, 32); + } + + commit_hash.update(lo.message_hash, 32); + // message_hash.update(&mp.rx_time, sizeof(mp.rx_time)); + commit_hash.finalize(lo.commit_hash, 32); + } + lo.counter = chain_end.counter + 1; + // push a message into the local chain DB + // destination + sqlite3_bind_int(chain_insert_stmt, 1, lo.to); + // sender + sqlite3_bind_int(chain_insert_stmt, 2, lo.from); + // packet_id + sqlite3_bind_int(chain_insert_stmt, 3, lo.id); + // root_hash + sqlite3_bind_blob(chain_insert_stmt, 4, lo.root_hash, 32, NULL); + // encrypted_bytes + sqlite3_bind_blob(chain_insert_stmt, 5, lo.encrypted_bytes, lo.encrypted_len, NULL); + // message_hash + sqlite3_bind_blob(chain_insert_stmt, 6, lo.message_hash, 32, NULL); + // rx_time + sqlite3_bind_int(chain_insert_stmt, 7, lo.rx_time); + // commit_hash + sqlite3_bind_blob(chain_insert_stmt, 8, lo.commit_hash, 32, NULL); + // payload + sqlite3_bind_text(chain_insert_stmt, 9, lo.payload.c_str(), lo.payload.length(), NULL); + + sqlite3_bind_int(chain_insert_stmt, 10, lo.counter); + sqlite3_step(chain_insert_stmt); + sqlite3_reset(chain_insert_stmt); + setChainCount(lo.root_hash, 32, lo.counter); + return true; +} + +bool StoreForwardPlusPlusModule::addToScratch(link_object &lo) +{ + // push a message into the local chain DB + // destination + sqlite3_bind_int(scratch_insert_stmt, 1, lo.to); + // sender + sqlite3_bind_int(scratch_insert_stmt, 2, lo.from); + // packet_id + sqlite3_bind_int(scratch_insert_stmt, 3, lo.id); + // root_hash + sqlite3_bind_blob(scratch_insert_stmt, 4, lo.root_hash, 32, NULL); + // encrypted_bytes + sqlite3_bind_blob(scratch_insert_stmt, 5, lo.encrypted_bytes, lo.encrypted_len, NULL); + // message_hash + sqlite3_bind_blob(scratch_insert_stmt, 6, lo.message_hash, 32, NULL); + // rx_time + sqlite3_bind_int(scratch_insert_stmt, 7, lo.rx_time); + // payload + sqlite3_bind_text(scratch_insert_stmt, 8, lo.payload.c_str(), lo.payload.length(), NULL); + const char *_error_mesg = sqlite3_errmsg(ppDb); + + LOG_WARN("step %u, %s", sqlite3_step(scratch_insert_stmt), _error_mesg); + sqlite3_reset(scratch_insert_stmt); + return true; +} + +void StoreForwardPlusPlusModule::canonAnnounce(uint8_t *_message_hash, uint8_t *_commit_hash, uint8_t *_root_hash, + uint32_t _rx_time) +{ + meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; + storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE; + // set root hash + + // set message hash + storeforward.message_hash.size = 32; + memcpy(storeforward.message_hash.bytes, _message_hash, 32); + + // set chain hash + storeforward.commit_hash.size = 32; + memcpy(storeforward.commit_hash.bytes, _commit_hash, 32); + + // set root hash + storeforward.root_hash.size = 32; + memcpy(storeforward.root_hash.bytes, _root_hash, 32); + + storeforward.encapsulated_rxtime = _rx_time; + // storeforward. + meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); + p->to = NODENUM_BROADCAST; + p->decoded.want_response = false; + p->priority = meshtastic_MeshPacket_Priority_BACKGROUND; + p->channel = 0; + LOG_INFO("Send packet to mesh"); + service->sendToMesh(p, RX_SRC_LOCAL, true); +} + +bool StoreForwardPlusPlusModule::isInDB(uint8_t *message_hash_bytes, size_t message_hash_len) +{ + sqlite3_bind_int(checkDup, 1, message_hash_len); + sqlite3_bind_blob(checkDup, 2, message_hash_bytes, message_hash_len, NULL); + sqlite3_step(checkDup); + int numberFound = sqlite3_column_int(checkDup, 0); + sqlite3_reset(checkDup); + if (numberFound > 0) + return true; + return false; +} + +bool StoreForwardPlusPlusModule::isInScratch(uint8_t *message_hash_bytes, size_t message_hash_len) +{ + LOG_WARN("isInScratch"); + sqlite3_bind_int(checkScratch, 1, message_hash_len); + sqlite3_bind_blob(checkScratch, 2, message_hash_bytes, message_hash_len, NULL); + sqlite3_step(checkScratch); + int numberFound = sqlite3_column_int(checkScratch, 0); + sqlite3_reset(checkScratch); + if (numberFound > 0) + return true; + return false; +} + +void StoreForwardPlusPlusModule::removeFromScratch(uint8_t *message_hash_bytes, size_t message_hash_len) +{ + LOG_WARN("removeFromScratch"); + sqlite3_bind_int(removeScratch, 1, message_hash_len); + sqlite3_bind_blob(removeScratch, 2, message_hash_bytes, message_hash_len, NULL); + sqlite3_step(removeScratch); + int numberFound = sqlite3_column_int(removeScratch, 0); + sqlite3_reset(removeScratch); +} + +void StoreForwardPlusPlusModule::updatePayload(uint8_t *message_hash_bytes, size_t message_hash_len, std::string payload) +{ + LOG_WARN("updatePayload"); + sqlite3_bind_text(updatePayloadStmt, 1, payload.c_str(), payload.length(), NULL); + sqlite3_bind_int(updatePayloadStmt, 2, message_hash_len); + sqlite3_bind_blob(updatePayloadStmt, 3, message_hash_bytes, message_hash_len, NULL); + auto res = sqlite3_step(updatePayloadStmt); + const char *_error_mesg = sqlite3_errmsg(ppDb); + LOG_WARN("step %u, %s", res, _error_mesg); + sqlite3_reset(updatePayloadStmt); +} + +StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getFromScratch(uint8_t *message_hash_bytes, size_t hash_len) +{ + + // vscode wrote this + LOG_WARN("getFromScratch"); + link_object lo; + + sqlite3_bind_int(fromScratchByHashStmt, 1, hash_len); + sqlite3_bind_blob(fromScratchByHashStmt, 2, message_hash_bytes, hash_len, NULL); + auto res = sqlite3_step(fromScratchByHashStmt); + const char *_error_mesg = sqlite3_errmsg(ppDb); + LOG_WARN("step %u, %s", res, _error_mesg); + lo.to = sqlite3_column_int(fromScratchByHashStmt, 0); + lo.from = sqlite3_column_int(fromScratchByHashStmt, 1); + lo.id = sqlite3_column_int(fromScratchByHashStmt, 2); + + uint8_t *encrypted_bytes = (uint8_t *)sqlite3_column_blob(fromScratchByHashStmt, 3); + lo.encrypted_len = sqlite3_column_bytes(fromScratchByHashStmt, 3); + memcpy(lo.encrypted_bytes, encrypted_bytes, lo.encrypted_len); + uint8_t *message_hash = (uint8_t *)sqlite3_column_blob(fromScratchByHashStmt, 4); + memcpy(lo.message_hash, message_hash, 32); + lo.rx_time = sqlite3_column_int(fromScratchByHashStmt, 5); + lo.channel_hash - sqlite3_column_int(fromScratchByHashStmt, 6); + lo.payload = + std::string((char *)sqlite3_column_text(fromScratchByHashStmt, 7), sqlite3_column_bytes(fromScratchByHashStmt, 7)); + sqlite3_reset(fromScratchByHashStmt); + return lo; +} + +// should not need size considerations +StoreForwardPlusPlusModule::link_object +StoreForwardPlusPlusModule::ingestTextPacket(const meshtastic_MeshPacket &mp, const meshtastic_MeshPacket *encrypted_meshpacket) +{ + link_object lo; + SHA256 message_hash; + lo.to = mp.to; + lo.from = mp.from; + lo.id = mp.id; + lo.rx_time = mp.rx_time; + lo.channel_hash = encrypted_meshpacket->channel; + memcpy(lo.encrypted_bytes, encrypted_meshpacket->encrypted.bytes, encrypted_meshpacket->encrypted.size); + lo.encrypted_len = encrypted_meshpacket->encrypted.size; + lo.payload = std::string((char *)mp.decoded.payload.bytes, mp.decoded.payload.size); + + message_hash.reset(); + message_hash.update(encrypted_meshpacket->encrypted.bytes, encrypted_meshpacket->encrypted.size); + message_hash.update(&mp.to, sizeof(mp.to)); + message_hash.update(&mp.from, sizeof(mp.from)); + message_hash.update(&mp.id, sizeof(mp.id)); + message_hash.finalize(lo.message_hash, 32); + lo.message_hash_len = 32; + + getOrAddRootFromChannelHash(encrypted_meshpacket->channel, lo.root_hash); + return lo; +} + +StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::ingestLinkMessage(meshtastic_StoreForwardPlusPlus *t) +{ + link_object lo; + + lo.to = t->encapsulated_to; + lo.from = t->encapsulated_from; + lo.id = t->encapsulated_id; + lo.rx_time = t->encapsulated_rxtime; + + // What if we don't have this root hash? Should drop this packet before this point. + lo.channel_hash = getChannelHashFromRoot(t->root_hash.bytes, t->root_hash.size); + SHA256 message_hash; + + memcpy(lo.encrypted_bytes, t->message.bytes, t->message.size); + lo.encrypted_len = t->message.size; + + message_hash.reset(); + message_hash.update(lo.encrypted_bytes, lo.encrypted_len); + message_hash.update(&lo.to, sizeof(lo.to)); + message_hash.update(&lo.from, sizeof(lo.from)); + message_hash.update(&lo.id, sizeof(lo.id)); + message_hash.finalize(lo.message_hash, 32); + lo.message_hash_len = 32; + + // look up full root hash and copy over the partial if it matches + if (lookUpFullRootHash(t->root_hash.bytes, t->root_hash.size, lo.root_hash)) { + printBytes("Found full root hash: 0x", lo.root_hash, 32); + lo.root_hash_len = 32; + } else { + LOG_WARN("root hash does not match %d bytes", t->root_hash.size); + printBytes("Using partial root hash: 0x", t->root_hash.bytes, t->root_hash.size); + lo.root_hash_len = 0; + } + + if (t->commit_hash.size > 0) { + // calculate the full commit hash and replace the partial if it matches + if (checkCommitHash(lo, t->commit_hash.bytes, t->commit_hash.size)) { + printBytes("commit hash matches: 0x", t->commit_hash.bytes, t->commit_hash.size); + } else { + LOG_WARN("commit hash does not match"); + lo.commit_hash_len = 0; + lo.validObject = false; + } + } + + // we don't ever get the payload here, so it's always an empty string + lo.payload = ""; + + return lo; +} + +void StoreForwardPlusPlusModule::rebroadcastLinkObject(StoreForwardPlusPlusModule::link_object &lo) +{ + LOG_WARN("Attempting to Rebroadcast1"); + meshtastic_MeshPacket *p = router->allocForSending(); + p->to = lo.to; + p->from = lo.from; + p->id = lo.id; + p->channel = lo.channel_hash; + p->which_payload_variant = meshtastic_MeshPacket_encrypted_tag; + p->encrypted.size = lo.encrypted_len; + memcpy(p->encrypted.bytes, lo.encrypted_bytes, lo.encrypted_len); + p->transport_mechanism = meshtastic_MeshPacket_TransportMechanism_TRANSPORT_LORA; // only a tiny white lie + service->sendToMesh(p, RX_SRC_RADIO, true); // Send to mesh, cc to phone +} + +bool StoreForwardPlusPlusModule::checkCommitHash(StoreForwardPlusPlusModule::link_object &lo, uint8_t *commit_hash_bytes, + size_t hash_len) +{ + SHA256 commit_hash; + + link_object chain_end = getChainEndLinkObject(lo.root_hash, lo.root_hash_len); + + commit_hash.reset(); + + if (chain_end.commit_hash_len == 32) { + printBytes("last message: 0x", chain_end.commit_hash, 32); + commit_hash.update(chain_end.commit_hash, 32); + } else { + if (lo.root_hash_len != 32) { + LOG_WARN("Short root hash in link object, cannot create new chain"); + return false; + } + printBytes("new chain root: 0x", lo.root_hash, 32); + commit_hash.update(lo.root_hash, 32); + } + + commit_hash.update(lo.message_hash, 32); + commit_hash.finalize(lo.commit_hash, 32); + lo.commit_hash_len = 32; + + if (hash_len == 0 || memcmp(commit_hash_bytes, lo.commit_hash, hash_len) == 0) { + return true; + } + return false; +} + +bool StoreForwardPlusPlusModule::lookUpFullRootHash(uint8_t *partial_root_hash, size_t partial_root_hash_len, + uint8_t *full_root_hash) +{ + LOG_WARN("lookUpFullRootHash"); + printBytes("partial_root_hash", partial_root_hash, partial_root_hash_len); + sqlite3_bind_int(getFullRootHashStmt, 1, partial_root_hash_len); + sqlite3_bind_blob(getFullRootHashStmt, 2, partial_root_hash, partial_root_hash_len, NULL); + sqlite3_step(getFullRootHashStmt); + uint8_t *tmp_root_hash = (uint8_t *)sqlite3_column_blob(getFullRootHashStmt, 0); + if (tmp_root_hash) { + LOG_WARN("Found full root hash!"); + memcpy(full_root_hash, tmp_root_hash, 32); + sqlite3_reset(getFullRootHashStmt); + return true; + } + sqlite3_reset(getFullRootHashStmt); + return false; +} + +void StoreForwardPlusPlusModule::setChainCount(uint8_t *root_hash, size_t root_hash_len, uint32_t count) +{ + sqlite3_bind_blob(setChainCountStmt, 1, root_hash, root_hash_len, NULL); + sqlite3_bind_int(setChainCountStmt, 2, count); + sqlite3_step(setChainCountStmt); + sqlite3_reset(setChainCountStmt); +} + +uint32_t StoreForwardPlusPlusModule::getChainCount(uint8_t *root_hash, size_t root_hash_len) +{ + sqlite3_bind_blob(getChainCountStmt, 1, root_hash, root_hash_len, NULL); + sqlite3_step(getChainCountStmt); + uint32_t count = sqlite3_column_int(getChainCountStmt, 0); + sqlite3_reset(getChainCountStmt); + return count; +} \ No newline at end of file diff --git a/src/modules/Native/StoreForwardPlusPlus.h b/src/modules/Native/StoreForwardPlusPlus.h new file mode 100644 index 000000000..bf2127874 --- /dev/null +++ b/src/modules/Native/StoreForwardPlusPlus.h @@ -0,0 +1,212 @@ +#pragma once +#include "Channels.h" +#include "ProtobufModule.h" +#include "Router.h" +#include "SinglePortModule.h" +#include "sqlite3.h" + +/** + * Store and forward ++ module + * There's an obvious need for a store-and-forward mechanism in Meshtastic. + * This module takes heavy inspiration from Git, building a chain of messages that can be synced between nodes. + * Each message is hashed, and the chain is built by hashing the previous commit hash and the current message hash. + * Nodes can request missing messages by requesting the next message after a given commit hash. + * + * The current focus is text messages, limited to the primary channel. + * + * Each chain is identified by a root hash, which is derived from the channelHash, the local nodenum, and the timestamp when + * created. + * + * Each message is also given a message hash, derived from the encrypted payload, the to, from, id. + * Notably not the timestamp, as we want these to match across nodes, even if the timestamps differ. + * + * The authoritative node for the chain will generate a commit hash for each message when adding it to the chain. + * The first message's commit hash is derived from the root hash and the message hash. + * Subsequent messages' commit hashes are derived from the previous commit hash and the current message hash. + * This allows a node to see only the last commit hash, and confirm it hasn't missed any messages. + * + * Nodes can request the next message in the chain by sending a LINK_REQUEST message with the root hash and the last known commit + * hash. Any node that has the next message can respond with a LINK_PROVIDE message containing the next message. + * + * When a satellite node sees a new text message, it stores it in a scratch database. + * These messages are periodically offered to the authoritative node for inclusion in the chain. + * + * The LINK_PROVIDE message does double-duty, sending both on-chain and off-chain messages. + * The differentiator is whether the commit hash is set or left empty. + * + * When a satellite node receives a canonical link message, it checks if it has the message in scratch. + * And evicts it when adding it to the canonical chain. + * + * This approach allows a node to know whether it has seen a given message before, or if it is new coming via SFPP. + * If new, and the timestamp is within the rebroadcast timeout, it will process that message as if it were just received from the + * mesh, allowing it to be decrypted, shown to the user, and rebroadcast. + */ +class StoreForwardPlusPlusModule : public ProtobufModule, private concurrency::OSThread +{ + struct link_object { + uint32_t to; + uint32_t from; + uint32_t id; + uint32_t rx_time = 0; + ChannelHash channel_hash; + uint8_t encrypted_bytes[256] = {0}; + size_t encrypted_len; + uint8_t message_hash[32] = {0}; + size_t message_hash_len = 0; + uint8_t root_hash[32] = {0}; + size_t root_hash_len = 0; + uint8_t commit_hash[32] = {0}; + size_t commit_hash_len = 0; + uint32_t counter = 0; + std::string payload; + bool validObject = true; // set this false when a chain calulation fails, etc. + }; + + public: + /** Constructor + * + */ + StoreForwardPlusPlusModule(); + + /* + -Override the wantPacket method. + */ + virtual bool wantPacket(const meshtastic_MeshPacket *p) override + { + switch (p->decoded.portnum) { + case meshtastic_PortNum_TEXT_MESSAGE_APP: + case meshtastic_PortNum_STORE_FORWARD_PLUSPLUS_APP: + return true; + default: + return false; + } + } + + protected: + /** Called to handle a particular incoming message + @return ProcessMessage::STOP if you've guaranteed you've handled this message and no other handlers should be considered for + it + */ + virtual ProcessMessage handleReceived(const meshtastic_MeshPacket &mp) override; + virtual bool handleReceivedProtobuf(const meshtastic_MeshPacket &mp, meshtastic_StoreForwardPlusPlus *t) override; + + virtual int32_t runOnce() override; + + private: + sqlite3 *ppDb; + sqlite3_stmt *chain_insert_stmt; + sqlite3_stmt *scratch_insert_stmt; + sqlite3_stmt *checkDup; + sqlite3_stmt *checkScratch; + sqlite3_stmt *removeScratch; + sqlite3_stmt *updatePayloadStmt; + sqlite3_stmt *getPayloadFromScratchStmt; + sqlite3_stmt *fromScratchStmt; + sqlite3_stmt *fromScratchByHashStmt; + sqlite3_stmt *getNextHashStmt; + sqlite3_stmt *getChainEndStmt; + sqlite3_stmt *getLinkStmt; + sqlite3_stmt *getHashFromRootStmt; + sqlite3_stmt *addRootToMappingsStmt; + sqlite3_stmt *getRootFromChannelHashStmt; + sqlite3_stmt *getFullRootHashStmt; + sqlite3_stmt *setChainCountStmt; + sqlite3_stmt *getChainCountStmt; + + // For a given Meshtastic ChannelHash, fills the root_hash buffer with a 32-byte root hash + // returns true if the root hash was found + bool getRootFromChannelHash(ChannelHash, uint8_t *); + + // For a given root hash, returns the ChannelHash + // can handle partial root hashes + ChannelHash getChannelHashFromRoot(uint8_t *_root_hash, size_t); + + // given a root hash and commit hash, returns the next commit hash in the chain + // can handle partial root and commit hashes, always fills the buffer with 32 bytes + // returns true if a next hash was found + bool getNextHash(uint8_t *, size_t, uint8_t *, size_t, uint8_t *); + + // For a given Meshtastic ChannelHash, fills the root_hash buffer with a 32-byte root hash + // but this function will add the root hash if it is not already present + // returns true if the hash is new + bool getOrAddRootFromChannelHash(ChannelHash, uint8_t *); + + // adds the ChannelHash and root_hash to the mappings table + void addRootToMappings(ChannelHash, uint8_t *); + + // gets the tip of the chain for the given root hash + link_object getChainEndLinkObject(uint8_t *, size_t); + + // requests the next message in the chain from the mesh network + // Sends a LINK_REQUEST message + void requestNextMessage(uint8_t *, size_t, uint8_t *, size_t); + + // sends a LINK_PROVIDE message broadcasting the given link object + void broadcastLink(uint8_t *, size_t); + + // sends a LINK_PROVIDE message broadcasting the given link object from scratch message store + bool sendFromScratch(uint8_t); + + // Adds the given link object to the canonical chain database + bool addToChain(link_object &); + + // Adds an incoming text message to the scratch database + bool addToScratch(link_object &); + + // sends a CANON_ANNOUNCE message, specifying the given root and commit hashes + void canonAnnounce(uint8_t *, uint8_t *, uint8_t *, uint32_t); + + // checks if the message hash is present in the canonical chain database + bool isInDB(uint8_t *, size_t); + + // checks if the message hash is present in the scratch database + bool isInScratch(uint8_t *, size_t); + + // retrieves a link object from the scratch database + link_object getFromScratch(uint8_t *, size_t); + + // removes a link object from the scratch database + void removeFromScratch(uint8_t *, size_t); + + // fills the payload section with the decrypted data for the given message hash + // probably not needed for production, but useful for testing + void updatePayload(uint8_t *, size_t, std::string); + + // Takes the decrypted MeshPacket and the encrypted packet copy, and builds a link_object + // Generates a message hash, but does not set the commit hash + link_object ingestTextPacket(const meshtastic_MeshPacket &, const meshtastic_MeshPacket *); + + // ingests a LINK_PROVIDE message and builds a link_object + // confirms the root hash and commit hash + link_object ingestLinkMessage(meshtastic_StoreForwardPlusPlus *); + + // retrieves a link object from the canonical chain database given a message hash + link_object getLink(uint8_t *, size_t); + + // puts the encrypted payload back into the queue as if it were just received + void rebroadcastLinkObject(link_object &); + + // Check if an incoming link object's commit hash matches the calculated commit hash + bool checkCommitHash(link_object &lo, uint8_t *commit_hash_bytes, size_t hash_len); + + // given a partial root hash, looks up the full 32-byte root hash + // returns true if found + bool lookUpFullRootHash(uint8_t *partial_root_hash, size_t partial_root_hash_len, uint8_t *full_root_hash); + + // update the mappings table to set the chain count for the given root hash + void setChainCount(uint8_t *, size_t, uint32_t); + + // query the mappings table for the chain count for the given root hash + uint32_t getChainCount(uint8_t *, size_t); + + // Track if we have a scheduled runOnce pending + // useful to not accudentally delay a scheduled runOnce + bool pendingRun = false; + + // Once we have multiple chain types, we can extend this + enum chain_types { + channel_chain = 0, + }; + + uint32_t rebroadcastTimeout = 3600; // Messages older than this (in seconds) will not be rebroadcast +}; diff --git a/src/platform/portduino/PortduinoGlue.cpp b/src/platform/portduino/PortduinoGlue.cpp index 1b601f9b4..f2ba7baf6 100644 --- a/src/platform/portduino/PortduinoGlue.cpp +++ b/src/platform/portduino/PortduinoGlue.cpp @@ -786,6 +786,10 @@ bool loadConfig(const char *configPath) } } + if (yamlConfig["StoreAndForward"]) { + portduino_config.sfpp_stratum0 = (yamlConfig["StoreAndForward"]["Stratum0"]).as(false); + } + if (yamlConfig["General"]) { portduino_config.MaxNodes = (yamlConfig["General"]["MaxNodes"]).as(200); portduino_config.maxtophone = (yamlConfig["General"]["MaxMessageQueue"]).as(100); diff --git a/src/platform/portduino/PortduinoGlue.h b/src/platform/portduino/PortduinoGlue.h index 9335be90a..f26ee4c02 100644 --- a/src/platform/portduino/PortduinoGlue.h +++ b/src/platform/portduino/PortduinoGlue.h @@ -169,6 +169,9 @@ extern struct portduino_config_struct { int configDisplayMode = 0; bool has_configDisplayMode = false; + // Store and Forward++ + bool sfpp_stratum0 = false; + // General std::string mac_address = ""; bool mac_address_explicit = false; @@ -488,6 +491,13 @@ extern struct portduino_config_struct { out << YAML::EndMap; // Config } + // StoreAndForward + if (sfpp_stratum0) { + out << YAML::Key << "StoreAndForward" << YAML::Value << YAML::BeginMap; + out << YAML::Key << "Stratum0" << YAML::Value << true; + out << YAML::EndMap; // StoreAndForward + } + // General out << YAML::Key << "General" << YAML::Value << YAML::BeginMap; if (config_directory != "") diff --git a/variants/native/portduino/platformio.ini b/variants/native/portduino/platformio.ini index 045e3edea..4aef42544 100644 --- a/variants/native/portduino/platformio.ini +++ b/variants/native/portduino/platformio.ini @@ -46,6 +46,7 @@ build_flags = ${native_base.build_flags} -Os -lX11 -linput -lxkbcommon -ffunctio !pkg-config --libs openssl --silence-errors || : !pkg-config --cflags --libs sdl2 --silence-errors || : !pkg-config --cflags --libs libbsd-overlay --silence-errors || : + !pkg-config --cflags --libs sqlite3 --silence-errors || : build_src_filter = ${native_base.build_src_filter}