Code refactor

This commit is contained in:
Jonathan Bennett
2026-01-08 13:02:14 -06:00
parent e990198628
commit 5a0644cd4f

View File

@@ -313,9 +313,15 @@ int32_t StoreForwardPlusPlusModule::runOnce()
return portduino_config.sfpp_announce_interval * 60 * 1000;
}
}
uint8_t root_hash_bytes[SFPP_HASH_SIZE] = {0};
ChannelHash hash = channels.getHash(0);
getOrAddRootFromChannelHash(hash, root_hash_bytes);
if (getOrAddRootFromChannelHash(hash, root_hash_bytes) == 0) {
LOG_WARN("No root hash found, not sending");
return portduino_config.sfpp_announce_interval * 60 * 1000;
}
uint32_t chain_count = getChainCount(root_hash_bytes, SFPP_HASH_SIZE);
LOG_DEBUG("Chain count is %u", chain_count);
while (chain_count > portduino_config.sfpp_max_chain) {
@@ -477,8 +483,6 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
updatePeers(mp, t->sfpp_message_type);
// TODO: Clean up this mess of logic
if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE) {
if (portduino_config.sfpp_stratum0) {
@@ -490,114 +494,101 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
broadcastLink(next_commit_hash, SFPP_HASH_SIZE);
}
}
} else {
uint8_t tmp_root_hash_bytes[SFPP_HASH_SIZE] = {0};
return true;
}
LOG_DEBUG("StoreForwardpp Received a CANON_ANNOUNCE");
if (getRootFromChannelHash(router->p_encrypted->channel, tmp_root_hash_bytes)) {
// we found the hash, check if it's the right one
if (memcmp(tmp_root_hash_bytes, t->root_hash.bytes, t->root_hash.size) != 0) {
LOG_INFO("StoreForwardpp Root hash does not match. Possibly two stratum0 nodes on the mesh?");
return true;
}
} else {
addRootToMappings(router->p_encrypted->channel, t->root_hash.bytes);
LOG_DEBUG("StoreForwardpp Adding root hash to mappings");
uint8_t tmp_root_hash_bytes[SFPP_HASH_SIZE] = {0};
LOG_DEBUG("StoreForwardpp Received a CANON_ANNOUNCE");
if (getRootFromChannelHash(router->p_encrypted->channel, tmp_root_hash_bytes)) {
// we found the hash, check if it's the right one
if (memcmp(tmp_root_hash_bytes, t->root_hash.bytes, t->root_hash.size) != 0) {
LOG_INFO("StoreForwardpp Root hash does not match. Possibly two stratum0 nodes on the mesh?");
return true;
}
if (t->encapsulated_rxtime == 0) {
LOG_DEBUG("StoreForwardpp No encapsulated time, conclude the chain is empty");
} else if (t->root_hash.size == SFPP_HASH_SIZE) {
addRootToMappings(router->p_encrypted->channel, t->root_hash.bytes);
LOG_DEBUG("StoreForwardpp Adding root hash to mappings");
} else {
LOG_WARN("No root hash and unable to add");
return true;
}
if (t->encapsulated_rxtime == 0) {
LOG_DEBUG("StoreForwardpp No encapsulated time, conclude the chain is empty");
return true;
}
// if we have no messages, just send out a request for a new starter message
if (chain_end.rx_time == 0 && airTime->isTxAllowedChannelUtil(true)) {
LOG_DEBUG("StoreForwardpp New chain, requesting last %u messages", portduino_config.sfpp_initial_sync);
requestMessageCount(t->root_hash.bytes, t->root_hash.size, portduino_config.sfpp_initial_sync);
return true;
}
// If we got an announce that matches, maybe send a message from scratch, but no other actions needed.
if (t->commit_hash.size >= SFPP_SHORT_HASH_SIZE &&
memcmp(chain_end.commit_hash, t->commit_hash.bytes, t->commit_hash.size) == 0) {
LOG_DEBUG("StoreForwardpp End of chain matches!");
sendFromScratch(chain_end.root_hash);
return true;
}
LOG_DEBUG("StoreForwardpp End of chain does not match! Checking distance behind.");
int64_t links_behind = 0;
if (t->chain_count != 0 && t->chain_count > chain_end.counter) {
links_behind = t->chain_count - chain_end.counter;
LOG_DEBUG("StoreForwardpp Links behind: %ld", links_behind);
if (links_behind > portduino_config.sfpp_backlog_limit) {
LOG_INFO("StoreForwardpp Chain behind limit, dumping DB");
clearChain(t->root_hash.bytes, t->root_hash.size);
clearCanonScratch(t->root_hash.bytes, t->root_hash.size, t->chain_count - portduino_config.sfpp_backlog_limit);
return true;
}
}
// We just got an end of chain announce, checking if we have seen this message and have it in scratch.
LOG_DEBUG("Checking if in scratch");
if (isInScratch(t->message_hash.bytes, t->message_hash.size)) {
LOG_DEBUG("Found in scratch");
link_object scratch_object = getFromScratch(t->message_hash.bytes, t->message_hash.size);
// if this matches, we don't need to request the message
// we know exactly what it is
if (t->message_hash.size >= 8 && t->commit_hash.size >= 8 &&
checkCommitHash(scratch_object, t->commit_hash.bytes, t->message_hash.size)) {
LOG_INFO("StoreForwardpp Found announced message in scratch, adding to chain");
scratch_object.rx_time = t->encapsulated_rxtime;
addToChain(scratch_object);
removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len);
maybeMoveFromCanonScratch(scratch_object.root_hash, scratch_object.root_hash_len);
return true;
}
if (chain_end.rx_time != 0) {
if (t->commit_hash.size >= SFPP_SHORT_HASH_SIZE &&
memcmp(chain_end.commit_hash, t->commit_hash.bytes, t->commit_hash.size) == 0) {
LOG_DEBUG("StoreForwardpp End of chain matches!");
sendFromScratch(chain_end.root_hash);
} else {
LOG_DEBUG("StoreForwardpp End of chain does not match! Checking distance behind.");
int64_t links_behind = 0;
if (t->chain_count != 0 && t->chain_count > chain_end.counter) {
links_behind = t->chain_count - chain_end.counter;
LOG_INFO("StoreForwardpp Earliest scratch message commit hash does not match announced commit hash, "
"speculating chain");
if (!speculateScratchChain(t->commit_hash.bytes, t->commit_hash.size, scratch_object.root_hash,
chain_end.commit_hash)) {
LOG_INFO("StoreForwardpp Could not speculate chain to announced commit hash. Moving from "
"local_messages to canon_scratch");
LOG_DEBUG("StoreForwardpp Links behind: %ld", links_behind);
if (links_behind > portduino_config.sfpp_backlog_limit) {
LOG_INFO("StoreForwardpp Chain behind limit, dumping DB");
clearChain(t->root_hash.bytes, t->root_hash.size);
clearCanonScratch(t->root_hash.bytes, t->root_hash.size,
t->chain_count - portduino_config.sfpp_backlog_limit);
return true;
}
}
// We just got an end of chain announce, checking if we have seen this message and have it in scratch.
LOG_DEBUG("Checking if in scratch");
if (isInScratch(t->message_hash.bytes, t->message_hash.size)) {
LOG_DEBUG("Found in scratch");
link_object scratch_object = getFromScratch(t->message_hash.bytes, t->message_hash.size);
// if this matches, we don't need to request the message
// we know exactly what it is
if (t->message_hash.size >= 8 && t->commit_hash.size >= 8 &&
checkCommitHash(scratch_object, t->commit_hash.bytes, t->message_hash.size)) {
LOG_INFO("StoreForwardpp Found announced message in scratch, adding to chain");
scratch_object.rx_time = t->encapsulated_rxtime;
addToChain(scratch_object);
removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len);
maybeMoveFromCanonScratch(scratch_object.root_hash, scratch_object.root_hash_len);
// short circuit and return
// falls through to a request for the message
return true;
} else {
// TODO move into a function
LOG_INFO("StoreForwardpp Earliest scratch message commit hash does not match announced commit hash, "
"speculating chain");
if (speculateScratchChain(t->commit_hash.bytes, t->commit_hash.size, scratch_object.root_hash,
chain_end.commit_hash)) {
int count = 0;
do {
count++;
link_object next_scratch_object = getNextScratchObject(scratch_object.root_hash);
if (!next_scratch_object.validObject) {
LOG_ERROR("StoreForwardpp Speculation commit possibly failed, no next scratch object");
break;
}
addToChain(next_scratch_object);
removeFromScratch(next_scratch_object.message_hash, next_scratch_object.message_hash_len);
chain_end = getLinkFromCount(0, t->root_hash.bytes, t->root_hash.size);
printBytes("StoreForwardpp local final commit hash: ", chain_end.commit_hash, SFPP_HASH_SIZE);
printBytes("StoreForwardpp Announced commit hash: ", t->commit_hash.bytes,
t->commit_hash.size);
} while (memcmp(chain_end.commit_hash, t->commit_hash.bytes, t->commit_hash.size) != 0);
LOG_INFO("StoreForwardpp added %d links from scratch", count);
maybeMoveFromCanonScratch(scratch_object.root_hash, scratch_object.root_hash_len);
// We have an object from Scratch that we know is a commit, but we can't put it on the chain yet
} else {
LOG_INFO("StoreForwardpp Could not speculate chain to announced commit hash. Moving from "
"local_messages to canon_scratch");
scratch_object.rx_time = t->encapsulated_rxtime;
scratch_object.counter = t->chain_count;
scratch_object.commit_hash_len = t->commit_hash.size;
memcpy(scratch_object.commit_hash, t->commit_hash.bytes, t->commit_hash.size);
addToCanonScratch(scratch_object);
removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len);
}
}
} else {
LOG_DEBUG("StoreForwardpp Not in scratch");
}
if (airTime->isTxAllowedChannelUtil(true)) {
requestNextMessage(t->root_hash.bytes, t->root_hash.size, chain_end.commit_hash, SFPP_HASH_SIZE);
}
}
} else { // if chainEnd()
if (airTime->isTxAllowedChannelUtil(true)) {
LOG_DEBUG("StoreForwardpp New chain, requesting last %u messages", portduino_config.sfpp_initial_sync);
requestMessageCount(t->root_hash.bytes, t->root_hash.size, portduino_config.sfpp_initial_sync);
}
scratch_object.rx_time = t->encapsulated_rxtime;
scratch_object.counter = t->chain_count;
scratch_object.commit_hash_len = t->commit_hash.size;
memcpy(scratch_object.commit_hash, t->commit_hash.bytes, t->commit_hash.size);
addToCanonScratch(scratch_object);
removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len);
}
} else {
LOG_DEBUG("StoreForwardpp Not in scratch");
}
if (airTime->isTxAllowedChannelUtil(true)) {
requestNextMessage(t->root_hash.bytes, t->root_hash.size, chain_end.commit_hash, SFPP_HASH_SIZE);
}
} else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_REQUEST) {
uint8_t next_commit_hash[SFPP_HASH_SIZE] = {0};
@@ -612,7 +603,7 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
} else if (getNextHash(t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size,
next_commit_hash)) {
printBytes("StoreForwardpp next chain hash: ", next_commit_hash, SFPP_HASH_SIZE);
printBytes("StoreForwardpp sending next chain hash: ", next_commit_hash, SFPP_HASH_SIZE);
broadcastLink(next_commit_hash, SFPP_HASH_SIZE);
} else {
@@ -675,19 +666,13 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
split_link_in = link_object();
split_link_in.validObject = false;
doing_split_receive = false;
// do the recalcualte step we skipped
/*if (!recalculateHash(incoming_link, t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes,
t->commit_hash.size)) {
LOG_WARN("StoreForwardpp Recalculated hash does not match");
return true;
}*/
} else {
LOG_WARN("StoreForwardpp No first half stored, cannot combine");
return true;
}
}
// We have a link. Recalculate the message hash and check if the commit hash matches
if (recalculateHash(incoming_link, t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size)) {
if (incoming_link.root_hash_len == 0) {
LOG_WARN("StoreForwardpp Hash bytes not found for incoming link");
@@ -696,18 +681,27 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
if (isCommitInDB(incoming_link.commit_hash, incoming_link.commit_hash_len) ||
isInDB(incoming_link.message_hash, incoming_link.message_hash_len)) {
// This was sent to us as a scratch message, but we already have it in the chain.
// Respond by announcing this
if (t->commit_hash.size == 0) {
link_object link_to_announce =
getLinkFromMessageHash(incoming_link.message_hash, incoming_link.message_hash_len);
canonAnnounce(link_to_announce);
LOG_INFO("StoreForwardpp Received link already in chain # %u", link_to_announce.counter);
LOG_INFO("StoreForwardpp Received link already in chain #%u, announcing next", link_to_announce.counter);
} else {
LOG_INFO("StoreForwardpp Received link already in chain");
}
return true;
}
// This is a scratch provide that we haven't seen
if (portduino_config.sfpp_stratum0) {
if (t->commit_hash.size != 0) {
LOG_WARN("Got a link with commit hash, that we don't have on the canon chain");
return true;
}
// calculate the commit_hash
addToChain(incoming_link);
if (!pendingRun) {
@@ -722,59 +716,66 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was %u",
incoming_link.rx_time);
}
return true;
}
} else {
if (incoming_link.commit_hash_len == SFPP_HASH_SIZE) {
addToChain(incoming_link);
if (isInScratch(incoming_link.message_hash, incoming_link.message_hash_len)) {
link_object scratch_object = getFromScratch(incoming_link.message_hash, incoming_link.message_hash_len);
if (scratch_object.payload != "") {
updatePayload(incoming_link.message_hash, incoming_link.message_hash_len, scratch_object.payload);
}
removeFromScratch(incoming_link.message_hash, incoming_link.message_hash_len);
} else {
// if this packet is new to us, we rebroadcast it, but only up to an hour old
if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) {
rebroadcastLinkObject(incoming_link);
} else {
LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was %u",
incoming_link.rx_time);
}
}
maybeMoveFromCanonScratch(incoming_link.root_hash, incoming_link.root_hash_len);
if (chain_end.rx_time != 0) {
int64_t links_behind = 0;
if (t->chain_count != 0 && t->chain_count > chain_end.counter) {
links_behind = t->chain_count - chain_end.counter;
// Valid hash, and we're not stratum0
if (links_behind > 1)
LOG_DEBUG("StoreForwardpp observed link that is links ahead of us: %ld", links_behind);
if (links_behind > portduino_config.sfpp_backlog_limit) {
LOG_INFO("StoreForwardpp Chain behind limit, dumping DB");
clearChain(t->root_hash.bytes, t->root_hash.size);
clearCanonScratch(t->root_hash.bytes, t->root_hash.size,
t->chain_count - portduino_config.sfpp_backlog_limit);
return true;
}
}
// has a commit hash, so on the canon chain
if (incoming_link.commit_hash_len == SFPP_HASH_SIZE) {
addToChain(incoming_link);
if (isInScratch(incoming_link.message_hash, incoming_link.message_hash_len)) {
link_object scratch_object = getFromScratch(incoming_link.message_hash, incoming_link.message_hash_len);
if (scratch_object.payload != "") {
updatePayload(incoming_link.message_hash, incoming_link.message_hash_len, scratch_object.payload);
}
requestNextMessage(incoming_link.root_hash, incoming_link.root_hash_len, incoming_link.commit_hash,
incoming_link.commit_hash_len);
removeFromScratch(incoming_link.message_hash, incoming_link.message_hash_len);
} else {
if (!isInScratch(incoming_link.message_hash, incoming_link.message_hash_len) &&
!isInDB(incoming_link.message_hash, incoming_link.message_hash_len) &&
!isInCanonScratch(incoming_link.message_hash, incoming_link.message_hash_len)) {
addToScratch(incoming_link);
LOG_INFO("StoreForwardpp added incoming non-canon message to scratch");
if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) {
rebroadcastLinkObject(incoming_link);
} else {
LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was &u",
incoming_link.rx_time);
// if this packet is new to us, we rebroadcast it, but only up to an hour old
if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) {
rebroadcastLinkObject(incoming_link);
} else {
LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was %u",
incoming_link.rx_time);
}
}
maybeMoveFromCanonScratch(incoming_link.root_hash, incoming_link.root_hash_len);
if (chain_end.rx_time != 0) {
int64_t links_behind = 0;
if (t->chain_count != 0 && t->chain_count > chain_end.counter) {
links_behind = t->chain_count - chain_end.counter;
if (links_behind > 1)
LOG_DEBUG("StoreForwardpp observed link that is links ahead of us: %ld", links_behind);
if (links_behind > portduino_config.sfpp_backlog_limit) {
LOG_INFO("StoreForwardpp Chain behind limit, dumping DB");
clearChain(t->root_hash.bytes, t->root_hash.size);
clearCanonScratch(t->root_hash.bytes, t->root_hash.size,
t->chain_count - portduino_config.sfpp_backlog_limit);
return true;
}
}
}
requestNextMessage(incoming_link.root_hash, incoming_link.root_hash_len, incoming_link.commit_hash,
incoming_link.commit_hash_len);
// possibly not on the canon hash
} else {
if (!isInScratch(incoming_link.message_hash, incoming_link.message_hash_len) &&
!isInDB(incoming_link.message_hash, incoming_link.message_hash_len) &&
!isInCanonScratch(incoming_link.message_hash, incoming_link.message_hash_len)) {
addToScratch(incoming_link);
LOG_INFO("StoreForwardpp added incoming non-canon message to scratch");
if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) {
rebroadcastLinkObject(incoming_link);
} else {
LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was &u",
incoming_link.rx_time);
}
}
}
} else {
LOG_INFO("StoreForwardpp Recalculated hash does not match.");
if (portduino_config.sfpp_stratum0)
@@ -1351,6 +1352,7 @@ void StoreForwardPlusPlusModule::removeFromScratch(uint8_t *message_hash_bytes,
bool StoreForwardPlusPlusModule::speculateScratchChain(uint8_t *commit_hash_bytes, size_t commit_hash_len, uint8_t *root_hash,
uint8_t *current_commit_hash)
{
bool found = false;
int count = 0;
uint32_t tmp_to = 0;
u_int32_t tmp_from = 0;
@@ -1386,14 +1388,37 @@ bool StoreForwardPlusPlusModule::speculateScratchChain(uint8_t *commit_hash_byte
// found it
LOG_INFO("StoreForwardpp found next scratch object in chain after %d tries", count);
sqlite3_reset(fromScratchStmt);
return true;
found = true;
break;
}
}
// loop
// get the oldest scratch object with the given root hash
// calculate the commit hash for that object
sqlite3_reset(fromScratchStmt);
return false;
if (!found)
return false;
int second_count = 0;
link_object chain_end;
do {
second_count++;
link_object next_scratch_object = getNextScratchObject(root_hash);
if (!next_scratch_object.validObject) {
LOG_ERROR("StoreForwardpp Speculation commit possibly failed, no next scratch object");
break;
}
addToChain(next_scratch_object);
removeFromScratch(next_scratch_object.message_hash, next_scratch_object.message_hash_len);
chain_end = getLinkFromCount(0, root_hash, SFPP_HASH_SIZE);
printBytes("StoreForwardpp local final commit hash: ", chain_end.commit_hash, SFPP_HASH_SIZE);
printBytes("StoreForwardpp target commit hash: ", commit_hash_bytes, commit_hash_len);
} while (memcmp(chain_end.commit_hash, commit_hash_bytes, commit_hash_len) != 0);
LOG_INFO("StoreForwardpp added %d links from scratch", second_count);
maybeMoveFromCanonScratch(root_hash, SFPP_HASH_SIZE);
return true;
}
StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getNextScratchObject(uint8_t *root_hash)
@@ -1833,24 +1858,7 @@ void StoreForwardPlusPlusModule::maybeMoveFromCanonScratch(uint8_t *root_hash, s
}
// TODO check hash lengths here
LOG_INFO("speculating chain, attempting to commit from canon scratch");
if (speculateScratchChain(lo.commit_hash, lo.commit_hash_len, root_hash, lo.commit_hash)) {
int count = 0;
do {
count++;
link_object next_scratch_object = getNextScratchObject(root_hash);
if (!next_scratch_object.validObject) {
LOG_ERROR("StoreForwardpp Speculation commit possibly failed, no next scratch object");
break;
}
addToChain(next_scratch_object);
removeFromScratch(next_scratch_object.message_hash, next_scratch_object.message_hash_len);
chain_end = getLinkFromCount(0, root_hash, root_hash_len);
} while (memcmp(chain_end.commit_hash, lo.commit_hash, lo.commit_hash_len) != 0);
LOG_INFO("StoreForwardpp added %d links from scratch", count);
maybeMoveFromCanonScratch(root_hash, root_hash_len); // recursion!
// We have an object from Scratch that we know is a commit, but we can't put it on the chain yet
}
speculateScratchChain(lo.commit_hash, lo.commit_hash_len, root_hash, lo.commit_hash);
}
void StoreForwardPlusPlusModule::addToCanonScratch(link_object &lo)