diff --git a/src/mesh/generated/meshtastic/mesh.pb.cpp b/src/mesh/generated/meshtastic/mesh.pb.cpp index 9966e52f8..d8eee1203 100644 --- a/src/mesh/generated/meshtastic/mesh.pb.cpp +++ b/src/mesh/generated/meshtastic/mesh.pb.cpp @@ -24,6 +24,9 @@ PB_BIND(meshtastic_Data, meshtastic_Data, 2) PB_BIND(meshtastic_KeyVerification, meshtastic_KeyVerification, AUTO) +PB_BIND(meshtastic_StoreForwardPlusPlus, meshtastic_StoreForwardPlusPlus, 2) + + PB_BIND(meshtastic_Waypoint, meshtastic_Waypoint, AUTO) @@ -121,6 +124,8 @@ PB_BIND(meshtastic_ChunkedPayloadResponse, meshtastic_ChunkedPayloadResponse, AU + + diff --git a/src/mesh/generated/meshtastic/mesh.pb.h b/src/mesh/generated/meshtastic/mesh.pb.h index 0c48a7891..e3e46487a 100644 --- a/src/mesh/generated/meshtastic/mesh.pb.h +++ b/src/mesh/generated/meshtastic/mesh.pb.h @@ -478,6 +478,17 @@ typedef enum _meshtastic_Routing_Error { meshtastic_Routing_Error_RATE_LIMIT_EXCEEDED = 38 } meshtastic_Routing_Error; +/* enum message type? */ +typedef enum _meshtastic_StoreForwardPlusPlus_SFPP_message_type { + /* message hash without chain hash means that no, it is not on the chain */ + meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE = 0, + meshtastic_StoreForwardPlusPlus_SFPP_message_type_CHAIN_QUERY = 1, + meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_REQUEST = 3, + meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE = 4, + meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_FIRSTHALF = 5, + meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_SECONDHALF = 6 +} meshtastic_StoreForwardPlusPlus_SFPP_message_type; + /* The priority of this message for sending. Higher priorities are sent first (when managing the transmit queue). This field is never sent over the air, it is only used internally inside of a local device node. @@ -782,6 +793,24 @@ typedef struct _meshtastic_KeyVerification { meshtastic_KeyVerification_hash2_t hash2; } meshtastic_KeyVerification; +typedef PB_BYTES_ARRAY_T(32) meshtastic_StoreForwardPlusPlus_message_hash_t; +typedef PB_BYTES_ARRAY_T(32) meshtastic_StoreForwardPlusPlus_commit_hash_t; +typedef PB_BYTES_ARRAY_T(32) meshtastic_StoreForwardPlusPlus_root_hash_t; +typedef PB_BYTES_ARRAY_T(240) meshtastic_StoreForwardPlusPlus_message_t; +/* The actual over-the-mesh message doing store and forward++ */ +typedef struct _meshtastic_StoreForwardPlusPlus { /* */ + meshtastic_StoreForwardPlusPlus_SFPP_message_type sfpp_message_type; + meshtastic_StoreForwardPlusPlus_message_hash_t message_hash; + meshtastic_StoreForwardPlusPlus_commit_hash_t commit_hash; + meshtastic_StoreForwardPlusPlus_root_hash_t root_hash; + /* encapsulated message to share (may be split in half) */ + meshtastic_StoreForwardPlusPlus_message_t message; + uint32_t encapsulated_id; + uint32_t encapsulated_to; + uint32_t encapsulated_from; + uint32_t encapsulated_rxtime; +} meshtastic_StoreForwardPlusPlus; + /* Waypoint message, used to share arbitrary locations across the mesh */ typedef struct _meshtastic_Waypoint { /* Id of the waypoint */ @@ -1310,6 +1339,10 @@ extern "C" { #define _meshtastic_Routing_Error_MAX meshtastic_Routing_Error_RATE_LIMIT_EXCEEDED #define _meshtastic_Routing_Error_ARRAYSIZE ((meshtastic_Routing_Error)(meshtastic_Routing_Error_RATE_LIMIT_EXCEEDED+1)) +#define _meshtastic_StoreForwardPlusPlus_SFPP_message_type_MIN meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE +#define _meshtastic_StoreForwardPlusPlus_SFPP_message_type_MAX meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_SECONDHALF +#define _meshtastic_StoreForwardPlusPlus_SFPP_message_type_ARRAYSIZE ((meshtastic_StoreForwardPlusPlus_SFPP_message_type)(meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_SECONDHALF+1)) + #define _meshtastic_MeshPacket_Priority_MIN meshtastic_MeshPacket_Priority_UNSET #define _meshtastic_MeshPacket_Priority_MAX meshtastic_MeshPacket_Priority_MAX #define _meshtastic_MeshPacket_Priority_ARRAYSIZE ((meshtastic_MeshPacket_Priority)(meshtastic_MeshPacket_Priority_MAX+1)) @@ -1338,6 +1371,8 @@ extern "C" { #define meshtastic_Data_portnum_ENUMTYPE meshtastic_PortNum +#define meshtastic_StoreForwardPlusPlus_sfpp_message_type_ENUMTYPE meshtastic_StoreForwardPlusPlus_SFPP_message_type + #define meshtastic_MeshPacket_priority_ENUMTYPE meshtastic_MeshPacket_Priority @@ -1380,6 +1415,7 @@ extern "C" { #define meshtastic_Routing_init_default {0, {meshtastic_RouteDiscovery_init_default}} #define meshtastic_Data_init_default {_meshtastic_PortNum_MIN, {0, {0}}, 0, 0, 0, 0, 0, 0, false, 0} #define meshtastic_KeyVerification_init_default {0, {0, {0}}, {0, {0}}} +#define meshtastic_StoreForwardPlusPlus_init_default {_meshtastic_StoreForwardPlusPlus_SFPP_message_type_MIN, {0, {0}}, {0, {0}}, {0, {0}}, {0, {0}}, 0, 0, 0, 0} #define meshtastic_Waypoint_init_default {0, false, 0, false, 0, 0, 0, "", "", 0} #define meshtastic_MqttClientProxyMessage_init_default {"", 0, {{0, {0}}}, 0} #define meshtastic_MeshPacket_init_default {0, 0, 0, 0, {meshtastic_Data_init_default}, 0, 0, 0, 0, 0, _meshtastic_MeshPacket_Priority_MIN, 0, _meshtastic_MeshPacket_Delayed_MIN, 0, 0, {0, {0}}, 0, 0, 0, 0, _meshtastic_MeshPacket_TransportMechanism_MIN} @@ -1411,6 +1447,7 @@ extern "C" { #define meshtastic_Routing_init_zero {0, {meshtastic_RouteDiscovery_init_zero}} #define meshtastic_Data_init_zero {_meshtastic_PortNum_MIN, {0, {0}}, 0, 0, 0, 0, 0, 0, false, 0} #define meshtastic_KeyVerification_init_zero {0, {0, {0}}, {0, {0}}} +#define meshtastic_StoreForwardPlusPlus_init_zero {_meshtastic_StoreForwardPlusPlus_SFPP_message_type_MIN, {0, {0}}, {0, {0}}, {0, {0}}, {0, {0}}, 0, 0, 0, 0} #define meshtastic_Waypoint_init_zero {0, false, 0, false, 0, 0, 0, "", "", 0} #define meshtastic_MqttClientProxyMessage_init_zero {"", 0, {{0, {0}}}, 0} #define meshtastic_MeshPacket_init_zero {0, 0, 0, 0, {meshtastic_Data_init_zero}, 0, 0, 0, 0, 0, _meshtastic_MeshPacket_Priority_MIN, 0, _meshtastic_MeshPacket_Delayed_MIN, 0, 0, {0, {0}}, 0, 0, 0, 0, _meshtastic_MeshPacket_TransportMechanism_MIN} @@ -1489,6 +1526,15 @@ extern "C" { #define meshtastic_KeyVerification_nonce_tag 1 #define meshtastic_KeyVerification_hash1_tag 2 #define meshtastic_KeyVerification_hash2_tag 3 +#define meshtastic_StoreForwardPlusPlus_sfpp_message_type_tag 1 +#define meshtastic_StoreForwardPlusPlus_message_hash_tag 2 +#define meshtastic_StoreForwardPlusPlus_commit_hash_tag 3 +#define meshtastic_StoreForwardPlusPlus_root_hash_tag 4 +#define meshtastic_StoreForwardPlusPlus_message_tag 5 +#define meshtastic_StoreForwardPlusPlus_encapsulated_id_tag 6 +#define meshtastic_StoreForwardPlusPlus_encapsulated_to_tag 7 +#define meshtastic_StoreForwardPlusPlus_encapsulated_from_tag 8 +#define meshtastic_StoreForwardPlusPlus_encapsulated_rxtime_tag 9 #define meshtastic_Waypoint_id_tag 1 #define meshtastic_Waypoint_latitude_i_tag 2 #define meshtastic_Waypoint_longitude_i_tag 3 @@ -1705,6 +1751,19 @@ X(a, STATIC, SINGULAR, BYTES, hash2, 3) #define meshtastic_KeyVerification_CALLBACK NULL #define meshtastic_KeyVerification_DEFAULT NULL +#define meshtastic_StoreForwardPlusPlus_FIELDLIST(X, a) \ +X(a, STATIC, SINGULAR, UENUM, sfpp_message_type, 1) \ +X(a, STATIC, SINGULAR, BYTES, message_hash, 2) \ +X(a, STATIC, SINGULAR, BYTES, commit_hash, 3) \ +X(a, STATIC, SINGULAR, BYTES, root_hash, 4) \ +X(a, STATIC, SINGULAR, BYTES, message, 5) \ +X(a, STATIC, SINGULAR, UINT32, encapsulated_id, 6) \ +X(a, STATIC, SINGULAR, UINT32, encapsulated_to, 7) \ +X(a, STATIC, SINGULAR, UINT32, encapsulated_from, 8) \ +X(a, STATIC, SINGULAR, UINT32, encapsulated_rxtime, 9) +#define meshtastic_StoreForwardPlusPlus_CALLBACK NULL +#define meshtastic_StoreForwardPlusPlus_DEFAULT NULL + #define meshtastic_Waypoint_FIELDLIST(X, a) \ X(a, STATIC, SINGULAR, UINT32, id, 1) \ X(a, STATIC, OPTIONAL, SFIXED32, latitude_i, 2) \ @@ -1980,6 +2039,7 @@ extern const pb_msgdesc_t meshtastic_RouteDiscovery_msg; extern const pb_msgdesc_t meshtastic_Routing_msg; extern const pb_msgdesc_t meshtastic_Data_msg; extern const pb_msgdesc_t meshtastic_KeyVerification_msg; +extern const pb_msgdesc_t meshtastic_StoreForwardPlusPlus_msg; extern const pb_msgdesc_t meshtastic_Waypoint_msg; extern const pb_msgdesc_t meshtastic_MqttClientProxyMessage_msg; extern const pb_msgdesc_t meshtastic_MeshPacket_msg; @@ -2013,6 +2073,7 @@ extern const pb_msgdesc_t meshtastic_ChunkedPayloadResponse_msg; #define meshtastic_Routing_fields &meshtastic_Routing_msg #define meshtastic_Data_fields &meshtastic_Data_msg #define meshtastic_KeyVerification_fields &meshtastic_KeyVerification_msg +#define meshtastic_StoreForwardPlusPlus_fields &meshtastic_StoreForwardPlusPlus_msg #define meshtastic_Waypoint_fields &meshtastic_Waypoint_msg #define meshtastic_MqttClientProxyMessage_fields &meshtastic_MqttClientProxyMessage_msg #define meshtastic_MeshPacket_fields &meshtastic_MeshPacket_msg @@ -2069,6 +2130,7 @@ extern const pb_msgdesc_t meshtastic_ChunkedPayloadResponse_msg; #define meshtastic_QueueStatus_size 23 #define meshtastic_RouteDiscovery_size 256 #define meshtastic_Routing_size 259 +#define meshtastic_StoreForwardPlusPlus_size 371 #define meshtastic_ToRadio_size 504 #define meshtastic_User_size 115 #define meshtastic_Waypoint_size 165 diff --git a/src/mesh/generated/meshtastic/portnums.pb.h b/src/mesh/generated/meshtastic/portnums.pb.h index 67adc60cc..0156c697b 100644 --- a/src/mesh/generated/meshtastic/portnums.pb.h +++ b/src/mesh/generated/meshtastic/portnums.pb.h @@ -86,6 +86,9 @@ typedef enum _meshtastic_PortNum { /* Paxcounter lib included in the firmware ENCODING: protobuf */ meshtastic_PortNum_PAXCOUNTER_APP = 34, + /* Store and Forward++ module included in the firmware + ENCODING: protobuf */ + meshtastic_PortNum_STORE_FORWARD_PLUSPLUS_APP = 35, /* Provides a hardware serial interface to send and receive from the Meshtastic network. Connect to the RX/TX pins of a device with 38400 8N1. Packets received from the Meshtastic network is forwarded to the RX pin while sending a packet to TX will go out to the Mesh network. diff --git a/src/modules/Modules.cpp b/src/modules/Modules.cpp index 63392f7e4..0b84e5ebd 100644 --- a/src/modules/Modules.cpp +++ b/src/modules/Modules.cpp @@ -61,6 +61,7 @@ #if ARCH_PORTDUINO #include "input/LinuxInputImpl.h" #include "input/SeesawRotary.h" +#include "modules/Native/StoreForwardPlusPlus.h" #include "modules/Telemetry/HostMetrics.h" #if !MESHTASTIC_EXCLUDE_STOREFORWARD #include "modules/StoreForwardModule.h" @@ -243,6 +244,7 @@ void setupModules() #endif #if ARCH_PORTDUINO new HostMetricsModule(); + new StoreForwardPlusPlusModule(); #endif #if HAS_TELEMETRY new DeviceTelemetryModule(); diff --git a/src/modules/Native/StoreForwardPlusPlus.cpp b/src/modules/Native/StoreForwardPlusPlus.cpp new file mode 100644 index 000000000..03a0bf5e7 --- /dev/null +++ b/src/modules/Native/StoreForwardPlusPlus.cpp @@ -0,0 +1,1042 @@ +// I've done a lot of this in SQLite for now, but honestly it needs to happen in memory, and get saved to sqlite during downtime + +// TODO: Message asking for the message x spots from the tip of the chain. +// This would allow individual nodes to apt in, grab the latest fe messages, and not need to sync the entire chain + +// TODO: custom hops. 1 maybe 0. Configurable? + +// TODO: non-stratum0 nodes need to be pointed at their upstream source? Maybe + +// TODO: There will come a point where the chain is too big to sync + +// TODO: evict messages from scratch after a timeout + +// things may get weird if there are multiple stratum-0 nodes on a single mesh. Come up with mitigations + +#include "StoreForwardPlusPlus.h" +#include "MeshService.h" +#include "RTC.h" +#include "SHA256.h" +#include "meshUtils.h" +#include "modules/RoutingModule.h" + +StoreForwardPlusPlusModule::StoreForwardPlusPlusModule() + : ProtobufModule("StoreForwardpp", meshtastic_PortNum_STORE_FORWARD_PLUSPLUS_APP, &meshtastic_StoreForwardPlusPlus_msg), + concurrency::OSThread("StoreForwardpp") +{ + LOG_WARN("StoreForwardPlusPlusModule init"); + + if (portduino_config.sfpp_stratum0) + LOG_WARN("SF++ stratum0"); + + int res = sqlite3_open("test.db", &ppDb); + LOG_WARN("Result1 %u", res); + + char *err = nullptr; + + res = sqlite3_exec(ppDb, " \ + CREATE TABLE channel_messages( \ + destination INT NOT NULL, \ + sender INT NOT NULL, \ + packet_id INT NOT NULL, \ + rx_time INT NOT NULL, \ + root_hash BLOB NOT NULL, \ + encrypted_bytes BLOB NOT NULL, \ + message_hash BLOB NOT NULL, \ + commit_hash BLOB NOT NULL, \ + payload TEXT, \ + counter INT DEFAULT 0, \ + PRIMARY KEY (message_hash) \ + );", + NULL, NULL, &err); + LOG_WARN("Result2 %u", res); + if (err != nullptr) + LOG_ERROR("%s", err); + sqlite3_free(err); + + res = sqlite3_exec(ppDb, " \ + CREATE TABLE local_messages( \ + destination INT NOT NULL, \ + sender INT NOT NULL, \ + packet_id INT NOT NULL, \ + rx_time INT NOT NULL, \ + channel_hash INT NOT NULL, \ + encrypted_bytes BLOB NOT NULL, \ + message_hash BLOB NOT NULL, \ + payload TEXT, \ + PRIMARY KEY (message_hash) \ + );", + NULL, NULL, &err); + LOG_WARN("Result2 %u", res); + if (err != nullptr) + LOG_ERROR("%s", err); + sqlite3_free(err); + + // create table DMs + res = sqlite3_exec(ppDb, " \ + CREATE TABLE direct_messages( \ + destination INT NOT NULL, \ + sender INT NOT NULL, \ + packet_id INT NOT NULL, \ + rx_time INT NOT NULL, \ + channel_hash INT NOT NULL, \ + commit_hash BLOB NOT NULL, \ + encrypted_bytes BLOB NOT NULL, \ + message_hash BLOB NOT NULL, \ + payload TEXT, \ + PRIMARY KEY (message_hash) \ + );", + NULL, NULL, &err); + LOG_WARN("Result2 %u", res); + if (err != nullptr) + LOG_ERROR("%s", err); + sqlite3_free(err); + + // mappings table -- connects the root hashes to channel hashes and DM identifiers + res = sqlite3_exec(ppDb, " \ + CREATE TABLE mappings( \ + chain_type INT NOT NULL, \ + identifier INT NOT NULL, \ + root_hash BLOB NOT NULL, \ + count INT DEFAULT 0, \ + PRIMARY KEY (identifier) \ + );", + NULL, NULL, &err); + LOG_WARN("Result2 %u", res); + if (err != nullptr) + LOG_ERROR("%s", err); + sqlite3_free(err); + + // store schema version somewhere + + // prepared statements *should* make this faster. + sqlite3_prepare_v2(ppDb, "INSERT INTO channel_messages (destination, sender, packet_id, root_hash, \ + encrypted_bytes, message_hash, rx_time, commit_hash, payload, counter) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", + -1, &chain_insert_stmt, NULL); + + sqlite3_prepare_v2(ppDb, "INSERT INTO local_messages (destination, sender, packet_id, channel_hash, \ + encrypted_bytes, message_hash, rx_time, payload) VALUES(?, ?, ?, ?, ?, ?, ?, ?);", + -1, &scratch_insert_stmt, NULL); + + sqlite3_prepare_v2(ppDb, "select destination, sender, packet_id, encrypted_bytes, message_hash, rx_time, channel_hash \ + from local_messages where channel_hash=? order by rx_time asc LIMIT 1;", // earliest first + -1, &fromScratchStmt, NULL); + + sqlite3_prepare_v2(ppDb, + "select destination, sender, packet_id, encrypted_bytes, message_hash, rx_time, channel_hash, payload \ + from local_messages where substr(message_hash,1,?)=? order by rx_time asc LIMIT 1;", // earliest first + -1, &fromScratchByHashStmt, NULL); + + sqlite3_prepare_v2(ppDb, "SELECT COUNT(*) from channel_messages where substr(message_hash,1,?)=?", -1, &checkDup, NULL); + + sqlite3_prepare_v2(ppDb, "SELECT COUNT(*) from local_messages where substr(message_hash,1,?)=?", -1, &checkScratch, NULL); + + sqlite3_prepare_v2(ppDb, "DELETE from local_messages where substr(message_hash,1,?)=?", -1, &removeScratch, NULL); + + sqlite3_prepare_v2(ppDb, "UPDATE channel_messages SET payload=? WHERE substr(message_hash,1,?)=?", -1, &updatePayloadStmt, + NULL); + + sqlite3_prepare_v2(ppDb, "select commit_hash from channel_messages where substr(root_hash,1,?)=? order by rowid ASC;", -1, + &getNextHashStmt, NULL); + + sqlite3_prepare_v2(ppDb, + "select commit_hash, message_hash, rx_time from channel_messages where substr(root_hash,1,?)=? order by " + "rowid desc LIMIT 1;", + -1, &getChainEndStmt, NULL); + + sqlite3_prepare_v2( + ppDb, + "select destination, sender, packet_id, encrypted_bytes, message_hash, rx_time, commit_hash, root_hash, counter, payload \ + from channel_messages where substr(commit_hash,1,?)=?;", + -1, &getLinkStmt, NULL); + + sqlite3_prepare_v2(ppDb, "select identifier from mappings where substr(root_hash,1,?)=?;", -1, &getHashFromRootStmt, NULL); + + sqlite3_prepare_v2(ppDb, "INSERT INTO mappings (chain_type, identifier, root_hash) VALUES(?, ?, ?);", -1, + &addRootToMappingsStmt, NULL); + + sqlite3_prepare_v2(ppDb, "select root_hash from mappings where identifier=?;", -1, &getRootFromChannelHashStmt, NULL); + + sqlite3_prepare_v2(ppDb, "select root_hash from mappings where substr(root_hash,1,?)=?;", -1, &getFullRootHashStmt, NULL); + + sqlite3_prepare_v2(ppDb, "UPDATE mappings SET count=? WHERE substr(root_hash,1,?)=?;", -1, &setChainCountStmt, NULL); + + sqlite3_prepare_v2(ppDb, "SELECT count FROM mappings WHERE substr(root_hash,1,?)=?;", -1, &getChainCountStmt, NULL); + + encryptedOk = true; + + // wait about 15 seconds after boot for the first runOnce() + // TODO: When not doing active development, adjust this to a longer time + this->setInterval(15 * 1000); +} + +int32_t StoreForwardPlusPlusModule::runOnce() +{ + LOG_WARN("StoreForward++ runONce"); + pendingRun = false; + if (getRTCQuality() < RTCQualityNTP) { + LOG_WARN("StoreForward++ deferred due to time quality %u", getRTCQuality()); + return 5 * 60 * 1000; + } + uint8_t root_hash_bytes[32] = {0}; + ChannelHash hash = channels.getHash(0); + getOrAddRootFromChannelHash(hash, root_hash_bytes); + + // get tip of chain for this channel + link_object chain_end = getChainEndLinkObject(root_hash_bytes, 32); + + if (chain_end.rx_time == 0) { + LOG_WARN("Store and Forward++ database lookup returned null"); + return 60 * 60 * 1000; + } + + // broadcast the tip of the chain + canonAnnounce(chain_end.message_hash, chain_end.commit_hash, root_hash_bytes, chain_end.rx_time); + + // eventually timeout things on the scratch queue + return 60 * 60 * 1000; +} + +ProcessMessage StoreForwardPlusPlusModule::handleReceived(const meshtastic_MeshPacket &mp) +{ + // To avoid terrible time problems, require NTP or GPS time + if (getRTCQuality() < RTCQualityNTP) { + return ProcessMessage::CONTINUE; + } + + // For the moment, this is strictly LoRa + if (mp.transport_mechanism != meshtastic_MeshPacket_TransportMechanism_TRANSPORT_LORA) { + return ProcessMessage::CONTINUE; // Let others look at this message also if they want + } + + // will eventually host DMs and other undecodable messages + if (mp.which_payload_variant != meshtastic_MeshPacket_decoded_tag) { + return ProcessMessage::CONTINUE; // Let others look at this message also if they want + } + LOG_WARN("in handleReceived"); + if (mp.decoded.portnum == meshtastic_PortNum_TEXT_MESSAGE_APP && mp.to == NODENUM_BROADCAST) { + link_object lo = ingestTextPacket(mp, router->p_encrypted); + + if (isInDB(lo.message_hash, lo.message_hash_len)) { + LOG_WARN("found message in db"); + // We may have this message already, but we may not have the payload + // if we do, we can update the payload in the database + if (lo.payload != "") + updatePayload(lo.message_hash, lo.message_hash_len, lo.payload); + return ProcessMessage::CONTINUE; + } + + if (!portduino_config.sfpp_stratum0) { + if (!isInDB(lo.message_hash, lo.message_hash_len)) { + addToScratch(lo); + LOG_WARN("added message to scratch"); + // send link to upstream? + } + return ProcessMessage::CONTINUE; + } + addToChain(lo); + + if (!pendingRun) { + setIntervalFromNow(60 * 1000); // run again in 60 seconds to announce the new tip of chain + pendingRun = true; + } + // canonAnnounce(lo.message_hash, lo.commit_hash, lo.root_hash, lo.rx_time); + return ProcessMessage::CONTINUE; // Let others look at this message also if they want + } else if (mp.decoded.portnum == meshtastic_PortNum_STORE_FORWARD_PLUSPLUS_APP) { + LOG_WARN("Got a STORE_FORWARD++ packet"); + meshtastic_StoreForwardPlusPlus scratch; + pb_decode_from_bytes(mp.decoded.payload.bytes, mp.decoded.payload.size, meshtastic_StoreForwardPlusPlus_fields, &scratch); + handleReceivedProtobuf(mp, &scratch); + return ProcessMessage::CONTINUE; + } + return ProcessMessage::CONTINUE; +} + +bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp, meshtastic_StoreForwardPlusPlus *t) +{ + LOG_WARN("in handleReceivedProtobuf"); + LOG_WARN("Sfp++ node %u sent us sf++ packet", mp.from); + printBytes("commit_hash ", t->commit_hash.bytes, t->commit_hash.size); + printBytes("root_hash ", t->root_hash.bytes, t->root_hash.size); + + if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE) { + + // TODO: Regardless of where we are in the chain, if we have a newer message, send it back. + if (portduino_config.sfpp_stratum0) { + LOG_WARN("Received a CANON_ANNOUNCE while stratum 0"); + uint8_t next_commit_hash[32] = {0}; + if (getNextHash(t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size, next_commit_hash)) { + printBytes("next chain hash: ", next_commit_hash, 32); + if (airTime->isTxAllowedChannelUtil(true)) { + broadcastLink(next_commit_hash, 32); + } + } + } else { + uint8_t tmp_root_hash_bytes[32] = {0}; + + LOG_WARN("Received a CANON_ANNOUNCE"); + if (getRootFromChannelHash(router->p_encrypted->channel, tmp_root_hash_bytes)) { + // we found the hash, check if it's the right one + // TODO handle oddball sizes here + if (memcmp(tmp_root_hash_bytes, t->root_hash.bytes, t->root_hash.size) != 0) { + LOG_WARN("Found root hash, and it doesn't match!"); + return true; + } + } else { + // TODO: size check + addRootToMappings(router->p_encrypted->channel, t->root_hash.bytes); + LOG_WARN("Adding root hash to mappings"); + } + + // get tip of chain for this channel + link_object chain_end = getChainEndLinkObject(t->root_hash.bytes, t->root_hash.size); + + // get chain tip + if (chain_end.rx_time != 0) { + // TODO: size check + if (memcmp(chain_end.commit_hash, t->commit_hash.bytes, t->commit_hash.size) == 0) { + LOG_WARN("End of chain matches!"); + sendFromScratch(router->p_encrypted->channel); + } else { + LOG_INFO("End of chain does not match!"); + + // We just got an end of chain announce, checking if we have seen this message and have it in scratch. + if (isInScratch(t->message_hash.bytes, t->message_hash.size)) { + link_object scratch_object = getFromScratch(t->message_hash.bytes, t->message_hash.size); + // if this matches, we don't need to request the message + // we know exactly what it is + if (t->message_hash.size >= 8 && + checkCommitHash(scratch_object, t->commit_hash.bytes, t->message_hash.size)) { + + addToChain(scratch_object); + removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len); + // short circuit and return + // falls through to a request for the message + return true; + } + } + if (airTime->isTxAllowedChannelUtil(true)) { + requestNextMessage(t->root_hash.bytes, t->root_hash.size, chain_end.commit_hash, 32); + } + } + } else { // if chainEnd() + LOG_WARN("No Messages on this chain, request!"); + if (airTime->isTxAllowedChannelUtil(true)) { + requestNextMessage(t->root_hash.bytes, t->root_hash.size, t->root_hash.bytes, t->root_hash.size); + } + } + } + } else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_REQUEST) { + uint8_t next_commit_hash[32] = {0}; + + LOG_WARN("Received link request"); + if (getNextHash(t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size, next_commit_hash)) { + printBytes("next chain hash: ", next_commit_hash, 32); + + broadcastLink(next_commit_hash, 32); + } + + // if root and chain hashes are the same, grab the first message on the chain + // if different, get the message directly after. + // check if the root + + } else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE) { + LOG_WARN("Link Provide received!"); + + link_object incoming_link = ingestLinkMessage(t); + if (incoming_link.root_hash_len == 0) { + LOG_WARN("Hash bytes not found for incoming link"); + return true; + } + + if (!incoming_link.validObject) { + LOG_WARN("commit byte mismatch"); + return true; + } + + if (portduino_config.sfpp_stratum0) { + if (isInDB(incoming_link.message_hash, incoming_link.message_hash_len)) { + LOG_WARN("Received link already in chain"); + // TODO: respond with next link? + return true; + } + + // calculate the commit_hash + addToChain(incoming_link); + if (!pendingRun) { + setIntervalFromNow(60 * 1000); // run again in 60 seconds to announce the new tip of chain + pendingRun = true; + } + // timebox to no more than an hour old + if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { + // if this packet is new to us, we rebroadcast it + rebroadcastLinkObject(incoming_link); + } + + } else { + if (incoming_link.commit_hash_len == 32) { + addToChain(incoming_link); + if (isInScratch(incoming_link.message_hash, incoming_link.message_hash_len)) { + link_object scratch_object = getFromScratch(incoming_link.message_hash, incoming_link.message_hash_len); + if (scratch_object.payload != "") { + updatePayload(incoming_link.message_hash, incoming_link.message_hash_len, scratch_object.payload); + } + removeFromScratch(incoming_link.message_hash, incoming_link.message_hash_len); + } else { + // if this packet is new to us, we rebroadcast it, but only up to an hour old + if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { + LOG_WARN("Attempting to Rebroadcast message"); + rebroadcastLinkObject(incoming_link); + } + } + requestNextMessage(t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size); + } else { + // todo: handle this + // add to scratch? + } + } + } + + return true; +} + +bool StoreForwardPlusPlusModule::getRootFromChannelHash(ChannelHash _ch_hash, uint8_t *_root_hash) +{ + bool found = false; + sqlite3_bind_int(getRootFromChannelHashStmt, 1, _ch_hash); + sqlite3_step(getRootFromChannelHashStmt); + uint8_t *tmp_root_hash = (uint8_t *)sqlite3_column_blob(getRootFromChannelHashStmt, 0); + if (tmp_root_hash) { + LOG_WARN("Found root hash!"); + memcpy(_root_hash, tmp_root_hash, 32); + found = true; + } + sqlite3_reset(getRootFromChannelHashStmt); + return found; +} + +ChannelHash StoreForwardPlusPlusModule::getChannelHashFromRoot(uint8_t *_root_hash, size_t _root_hash_len) +{ + // TODO move and substr() + sqlite3_bind_int(getHashFromRootStmt, 1, _root_hash_len); + sqlite3_bind_blob(getHashFromRootStmt, 2, _root_hash, _root_hash_len, NULL); + sqlite3_step(getHashFromRootStmt); + ChannelHash tmp_hash = (ChannelHash)sqlite3_column_int(getHashFromRootStmt, 0); + sqlite3_reset(getHashFromRootStmt); + return tmp_hash; +} + +// return code indicates newly created chain +bool StoreForwardPlusPlusModule::getOrAddRootFromChannelHash(ChannelHash _ch_hash, uint8_t *_root_hash) +{ + LOG_WARN("getOrAddRootFromChannelHash()"); + bool isNew = !getRootFromChannelHash(_ch_hash, _root_hash); + + if (isNew) { + if (portduino_config.sfpp_stratum0) { + LOG_WARN("Generating Root hash!"); + // generate root hash + SHA256 root_hash; + root_hash.update(&_ch_hash, sizeof(_ch_hash)); + NodeNum ourNode = nodeDB->getNodeNum(); + root_hash.update(&ourNode, sizeof(ourNode)); + uint32_t rtc_sec = getValidTime(RTCQuality::RTCQualityDevice, true); + root_hash.update(&rtc_sec, sizeof(rtc_sec)); + root_hash.finalize(_root_hash, 32); + addRootToMappings(_ch_hash, _root_hash); + } + } + return isNew; +} + +void StoreForwardPlusPlusModule::addRootToMappings(ChannelHash _ch_hash, uint8_t *_root_hash) +{ + LOG_WARN("addRootToMappings()"); + printBytes("_root_hash", _root_hash, 32); + + // write to the table + int type = chain_types::channel_chain; + // note, must be an int variable + + sqlite3_bind_int(addRootToMappingsStmt, 1, type); + sqlite3_bind_int(addRootToMappingsStmt, 2, _ch_hash); + sqlite3_bind_blob(addRootToMappingsStmt, 3, _root_hash, 32, NULL); + auto rc = sqlite3_step(addRootToMappingsStmt); + LOG_WARN("result %u, %s", rc, sqlite3_errmsg(ppDb)); + sqlite3_reset(addRootToMappingsStmt); +} + +StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getChainEndLinkObject(uint8_t *_root_hash, + size_t _root_hash_len) +{ + LOG_WARN("getChainEndLinkObject"); + link_object lo; + + int rc; + sqlite3_bind_int(getChainEndStmt, 1, _root_hash_len); + sqlite3_bind_blob(getChainEndStmt, 2, _root_hash, _root_hash_len, NULL); + sqlite3_step(getChainEndStmt); + uint8_t *last_message_commit_hash = (uint8_t *)sqlite3_column_blob(getChainEndStmt, 0); + uint8_t *last_message_hash = (uint8_t *)sqlite3_column_blob(getChainEndStmt, 1); + uint32_t _rx_time = sqlite3_column_int(getChainEndStmt, 2); + if (last_message_commit_hash != nullptr) { + lo = getLink(last_message_commit_hash, 32); + } + + sqlite3_reset(getChainEndStmt); + return lo; +} + +// TODO: make DM? +void StoreForwardPlusPlusModule::requestNextMessage(uint8_t *_root_hash, size_t _root_hash_len, uint8_t *_commit_hash, + size_t _commit_hash_len) +{ + + meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; + storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_REQUEST; + // set root hash + + // set chain hash + storeforward.commit_hash.size = _commit_hash_len; + memcpy(storeforward.commit_hash.bytes, _commit_hash, _commit_hash_len); + + // set root hash + storeforward.root_hash.size = _root_hash_len; + memcpy(storeforward.root_hash.bytes, _root_hash, _root_hash_len); + + // storeforward. + meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); + p->to = NODENUM_BROADCAST; + p->decoded.want_response = false; + p->priority = meshtastic_MeshPacket_Priority_BACKGROUND; + p->channel = 0; + LOG_INFO("Send packet to mesh"); + service->sendToMesh(p, RX_SRC_LOCAL, true); +} + +bool StoreForwardPlusPlusModule::getNextHash(uint8_t *_root_hash, size_t _root_hash_len, uint8_t *_commit_hash, + size_t _commit_hash_len, uint8_t *next_commit_hash) +{ + LOG_WARN("getNextHash"); + + ChannelHash _channel_hash = getChannelHashFromRoot(_root_hash, _root_hash_len); + LOG_WARN("_channel_hash %u", _channel_hash); + + int rc; + sqlite3_bind_int(getNextHashStmt, 1, _root_hash_len); + sqlite3_bind_blob(getNextHashStmt, 2, _root_hash, _root_hash_len, NULL); + bool next_hash = false; + + // asking for the first entry on the chain + if (memcmp(_root_hash, _commit_hash, _commit_hash_len) == 0) { + rc = sqlite3_step(getNextHashStmt); + if (rc != SQLITE_OK) { + LOG_WARN("here2 %u, %s", rc, sqlite3_errmsg(ppDb)); + } + uint8_t *tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getNextHashStmt, 0); + if (tmp_commit_hash == nullptr) { + LOG_WARN("No next hash found"); + sqlite3_reset(getNextHashStmt); + return false; + } + printBytes("commit_hash", tmp_commit_hash, 32); + memcpy(next_commit_hash, tmp_commit_hash, 32); + next_hash = true; + } else { + bool found_hash = false; + + LOG_WARN("Looking for next hashes"); + uint8_t *tmp_commit_hash; + while (sqlite3_step(getNextHashStmt) != SQLITE_DONE) { + tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getNextHashStmt, 0); + + if (found_hash) { + LOG_WARN("Found hash"); + memcpy(next_commit_hash, tmp_commit_hash, 32); + next_hash = true; + break; + } + if (memcmp(tmp_commit_hash, _commit_hash, _commit_hash_len) == 0) + found_hash = true; + } + } + + sqlite3_reset(getNextHashStmt); + return next_hash; +} + +void StoreForwardPlusPlusModule::broadcastLink(uint8_t *_commit_hash, size_t _commit_hash_len) +{ + int rc; + sqlite3_bind_int(getLinkStmt, 1, _commit_hash_len); + sqlite3_bind_blob(getLinkStmt, 2, _commit_hash, _commit_hash_len, NULL); + LOG_WARN("%d", sqlite3_step(getLinkStmt)); + + meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; + storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE; + + storeforward.encapsulated_to = sqlite3_column_int(getLinkStmt, 0); + storeforward.encapsulated_from = sqlite3_column_int(getLinkStmt, 1); + storeforward.encapsulated_id = sqlite3_column_int(getLinkStmt, 2); + + uint8_t *_payload = (uint8_t *)sqlite3_column_blob(getLinkStmt, 3); + storeforward.message.size = sqlite3_column_bytes(getLinkStmt, 3); + memcpy(storeforward.message.bytes, _payload, storeforward.message.size); + + uint8_t *_message_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 4); + // storeforward.message_hash.size = 8; + // memcpy(storeforward.message_hash.bytes, _message_hash, storeforward.message_hash.size); + + storeforward.encapsulated_rxtime = sqlite3_column_int(getLinkStmt, 5); + + uint8_t *_tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 6); + storeforward.commit_hash.size = 8; + memcpy(storeforward.commit_hash.bytes, _tmp_commit_hash, storeforward.commit_hash.size); + + uint8_t *_root_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 7); + storeforward.root_hash.size = 8; + memcpy(storeforward.root_hash.bytes, _root_hash, storeforward.root_hash.size); + + sqlite3_reset(getLinkStmt); + + meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); + p->to = NODENUM_BROADCAST; + p->decoded.want_response = false; + p->priority = meshtastic_MeshPacket_Priority_BACKGROUND; + p->channel = 0; + LOG_INFO("Send link to mesh"); + service->sendToMesh(p, RX_SRC_LOCAL, true); +} + +// +StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getLink(uint8_t *_commit_hash, size_t _commit_hash_len) +{ + link_object lo; + int rc; + sqlite3_bind_int(getLinkStmt, 1, _commit_hash_len); + sqlite3_bind_blob(getLinkStmt, 2, _commit_hash, _commit_hash_len, NULL); + LOG_WARN("%d", sqlite3_step(getLinkStmt)); + + lo.to = sqlite3_column_int(getLinkStmt, 0); + lo.from = sqlite3_column_int(getLinkStmt, 1); + lo.id = sqlite3_column_int(getLinkStmt, 2); + + uint8_t *_payload = (uint8_t *)sqlite3_column_blob(getLinkStmt, 3); + lo.encrypted_len = sqlite3_column_bytes(getLinkStmt, 3); + memcpy(lo.encrypted_bytes, _payload, lo.encrypted_len); + + uint8_t *_message_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 4); + lo.message_hash_len = 32; + memcpy(lo.message_hash, _message_hash, lo.message_hash_len); + + lo.rx_time = sqlite3_column_int(getLinkStmt, 5); + + uint8_t *_tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 6); + lo.commit_hash_len = 32; + memcpy(lo.commit_hash, _tmp_commit_hash, lo.commit_hash_len); + + uint8_t *_root_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 7); + lo.root_hash_len = 32; + memcpy(lo.root_hash, _root_hash, lo.root_hash_len); + + lo.counter = sqlite3_column_int(getLinkStmt, 8); + + lo.payload = std::string((char *)sqlite3_column_text(getLinkStmt, 9)); + + lo.channel_hash = getChannelHashFromRoot(lo.root_hash, lo.root_hash_len); + + sqlite3_reset(getLinkStmt); + + return lo; +} + +bool StoreForwardPlusPlusModule::sendFromScratch(uint8_t _channel_hash) +{ + LOG_WARN("sendFromScratch"); + // "select destination, sender, packet_id, channel_hash, encrypted_bytes, message_hash, rx_time \ + // from local_messages order by rx_time desc LIMIT 1;" + sqlite3_bind_int(fromScratchStmt, 1, _channel_hash); + if (sqlite3_step(fromScratchStmt) == SQLITE_DONE) { + LOG_WARN("No messages in scratch to forward"); + return false; + } + uint8_t _root_hash[32] = {0}; + if (!getRootFromChannelHash(_channel_hash, _root_hash)) { + LOG_ERROR("Error getting root hash"); + return false; + } + + meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; + storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE; + + storeforward.encapsulated_to = sqlite3_column_int(fromScratchStmt, 0); + storeforward.encapsulated_from = sqlite3_column_int(fromScratchStmt, 1); + storeforward.encapsulated_id = sqlite3_column_int(fromScratchStmt, 2); + + uint8_t *_encrypted = (uint8_t *)sqlite3_column_blob(fromScratchStmt, 3); + storeforward.message.size = sqlite3_column_bytes(fromScratchStmt, 3); + memcpy(storeforward.message.bytes, _encrypted, storeforward.message.size); + + uint8_t *_message_hash = (uint8_t *)sqlite3_column_blob(fromScratchStmt, 4); + storeforward.message_hash.size = 32; + memcpy(storeforward.message_hash.bytes, _message_hash, storeforward.message_hash.size); + + storeforward.encapsulated_rxtime = sqlite3_column_int(fromScratchStmt, 5); + + storeforward.root_hash.size = 32; + memcpy(storeforward.root_hash.bytes, _root_hash, 32); + + sqlite3_reset(fromScratchStmt); + + meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); + p->to = NODENUM_BROADCAST; + p->decoded.want_response = false; + p->priority = meshtastic_MeshPacket_Priority_BACKGROUND; + p->channel = 0; + LOG_INFO("Send link to mesh"); + service->sendToMesh(p, RX_SRC_LOCAL, true); + return true; +} + +bool StoreForwardPlusPlusModule::addToChain(link_object &lo) +{ + LOG_WARN("Add to chain"); + link_object chain_end = getChainEndLinkObject(lo.root_hash, lo.root_hash_len); + + // we may need to calculate the full commit hash at this point + if (lo.commit_hash_len != 32) { + SHA256 commit_hash; + + commit_hash.reset(); + + if (chain_end.commit_hash_len == 32) { + printBytes("last message: 0x", chain_end.commit_hash, 32); + commit_hash.update(chain_end.commit_hash, 32); + } else { + printBytes("new chain root: 0x", lo.root_hash, 32); + commit_hash.update(lo.root_hash, 32); + } + + commit_hash.update(lo.message_hash, 32); + // message_hash.update(&mp.rx_time, sizeof(mp.rx_time)); + commit_hash.finalize(lo.commit_hash, 32); + } + lo.counter = chain_end.counter + 1; + // push a message into the local chain DB + // destination + sqlite3_bind_int(chain_insert_stmt, 1, lo.to); + // sender + sqlite3_bind_int(chain_insert_stmt, 2, lo.from); + // packet_id + sqlite3_bind_int(chain_insert_stmt, 3, lo.id); + // root_hash + sqlite3_bind_blob(chain_insert_stmt, 4, lo.root_hash, 32, NULL); + // encrypted_bytes + sqlite3_bind_blob(chain_insert_stmt, 5, lo.encrypted_bytes, lo.encrypted_len, NULL); + // message_hash + sqlite3_bind_blob(chain_insert_stmt, 6, lo.message_hash, 32, NULL); + // rx_time + sqlite3_bind_int(chain_insert_stmt, 7, lo.rx_time); + // commit_hash + sqlite3_bind_blob(chain_insert_stmt, 8, lo.commit_hash, 32, NULL); + // payload + sqlite3_bind_text(chain_insert_stmt, 9, lo.payload.c_str(), lo.payload.length(), NULL); + + sqlite3_bind_int(chain_insert_stmt, 10, lo.counter); + sqlite3_step(chain_insert_stmt); + sqlite3_reset(chain_insert_stmt); + setChainCount(lo.root_hash, 32, lo.counter); + return true; +} + +bool StoreForwardPlusPlusModule::addToScratch(link_object &lo) +{ + // push a message into the local chain DB + // destination + sqlite3_bind_int(scratch_insert_stmt, 1, lo.to); + // sender + sqlite3_bind_int(scratch_insert_stmt, 2, lo.from); + // packet_id + sqlite3_bind_int(scratch_insert_stmt, 3, lo.id); + // root_hash + sqlite3_bind_blob(scratch_insert_stmt, 4, lo.root_hash, 32, NULL); + // encrypted_bytes + sqlite3_bind_blob(scratch_insert_stmt, 5, lo.encrypted_bytes, lo.encrypted_len, NULL); + // message_hash + sqlite3_bind_blob(scratch_insert_stmt, 6, lo.message_hash, 32, NULL); + // rx_time + sqlite3_bind_int(scratch_insert_stmt, 7, lo.rx_time); + // payload + sqlite3_bind_text(scratch_insert_stmt, 8, lo.payload.c_str(), lo.payload.length(), NULL); + const char *_error_mesg = sqlite3_errmsg(ppDb); + + LOG_WARN("step %u, %s", sqlite3_step(scratch_insert_stmt), _error_mesg); + sqlite3_reset(scratch_insert_stmt); + return true; +} + +void StoreForwardPlusPlusModule::canonAnnounce(uint8_t *_message_hash, uint8_t *_commit_hash, uint8_t *_root_hash, + uint32_t _rx_time) +{ + meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; + storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE; + // set root hash + + // set message hash + storeforward.message_hash.size = 32; + memcpy(storeforward.message_hash.bytes, _message_hash, 32); + + // set chain hash + storeforward.commit_hash.size = 32; + memcpy(storeforward.commit_hash.bytes, _commit_hash, 32); + + // set root hash + storeforward.root_hash.size = 32; + memcpy(storeforward.root_hash.bytes, _root_hash, 32); + + storeforward.encapsulated_rxtime = _rx_time; + // storeforward. + meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); + p->to = NODENUM_BROADCAST; + p->decoded.want_response = false; + p->priority = meshtastic_MeshPacket_Priority_BACKGROUND; + p->channel = 0; + LOG_INFO("Send packet to mesh"); + service->sendToMesh(p, RX_SRC_LOCAL, true); +} + +bool StoreForwardPlusPlusModule::isInDB(uint8_t *message_hash_bytes, size_t message_hash_len) +{ + sqlite3_bind_int(checkDup, 1, message_hash_len); + sqlite3_bind_blob(checkDup, 2, message_hash_bytes, message_hash_len, NULL); + sqlite3_step(checkDup); + int numberFound = sqlite3_column_int(checkDup, 0); + sqlite3_reset(checkDup); + if (numberFound > 0) + return true; + return false; +} + +bool StoreForwardPlusPlusModule::isInScratch(uint8_t *message_hash_bytes, size_t message_hash_len) +{ + LOG_WARN("isInScratch"); + sqlite3_bind_int(checkScratch, 1, message_hash_len); + sqlite3_bind_blob(checkScratch, 2, message_hash_bytes, message_hash_len, NULL); + sqlite3_step(checkScratch); + int numberFound = sqlite3_column_int(checkScratch, 0); + sqlite3_reset(checkScratch); + if (numberFound > 0) + return true; + return false; +} + +void StoreForwardPlusPlusModule::removeFromScratch(uint8_t *message_hash_bytes, size_t message_hash_len) +{ + LOG_WARN("removeFromScratch"); + sqlite3_bind_int(removeScratch, 1, message_hash_len); + sqlite3_bind_blob(removeScratch, 2, message_hash_bytes, message_hash_len, NULL); + sqlite3_step(removeScratch); + int numberFound = sqlite3_column_int(removeScratch, 0); + sqlite3_reset(removeScratch); +} + +void StoreForwardPlusPlusModule::updatePayload(uint8_t *message_hash_bytes, size_t message_hash_len, std::string payload) +{ + LOG_WARN("updatePayload"); + sqlite3_bind_text(updatePayloadStmt, 1, payload.c_str(), payload.length(), NULL); + sqlite3_bind_int(updatePayloadStmt, 2, message_hash_len); + sqlite3_bind_blob(updatePayloadStmt, 3, message_hash_bytes, message_hash_len, NULL); + auto res = sqlite3_step(updatePayloadStmt); + const char *_error_mesg = sqlite3_errmsg(ppDb); + LOG_WARN("step %u, %s", res, _error_mesg); + sqlite3_reset(updatePayloadStmt); +} + +StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getFromScratch(uint8_t *message_hash_bytes, size_t hash_len) +{ + + // vscode wrote this + LOG_WARN("getFromScratch"); + link_object lo; + + sqlite3_bind_int(fromScratchByHashStmt, 1, hash_len); + sqlite3_bind_blob(fromScratchByHashStmt, 2, message_hash_bytes, hash_len, NULL); + auto res = sqlite3_step(fromScratchByHashStmt); + const char *_error_mesg = sqlite3_errmsg(ppDb); + LOG_WARN("step %u, %s", res, _error_mesg); + lo.to = sqlite3_column_int(fromScratchByHashStmt, 0); + lo.from = sqlite3_column_int(fromScratchByHashStmt, 1); + lo.id = sqlite3_column_int(fromScratchByHashStmt, 2); + + uint8_t *encrypted_bytes = (uint8_t *)sqlite3_column_blob(fromScratchByHashStmt, 3); + lo.encrypted_len = sqlite3_column_bytes(fromScratchByHashStmt, 3); + memcpy(lo.encrypted_bytes, encrypted_bytes, lo.encrypted_len); + uint8_t *message_hash = (uint8_t *)sqlite3_column_blob(fromScratchByHashStmt, 4); + memcpy(lo.message_hash, message_hash, 32); + lo.rx_time = sqlite3_column_int(fromScratchByHashStmt, 5); + lo.channel_hash - sqlite3_column_int(fromScratchByHashStmt, 6); + lo.payload = + std::string((char *)sqlite3_column_text(fromScratchByHashStmt, 7), sqlite3_column_bytes(fromScratchByHashStmt, 7)); + sqlite3_reset(fromScratchByHashStmt); + return lo; +} + +// should not need size considerations +StoreForwardPlusPlusModule::link_object +StoreForwardPlusPlusModule::ingestTextPacket(const meshtastic_MeshPacket &mp, const meshtastic_MeshPacket *encrypted_meshpacket) +{ + link_object lo; + SHA256 message_hash; + lo.to = mp.to; + lo.from = mp.from; + lo.id = mp.id; + lo.rx_time = mp.rx_time; + lo.channel_hash = encrypted_meshpacket->channel; + memcpy(lo.encrypted_bytes, encrypted_meshpacket->encrypted.bytes, encrypted_meshpacket->encrypted.size); + lo.encrypted_len = encrypted_meshpacket->encrypted.size; + lo.payload = std::string((char *)mp.decoded.payload.bytes, mp.decoded.payload.size); + + message_hash.reset(); + message_hash.update(encrypted_meshpacket->encrypted.bytes, encrypted_meshpacket->encrypted.size); + message_hash.update(&mp.to, sizeof(mp.to)); + message_hash.update(&mp.from, sizeof(mp.from)); + message_hash.update(&mp.id, sizeof(mp.id)); + message_hash.finalize(lo.message_hash, 32); + lo.message_hash_len = 32; + + getOrAddRootFromChannelHash(encrypted_meshpacket->channel, lo.root_hash); + return lo; +} + +StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::ingestLinkMessage(meshtastic_StoreForwardPlusPlus *t) +{ + link_object lo; + + lo.to = t->encapsulated_to; + lo.from = t->encapsulated_from; + lo.id = t->encapsulated_id; + lo.rx_time = t->encapsulated_rxtime; + + // What if we don't have this root hash? Should drop this packet before this point. + lo.channel_hash = getChannelHashFromRoot(t->root_hash.bytes, t->root_hash.size); + SHA256 message_hash; + + memcpy(lo.encrypted_bytes, t->message.bytes, t->message.size); + lo.encrypted_len = t->message.size; + + message_hash.reset(); + message_hash.update(lo.encrypted_bytes, lo.encrypted_len); + message_hash.update(&lo.to, sizeof(lo.to)); + message_hash.update(&lo.from, sizeof(lo.from)); + message_hash.update(&lo.id, sizeof(lo.id)); + message_hash.finalize(lo.message_hash, 32); + lo.message_hash_len = 32; + + // look up full root hash and copy over the partial if it matches + if (lookUpFullRootHash(t->root_hash.bytes, t->root_hash.size, lo.root_hash)) { + printBytes("Found full root hash: 0x", lo.root_hash, 32); + lo.root_hash_len = 32; + } else { + LOG_WARN("root hash does not match %d bytes", t->root_hash.size); + printBytes("Using partial root hash: 0x", t->root_hash.bytes, t->root_hash.size); + lo.root_hash_len = 0; + } + + if (t->commit_hash.size > 0) { + // calculate the full commit hash and replace the partial if it matches + if (checkCommitHash(lo, t->commit_hash.bytes, t->commit_hash.size)) { + printBytes("commit hash matches: 0x", t->commit_hash.bytes, t->commit_hash.size); + } else { + LOG_WARN("commit hash does not match"); + lo.commit_hash_len = 0; + lo.validObject = false; + } + } + + // we don't ever get the payload here, so it's always an empty string + lo.payload = ""; + + return lo; +} + +void StoreForwardPlusPlusModule::rebroadcastLinkObject(StoreForwardPlusPlusModule::link_object &lo) +{ + LOG_WARN("Attempting to Rebroadcast1"); + meshtastic_MeshPacket *p = router->allocForSending(); + p->to = lo.to; + p->from = lo.from; + p->id = lo.id; + p->channel = lo.channel_hash; + p->which_payload_variant = meshtastic_MeshPacket_encrypted_tag; + p->encrypted.size = lo.encrypted_len; + memcpy(p->encrypted.bytes, lo.encrypted_bytes, lo.encrypted_len); + p->transport_mechanism = meshtastic_MeshPacket_TransportMechanism_TRANSPORT_LORA; // only a tiny white lie + service->sendToMesh(p, RX_SRC_RADIO, true); // Send to mesh, cc to phone +} + +bool StoreForwardPlusPlusModule::checkCommitHash(StoreForwardPlusPlusModule::link_object &lo, uint8_t *commit_hash_bytes, + size_t hash_len) +{ + SHA256 commit_hash; + + link_object chain_end = getChainEndLinkObject(lo.root_hash, lo.root_hash_len); + + commit_hash.reset(); + + if (chain_end.commit_hash_len == 32) { + printBytes("last message: 0x", chain_end.commit_hash, 32); + commit_hash.update(chain_end.commit_hash, 32); + } else { + if (lo.root_hash_len != 32) { + LOG_WARN("Short root hash in link object, cannot create new chain"); + return false; + } + printBytes("new chain root: 0x", lo.root_hash, 32); + commit_hash.update(lo.root_hash, 32); + } + + commit_hash.update(lo.message_hash, 32); + commit_hash.finalize(lo.commit_hash, 32); + lo.commit_hash_len = 32; + + if (hash_len == 0 || memcmp(commit_hash_bytes, lo.commit_hash, hash_len) == 0) { + return true; + } + return false; +} + +bool StoreForwardPlusPlusModule::lookUpFullRootHash(uint8_t *partial_root_hash, size_t partial_root_hash_len, + uint8_t *full_root_hash) +{ + LOG_WARN("lookUpFullRootHash"); + printBytes("partial_root_hash", partial_root_hash, partial_root_hash_len); + sqlite3_bind_int(getFullRootHashStmt, 1, partial_root_hash_len); + sqlite3_bind_blob(getFullRootHashStmt, 2, partial_root_hash, partial_root_hash_len, NULL); + sqlite3_step(getFullRootHashStmt); + uint8_t *tmp_root_hash = (uint8_t *)sqlite3_column_blob(getFullRootHashStmt, 0); + if (tmp_root_hash) { + LOG_WARN("Found full root hash!"); + memcpy(full_root_hash, tmp_root_hash, 32); + sqlite3_reset(getFullRootHashStmt); + return true; + } + sqlite3_reset(getFullRootHashStmt); + return false; +} + +void StoreForwardPlusPlusModule::setChainCount(uint8_t *root_hash, size_t root_hash_len, uint32_t count) +{ + sqlite3_bind_blob(setChainCountStmt, 1, root_hash, root_hash_len, NULL); + sqlite3_bind_int(setChainCountStmt, 2, count); + sqlite3_step(setChainCountStmt); + sqlite3_reset(setChainCountStmt); +} + +uint32_t StoreForwardPlusPlusModule::getChainCount(uint8_t *root_hash, size_t root_hash_len) +{ + sqlite3_bind_blob(getChainCountStmt, 1, root_hash, root_hash_len, NULL); + sqlite3_step(getChainCountStmt); + uint32_t count = sqlite3_column_int(getChainCountStmt, 0); + sqlite3_reset(getChainCountStmt); + return count; +} \ No newline at end of file diff --git a/src/modules/Native/StoreForwardPlusPlus.h b/src/modules/Native/StoreForwardPlusPlus.h new file mode 100644 index 000000000..bf2127874 --- /dev/null +++ b/src/modules/Native/StoreForwardPlusPlus.h @@ -0,0 +1,212 @@ +#pragma once +#include "Channels.h" +#include "ProtobufModule.h" +#include "Router.h" +#include "SinglePortModule.h" +#include "sqlite3.h" + +/** + * Store and forward ++ module + * There's an obvious need for a store-and-forward mechanism in Meshtastic. + * This module takes heavy inspiration from Git, building a chain of messages that can be synced between nodes. + * Each message is hashed, and the chain is built by hashing the previous commit hash and the current message hash. + * Nodes can request missing messages by requesting the next message after a given commit hash. + * + * The current focus is text messages, limited to the primary channel. + * + * Each chain is identified by a root hash, which is derived from the channelHash, the local nodenum, and the timestamp when + * created. + * + * Each message is also given a message hash, derived from the encrypted payload, the to, from, id. + * Notably not the timestamp, as we want these to match across nodes, even if the timestamps differ. + * + * The authoritative node for the chain will generate a commit hash for each message when adding it to the chain. + * The first message's commit hash is derived from the root hash and the message hash. + * Subsequent messages' commit hashes are derived from the previous commit hash and the current message hash. + * This allows a node to see only the last commit hash, and confirm it hasn't missed any messages. + * + * Nodes can request the next message in the chain by sending a LINK_REQUEST message with the root hash and the last known commit + * hash. Any node that has the next message can respond with a LINK_PROVIDE message containing the next message. + * + * When a satellite node sees a new text message, it stores it in a scratch database. + * These messages are periodically offered to the authoritative node for inclusion in the chain. + * + * The LINK_PROVIDE message does double-duty, sending both on-chain and off-chain messages. + * The differentiator is whether the commit hash is set or left empty. + * + * When a satellite node receives a canonical link message, it checks if it has the message in scratch. + * And evicts it when adding it to the canonical chain. + * + * This approach allows a node to know whether it has seen a given message before, or if it is new coming via SFPP. + * If new, and the timestamp is within the rebroadcast timeout, it will process that message as if it were just received from the + * mesh, allowing it to be decrypted, shown to the user, and rebroadcast. + */ +class StoreForwardPlusPlusModule : public ProtobufModule, private concurrency::OSThread +{ + struct link_object { + uint32_t to; + uint32_t from; + uint32_t id; + uint32_t rx_time = 0; + ChannelHash channel_hash; + uint8_t encrypted_bytes[256] = {0}; + size_t encrypted_len; + uint8_t message_hash[32] = {0}; + size_t message_hash_len = 0; + uint8_t root_hash[32] = {0}; + size_t root_hash_len = 0; + uint8_t commit_hash[32] = {0}; + size_t commit_hash_len = 0; + uint32_t counter = 0; + std::string payload; + bool validObject = true; // set this false when a chain calulation fails, etc. + }; + + public: + /** Constructor + * + */ + StoreForwardPlusPlusModule(); + + /* + -Override the wantPacket method. + */ + virtual bool wantPacket(const meshtastic_MeshPacket *p) override + { + switch (p->decoded.portnum) { + case meshtastic_PortNum_TEXT_MESSAGE_APP: + case meshtastic_PortNum_STORE_FORWARD_PLUSPLUS_APP: + return true; + default: + return false; + } + } + + protected: + /** Called to handle a particular incoming message + @return ProcessMessage::STOP if you've guaranteed you've handled this message and no other handlers should be considered for + it + */ + virtual ProcessMessage handleReceived(const meshtastic_MeshPacket &mp) override; + virtual bool handleReceivedProtobuf(const meshtastic_MeshPacket &mp, meshtastic_StoreForwardPlusPlus *t) override; + + virtual int32_t runOnce() override; + + private: + sqlite3 *ppDb; + sqlite3_stmt *chain_insert_stmt; + sqlite3_stmt *scratch_insert_stmt; + sqlite3_stmt *checkDup; + sqlite3_stmt *checkScratch; + sqlite3_stmt *removeScratch; + sqlite3_stmt *updatePayloadStmt; + sqlite3_stmt *getPayloadFromScratchStmt; + sqlite3_stmt *fromScratchStmt; + sqlite3_stmt *fromScratchByHashStmt; + sqlite3_stmt *getNextHashStmt; + sqlite3_stmt *getChainEndStmt; + sqlite3_stmt *getLinkStmt; + sqlite3_stmt *getHashFromRootStmt; + sqlite3_stmt *addRootToMappingsStmt; + sqlite3_stmt *getRootFromChannelHashStmt; + sqlite3_stmt *getFullRootHashStmt; + sqlite3_stmt *setChainCountStmt; + sqlite3_stmt *getChainCountStmt; + + // For a given Meshtastic ChannelHash, fills the root_hash buffer with a 32-byte root hash + // returns true if the root hash was found + bool getRootFromChannelHash(ChannelHash, uint8_t *); + + // For a given root hash, returns the ChannelHash + // can handle partial root hashes + ChannelHash getChannelHashFromRoot(uint8_t *_root_hash, size_t); + + // given a root hash and commit hash, returns the next commit hash in the chain + // can handle partial root and commit hashes, always fills the buffer with 32 bytes + // returns true if a next hash was found + bool getNextHash(uint8_t *, size_t, uint8_t *, size_t, uint8_t *); + + // For a given Meshtastic ChannelHash, fills the root_hash buffer with a 32-byte root hash + // but this function will add the root hash if it is not already present + // returns true if the hash is new + bool getOrAddRootFromChannelHash(ChannelHash, uint8_t *); + + // adds the ChannelHash and root_hash to the mappings table + void addRootToMappings(ChannelHash, uint8_t *); + + // gets the tip of the chain for the given root hash + link_object getChainEndLinkObject(uint8_t *, size_t); + + // requests the next message in the chain from the mesh network + // Sends a LINK_REQUEST message + void requestNextMessage(uint8_t *, size_t, uint8_t *, size_t); + + // sends a LINK_PROVIDE message broadcasting the given link object + void broadcastLink(uint8_t *, size_t); + + // sends a LINK_PROVIDE message broadcasting the given link object from scratch message store + bool sendFromScratch(uint8_t); + + // Adds the given link object to the canonical chain database + bool addToChain(link_object &); + + // Adds an incoming text message to the scratch database + bool addToScratch(link_object &); + + // sends a CANON_ANNOUNCE message, specifying the given root and commit hashes + void canonAnnounce(uint8_t *, uint8_t *, uint8_t *, uint32_t); + + // checks if the message hash is present in the canonical chain database + bool isInDB(uint8_t *, size_t); + + // checks if the message hash is present in the scratch database + bool isInScratch(uint8_t *, size_t); + + // retrieves a link object from the scratch database + link_object getFromScratch(uint8_t *, size_t); + + // removes a link object from the scratch database + void removeFromScratch(uint8_t *, size_t); + + // fills the payload section with the decrypted data for the given message hash + // probably not needed for production, but useful for testing + void updatePayload(uint8_t *, size_t, std::string); + + // Takes the decrypted MeshPacket and the encrypted packet copy, and builds a link_object + // Generates a message hash, but does not set the commit hash + link_object ingestTextPacket(const meshtastic_MeshPacket &, const meshtastic_MeshPacket *); + + // ingests a LINK_PROVIDE message and builds a link_object + // confirms the root hash and commit hash + link_object ingestLinkMessage(meshtastic_StoreForwardPlusPlus *); + + // retrieves a link object from the canonical chain database given a message hash + link_object getLink(uint8_t *, size_t); + + // puts the encrypted payload back into the queue as if it were just received + void rebroadcastLinkObject(link_object &); + + // Check if an incoming link object's commit hash matches the calculated commit hash + bool checkCommitHash(link_object &lo, uint8_t *commit_hash_bytes, size_t hash_len); + + // given a partial root hash, looks up the full 32-byte root hash + // returns true if found + bool lookUpFullRootHash(uint8_t *partial_root_hash, size_t partial_root_hash_len, uint8_t *full_root_hash); + + // update the mappings table to set the chain count for the given root hash + void setChainCount(uint8_t *, size_t, uint32_t); + + // query the mappings table for the chain count for the given root hash + uint32_t getChainCount(uint8_t *, size_t); + + // Track if we have a scheduled runOnce pending + // useful to not accudentally delay a scheduled runOnce + bool pendingRun = false; + + // Once we have multiple chain types, we can extend this + enum chain_types { + channel_chain = 0, + }; + + uint32_t rebroadcastTimeout = 3600; // Messages older than this (in seconds) will not be rebroadcast +}; diff --git a/src/platform/portduino/PortduinoGlue.cpp b/src/platform/portduino/PortduinoGlue.cpp index 1b601f9b4..f2ba7baf6 100644 --- a/src/platform/portduino/PortduinoGlue.cpp +++ b/src/platform/portduino/PortduinoGlue.cpp @@ -786,6 +786,10 @@ bool loadConfig(const char *configPath) } } + if (yamlConfig["StoreAndForward"]) { + portduino_config.sfpp_stratum0 = (yamlConfig["StoreAndForward"]["Stratum0"]).as(false); + } + if (yamlConfig["General"]) { portduino_config.MaxNodes = (yamlConfig["General"]["MaxNodes"]).as(200); portduino_config.maxtophone = (yamlConfig["General"]["MaxMessageQueue"]).as(100); diff --git a/src/platform/portduino/PortduinoGlue.h b/src/platform/portduino/PortduinoGlue.h index 9335be90a..4b830dbbe 100644 --- a/src/platform/portduino/PortduinoGlue.h +++ b/src/platform/portduino/PortduinoGlue.h @@ -169,6 +169,9 @@ extern struct portduino_config_struct { int configDisplayMode = 0; bool has_configDisplayMode = false; + // Store and Forward++ + bool sfpp_stratum0 = false; + // General std::string mac_address = ""; bool mac_address_explicit = false; diff --git a/variants/native/portduino/platformio.ini b/variants/native/portduino/platformio.ini index d87c74532..3381dd460 100644 --- a/variants/native/portduino/platformio.ini +++ b/variants/native/portduino/platformio.ini @@ -45,6 +45,7 @@ build_flags = ${native_base.build_flags} -Os -lX11 -linput -lxkbcommon -ffunctio !pkg-config --libs openssl --silence-errors || : !pkg-config --cflags --libs sdl2 --silence-errors || : !pkg-config --cflags --libs libbsd-overlay --silence-errors || : + !pkg-config --cflags --libs sqlite3 --silence-errors || : build_src_filter = ${native_base.build_src_filter}