Add chain speculation from scratch

This commit is contained in:
Jonathan Bennett
2026-01-02 14:43:18 -06:00
parent 87798429fa
commit dd4fb6b0bc
2 changed files with 125 additions and 30 deletions

View File

@@ -2,8 +2,6 @@
// Track the nodenums we've gotten SFPP traffic from, to build a network graph
// store nodes, packet counts, and hops away in a new peers table
// TODO: When evaluating a canon announce, try speculatively commiting multiple messages from scratch.
// Eventual TODO: non-stratum0 nodes need to be pointed at their upstream source? Maybe
#if __has_include("sqlite3.h")
@@ -163,7 +161,7 @@ StoreForwardPlusPlusModule::StoreForwardPlusPlusModule()
-1, &scratch_insert_stmt, NULL);
sqlite3_prepare_v2(ppDb, "select destination, sender, packet_id, encrypted_bytes, message_hash, rx_time, root_hash \
from local_messages where root_hash=? order by rx_time asc LIMIT 1;", // earliest first
from local_messages where root_hash=? order by rx_time asc;", // earliest first
-1, &fromScratchStmt, NULL);
sqlite3_prepare_v2(ppDb,
@@ -455,12 +453,40 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
// we know exactly what it is
if (t->message_hash.size >= 8 &&
checkCommitHash(scratch_object, t->commit_hash.bytes, t->message_hash.size)) {
LOG_INFO("StoreForwardpp Found announced message in scratch, adding to chain");
scratch_object.rx_time = t->encapsulated_rxtime;
addToChain(scratch_object);
removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len);
// short circuit and return
// falls through to a request for the message
return true;
} else {
// TODO: start with earliest scratch message, and calculate a speculative chain up to the announced
// commit hash if we find a match, add all the messages to the chain
LOG_INFO("StoreForwardpp Earliest scratch message commit hash does not match announced commit hash, "
"speculating chain");
if (speculateScratchChain(t->commit_hash.bytes, t->commit_hash.size, scratch_object.root_hash,
chain_end.commit_hash)) {
int count = 0;
do {
count++;
link_object next_scratch_object = getNextScratchObject(scratch_object.root_hash);
if (!next_scratch_object.validObject) {
LOG_ERROR("StoreForwardpp Speculation commit possibly failed, no next scratch object");
break;
}
addToChain(next_scratch_object);
removeFromScratch(next_scratch_object.message_hash, next_scratch_object.message_hash_len);
chain_end = getLinkFromCount(0, t->root_hash.bytes, t->root_hash.size);
printBytes("StoreForwardpp local final commit hash: ", chain_end.commit_hash, SFPP_HASH_SIZE);
printBytes("StoreForwardpp Announced commit hash: ", t->commit_hash.bytes,
t->commit_hash.size);
} while (memcmp(chain_end.commit_hash, t->commit_hash.bytes, t->commit_hash.size) != 0);
LOG_INFO("StoreForwardpp added %d links from scratch", count);
} else {
LOG_INFO("StoreForwardpp Could not speculate chain to announced commit hash");
}
}
}
if (airTime->isTxAllowedChannelUtil(true)) {
@@ -925,34 +951,18 @@ StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getLink(uint
bool StoreForwardPlusPlusModule::sendFromScratch(uint8_t *root_hash)
{
link_object lo;
sqlite3_bind_blob(fromScratchStmt, 1, root_hash, SFPP_HASH_SIZE, NULL);
if (sqlite3_step(fromScratchStmt) == SQLITE_DONE) {
link_object lo = getNextScratchObject(root_hash);
if (lo.validObject) {
printBytes("StoreForwardpp Send link to mesh ", lo.message_hash, 8);
LOG_WARN("StoreForwardpp Size: %d", lo.encrypted_len);
printBytes("StoreForwardpp encrypted ", lo.encrypted_bytes, lo.encrypted_len);
broadcastLink(lo, false);
return true;
} else {
LOG_INFO("StoreForwardpp No messages in scratch for this root hash");
return false;
}
lo.to = sqlite3_column_int(fromScratchStmt, 0);
lo.from = sqlite3_column_int(fromScratchStmt, 1);
lo.id = sqlite3_column_int(fromScratchStmt, 2);
uint8_t *_encrypted = (uint8_t *)sqlite3_column_blob(fromScratchStmt, 3);
lo.encrypted_len = sqlite3_column_bytes(fromScratchStmt, 3);
memcpy(lo.encrypted_bytes, _encrypted, lo.encrypted_len);
uint8_t *_message_hash = (uint8_t *)sqlite3_column_blob(fromScratchStmt, 4);
lo.rx_time = sqlite3_column_int(fromScratchStmt, 5);
lo.root_hash_len = SFPP_SHORT_HASH_SIZE;
memcpy(lo.root_hash, root_hash, lo.root_hash_len);
sqlite3_reset(fromScratchStmt);
printBytes("StoreForwardpp Send link to mesh ", _message_hash, 8);
LOG_WARN("StoreForwardpp Size: %d", lo.encrypted_len);
printBytes("StoreForwardpp encrypted ", lo.encrypted_bytes, lo.encrypted_len);
broadcastLink(lo, false);
return true;
}
bool StoreForwardPlusPlusModule::addToChain(link_object &lo)
@@ -999,7 +1009,7 @@ bool StoreForwardPlusPlusModule::addToChain(link_object &lo)
sqlite3_bind_int(chain_insert_stmt, 10, lo.counter);
int res = sqlite3_step(chain_insert_stmt);
if (res != SQLITE_OK) {
if (res != SQLITE_OK && res != SQLITE_DONE) {
LOG_ERROR("StoreForwardpp Cannot step: %s", sqlite3_errmsg(ppDb));
}
sqlite3_reset(chain_insert_stmt);
@@ -1118,6 +1128,85 @@ void StoreForwardPlusPlusModule::removeFromScratch(uint8_t *message_hash_bytes,
sqlite3_reset(removeScratch);
}
bool StoreForwardPlusPlusModule::speculateScratchChain(uint8_t *commit_hash_bytes, size_t commit_hash_len, uint8_t *root_hash,
uint8_t *current_commit_hash)
{
int count = 0;
uint32_t tmp_to = 0;
u_int32_t tmp_from = 0;
uint32_t tmp_id = 0;
uint8_t *tmp_encrypted = nullptr;
uint8_t *tmp_message_hash = nullptr;
uint32_t tmp_rx_time = 0;
SHA256 commit_hash_obj;
uint8_t tmp_commit_hash[SFPP_HASH_SIZE];
if (commit_hash_len < SFPP_SHORT_HASH_SIZE) {
return false;
}
memcpy(tmp_commit_hash, current_commit_hash, SFPP_HASH_SIZE);
sqlite3_bind_blob(fromScratchStmt, 1, root_hash, SFPP_HASH_SIZE, NULL);
while (sqlite3_step(fromScratchStmt) == SQLITE_ROW) {
count++;
tmp_to = sqlite3_column_int(fromScratchStmt, 0);
tmp_from = sqlite3_column_int(fromScratchStmt, 1);
tmp_id = sqlite3_column_int(fromScratchStmt, 2);
tmp_encrypted = (uint8_t *)sqlite3_column_blob(fromScratchStmt, 3);
size_t encrypted_len = sqlite3_column_bytes(fromScratchStmt, 3);
tmp_message_hash = (uint8_t *)sqlite3_column_blob(fromScratchStmt, 4);
tmp_rx_time = sqlite3_column_int(fromScratchStmt, 5);
// calculate the commit hash for that object
commit_hash_obj.reset();
commit_hash_obj.update(tmp_commit_hash, SFPP_HASH_SIZE);
commit_hash_obj.update(tmp_message_hash, SFPP_HASH_SIZE);
commit_hash_obj.finalize(tmp_commit_hash, SFPP_HASH_SIZE);
if (memcmp(tmp_commit_hash, commit_hash_bytes, commit_hash_len) == 0) {
// found it
LOG_INFO("StoreForwardpp found next scratch object in chain after %d tries", count);
sqlite3_reset(fromScratchStmt);
return true;
}
}
// loop
// get the oldest scratch object with the given root hash
// calculate the commit hash for that object
sqlite3_reset(fromScratchStmt);
return false;
}
StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getNextScratchObject(uint8_t *root_hash)
{
link_object lo;
lo.validObject = true;
sqlite3_bind_blob(fromScratchStmt, 1, root_hash, SFPP_HASH_SIZE, NULL);
if (sqlite3_step(fromScratchStmt) == SQLITE_DONE) {
lo.validObject = false;
return lo;
}
lo.to = sqlite3_column_int(fromScratchStmt, 0);
lo.from = sqlite3_column_int(fromScratchStmt, 1);
lo.id = sqlite3_column_int(fromScratchStmt, 2);
uint8_t *_encrypted = (uint8_t *)sqlite3_column_blob(fromScratchStmt, 3);
lo.encrypted_len = sqlite3_column_bytes(fromScratchStmt, 3);
memcpy(lo.encrypted_bytes, _encrypted, lo.encrypted_len);
uint8_t *_message_hash = (uint8_t *)sqlite3_column_blob(fromScratchStmt, 4);
lo.message_hash_len = sqlite3_column_bytes(fromScratchStmt, 4);
memcpy(lo.message_hash, _message_hash, lo.message_hash_len);
lo.rx_time = sqlite3_column_int(fromScratchStmt, 5);
lo.root_hash_len = SFPP_HASH_SIZE;
memcpy(lo.root_hash, root_hash, lo.root_hash_len);
sqlite3_reset(fromScratchStmt);
return lo;
}
void StoreForwardPlusPlusModule::updatePayload(uint8_t *message_hash_bytes, size_t message_hash_len, std::string payload)
{
LOG_WARN("StoreForwardpp updatePayload");

View File

@@ -186,6 +186,12 @@ class StoreForwardPlusPlusModule : public ProtobufModule<meshtastic_StoreForward
// removes a link object from the scratch database
void removeFromScratch(uint8_t *, size_t);
// iterate through our scratch database, and see if we can speculate a chain up to the given commit hash
bool speculateScratchChain(uint8_t *, size_t, uint8_t *, uint8_t *);
// retrieves the next link object from scratch given a root hash
link_object getNextScratchObject(uint8_t *);
// fills the payload section with the decrypted data for the given message hash
// probably not needed for production, but useful for testing
void updatePayload(uint8_t *, size_t, std::string);