diff --git a/src/modules/Native/StoreForwardPlusPlus.cpp b/src/modules/Native/StoreForwardPlusPlus.cpp index 819bdb470..c2f8ea331 100644 --- a/src/modules/Native/StoreForwardPlusPlus.cpp +++ b/src/modules/Native/StoreForwardPlusPlus.cpp @@ -313,9 +313,15 @@ int32_t StoreForwardPlusPlusModule::runOnce() return portduino_config.sfpp_announce_interval * 60 * 1000; } } + uint8_t root_hash_bytes[SFPP_HASH_SIZE] = {0}; ChannelHash hash = channels.getHash(0); - getOrAddRootFromChannelHash(hash, root_hash_bytes); + + if (getOrAddRootFromChannelHash(hash, root_hash_bytes) == 0) { + LOG_WARN("No root hash found, not sending"); + return portduino_config.sfpp_announce_interval * 60 * 1000; + } + uint32_t chain_count = getChainCount(root_hash_bytes, SFPP_HASH_SIZE); LOG_DEBUG("Chain count is %u", chain_count); while (chain_count > portduino_config.sfpp_max_chain) { @@ -477,8 +483,6 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac updatePeers(mp, t->sfpp_message_type); - // TODO: Clean up this mess of logic - if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE) { if (portduino_config.sfpp_stratum0) { @@ -490,114 +494,101 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac broadcastLink(next_commit_hash, SFPP_HASH_SIZE); } } - } else { - uint8_t tmp_root_hash_bytes[SFPP_HASH_SIZE] = {0}; + return true; + } - LOG_DEBUG("StoreForwardpp Received a CANON_ANNOUNCE"); - if (getRootFromChannelHash(router->p_encrypted->channel, tmp_root_hash_bytes)) { - // we found the hash, check if it's the right one - if (memcmp(tmp_root_hash_bytes, t->root_hash.bytes, t->root_hash.size) != 0) { - LOG_INFO("StoreForwardpp Root hash does not match. Possibly two stratum0 nodes on the mesh?"); - return true; - } - } else { - addRootToMappings(router->p_encrypted->channel, t->root_hash.bytes); - LOG_DEBUG("StoreForwardpp Adding root hash to mappings"); + uint8_t tmp_root_hash_bytes[SFPP_HASH_SIZE] = {0}; + + LOG_DEBUG("StoreForwardpp Received a CANON_ANNOUNCE"); + if (getRootFromChannelHash(router->p_encrypted->channel, tmp_root_hash_bytes)) { + // we found the hash, check if it's the right one + if (memcmp(tmp_root_hash_bytes, t->root_hash.bytes, t->root_hash.size) != 0) { + LOG_INFO("StoreForwardpp Root hash does not match. Possibly two stratum0 nodes on the mesh?"); + return true; } - if (t->encapsulated_rxtime == 0) { - LOG_DEBUG("StoreForwardpp No encapsulated time, conclude the chain is empty"); + } else if (t->root_hash.size == SFPP_HASH_SIZE) { + addRootToMappings(router->p_encrypted->channel, t->root_hash.bytes); + LOG_DEBUG("StoreForwardpp Adding root hash to mappings"); + } else { + LOG_WARN("No root hash and unable to add"); + return true; + } + + if (t->encapsulated_rxtime == 0) { + LOG_DEBUG("StoreForwardpp No encapsulated time, conclude the chain is empty"); + return true; + } + + // if we have no messages, just send out a request for a new starter message + if (chain_end.rx_time == 0 && airTime->isTxAllowedChannelUtil(true)) { + LOG_DEBUG("StoreForwardpp New chain, requesting last %u messages", portduino_config.sfpp_initial_sync); + requestMessageCount(t->root_hash.bytes, t->root_hash.size, portduino_config.sfpp_initial_sync); + return true; + } + + // If we got an announce that matches, maybe send a message from scratch, but no other actions needed. + if (t->commit_hash.size >= SFPP_SHORT_HASH_SIZE && + memcmp(chain_end.commit_hash, t->commit_hash.bytes, t->commit_hash.size) == 0) { + LOG_DEBUG("StoreForwardpp End of chain matches!"); + sendFromScratch(chain_end.root_hash); + return true; + } + + LOG_DEBUG("StoreForwardpp End of chain does not match! Checking distance behind."); + int64_t links_behind = 0; + if (t->chain_count != 0 && t->chain_count > chain_end.counter) { + links_behind = t->chain_count - chain_end.counter; + + LOG_DEBUG("StoreForwardpp Links behind: %ld", links_behind); + if (links_behind > portduino_config.sfpp_backlog_limit) { + LOG_INFO("StoreForwardpp Chain behind limit, dumping DB"); + clearChain(t->root_hash.bytes, t->root_hash.size); + clearCanonScratch(t->root_hash.bytes, t->root_hash.size, t->chain_count - portduino_config.sfpp_backlog_limit); + return true; + } + } + + // We just got an end of chain announce, checking if we have seen this message and have it in scratch. + LOG_DEBUG("Checking if in scratch"); + if (isInScratch(t->message_hash.bytes, t->message_hash.size)) { + + LOG_DEBUG("Found in scratch"); + link_object scratch_object = getFromScratch(t->message_hash.bytes, t->message_hash.size); + // if this matches, we don't need to request the message + // we know exactly what it is + if (t->message_hash.size >= 8 && t->commit_hash.size >= 8 && + checkCommitHash(scratch_object, t->commit_hash.bytes, t->message_hash.size)) { + LOG_INFO("StoreForwardpp Found announced message in scratch, adding to chain"); + scratch_object.rx_time = t->encapsulated_rxtime; + addToChain(scratch_object); + removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len); + maybeMoveFromCanonScratch(scratch_object.root_hash, scratch_object.root_hash_len); + return true; } - if (chain_end.rx_time != 0) { - if (t->commit_hash.size >= SFPP_SHORT_HASH_SIZE && - memcmp(chain_end.commit_hash, t->commit_hash.bytes, t->commit_hash.size) == 0) { - LOG_DEBUG("StoreForwardpp End of chain matches!"); - sendFromScratch(chain_end.root_hash); - } else { - LOG_DEBUG("StoreForwardpp End of chain does not match! Checking distance behind."); - int64_t links_behind = 0; - if (t->chain_count != 0 && t->chain_count > chain_end.counter) { - links_behind = t->chain_count - chain_end.counter; + LOG_INFO("StoreForwardpp Earliest scratch message commit hash does not match announced commit hash, " + "speculating chain"); + if (!speculateScratchChain(t->commit_hash.bytes, t->commit_hash.size, scratch_object.root_hash, + chain_end.commit_hash)) { + LOG_INFO("StoreForwardpp Could not speculate chain to announced commit hash. Moving from " + "local_messages to canon_scratch"); - LOG_DEBUG("StoreForwardpp Links behind: %ld", links_behind); - if (links_behind > portduino_config.sfpp_backlog_limit) { - LOG_INFO("StoreForwardpp Chain behind limit, dumping DB"); - clearChain(t->root_hash.bytes, t->root_hash.size); - clearCanonScratch(t->root_hash.bytes, t->root_hash.size, - t->chain_count - portduino_config.sfpp_backlog_limit); - return true; - } - } - // We just got an end of chain announce, checking if we have seen this message and have it in scratch. - LOG_DEBUG("Checking if in scratch"); - if (isInScratch(t->message_hash.bytes, t->message_hash.size)) { - LOG_DEBUG("Found in scratch"); - link_object scratch_object = getFromScratch(t->message_hash.bytes, t->message_hash.size); - // if this matches, we don't need to request the message - // we know exactly what it is - if (t->message_hash.size >= 8 && t->commit_hash.size >= 8 && - checkCommitHash(scratch_object, t->commit_hash.bytes, t->message_hash.size)) { - LOG_INFO("StoreForwardpp Found announced message in scratch, adding to chain"); - scratch_object.rx_time = t->encapsulated_rxtime; - addToChain(scratch_object); - removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len); - maybeMoveFromCanonScratch(scratch_object.root_hash, scratch_object.root_hash_len); - // short circuit and return - // falls through to a request for the message - return true; - } else { - - // TODO move into a function - LOG_INFO("StoreForwardpp Earliest scratch message commit hash does not match announced commit hash, " - "speculating chain"); - if (speculateScratchChain(t->commit_hash.bytes, t->commit_hash.size, scratch_object.root_hash, - chain_end.commit_hash)) { - int count = 0; - do { - count++; - link_object next_scratch_object = getNextScratchObject(scratch_object.root_hash); - if (!next_scratch_object.validObject) { - LOG_ERROR("StoreForwardpp Speculation commit possibly failed, no next scratch object"); - break; - } - addToChain(next_scratch_object); - removeFromScratch(next_scratch_object.message_hash, next_scratch_object.message_hash_len); - chain_end = getLinkFromCount(0, t->root_hash.bytes, t->root_hash.size); - printBytes("StoreForwardpp local final commit hash: ", chain_end.commit_hash, SFPP_HASH_SIZE); - printBytes("StoreForwardpp Announced commit hash: ", t->commit_hash.bytes, - t->commit_hash.size); - } while (memcmp(chain_end.commit_hash, t->commit_hash.bytes, t->commit_hash.size) != 0); - LOG_INFO("StoreForwardpp added %d links from scratch", count); - maybeMoveFromCanonScratch(scratch_object.root_hash, scratch_object.root_hash_len); - - // We have an object from Scratch that we know is a commit, but we can't put it on the chain yet - } else { - LOG_INFO("StoreForwardpp Could not speculate chain to announced commit hash. Moving from " - "local_messages to canon_scratch"); - - scratch_object.rx_time = t->encapsulated_rxtime; - scratch_object.counter = t->chain_count; - scratch_object.commit_hash_len = t->commit_hash.size; - memcpy(scratch_object.commit_hash, t->commit_hash.bytes, t->commit_hash.size); - addToCanonScratch(scratch_object); - removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len); - } - } - } else { - LOG_DEBUG("StoreForwardpp Not in scratch"); - } - if (airTime->isTxAllowedChannelUtil(true)) { - requestNextMessage(t->root_hash.bytes, t->root_hash.size, chain_end.commit_hash, SFPP_HASH_SIZE); - } - } - } else { // if chainEnd() - if (airTime->isTxAllowedChannelUtil(true)) { - LOG_DEBUG("StoreForwardpp New chain, requesting last %u messages", portduino_config.sfpp_initial_sync); - requestMessageCount(t->root_hash.bytes, t->root_hash.size, portduino_config.sfpp_initial_sync); - } + scratch_object.rx_time = t->encapsulated_rxtime; + scratch_object.counter = t->chain_count; + scratch_object.commit_hash_len = t->commit_hash.size; + memcpy(scratch_object.commit_hash, t->commit_hash.bytes, t->commit_hash.size); + addToCanonScratch(scratch_object); + removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len); } + + } else { + LOG_DEBUG("StoreForwardpp Not in scratch"); } + if (airTime->isTxAllowedChannelUtil(true)) { + requestNextMessage(t->root_hash.bytes, t->root_hash.size, chain_end.commit_hash, SFPP_HASH_SIZE); + } + } else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_REQUEST) { uint8_t next_commit_hash[SFPP_HASH_SIZE] = {0}; @@ -612,7 +603,7 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac } else if (getNextHash(t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size, next_commit_hash)) { - printBytes("StoreForwardpp next chain hash: ", next_commit_hash, SFPP_HASH_SIZE); + printBytes("StoreForwardpp sending next chain hash: ", next_commit_hash, SFPP_HASH_SIZE); broadcastLink(next_commit_hash, SFPP_HASH_SIZE); } else { @@ -675,19 +666,13 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac split_link_in = link_object(); split_link_in.validObject = false; doing_split_receive = false; - // do the recalcualte step we skipped - /*if (!recalculateHash(incoming_link, t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, - t->commit_hash.size)) { - LOG_WARN("StoreForwardpp Recalculated hash does not match"); - return true; - }*/ - } else { LOG_WARN("StoreForwardpp No first half stored, cannot combine"); return true; } } + // We have a link. Recalculate the message hash and check if the commit hash matches if (recalculateHash(incoming_link, t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size)) { if (incoming_link.root_hash_len == 0) { LOG_WARN("StoreForwardpp Hash bytes not found for incoming link"); @@ -696,18 +681,27 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac if (isCommitInDB(incoming_link.commit_hash, incoming_link.commit_hash_len) || isInDB(incoming_link.message_hash, incoming_link.message_hash_len)) { + // This was sent to us as a scratch message, but we already have it in the chain. + // Respond by announcing this if (t->commit_hash.size == 0) { link_object link_to_announce = getLinkFromMessageHash(incoming_link.message_hash, incoming_link.message_hash_len); canonAnnounce(link_to_announce); - LOG_INFO("StoreForwardpp Received link already in chain # %u", link_to_announce.counter); + LOG_INFO("StoreForwardpp Received link already in chain #%u, announcing next", link_to_announce.counter); } else { LOG_INFO("StoreForwardpp Received link already in chain"); } return true; } + + // This is a scratch provide that we haven't seen if (portduino_config.sfpp_stratum0) { + if (t->commit_hash.size != 0) { + LOG_WARN("Got a link with commit hash, that we don't have on the canon chain"); + return true; + } + // calculate the commit_hash addToChain(incoming_link); if (!pendingRun) { @@ -722,59 +716,66 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was %u", incoming_link.rx_time); } + return true; + } - } else { - if (incoming_link.commit_hash_len == SFPP_HASH_SIZE) { - addToChain(incoming_link); - if (isInScratch(incoming_link.message_hash, incoming_link.message_hash_len)) { - link_object scratch_object = getFromScratch(incoming_link.message_hash, incoming_link.message_hash_len); - if (scratch_object.payload != "") { - updatePayload(incoming_link.message_hash, incoming_link.message_hash_len, scratch_object.payload); - } - removeFromScratch(incoming_link.message_hash, incoming_link.message_hash_len); - } else { - // if this packet is new to us, we rebroadcast it, but only up to an hour old - if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { - rebroadcastLinkObject(incoming_link); - } else { - LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was %u", - incoming_link.rx_time); - } - } - maybeMoveFromCanonScratch(incoming_link.root_hash, incoming_link.root_hash_len); - if (chain_end.rx_time != 0) { - int64_t links_behind = 0; - if (t->chain_count != 0 && t->chain_count > chain_end.counter) { - links_behind = t->chain_count - chain_end.counter; + // Valid hash, and we're not stratum0 - if (links_behind > 1) - LOG_DEBUG("StoreForwardpp observed link that is links ahead of us: %ld", links_behind); - if (links_behind > portduino_config.sfpp_backlog_limit) { - LOG_INFO("StoreForwardpp Chain behind limit, dumping DB"); - clearChain(t->root_hash.bytes, t->root_hash.size); - clearCanonScratch(t->root_hash.bytes, t->root_hash.size, - t->chain_count - portduino_config.sfpp_backlog_limit); - return true; - } - } + // has a commit hash, so on the canon chain + if (incoming_link.commit_hash_len == SFPP_HASH_SIZE) { + addToChain(incoming_link); + if (isInScratch(incoming_link.message_hash, incoming_link.message_hash_len)) { + link_object scratch_object = getFromScratch(incoming_link.message_hash, incoming_link.message_hash_len); + if (scratch_object.payload != "") { + updatePayload(incoming_link.message_hash, incoming_link.message_hash_len, scratch_object.payload); } - requestNextMessage(incoming_link.root_hash, incoming_link.root_hash_len, incoming_link.commit_hash, - incoming_link.commit_hash_len); + removeFromScratch(incoming_link.message_hash, incoming_link.message_hash_len); } else { - if (!isInScratch(incoming_link.message_hash, incoming_link.message_hash_len) && - !isInDB(incoming_link.message_hash, incoming_link.message_hash_len) && - !isInCanonScratch(incoming_link.message_hash, incoming_link.message_hash_len)) { - addToScratch(incoming_link); - LOG_INFO("StoreForwardpp added incoming non-canon message to scratch"); - if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { - rebroadcastLinkObject(incoming_link); - } else { - LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was &u", - incoming_link.rx_time); + // if this packet is new to us, we rebroadcast it, but only up to an hour old + if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { + rebroadcastLinkObject(incoming_link); + } else { + LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was %u", + incoming_link.rx_time); + } + } + maybeMoveFromCanonScratch(incoming_link.root_hash, incoming_link.root_hash_len); + + if (chain_end.rx_time != 0) { + int64_t links_behind = 0; + if (t->chain_count != 0 && t->chain_count > chain_end.counter) { + links_behind = t->chain_count - chain_end.counter; + + if (links_behind > 1) + LOG_DEBUG("StoreForwardpp observed link that is links ahead of us: %ld", links_behind); + if (links_behind > portduino_config.sfpp_backlog_limit) { + LOG_INFO("StoreForwardpp Chain behind limit, dumping DB"); + clearChain(t->root_hash.bytes, t->root_hash.size); + clearCanonScratch(t->root_hash.bytes, t->root_hash.size, + t->chain_count - portduino_config.sfpp_backlog_limit); + return true; } } } + requestNextMessage(incoming_link.root_hash, incoming_link.root_hash_len, incoming_link.commit_hash, + incoming_link.commit_hash_len); + + // possibly not on the canon hash + } else { + if (!isInScratch(incoming_link.message_hash, incoming_link.message_hash_len) && + !isInDB(incoming_link.message_hash, incoming_link.message_hash_len) && + !isInCanonScratch(incoming_link.message_hash, incoming_link.message_hash_len)) { + addToScratch(incoming_link); + LOG_INFO("StoreForwardpp added incoming non-canon message to scratch"); + if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { + rebroadcastLinkObject(incoming_link); + } else { + LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was &u", + incoming_link.rx_time); + } + } } + } else { LOG_INFO("StoreForwardpp Recalculated hash does not match."); if (portduino_config.sfpp_stratum0) @@ -1351,6 +1352,7 @@ void StoreForwardPlusPlusModule::removeFromScratch(uint8_t *message_hash_bytes, bool StoreForwardPlusPlusModule::speculateScratchChain(uint8_t *commit_hash_bytes, size_t commit_hash_len, uint8_t *root_hash, uint8_t *current_commit_hash) { + bool found = false; int count = 0; uint32_t tmp_to = 0; u_int32_t tmp_from = 0; @@ -1386,14 +1388,37 @@ bool StoreForwardPlusPlusModule::speculateScratchChain(uint8_t *commit_hash_byte // found it LOG_INFO("StoreForwardpp found next scratch object in chain after %d tries", count); sqlite3_reset(fromScratchStmt); - return true; + found = true; + break; } } // loop // get the oldest scratch object with the given root hash // calculate the commit hash for that object sqlite3_reset(fromScratchStmt); - return false; + if (!found) + return false; + + int second_count = 0; + link_object chain_end; + + do { + second_count++; + link_object next_scratch_object = getNextScratchObject(root_hash); + if (!next_scratch_object.validObject) { + LOG_ERROR("StoreForwardpp Speculation commit possibly failed, no next scratch object"); + break; + } + addToChain(next_scratch_object); + removeFromScratch(next_scratch_object.message_hash, next_scratch_object.message_hash_len); + chain_end = getLinkFromCount(0, root_hash, SFPP_HASH_SIZE); + printBytes("StoreForwardpp local final commit hash: ", chain_end.commit_hash, SFPP_HASH_SIZE); + printBytes("StoreForwardpp target commit hash: ", commit_hash_bytes, commit_hash_len); + } while (memcmp(chain_end.commit_hash, commit_hash_bytes, commit_hash_len) != 0); + + LOG_INFO("StoreForwardpp added %d links from scratch", second_count); + maybeMoveFromCanonScratch(root_hash, SFPP_HASH_SIZE); + return true; } StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getNextScratchObject(uint8_t *root_hash) @@ -1833,24 +1858,7 @@ void StoreForwardPlusPlusModule::maybeMoveFromCanonScratch(uint8_t *root_hash, s } // TODO check hash lengths here LOG_INFO("speculating chain, attempting to commit from canon scratch"); - if (speculateScratchChain(lo.commit_hash, lo.commit_hash_len, root_hash, lo.commit_hash)) { - int count = 0; - do { - count++; - link_object next_scratch_object = getNextScratchObject(root_hash); - if (!next_scratch_object.validObject) { - LOG_ERROR("StoreForwardpp Speculation commit possibly failed, no next scratch object"); - break; - } - addToChain(next_scratch_object); - removeFromScratch(next_scratch_object.message_hash, next_scratch_object.message_hash_len); - chain_end = getLinkFromCount(0, root_hash, root_hash_len); - } while (memcmp(chain_end.commit_hash, lo.commit_hash, lo.commit_hash_len) != 0); - LOG_INFO("StoreForwardpp added %d links from scratch", count); - maybeMoveFromCanonScratch(root_hash, root_hash_len); // recursion! - - // We have an object from Scratch that we know is a commit, but we can't put it on the chain yet - } + speculateScratchChain(lo.commit_hash, lo.commit_hash_len, root_hash, lo.commit_hash); } void StoreForwardPlusPlusModule::addToCanonScratch(link_object &lo)