From 3ae331eb899cd8c587500d4aebf5a0c764cca8f1 Mon Sep 17 00:00:00 2001 From: Jonathan Bennett Date: Wed, 7 Jan 2026 12:44:22 -0600 Subject: [PATCH] Add canon_scratch to SF++ --- src/modules/Native/StoreForwardPlusPlus.cpp | 617 +++++++++++++++----- src/modules/Native/StoreForwardPlusPlus.h | 28 +- 2 files changed, 493 insertions(+), 152 deletions(-) diff --git a/src/modules/Native/StoreForwardPlusPlus.cpp b/src/modules/Native/StoreForwardPlusPlus.cpp index f4dcc94ed..b2bd8aaf6 100644 --- a/src/modules/Native/StoreForwardPlusPlus.cpp +++ b/src/modules/Native/StoreForwardPlusPlus.cpp @@ -107,6 +107,33 @@ StoreForwardPlusPlusModule::StoreForwardPlusPlusModule() LOG_ERROR("%s", err); sqlite3_free(err); + // add canon_scratch table + // where we move message out of scratch when we've been informed that message exists on the canon chain, but we're not ready + // to commit it on our own chain + + res = sqlite3_exec(ppDb, " \ + CREATE TABLE IF NOT EXISTS \ + canon_scratch( \ + destination INT NOT NULL, \ + sender INT NOT NULL, \ + packet_id INT NOT NULL, \ + rx_time INT NOT NULL, \ + root_hash BLOB NOT NULL, \ + encrypted_bytes BLOB NOT NULL, \ + message_hash BLOB NOT NULL, \ + commit_hash BLOB NOT NULL, \ + payload TEXT, \ + counter INT NOT NULL, \ + PRIMARY KEY (message_hash) \ + );", + NULL, NULL, &err); + if (res != SQLITE_OK) { + LOG_ERROR("Failed to create table: %s", sqlite3_errmsg(ppDb)); + } + if (err != nullptr) + LOG_ERROR("%s", err); + sqlite3_free(err); + // mappings table -- connects the root hashes to channel hashes and DM identifiers res = sqlite3_exec(ppDb, " \ CREATE TABLE IF NOT EXISTS \ @@ -196,6 +223,12 @@ StoreForwardPlusPlusModule::StoreForwardPlusPlusModule() from channel_messages where substr(commit_hash,1,?)=?;", -1, &getLinkStmt, NULL); + sqlite3_prepare_v2( + ppDb, + "select destination, sender, packet_id, encrypted_bytes, message_hash, rx_time, commit_hash, root_hash, counter, payload \ + from channel_messages where substr(message_hash,1,?)=?;", + -1, &getLinkFromMessageHashStmt, NULL); + sqlite3_prepare_v2(ppDb, "select identifier from mappings where substr(root_hash,1,?)=?;", -1, &getHashFromRootStmt, NULL); sqlite3_prepare_v2(ppDb, "INSERT INTO mappings (chain_type, identifier, root_hash) VALUES(?, ?, ?);", -1, @@ -233,9 +266,27 @@ StoreForwardPlusPlusModule::StoreForwardPlusPlusModule() sqlite3_prepare_v2(ppDb, "DELETE FROM channel_messages WHERE substr(root_hash,1,?)=?;", -1, &clearChainStmt, NULL); + sqlite3_prepare_v2(ppDb, "INSERT INTO canon_scratch (destination, sender, packet_id, root_hash, \ + encrypted_bytes, message_hash, rx_time, commit_hash, payload, counter) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", + -1, &canon_scratch_insert_stmt, NULL); + + sqlite3_prepare_v2(ppDb, "SELECT count(*) FROM canon_scratch WHERE substr(message_hash,1,?)=?;", -1, + &getCanonScratchCountStmt, NULL); + + sqlite3_prepare_v2( + ppDb, + "select destination, sender, packet_id, encrypted_bytes, message_hash, rx_time, commit_hash, root_hash, counter, payload \ + from canon_scratch where substr(root_hash,1,?)=? ORDER BY counter ASC;", + -1, &getCanonScratchStmt, NULL); + + sqlite3_prepare_v2(ppDb, "DELETE from canon_scratch where substr(message_hash,1,?)=?", -1, &removeCanonScratch, NULL); + + sqlite3_prepare_v2(ppDb, "DELETE from canon_scratch where substr(root_hash,1,?)=? AND counter < ?;", -1, + &clearCanonScratchStmt, NULL); + encryptedOk = true; - this->setInterval(portduino_config.sfpp_announce_interval * 60 * 1000); + this->setInterval(15 * 1000); } int32_t StoreForwardPlusPlusModule::runOnce() @@ -286,6 +337,12 @@ int32_t StoreForwardPlusPlusModule::runOnce() return portduino_config.sfpp_announce_interval * 60 * 1000; } + if (did_announce_last && sendFromScratch(root_hash_bytes)) { + LOG_DEBUG("Send from scratch queue"); + did_announce_last = false; + return portduino_config.sfpp_announce_interval * 60 * 1000; + } + // get tip of chain for this channel link_object chain_end = getLinkFromCount(0, root_hash_bytes, SFPP_HASH_SIZE); @@ -315,13 +372,14 @@ int32_t StoreForwardPlusPlusModule::runOnce() LOG_INFO("Send packet to mesh payload size %u", p->decoded.payload.size); service->sendToMesh(p, RX_SRC_LOCAL, true); + did_announce_last = true; return portduino_config.sfpp_announce_interval * 60 * 1000; } // broadcast the tip of the chain // todo just send the link object - canonAnnounce(chain_end, chain_end.message_hash, chain_end.commit_hash, root_hash_bytes, chain_end.rx_time); - + canonAnnounce(chain_end); + did_announce_last = true; // eventually timeout things on the scratch queue return portduino_config.sfpp_announce_interval * 60 * 1000; } @@ -367,7 +425,8 @@ ProcessMessage StoreForwardPlusPlusModule::handleReceived(const meshtastic_MeshP } if (!portduino_config.sfpp_stratum0) { - if (!isInDB(lo.message_hash, lo.message_hash_len)) { + if (!isInDB(lo.message_hash, lo.message_hash_len) && !isInScratch(lo.message_hash, lo.message_hash_len) && + !isInCanonScratch(lo.message_hash, lo.message_hash_len)) { if (lo.root_hash_len == 0) { LOG_DEBUG("StoreForwardpp Received text message, but no chain. Possibly no Stratum0 on local mesh."); return ProcessMessage::CONTINUE; @@ -381,7 +440,7 @@ ProcessMessage StoreForwardPlusPlusModule::handleReceived(const meshtastic_MeshP addToChain(lo); if (!pendingRun) { - setIntervalFromNow(30 * 1000); // run again in 30 seconds to announce the new tip of chain + setIntervalFromNow(10 * 1000); // run again in 30 seconds to announce the new tip of chain pendingRun = true; } return ProcessMessage::CONTINUE; // Let others look at this message also if they want @@ -403,6 +462,7 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac LOG_DEBUG("StoreForwardpp node %u sent us sf++ packet", mp.from); printBytes("StoreForwardpp commit_hash ", t->commit_hash.bytes, t->commit_hash.size); printBytes("StoreForwardpp root_hash ", t->root_hash.bytes, t->root_hash.size); + printBytes("StoreForwardpp message_hash ", t->message_hash.bytes, t->message_hash.size); link_object incoming_link; incoming_link.validObject = false; @@ -459,11 +519,15 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac if (links_behind > portduino_config.sfpp_backlog_limit) { LOG_INFO("StoreForwardpp Chain behind limit, dumping DB"); clearChain(t->root_hash.bytes, t->root_hash.size); + clearCanonScratch(t->root_hash.bytes, t->root_hash.size, + t->chain_count - portduino_config.sfpp_backlog_limit); return true; } } // We just got an end of chain announce, checking if we have seen this message and have it in scratch. + LOG_DEBUG("Checking if in scratch"); if (isInScratch(t->message_hash.bytes, t->message_hash.size)) { + LOG_DEBUG("Found in scratch"); link_object scratch_object = getFromScratch(t->message_hash.bytes, t->message_hash.size); // if this matches, we don't need to request the message // we know exactly what it is @@ -473,12 +537,13 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac scratch_object.rx_time = t->encapsulated_rxtime; addToChain(scratch_object); removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len); + maybeMoveFromCanonScratch(scratch_object.root_hash, scratch_object.root_hash_len); // short circuit and return // falls through to a request for the message return true; } else { - // TODO: start with earliest scratch message, and calculate a speculative chain up to the announced - // commit hash if we find a match, add all the messages to the chain + + // TODO move into a function LOG_INFO("StoreForwardpp Earliest scratch message commit hash does not match announced commit hash, " "speculating chain"); if (speculateScratchChain(t->commit_hash.bytes, t->commit_hash.size, scratch_object.root_hash, @@ -499,11 +564,23 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac t->commit_hash.size); } while (memcmp(chain_end.commit_hash, t->commit_hash.bytes, t->commit_hash.size) != 0); LOG_INFO("StoreForwardpp added %d links from scratch", count); + maybeMoveFromCanonScratch(scratch_object.root_hash, scratch_object.root_hash_len); + // We have an object from Scratch that we know is a commit, but we can't put it on the chain yet } else { - LOG_INFO("StoreForwardpp Could not speculate chain to announced commit hash"); + LOG_INFO("StoreForwardpp Could not speculate chain to announced commit hash. Moving from " + "local_messages to canon_scratch"); + + scratch_object.rx_time = t->encapsulated_rxtime; + scratch_object.counter = t->chain_count; + scratch_object.commit_hash_len = t->commit_hash.size; + memcpy(scratch_object.commit_hash, t->commit_hash.bytes, t->commit_hash.size); + addToCanonScratch(scratch_object); + removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len); } } + } else { + LOG_DEBUG("StoreForwardpp Not in scratch"); } if (airTime->isTxAllowedChannelUtil(true)) { requestNextMessage(t->root_hash.bytes, t->root_hash.size, chain_end.commit_hash, SFPP_HASH_SIZE); @@ -537,148 +614,161 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac // if root and chain hashes are the same, grab the first message on the chain // if different, get the message directly after. + } else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE || + t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_FIRSTHALF || + t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_SECONDHALF) { - } else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE) { - LOG_DEBUG("StoreForwardpp Link Provide received!"); - if (t->message_hash.size >= 8 && isInDB(t->message_hash.bytes, t->message_hash.size)) { - LOG_INFO("StoreForwardpp Received link already in chain"); - return true; - } + if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE) { + LOG_DEBUG("StoreForwardpp Link Provide received!"); - incoming_link = ingestLinkMessage(t); - } else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_FIRSTHALF) { - LOG_DEBUG("StoreForwardpp Link Provide First Half received!"); - split_link_in = ingestLinkMessage(t, false); - doing_split_receive = true; - split_link_in.validObject = true; - return true; - - } else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_SECONDHALF) { - LOG_DEBUG("StoreForwardpp Link Provide Second Half received!"); - if (!doing_split_receive) { - LOG_DEBUG("StoreForwardpp Received second half without first half, ignoring"); - return true; - } - if (!split_link_in.validObject) { - LOG_WARN("StoreForwardpp No first half stored, cannot combine"); - doing_split_receive = false; - return true; - } - link_object second_half = ingestLinkMessage(t, false); - if (split_link_in.encrypted_len + second_half.encrypted_len > 256) { - LOG_WARN("StoreForwardpp Combined link too large"); - return true; - } - - if (split_link_in.from == second_half.from && split_link_in.to == second_half.to && - split_link_in.root_hash_len == second_half.root_hash_len && - memcmp(split_link_in.root_hash, second_half.root_hash, split_link_in.root_hash_len) == 0 && - split_link_in.message_hash_len == second_half.message_hash_len && - memcmp(split_link_in.message_hash, second_half.message_hash, split_link_in.message_hash_len) == 0) { - incoming_link = split_link_in; - memcpy(&incoming_link.encrypted_bytes[split_link_in.encrypted_len], second_half.encrypted_bytes, - second_half.encrypted_len); - incoming_link.encrypted_len = split_link_in.encrypted_len + second_half.encrypted_len; - - // append the encrypted bytes - - // clear first half - split_link_in = link_object(); - split_link_in.validObject = false; - doing_split_receive = false; - // do the recalcualte step we skipped - if (!recalculateHash(incoming_link, t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, - t->commit_hash.size)) { - LOG_WARN("StoreForwardpp Recalculated hash does not match"); + // Is this already in our canon DB? If so, log and drop + if (t->commit_hash.size >= 8 && t->message_hash.size >= 8 && isInDB(t->message_hash.bytes, t->message_hash.size)) { + LOG_INFO("StoreForwardpp Received link already in chain1"); return true; } - } else { - LOG_WARN("StoreForwardpp No first half stored, cannot combine"); + incoming_link = ingestLinkMessage(t, false); + } else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_FIRSTHALF) { + LOG_DEBUG("StoreForwardpp Link Provide First Half received!"); + split_link_in = ingestLinkMessage(t, false); + doing_split_receive = true; + split_link_in.validObject = true; return true; - } - } - if (incoming_link.validObject) { - if (incoming_link.root_hash_len == 0) { - LOG_WARN("StoreForwardpp Hash bytes not found for incoming link"); - return true; - } - - if (!incoming_link.validObject) { - LOG_WARN("StoreForwardpp commit byte mismatch"); - return true; - } - - if (isCommitInDB(incoming_link.commit_hash, incoming_link.commit_hash_len) || - isInDB(incoming_link.message_hash, incoming_link.message_hash_len)) { - LOG_INFO("StoreForwardpp Received link already in chain"); - // TODO: respond with next link? - return true; - } - if (portduino_config.sfpp_stratum0) { - - // calculate the commit_hash - addToChain(incoming_link); - if (!pendingRun) { - setIntervalFromNow(30 * 1000); // run again in 30 seconds to announce the new tip of chain - pendingRun = true; + } else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_SECONDHALF) { + LOG_DEBUG("StoreForwardpp Link Provide Second Half received!"); + if (!doing_split_receive) { + LOG_DEBUG("StoreForwardpp Received second half without first half, ignoring"); + return true; } - // timebox to no more than an hour old - if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { - // if this packet is new to us, we rebroadcast it - rebroadcastLinkObject(incoming_link); + if (!split_link_in.validObject) { + LOG_WARN("StoreForwardpp No first half stored, cannot combine"); + doing_split_receive = false; + return true; + } + link_object second_half = ingestLinkMessage(t, false); + if (split_link_in.encrypted_len + second_half.encrypted_len > 256) { + LOG_WARN("StoreForwardpp Combined link too large"); + return true; + } + + if (split_link_in.from == second_half.from && split_link_in.to == second_half.to && + split_link_in.root_hash_len == second_half.root_hash_len && + memcmp(split_link_in.root_hash, second_half.root_hash, split_link_in.root_hash_len) == 0 && + split_link_in.message_hash_len == second_half.message_hash_len && + memcmp(split_link_in.message_hash, second_half.message_hash, split_link_in.message_hash_len) == 0) { + incoming_link = split_link_in; + memcpy(&incoming_link.encrypted_bytes[split_link_in.encrypted_len], second_half.encrypted_bytes, + second_half.encrypted_len); + incoming_link.encrypted_len = split_link_in.encrypted_len + second_half.encrypted_len; + + // append the encrypted bytes + + // clear first half + split_link_in = link_object(); + split_link_in.validObject = false; + doing_split_receive = false; + // do the recalcualte step we skipped + /*if (!recalculateHash(incoming_link, t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, + t->commit_hash.size)) { + LOG_WARN("StoreForwardpp Recalculated hash does not match"); + return true; + }*/ + } else { - LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was %u", - incoming_link.rx_time); + LOG_WARN("StoreForwardpp No first half stored, cannot combine"); + return true; + } + } + + if (recalculateHash(incoming_link, t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size)) { + if (incoming_link.root_hash_len == 0) { + LOG_WARN("StoreForwardpp Hash bytes not found for incoming link"); + return true; } - } else { - if (incoming_link.commit_hash_len == SFPP_HASH_SIZE) { - addToChain(incoming_link); - if (isInScratch(incoming_link.message_hash, incoming_link.message_hash_len)) { - link_object scratch_object = getFromScratch(incoming_link.message_hash, incoming_link.message_hash_len); - if (scratch_object.payload != "") { - updatePayload(incoming_link.message_hash, incoming_link.message_hash_len, scratch_object.payload); - } - removeFromScratch(incoming_link.message_hash, incoming_link.message_hash_len); - } else { - // if this packet is new to us, we rebroadcast it, but only up to an hour old - if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { - rebroadcastLinkObject(incoming_link); - } else { - LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was %u", - incoming_link.rx_time); - } + if (isCommitInDB(incoming_link.commit_hash, incoming_link.commit_hash_len) || + isInDB(incoming_link.message_hash, incoming_link.message_hash_len)) { + LOG_INFO("StoreForwardpp Received link already in chain"); + if (t->commit_hash.size == 0) { + link_object link_to_announce = + getLinkFromMessageHash(incoming_link.message_hash, incoming_link.message_hash_len); + canonAnnounce(link_to_announce); } - if (chain_end.rx_time != 0) { - int64_t links_behind = 0; - if (t->chain_count != 0 && t->chain_count > chain_end.counter) { - links_behind = t->chain_count - chain_end.counter; + return true; + } + if (portduino_config.sfpp_stratum0) { - LOG_DEBUG("StoreForwardpp observed link that is link ahead of us: %ld", links_behind); - if (links_behind > portduino_config.sfpp_backlog_limit) { - LOG_INFO("StoreForwardpp Chain behind limit, dumping DB"); - clearChain(t->root_hash.bytes, t->root_hash.size); - return true; + // calculate the commit_hash + addToChain(incoming_link); + if (!pendingRun) { + setIntervalFromNow(10 * 1000); // run again in 30 seconds to announce the new tip of chain + pendingRun = true; + } + // timebox to no more than an hour old + if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { + // if this packet is new to us, we rebroadcast it + rebroadcastLinkObject(incoming_link); + } else { + LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was %u", + incoming_link.rx_time); + } + + } else { + if (incoming_link.commit_hash_len == SFPP_HASH_SIZE) { + addToChain(incoming_link); + if (isInScratch(incoming_link.message_hash, incoming_link.message_hash_len)) { + link_object scratch_object = getFromScratch(incoming_link.message_hash, incoming_link.message_hash_len); + if (scratch_object.payload != "") { + updatePayload(incoming_link.message_hash, incoming_link.message_hash_len, scratch_object.payload); + } + removeFromScratch(incoming_link.message_hash, incoming_link.message_hash_len); + } else { + // if this packet is new to us, we rebroadcast it, but only up to an hour old + if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { + rebroadcastLinkObject(incoming_link); + } else { + LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was %u", + incoming_link.rx_time); + } + } + maybeMoveFromCanonScratch(incoming_link.root_hash, incoming_link.root_hash_len); + if (chain_end.rx_time != 0) { + int64_t links_behind = 0; + if (t->chain_count != 0 && t->chain_count > chain_end.counter) { + links_behind = t->chain_count - chain_end.counter; + + if (links_behind > 1) + LOG_DEBUG("StoreForwardpp observed link that is links ahead of us: %ld", links_behind); + if (links_behind > portduino_config.sfpp_backlog_limit) { + LOG_INFO("StoreForwardpp Chain behind limit, dumping DB"); + clearChain(t->root_hash.bytes, t->root_hash.size); + clearCanonScratch(t->root_hash.bytes, t->root_hash.size, + t->chain_count - portduino_config.sfpp_backlog_limit); + return true; + } + } + } + requestNextMessage(incoming_link.root_hash, incoming_link.root_hash_len, incoming_link.commit_hash, + incoming_link.commit_hash_len); + } else { + if (!isInScratch(incoming_link.message_hash, incoming_link.message_hash_len) && + !isInDB(incoming_link.message_hash, incoming_link.message_hash_len) && + !isInCanonScratch(incoming_link.message_hash, incoming_link.message_hash_len)) { + addToScratch(incoming_link); + LOG_INFO("StoreForwardpp added incoming non-canon message to scratch"); + if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { + rebroadcastLinkObject(incoming_link); + } else { + LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was &u", + incoming_link.rx_time); } } } - requestNextMessage(incoming_link.root_hash, incoming_link.root_hash_len, incoming_link.commit_hash, - incoming_link.commit_hash_len); - } else { - if (!isInScratch(incoming_link.message_hash, incoming_link.message_hash_len) && - !isInDB(incoming_link.message_hash, incoming_link.message_hash_len)) { - addToScratch(incoming_link); - LOG_INFO("StoreForwardpp added incoming non-canon message to scratch"); - if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { - rebroadcastLinkObject(incoming_link); - } else { - LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was &u", - incoming_link.rx_time); - } - } } + } else { + // We've received a link provide, and it doesn't fit. But it may be legit. Add it to canon_scratch + addToCanonScratch(incoming_link); } } @@ -878,7 +968,6 @@ void StoreForwardPlusPlusModule::broadcastLink(uint8_t *_commit_hash, size_t _co broadcastLink(lo, false); } -// TODO if stratum0, send the chain count void StoreForwardPlusPlusModule::broadcastLink(link_object &lo, bool full_commit_hash, bool is_split_second_half) { meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; @@ -984,6 +1073,47 @@ StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getLink(uint return lo; } +StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getLinkFromMessageHash(uint8_t *_message_hash, + size_t _message_hash_len) +{ + link_object lo; + sqlite3_bind_int(getLinkFromMessageHashStmt, 1, _message_hash_len); + sqlite3_bind_blob(getLinkFromMessageHashStmt, 2, _message_hash, _message_hash_len, NULL); + int res = sqlite3_step(getLinkFromMessageHashStmt); + + lo.to = sqlite3_column_int(getLinkFromMessageHashStmt, 0); + lo.from = sqlite3_column_int(getLinkFromMessageHashStmt, 1); + lo.id = sqlite3_column_int(getLinkFromMessageHashStmt, 2); + + uint8_t *_payload = (uint8_t *)sqlite3_column_blob(getLinkFromMessageHashStmt, 3); + lo.encrypted_len = sqlite3_column_bytes(getLinkFromMessageHashStmt, 3); + memcpy(lo.encrypted_bytes, _payload, lo.encrypted_len); + + uint8_t *link_message_hash = (uint8_t *)sqlite3_column_blob(getLinkFromMessageHashStmt, 4); + lo.message_hash_len = SFPP_HASH_SIZE; + memcpy(lo.message_hash, link_message_hash, lo.message_hash_len); + + lo.rx_time = sqlite3_column_int(getLinkFromMessageHashStmt, 5); + + uint8_t *_tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getLinkFromMessageHashStmt, 6); + lo.commit_hash_len = SFPP_HASH_SIZE; + memcpy(lo.commit_hash, _tmp_commit_hash, lo.commit_hash_len); + + uint8_t *_root_hash = (uint8_t *)sqlite3_column_blob(getLinkFromMessageHashStmt, 7); + lo.root_hash_len = SFPP_HASH_SIZE; + memcpy(lo.root_hash, _root_hash, lo.root_hash_len); + + lo.counter = sqlite3_column_int(getLinkFromMessageHashStmt, 8); + + lo.payload = std::string((char *)sqlite3_column_text(getLinkFromMessageHashStmt, 9)); + + lo.channel_hash = getChannelHashFromRoot(lo.root_hash, lo.root_hash_len); + + sqlite3_reset(getLinkFromMessageHashStmt); + + return lo; +} + bool StoreForwardPlusPlusModule::sendFromScratch(uint8_t *root_hash) { link_object lo = getNextScratchObject(root_hash); @@ -1025,6 +1155,7 @@ bool StoreForwardPlusPlusModule::addToChain(link_object &lo) if (lo.counter == 0) { lo.counter = chain_end.counter + 1; } + LOG_DEBUG("Adding link %u to Canon Chain", lo.counter); // push a message into the local chain DB // destination @@ -1053,6 +1184,7 @@ bool StoreForwardPlusPlusModule::addToChain(link_object &lo) } sqlite3_reset(chain_insert_stmt); setChainCount(lo.root_hash, SFPP_HASH_SIZE, lo.counter); + removeFromCanonScratch(lo.message_hash, lo.message_hash_len); return true; } @@ -1085,27 +1217,38 @@ bool StoreForwardPlusPlusModule::addToScratch(link_object &lo) return true; } -void StoreForwardPlusPlusModule::canonAnnounce(link_object &lo, uint8_t *_message_hash, uint8_t *_commit_hash, - uint8_t *_root_hash, uint32_t _rx_time) +void StoreForwardPlusPlusModule::canonAnnounce(link_object &lo) { meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE; // set root hash // set message hash + if (lo.message_hash_len < 8) { + LOG_WARN("Attempt canonAnnounce without message hash"); + return; + } storeforward.message_hash.size = 8; - memcpy(storeforward.message_hash.bytes, _message_hash, 8); + memcpy(storeforward.message_hash.bytes, lo.message_hash, 8); // set chain hash + if (lo.commit_hash_len < 8) { + LOG_WARN("Attempt canonAnnounce without commit hash"); + return; + } storeforward.commit_hash.size = 8; - memcpy(storeforward.commit_hash.bytes, _commit_hash, 8); + memcpy(storeforward.commit_hash.bytes, lo.commit_hash, 8); // set root hash // needs to be the full hash to bootstrap + if (lo.root_hash_len < SFPP_HASH_SIZE) { + LOG_WARN("Attempt canonAnnounce without root hash"); + return; + } storeforward.root_hash.size = SFPP_HASH_SIZE; - memcpy(storeforward.root_hash.bytes, _root_hash, SFPP_HASH_SIZE); + memcpy(storeforward.root_hash.bytes, lo.root_hash, SFPP_HASH_SIZE); - storeforward.encapsulated_rxtime = _rx_time; + storeforward.encapsulated_rxtime = lo.rx_time; storeforward.chain_count = lo.counter; // storeforward. meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); @@ -1258,7 +1401,7 @@ void StoreForwardPlusPlusModule::updatePayload(uint8_t *message_hash_bytes, size auto res = sqlite3_step(updatePayloadStmt); if (res != SQLITE_OK) { const char *_error_mesg = sqlite3_errmsg(ppDb); - LOG_WARN("StoreForwardpp step error %u, %s", res, _error_mesg); + LOG_WARN("StoreForwardpp updatePayloadStmt step error %u, %s", res, _error_mesg); } sqlite3_reset(updatePayloadStmt); } @@ -1272,7 +1415,7 @@ StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getFromScrat auto res = sqlite3_step(fromScratchByHashStmt); if (res != SQLITE_ROW && res != SQLITE_OK) { const char *_error_mesg = sqlite3_errmsg(ppDb); - LOG_WARN("StoreForwardpp step error %u, %s", res, _error_mesg); + LOG_WARN("StoreForwardpp fromScratchByHashStmt step error %u, %s", res, _error_mesg); } lo.to = sqlite3_column_int(fromScratchByHashStmt, 0); lo.from = sqlite3_column_int(fromScratchByHashStmt, 1); @@ -1281,15 +1424,18 @@ StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getFromScrat uint8_t *encrypted_bytes = (uint8_t *)sqlite3_column_blob(fromScratchByHashStmt, 3); lo.encrypted_len = sqlite3_column_bytes(fromScratchByHashStmt, 3); memcpy(lo.encrypted_bytes, encrypted_bytes, lo.encrypted_len); + uint8_t *message_hash = (uint8_t *)sqlite3_column_blob(fromScratchByHashStmt, 4); + lo.message_hash_len = sqlite3_column_bytes(fromScratchByHashStmt, 4); memcpy(lo.message_hash, message_hash, SFPP_HASH_SIZE); + lo.rx_time = sqlite3_column_int(fromScratchByHashStmt, 5); + uint8_t *root_hash = (uint8_t *)sqlite3_column_blob(fromScratchByHashStmt, 6); + lo.root_hash_len = SFPP_HASH_SIZE; memcpy(lo.root_hash, root_hash, SFPP_HASH_SIZE); lo.payload = std::string((char *)sqlite3_column_text(fromScratchByHashStmt, 7), sqlite3_column_bytes(fromScratchByHashStmt, 7)); - lo.message_hash_len = hash_len; - memcpy(lo.message_hash, message_hash_bytes, hash_len); sqlite3_reset(fromScratchByHashStmt); return lo; } @@ -1324,7 +1470,6 @@ StoreForwardPlusPlusModule::ingestTextPacket(const meshtastic_MeshPacket &mp, co StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::ingestLinkMessage(meshtastic_StoreForwardPlusPlus *t, bool recalc) { - // TODO: If not stratum0, injest the chain count link_object lo; lo.to = t->encapsulated_to; @@ -1334,7 +1479,7 @@ StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::ingestLinkMe lo.from = t->encapsulated_from; lo.id = t->encapsulated_id; lo.rx_time = t->encapsulated_rxtime; - lo.counter = t->chain_count; + lo.counter = t->chain_count; // Should this be skipped if stratum0? // What if we don't have this root hash? Should drop this packet before this point. lo.channel_hash = getChannelHashFromRoot(t->root_hash.bytes, t->root_hash.size); @@ -1384,6 +1529,7 @@ bool StoreForwardPlusPlusModule::checkCommitHash(StoreForwardPlusPlusModule::lin size_t hash_len) { SHA256 commit_hash; + uint8_t tmp_commit_hash[SFPP_HASH_SIZE]; link_object chain_end = getLinkFromCount(0, lo.root_hash, lo.root_hash_len); @@ -1402,10 +1548,11 @@ bool StoreForwardPlusPlusModule::checkCommitHash(StoreForwardPlusPlusModule::lin } commit_hash.update(lo.message_hash, SFPP_HASH_SIZE); - commit_hash.finalize(lo.commit_hash, SFPP_HASH_SIZE); - lo.commit_hash_len = SFPP_HASH_SIZE; + commit_hash.finalize(tmp_commit_hash, SFPP_HASH_SIZE); if (hash_len == 0 || memcmp(commit_hash_bytes, lo.commit_hash, hash_len) == 0) { + lo.commit_hash_len = SFPP_HASH_SIZE; + memcpy(lo.commit_hash, tmp_commit_hash, SFPP_HASH_SIZE); return true; } return false; @@ -1490,7 +1637,8 @@ StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getLinkFromC void StoreForwardPlusPlusModule::pruneScratchQueue() { - sqlite3_bind_int(pruneScratchQueueStmt, 1, time(nullptr) - 60 * 60 * 24); + // Very little utility in holding on to very old scratch messages + sqlite3_bind_int(pruneScratchQueueStmt, 1, time(nullptr) - 60 * 60 * 6); int res = sqlite3_step(pruneScratchQueueStmt); if (res != SQLITE_OK && res != SQLITE_DONE) { LOG_ERROR("StoreForwardpp Prune Scratch sqlite error %u, %s", res, sqlite3_errmsg(ppDb)); @@ -1555,7 +1703,6 @@ bool StoreForwardPlusPlusModule::recalculateHash(StoreForwardPlusPlusModule::lin printBytes("StoreForwardpp commit hash matches: 0x", _commit_hash_bytes, _commit_hash_len); } else { LOG_WARN("StoreForwardpp commit hash does not match, rejecting link."); - lo.commit_hash_len = 0; lo.validObject = false; return false; } @@ -1625,4 +1772,174 @@ void StoreForwardPlusPlusModule::updatePeers(const meshtastic_MeshPacket &mp, // if not new, run the calculations and update the numbers } +void StoreForwardPlusPlusModule::maybeMoveFromCanonScratch(uint8_t *root_hash, size_t root_hash_len) +{ + + // get the earliest chain count. See if it's the next one. Try to commit + // if not, speculate and try to commit + link_object lo = getfromCanonScratch(root_hash, root_hash_len); + if (!lo.validObject) + return; + link_object chain_end = getLinkFromCount(0, root_hash, root_hash_len); + if (!chain_end.validObject) + return; + + if (lo.counter == chain_end.counter + 1 && + recalculateHash(lo, root_hash, root_hash_len, lo.commit_hash, lo.commit_hash_len)) { + + addToChain(lo); + maybeMoveFromCanonScratch(root_hash, root_hash_len); // recursion! + return; + } + // TODO check hash lengths here + LOG_INFO("speculating chain, attempting to commit from canon scratch"); + if (speculateScratchChain(lo.commit_hash, lo.commit_hash_len, root_hash, lo.commit_hash)) { + int count = 0; + do { + count++; + link_object next_scratch_object = getNextScratchObject(root_hash); + if (!next_scratch_object.validObject) { + LOG_ERROR("StoreForwardpp Speculation commit possibly failed, no next scratch object"); + break; + } + addToChain(next_scratch_object); + removeFromScratch(next_scratch_object.message_hash, next_scratch_object.message_hash_len); + chain_end = getLinkFromCount(0, root_hash, root_hash_len); + } while (memcmp(chain_end.commit_hash, lo.commit_hash, lo.commit_hash_len) != 0); + LOG_INFO("StoreForwardpp added %d links from scratch", count); + maybeMoveFromCanonScratch(root_hash, root_hash_len); // recursion! + + // We have an object from Scratch that we know is a commit, but we can't put it on the chain yet + } +} + +void StoreForwardPlusPlusModule::addToCanonScratch(link_object &lo) +{ + + // recalculate full message hash? + LOG_INFO("addToCanonScratch"); + logLinkObject(lo); + + // push a message into the local chain DB + // destination + sqlite3_bind_int(canon_scratch_insert_stmt, 1, lo.to); + // sender + sqlite3_bind_int(canon_scratch_insert_stmt, 2, lo.from); + // packet_id + sqlite3_bind_int(canon_scratch_insert_stmt, 3, lo.id); + // root_hash + sqlite3_bind_blob(canon_scratch_insert_stmt, 4, lo.root_hash, lo.root_hash_len, NULL); + // encrypted_bytes + sqlite3_bind_blob(canon_scratch_insert_stmt, 5, lo.encrypted_bytes, lo.encrypted_len, NULL); + // message_hash + sqlite3_bind_blob(canon_scratch_insert_stmt, 6, lo.message_hash, lo.message_hash_len, NULL); + // rx_time + sqlite3_bind_int(canon_scratch_insert_stmt, 7, lo.rx_time); + // commit_hash + sqlite3_bind_blob(canon_scratch_insert_stmt, 8, lo.commit_hash, lo.commit_hash_len, NULL); + // payload + sqlite3_bind_text(canon_scratch_insert_stmt, 9, lo.payload.c_str(), lo.payload.length(), NULL); + + sqlite3_bind_int(canon_scratch_insert_stmt, 10, lo.counter); + int res = sqlite3_step(canon_scratch_insert_stmt); + if (res != SQLITE_OK && res != SQLITE_DONE) { + LOG_ERROR("StoreForwardpp canon_scratch_insert_stmt Cannot step %u: %s", res, sqlite3_errmsg(ppDb)); + } + sqlite3_reset(canon_scratch_insert_stmt); +} + +StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getfromCanonScratch(uint8_t *root_hash_bytes, size_t hash_len) +{ + + link_object lo; + sqlite3_bind_int(getCanonScratchStmt, 1, hash_len); + sqlite3_bind_blob(getCanonScratchStmt, 2, root_hash_bytes, hash_len, NULL); + auto res = sqlite3_step(getCanonScratchStmt); + if (res != SQLITE_ROW && res != SQLITE_OK) { + const char *_error_mesg = sqlite3_errmsg(ppDb); + LOG_ERROR("StoreForwardpp getCanonScratchStmt step error %u, %s", res, _error_mesg); + lo.validObject = false; + return lo; + } + lo.to = sqlite3_column_int(getCanonScratchStmt, 0); + lo.from = sqlite3_column_int(getCanonScratchStmt, 1); + lo.id = sqlite3_column_int(getCanonScratchStmt, 2); + + uint8_t *_payload = (uint8_t *)sqlite3_column_blob(getCanonScratchStmt, 3); + lo.encrypted_len = sqlite3_column_bytes(getCanonScratchStmt, 3); + memcpy(lo.encrypted_bytes, _payload, lo.encrypted_len); + + uint8_t *_message_hash = (uint8_t *)sqlite3_column_blob(getCanonScratchStmt, 4); + lo.message_hash_len = sqlite3_column_bytes(getCanonScratchStmt, 4); + memcpy(lo.message_hash, _message_hash, lo.message_hash_len); + + lo.rx_time = sqlite3_column_int(getCanonScratchStmt, 5); + + uint8_t *_tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getCanonScratchStmt, 6); + lo.commit_hash_len = sqlite3_column_bytes(getCanonScratchStmt, 6); + memcpy(lo.commit_hash, _tmp_commit_hash, lo.commit_hash_len); + + uint8_t *_root_hash = (uint8_t *)sqlite3_column_blob(getCanonScratchStmt, 7); + lo.root_hash_len = sqlite3_column_bytes(getCanonScratchStmt, 7); + memcpy(lo.root_hash, _root_hash, lo.root_hash_len); + + lo.counter = sqlite3_column_int(getCanonScratchStmt, 8); + + lo.payload = std::string((char *)sqlite3_column_text(getCanonScratchStmt, 9)); + + sqlite3_reset(getCanonScratchStmt); + + return lo; +} + +void StoreForwardPlusPlusModule::removeFromCanonScratch(uint8_t *message_hash_bytes, size_t message_hash_len) +{ + printBytes("StoreForwardpp removing from canon scratch: ", message_hash_bytes, message_hash_len); + sqlite3_bind_int(removeCanonScratch, 1, message_hash_len); + sqlite3_bind_blob(removeCanonScratch, 2, message_hash_bytes, message_hash_len, NULL); + sqlite3_step(removeCanonScratch); + sqlite3_reset(removeCanonScratch); +} + +bool StoreForwardPlusPlusModule::isInCanonScratch(uint8_t *message_hash, size_t message_hash_len) +{ + if (message_hash_len < SFPP_SHORT_HASH_SIZE) { + return false; + } + sqlite3_bind_int(getCanonScratchCountStmt, 1, message_hash_len); + sqlite3_bind_blob(getCanonScratchCountStmt, 2, message_hash, message_hash_len, NULL); + sqlite3_step(getCanonScratchCountStmt); + int count = sqlite3_column_int(getCanonScratchCountStmt, 0); + sqlite3_reset(getCanonScratchCountStmt); + if (count > 0) + return true; + return false; +} + +void StoreForwardPlusPlusModule::clearCanonScratch(uint8_t *root_hash, size_t root_hash_len, uint32_t new_count) +{ + sqlite3_bind_int(clearCanonScratchStmt, 1, root_hash_len); + sqlite3_bind_blob(clearCanonScratchStmt, 2, root_hash, root_hash_len, NULL); + sqlite3_bind_int(clearCanonScratchStmt, 3, new_count); + int res = sqlite3_step(clearCanonScratchStmt); + if (res != SQLITE_OK && res != SQLITE_DONE) { + LOG_ERROR("StoreForwardpp Clear Canon Scratch sqlite error %u, %s", res, sqlite3_errmsg(ppDb)); + } + sqlite3_reset(clearCanonScratchStmt); +} + +void StoreForwardPlusPlusModule::logLinkObject(link_object &lo) +{ + LOG_DEBUG("To: %u, From: %u, id: %u, rx_time: %u, counter: %u, channel_hash: 0x%02x", lo.to, lo.from, lo.id, lo.rx_time, + lo.counter, lo.channel_hash); + printBytes("encrypted_bytes: ", lo.encrypted_bytes, lo.encrypted_len); + printBytes("message_hash: ", lo.message_hash, lo.message_hash_len); + printBytes("root_hash: ", lo.root_hash, lo.root_hash_len); + printBytes("commit_hash: ", lo.commit_hash, lo.commit_hash_len); + LOG_DEBUG("payload: %s", lo.payload.c_str()); + if (lo.validObject) { + LOG_DEBUG("Valid Object"); + } +} + #endif // has include sqlite3 \ No newline at end of file diff --git a/src/modules/Native/StoreForwardPlusPlus.h b/src/modules/Native/StoreForwardPlusPlus.h index 3732ee53f..db940244b 100644 --- a/src/modules/Native/StoreForwardPlusPlus.h +++ b/src/modules/Native/StoreForwardPlusPlus.h @@ -112,6 +112,7 @@ class StoreForwardPlusPlusModule : public ProtobufModule