Add canon_scratch to SF++

This commit is contained in:
Jonathan Bennett
2026-01-07 12:44:22 -06:00
parent 74a6c9f447
commit 3ae331eb89
2 changed files with 493 additions and 152 deletions

View File

@@ -107,6 +107,33 @@ StoreForwardPlusPlusModule::StoreForwardPlusPlusModule()
LOG_ERROR("%s", err);
sqlite3_free(err);
// add canon_scratch table
// where we move message out of scratch when we've been informed that message exists on the canon chain, but we're not ready
// to commit it on our own chain
res = sqlite3_exec(ppDb, " \
CREATE TABLE IF NOT EXISTS \
canon_scratch( \
destination INT NOT NULL, \
sender INT NOT NULL, \
packet_id INT NOT NULL, \
rx_time INT NOT NULL, \
root_hash BLOB NOT NULL, \
encrypted_bytes BLOB NOT NULL, \
message_hash BLOB NOT NULL, \
commit_hash BLOB NOT NULL, \
payload TEXT, \
counter INT NOT NULL, \
PRIMARY KEY (message_hash) \
);",
NULL, NULL, &err);
if (res != SQLITE_OK) {
LOG_ERROR("Failed to create table: %s", sqlite3_errmsg(ppDb));
}
if (err != nullptr)
LOG_ERROR("%s", err);
sqlite3_free(err);
// mappings table -- connects the root hashes to channel hashes and DM identifiers
res = sqlite3_exec(ppDb, " \
CREATE TABLE IF NOT EXISTS \
@@ -196,6 +223,12 @@ StoreForwardPlusPlusModule::StoreForwardPlusPlusModule()
from channel_messages where substr(commit_hash,1,?)=?;",
-1, &getLinkStmt, NULL);
sqlite3_prepare_v2(
ppDb,
"select destination, sender, packet_id, encrypted_bytes, message_hash, rx_time, commit_hash, root_hash, counter, payload \
from channel_messages where substr(message_hash,1,?)=?;",
-1, &getLinkFromMessageHashStmt, NULL);
sqlite3_prepare_v2(ppDb, "select identifier from mappings where substr(root_hash,1,?)=?;", -1, &getHashFromRootStmt, NULL);
sqlite3_prepare_v2(ppDb, "INSERT INTO mappings (chain_type, identifier, root_hash) VALUES(?, ?, ?);", -1,
@@ -233,9 +266,27 @@ StoreForwardPlusPlusModule::StoreForwardPlusPlusModule()
sqlite3_prepare_v2(ppDb, "DELETE FROM channel_messages WHERE substr(root_hash,1,?)=?;", -1, &clearChainStmt, NULL);
sqlite3_prepare_v2(ppDb, "INSERT INTO canon_scratch (destination, sender, packet_id, root_hash, \
encrypted_bytes, message_hash, rx_time, commit_hash, payload, counter) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?);",
-1, &canon_scratch_insert_stmt, NULL);
sqlite3_prepare_v2(ppDb, "SELECT count(*) FROM canon_scratch WHERE substr(message_hash,1,?)=?;", -1,
&getCanonScratchCountStmt, NULL);
sqlite3_prepare_v2(
ppDb,
"select destination, sender, packet_id, encrypted_bytes, message_hash, rx_time, commit_hash, root_hash, counter, payload \
from canon_scratch where substr(root_hash,1,?)=? ORDER BY counter ASC;",
-1, &getCanonScratchStmt, NULL);
sqlite3_prepare_v2(ppDb, "DELETE from canon_scratch where substr(message_hash,1,?)=?", -1, &removeCanonScratch, NULL);
sqlite3_prepare_v2(ppDb, "DELETE from canon_scratch where substr(root_hash,1,?)=? AND counter < ?;", -1,
&clearCanonScratchStmt, NULL);
encryptedOk = true;
this->setInterval(portduino_config.sfpp_announce_interval * 60 * 1000);
this->setInterval(15 * 1000);
}
int32_t StoreForwardPlusPlusModule::runOnce()
@@ -286,6 +337,12 @@ int32_t StoreForwardPlusPlusModule::runOnce()
return portduino_config.sfpp_announce_interval * 60 * 1000;
}
if (did_announce_last && sendFromScratch(root_hash_bytes)) {
LOG_DEBUG("Send from scratch queue");
did_announce_last = false;
return portduino_config.sfpp_announce_interval * 60 * 1000;
}
// get tip of chain for this channel
link_object chain_end = getLinkFromCount(0, root_hash_bytes, SFPP_HASH_SIZE);
@@ -315,13 +372,14 @@ int32_t StoreForwardPlusPlusModule::runOnce()
LOG_INFO("Send packet to mesh payload size %u", p->decoded.payload.size);
service->sendToMesh(p, RX_SRC_LOCAL, true);
did_announce_last = true;
return portduino_config.sfpp_announce_interval * 60 * 1000;
}
// broadcast the tip of the chain
// todo just send the link object
canonAnnounce(chain_end, chain_end.message_hash, chain_end.commit_hash, root_hash_bytes, chain_end.rx_time);
canonAnnounce(chain_end);
did_announce_last = true;
// eventually timeout things on the scratch queue
return portduino_config.sfpp_announce_interval * 60 * 1000;
}
@@ -367,7 +425,8 @@ ProcessMessage StoreForwardPlusPlusModule::handleReceived(const meshtastic_MeshP
}
if (!portduino_config.sfpp_stratum0) {
if (!isInDB(lo.message_hash, lo.message_hash_len)) {
if (!isInDB(lo.message_hash, lo.message_hash_len) && !isInScratch(lo.message_hash, lo.message_hash_len) &&
!isInCanonScratch(lo.message_hash, lo.message_hash_len)) {
if (lo.root_hash_len == 0) {
LOG_DEBUG("StoreForwardpp Received text message, but no chain. Possibly no Stratum0 on local mesh.");
return ProcessMessage::CONTINUE;
@@ -381,7 +440,7 @@ ProcessMessage StoreForwardPlusPlusModule::handleReceived(const meshtastic_MeshP
addToChain(lo);
if (!pendingRun) {
setIntervalFromNow(30 * 1000); // run again in 30 seconds to announce the new tip of chain
setIntervalFromNow(10 * 1000); // run again in 30 seconds to announce the new tip of chain
pendingRun = true;
}
return ProcessMessage::CONTINUE; // Let others look at this message also if they want
@@ -403,6 +462,7 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
LOG_DEBUG("StoreForwardpp node %u sent us sf++ packet", mp.from);
printBytes("StoreForwardpp commit_hash ", t->commit_hash.bytes, t->commit_hash.size);
printBytes("StoreForwardpp root_hash ", t->root_hash.bytes, t->root_hash.size);
printBytes("StoreForwardpp message_hash ", t->message_hash.bytes, t->message_hash.size);
link_object incoming_link;
incoming_link.validObject = false;
@@ -459,11 +519,15 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
if (links_behind > portduino_config.sfpp_backlog_limit) {
LOG_INFO("StoreForwardpp Chain behind limit, dumping DB");
clearChain(t->root_hash.bytes, t->root_hash.size);
clearCanonScratch(t->root_hash.bytes, t->root_hash.size,
t->chain_count - portduino_config.sfpp_backlog_limit);
return true;
}
}
// We just got an end of chain announce, checking if we have seen this message and have it in scratch.
LOG_DEBUG("Checking if in scratch");
if (isInScratch(t->message_hash.bytes, t->message_hash.size)) {
LOG_DEBUG("Found in scratch");
link_object scratch_object = getFromScratch(t->message_hash.bytes, t->message_hash.size);
// if this matches, we don't need to request the message
// we know exactly what it is
@@ -473,12 +537,13 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
scratch_object.rx_time = t->encapsulated_rxtime;
addToChain(scratch_object);
removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len);
maybeMoveFromCanonScratch(scratch_object.root_hash, scratch_object.root_hash_len);
// short circuit and return
// falls through to a request for the message
return true;
} else {
// TODO: start with earliest scratch message, and calculate a speculative chain up to the announced
// commit hash if we find a match, add all the messages to the chain
// TODO move into a function
LOG_INFO("StoreForwardpp Earliest scratch message commit hash does not match announced commit hash, "
"speculating chain");
if (speculateScratchChain(t->commit_hash.bytes, t->commit_hash.size, scratch_object.root_hash,
@@ -499,11 +564,23 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
t->commit_hash.size);
} while (memcmp(chain_end.commit_hash, t->commit_hash.bytes, t->commit_hash.size) != 0);
LOG_INFO("StoreForwardpp added %d links from scratch", count);
maybeMoveFromCanonScratch(scratch_object.root_hash, scratch_object.root_hash_len);
// We have an object from Scratch that we know is a commit, but we can't put it on the chain yet
} else {
LOG_INFO("StoreForwardpp Could not speculate chain to announced commit hash");
LOG_INFO("StoreForwardpp Could not speculate chain to announced commit hash. Moving from "
"local_messages to canon_scratch");
scratch_object.rx_time = t->encapsulated_rxtime;
scratch_object.counter = t->chain_count;
scratch_object.commit_hash_len = t->commit_hash.size;
memcpy(scratch_object.commit_hash, t->commit_hash.bytes, t->commit_hash.size);
addToCanonScratch(scratch_object);
removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len);
}
}
} else {
LOG_DEBUG("StoreForwardpp Not in scratch");
}
if (airTime->isTxAllowedChannelUtil(true)) {
requestNextMessage(t->root_hash.bytes, t->root_hash.size, chain_end.commit_hash, SFPP_HASH_SIZE);
@@ -537,148 +614,161 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
// if root and chain hashes are the same, grab the first message on the chain
// if different, get the message directly after.
} else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE ||
t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_FIRSTHALF ||
t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_SECONDHALF) {
} else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE) {
LOG_DEBUG("StoreForwardpp Link Provide received!");
if (t->message_hash.size >= 8 && isInDB(t->message_hash.bytes, t->message_hash.size)) {
LOG_INFO("StoreForwardpp Received link already in chain");
return true;
}
if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE) {
LOG_DEBUG("StoreForwardpp Link Provide received!");
incoming_link = ingestLinkMessage(t);
} else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_FIRSTHALF) {
LOG_DEBUG("StoreForwardpp Link Provide First Half received!");
split_link_in = ingestLinkMessage(t, false);
doing_split_receive = true;
split_link_in.validObject = true;
return true;
} else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_SECONDHALF) {
LOG_DEBUG("StoreForwardpp Link Provide Second Half received!");
if (!doing_split_receive) {
LOG_DEBUG("StoreForwardpp Received second half without first half, ignoring");
return true;
}
if (!split_link_in.validObject) {
LOG_WARN("StoreForwardpp No first half stored, cannot combine");
doing_split_receive = false;
return true;
}
link_object second_half = ingestLinkMessage(t, false);
if (split_link_in.encrypted_len + second_half.encrypted_len > 256) {
LOG_WARN("StoreForwardpp Combined link too large");
return true;
}
if (split_link_in.from == second_half.from && split_link_in.to == second_half.to &&
split_link_in.root_hash_len == second_half.root_hash_len &&
memcmp(split_link_in.root_hash, second_half.root_hash, split_link_in.root_hash_len) == 0 &&
split_link_in.message_hash_len == second_half.message_hash_len &&
memcmp(split_link_in.message_hash, second_half.message_hash, split_link_in.message_hash_len) == 0) {
incoming_link = split_link_in;
memcpy(&incoming_link.encrypted_bytes[split_link_in.encrypted_len], second_half.encrypted_bytes,
second_half.encrypted_len);
incoming_link.encrypted_len = split_link_in.encrypted_len + second_half.encrypted_len;
// append the encrypted bytes
// clear first half
split_link_in = link_object();
split_link_in.validObject = false;
doing_split_receive = false;
// do the recalcualte step we skipped
if (!recalculateHash(incoming_link, t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes,
t->commit_hash.size)) {
LOG_WARN("StoreForwardpp Recalculated hash does not match");
// Is this already in our canon DB? If so, log and drop
if (t->commit_hash.size >= 8 && t->message_hash.size >= 8 && isInDB(t->message_hash.bytes, t->message_hash.size)) {
LOG_INFO("StoreForwardpp Received link already in chain1");
return true;
}
} else {
LOG_WARN("StoreForwardpp No first half stored, cannot combine");
incoming_link = ingestLinkMessage(t, false);
} else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_FIRSTHALF) {
LOG_DEBUG("StoreForwardpp Link Provide First Half received!");
split_link_in = ingestLinkMessage(t, false);
doing_split_receive = true;
split_link_in.validObject = true;
return true;
}
}
if (incoming_link.validObject) {
if (incoming_link.root_hash_len == 0) {
LOG_WARN("StoreForwardpp Hash bytes not found for incoming link");
return true;
}
if (!incoming_link.validObject) {
LOG_WARN("StoreForwardpp commit byte mismatch");
return true;
}
if (isCommitInDB(incoming_link.commit_hash, incoming_link.commit_hash_len) ||
isInDB(incoming_link.message_hash, incoming_link.message_hash_len)) {
LOG_INFO("StoreForwardpp Received link already in chain");
// TODO: respond with next link?
return true;
}
if (portduino_config.sfpp_stratum0) {
// calculate the commit_hash
addToChain(incoming_link);
if (!pendingRun) {
setIntervalFromNow(30 * 1000); // run again in 30 seconds to announce the new tip of chain
pendingRun = true;
} else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_SECONDHALF) {
LOG_DEBUG("StoreForwardpp Link Provide Second Half received!");
if (!doing_split_receive) {
LOG_DEBUG("StoreForwardpp Received second half without first half, ignoring");
return true;
}
// timebox to no more than an hour old
if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) {
// if this packet is new to us, we rebroadcast it
rebroadcastLinkObject(incoming_link);
if (!split_link_in.validObject) {
LOG_WARN("StoreForwardpp No first half stored, cannot combine");
doing_split_receive = false;
return true;
}
link_object second_half = ingestLinkMessage(t, false);
if (split_link_in.encrypted_len + second_half.encrypted_len > 256) {
LOG_WARN("StoreForwardpp Combined link too large");
return true;
}
if (split_link_in.from == second_half.from && split_link_in.to == second_half.to &&
split_link_in.root_hash_len == second_half.root_hash_len &&
memcmp(split_link_in.root_hash, second_half.root_hash, split_link_in.root_hash_len) == 0 &&
split_link_in.message_hash_len == second_half.message_hash_len &&
memcmp(split_link_in.message_hash, second_half.message_hash, split_link_in.message_hash_len) == 0) {
incoming_link = split_link_in;
memcpy(&incoming_link.encrypted_bytes[split_link_in.encrypted_len], second_half.encrypted_bytes,
second_half.encrypted_len);
incoming_link.encrypted_len = split_link_in.encrypted_len + second_half.encrypted_len;
// append the encrypted bytes
// clear first half
split_link_in = link_object();
split_link_in.validObject = false;
doing_split_receive = false;
// do the recalcualte step we skipped
/*if (!recalculateHash(incoming_link, t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes,
t->commit_hash.size)) {
LOG_WARN("StoreForwardpp Recalculated hash does not match");
return true;
}*/
} else {
LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was %u",
incoming_link.rx_time);
LOG_WARN("StoreForwardpp No first half stored, cannot combine");
return true;
}
}
if (recalculateHash(incoming_link, t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size)) {
if (incoming_link.root_hash_len == 0) {
LOG_WARN("StoreForwardpp Hash bytes not found for incoming link");
return true;
}
} else {
if (incoming_link.commit_hash_len == SFPP_HASH_SIZE) {
addToChain(incoming_link);
if (isInScratch(incoming_link.message_hash, incoming_link.message_hash_len)) {
link_object scratch_object = getFromScratch(incoming_link.message_hash, incoming_link.message_hash_len);
if (scratch_object.payload != "") {
updatePayload(incoming_link.message_hash, incoming_link.message_hash_len, scratch_object.payload);
}
removeFromScratch(incoming_link.message_hash, incoming_link.message_hash_len);
} else {
// if this packet is new to us, we rebroadcast it, but only up to an hour old
if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) {
rebroadcastLinkObject(incoming_link);
} else {
LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was %u",
incoming_link.rx_time);
}
if (isCommitInDB(incoming_link.commit_hash, incoming_link.commit_hash_len) ||
isInDB(incoming_link.message_hash, incoming_link.message_hash_len)) {
LOG_INFO("StoreForwardpp Received link already in chain");
if (t->commit_hash.size == 0) {
link_object link_to_announce =
getLinkFromMessageHash(incoming_link.message_hash, incoming_link.message_hash_len);
canonAnnounce(link_to_announce);
}
if (chain_end.rx_time != 0) {
int64_t links_behind = 0;
if (t->chain_count != 0 && t->chain_count > chain_end.counter) {
links_behind = t->chain_count - chain_end.counter;
return true;
}
if (portduino_config.sfpp_stratum0) {
LOG_DEBUG("StoreForwardpp observed link that is link ahead of us: %ld", links_behind);
if (links_behind > portduino_config.sfpp_backlog_limit) {
LOG_INFO("StoreForwardpp Chain behind limit, dumping DB");
clearChain(t->root_hash.bytes, t->root_hash.size);
return true;
// calculate the commit_hash
addToChain(incoming_link);
if (!pendingRun) {
setIntervalFromNow(10 * 1000); // run again in 30 seconds to announce the new tip of chain
pendingRun = true;
}
// timebox to no more than an hour old
if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) {
// if this packet is new to us, we rebroadcast it
rebroadcastLinkObject(incoming_link);
} else {
LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was %u",
incoming_link.rx_time);
}
} else {
if (incoming_link.commit_hash_len == SFPP_HASH_SIZE) {
addToChain(incoming_link);
if (isInScratch(incoming_link.message_hash, incoming_link.message_hash_len)) {
link_object scratch_object = getFromScratch(incoming_link.message_hash, incoming_link.message_hash_len);
if (scratch_object.payload != "") {
updatePayload(incoming_link.message_hash, incoming_link.message_hash_len, scratch_object.payload);
}
removeFromScratch(incoming_link.message_hash, incoming_link.message_hash_len);
} else {
// if this packet is new to us, we rebroadcast it, but only up to an hour old
if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) {
rebroadcastLinkObject(incoming_link);
} else {
LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was %u",
incoming_link.rx_time);
}
}
maybeMoveFromCanonScratch(incoming_link.root_hash, incoming_link.root_hash_len);
if (chain_end.rx_time != 0) {
int64_t links_behind = 0;
if (t->chain_count != 0 && t->chain_count > chain_end.counter) {
links_behind = t->chain_count - chain_end.counter;
if (links_behind > 1)
LOG_DEBUG("StoreForwardpp observed link that is links ahead of us: %ld", links_behind);
if (links_behind > portduino_config.sfpp_backlog_limit) {
LOG_INFO("StoreForwardpp Chain behind limit, dumping DB");
clearChain(t->root_hash.bytes, t->root_hash.size);
clearCanonScratch(t->root_hash.bytes, t->root_hash.size,
t->chain_count - portduino_config.sfpp_backlog_limit);
return true;
}
}
}
requestNextMessage(incoming_link.root_hash, incoming_link.root_hash_len, incoming_link.commit_hash,
incoming_link.commit_hash_len);
} else {
if (!isInScratch(incoming_link.message_hash, incoming_link.message_hash_len) &&
!isInDB(incoming_link.message_hash, incoming_link.message_hash_len) &&
!isInCanonScratch(incoming_link.message_hash, incoming_link.message_hash_len)) {
addToScratch(incoming_link);
LOG_INFO("StoreForwardpp added incoming non-canon message to scratch");
if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) {
rebroadcastLinkObject(incoming_link);
} else {
LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was &u",
incoming_link.rx_time);
}
}
}
requestNextMessage(incoming_link.root_hash, incoming_link.root_hash_len, incoming_link.commit_hash,
incoming_link.commit_hash_len);
} else {
if (!isInScratch(incoming_link.message_hash, incoming_link.message_hash_len) &&
!isInDB(incoming_link.message_hash, incoming_link.message_hash_len)) {
addToScratch(incoming_link);
LOG_INFO("StoreForwardpp added incoming non-canon message to scratch");
if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) {
rebroadcastLinkObject(incoming_link);
} else {
LOG_DEBUG("StoreForwardpp Got previously unseen text, but not rebroadcasting because rxtime was &u",
incoming_link.rx_time);
}
}
}
} else {
// We've received a link provide, and it doesn't fit. But it may be legit. Add it to canon_scratch
addToCanonScratch(incoming_link);
}
}
@@ -878,7 +968,6 @@ void StoreForwardPlusPlusModule::broadcastLink(uint8_t *_commit_hash, size_t _co
broadcastLink(lo, false);
}
// TODO if stratum0, send the chain count
void StoreForwardPlusPlusModule::broadcastLink(link_object &lo, bool full_commit_hash, bool is_split_second_half)
{
meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero;
@@ -984,6 +1073,47 @@ StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getLink(uint
return lo;
}
StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getLinkFromMessageHash(uint8_t *_message_hash,
size_t _message_hash_len)
{
link_object lo;
sqlite3_bind_int(getLinkFromMessageHashStmt, 1, _message_hash_len);
sqlite3_bind_blob(getLinkFromMessageHashStmt, 2, _message_hash, _message_hash_len, NULL);
int res = sqlite3_step(getLinkFromMessageHashStmt);
lo.to = sqlite3_column_int(getLinkFromMessageHashStmt, 0);
lo.from = sqlite3_column_int(getLinkFromMessageHashStmt, 1);
lo.id = sqlite3_column_int(getLinkFromMessageHashStmt, 2);
uint8_t *_payload = (uint8_t *)sqlite3_column_blob(getLinkFromMessageHashStmt, 3);
lo.encrypted_len = sqlite3_column_bytes(getLinkFromMessageHashStmt, 3);
memcpy(lo.encrypted_bytes, _payload, lo.encrypted_len);
uint8_t *link_message_hash = (uint8_t *)sqlite3_column_blob(getLinkFromMessageHashStmt, 4);
lo.message_hash_len = SFPP_HASH_SIZE;
memcpy(lo.message_hash, link_message_hash, lo.message_hash_len);
lo.rx_time = sqlite3_column_int(getLinkFromMessageHashStmt, 5);
uint8_t *_tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getLinkFromMessageHashStmt, 6);
lo.commit_hash_len = SFPP_HASH_SIZE;
memcpy(lo.commit_hash, _tmp_commit_hash, lo.commit_hash_len);
uint8_t *_root_hash = (uint8_t *)sqlite3_column_blob(getLinkFromMessageHashStmt, 7);
lo.root_hash_len = SFPP_HASH_SIZE;
memcpy(lo.root_hash, _root_hash, lo.root_hash_len);
lo.counter = sqlite3_column_int(getLinkFromMessageHashStmt, 8);
lo.payload = std::string((char *)sqlite3_column_text(getLinkFromMessageHashStmt, 9));
lo.channel_hash = getChannelHashFromRoot(lo.root_hash, lo.root_hash_len);
sqlite3_reset(getLinkFromMessageHashStmt);
return lo;
}
bool StoreForwardPlusPlusModule::sendFromScratch(uint8_t *root_hash)
{
link_object lo = getNextScratchObject(root_hash);
@@ -1025,6 +1155,7 @@ bool StoreForwardPlusPlusModule::addToChain(link_object &lo)
if (lo.counter == 0) {
lo.counter = chain_end.counter + 1;
}
LOG_DEBUG("Adding link %u to Canon Chain", lo.counter);
// push a message into the local chain DB
// destination
@@ -1053,6 +1184,7 @@ bool StoreForwardPlusPlusModule::addToChain(link_object &lo)
}
sqlite3_reset(chain_insert_stmt);
setChainCount(lo.root_hash, SFPP_HASH_SIZE, lo.counter);
removeFromCanonScratch(lo.message_hash, lo.message_hash_len);
return true;
}
@@ -1085,27 +1217,38 @@ bool StoreForwardPlusPlusModule::addToScratch(link_object &lo)
return true;
}
void StoreForwardPlusPlusModule::canonAnnounce(link_object &lo, uint8_t *_message_hash, uint8_t *_commit_hash,
uint8_t *_root_hash, uint32_t _rx_time)
void StoreForwardPlusPlusModule::canonAnnounce(link_object &lo)
{
meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero;
storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE;
// set root hash
// set message hash
if (lo.message_hash_len < 8) {
LOG_WARN("Attempt canonAnnounce without message hash");
return;
}
storeforward.message_hash.size = 8;
memcpy(storeforward.message_hash.bytes, _message_hash, 8);
memcpy(storeforward.message_hash.bytes, lo.message_hash, 8);
// set chain hash
if (lo.commit_hash_len < 8) {
LOG_WARN("Attempt canonAnnounce without commit hash");
return;
}
storeforward.commit_hash.size = 8;
memcpy(storeforward.commit_hash.bytes, _commit_hash, 8);
memcpy(storeforward.commit_hash.bytes, lo.commit_hash, 8);
// set root hash
// needs to be the full hash to bootstrap
if (lo.root_hash_len < SFPP_HASH_SIZE) {
LOG_WARN("Attempt canonAnnounce without root hash");
return;
}
storeforward.root_hash.size = SFPP_HASH_SIZE;
memcpy(storeforward.root_hash.bytes, _root_hash, SFPP_HASH_SIZE);
memcpy(storeforward.root_hash.bytes, lo.root_hash, SFPP_HASH_SIZE);
storeforward.encapsulated_rxtime = _rx_time;
storeforward.encapsulated_rxtime = lo.rx_time;
storeforward.chain_count = lo.counter;
// storeforward.
meshtastic_MeshPacket *p = allocDataProtobuf(storeforward);
@@ -1258,7 +1401,7 @@ void StoreForwardPlusPlusModule::updatePayload(uint8_t *message_hash_bytes, size
auto res = sqlite3_step(updatePayloadStmt);
if (res != SQLITE_OK) {
const char *_error_mesg = sqlite3_errmsg(ppDb);
LOG_WARN("StoreForwardpp step error %u, %s", res, _error_mesg);
LOG_WARN("StoreForwardpp updatePayloadStmt step error %u, %s", res, _error_mesg);
}
sqlite3_reset(updatePayloadStmt);
}
@@ -1272,7 +1415,7 @@ StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getFromScrat
auto res = sqlite3_step(fromScratchByHashStmt);
if (res != SQLITE_ROW && res != SQLITE_OK) {
const char *_error_mesg = sqlite3_errmsg(ppDb);
LOG_WARN("StoreForwardpp step error %u, %s", res, _error_mesg);
LOG_WARN("StoreForwardpp fromScratchByHashStmt step error %u, %s", res, _error_mesg);
}
lo.to = sqlite3_column_int(fromScratchByHashStmt, 0);
lo.from = sqlite3_column_int(fromScratchByHashStmt, 1);
@@ -1281,15 +1424,18 @@ StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getFromScrat
uint8_t *encrypted_bytes = (uint8_t *)sqlite3_column_blob(fromScratchByHashStmt, 3);
lo.encrypted_len = sqlite3_column_bytes(fromScratchByHashStmt, 3);
memcpy(lo.encrypted_bytes, encrypted_bytes, lo.encrypted_len);
uint8_t *message_hash = (uint8_t *)sqlite3_column_blob(fromScratchByHashStmt, 4);
lo.message_hash_len = sqlite3_column_bytes(fromScratchByHashStmt, 4);
memcpy(lo.message_hash, message_hash, SFPP_HASH_SIZE);
lo.rx_time = sqlite3_column_int(fromScratchByHashStmt, 5);
uint8_t *root_hash = (uint8_t *)sqlite3_column_blob(fromScratchByHashStmt, 6);
lo.root_hash_len = SFPP_HASH_SIZE;
memcpy(lo.root_hash, root_hash, SFPP_HASH_SIZE);
lo.payload =
std::string((char *)sqlite3_column_text(fromScratchByHashStmt, 7), sqlite3_column_bytes(fromScratchByHashStmt, 7));
lo.message_hash_len = hash_len;
memcpy(lo.message_hash, message_hash_bytes, hash_len);
sqlite3_reset(fromScratchByHashStmt);
return lo;
}
@@ -1324,7 +1470,6 @@ StoreForwardPlusPlusModule::ingestTextPacket(const meshtastic_MeshPacket &mp, co
StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::ingestLinkMessage(meshtastic_StoreForwardPlusPlus *t,
bool recalc)
{
// TODO: If not stratum0, injest the chain count
link_object lo;
lo.to = t->encapsulated_to;
@@ -1334,7 +1479,7 @@ StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::ingestLinkMe
lo.from = t->encapsulated_from;
lo.id = t->encapsulated_id;
lo.rx_time = t->encapsulated_rxtime;
lo.counter = t->chain_count;
lo.counter = t->chain_count; // Should this be skipped if stratum0?
// What if we don't have this root hash? Should drop this packet before this point.
lo.channel_hash = getChannelHashFromRoot(t->root_hash.bytes, t->root_hash.size);
@@ -1384,6 +1529,7 @@ bool StoreForwardPlusPlusModule::checkCommitHash(StoreForwardPlusPlusModule::lin
size_t hash_len)
{
SHA256 commit_hash;
uint8_t tmp_commit_hash[SFPP_HASH_SIZE];
link_object chain_end = getLinkFromCount(0, lo.root_hash, lo.root_hash_len);
@@ -1402,10 +1548,11 @@ bool StoreForwardPlusPlusModule::checkCommitHash(StoreForwardPlusPlusModule::lin
}
commit_hash.update(lo.message_hash, SFPP_HASH_SIZE);
commit_hash.finalize(lo.commit_hash, SFPP_HASH_SIZE);
lo.commit_hash_len = SFPP_HASH_SIZE;
commit_hash.finalize(tmp_commit_hash, SFPP_HASH_SIZE);
if (hash_len == 0 || memcmp(commit_hash_bytes, lo.commit_hash, hash_len) == 0) {
lo.commit_hash_len = SFPP_HASH_SIZE;
memcpy(lo.commit_hash, tmp_commit_hash, SFPP_HASH_SIZE);
return true;
}
return false;
@@ -1490,7 +1637,8 @@ StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getLinkFromC
void StoreForwardPlusPlusModule::pruneScratchQueue()
{
sqlite3_bind_int(pruneScratchQueueStmt, 1, time(nullptr) - 60 * 60 * 24);
// Very little utility in holding on to very old scratch messages
sqlite3_bind_int(pruneScratchQueueStmt, 1, time(nullptr) - 60 * 60 * 6);
int res = sqlite3_step(pruneScratchQueueStmt);
if (res != SQLITE_OK && res != SQLITE_DONE) {
LOG_ERROR("StoreForwardpp Prune Scratch sqlite error %u, %s", res, sqlite3_errmsg(ppDb));
@@ -1555,7 +1703,6 @@ bool StoreForwardPlusPlusModule::recalculateHash(StoreForwardPlusPlusModule::lin
printBytes("StoreForwardpp commit hash matches: 0x", _commit_hash_bytes, _commit_hash_len);
} else {
LOG_WARN("StoreForwardpp commit hash does not match, rejecting link.");
lo.commit_hash_len = 0;
lo.validObject = false;
return false;
}
@@ -1625,4 +1772,174 @@ void StoreForwardPlusPlusModule::updatePeers(const meshtastic_MeshPacket &mp,
// if not new, run the calculations and update the numbers
}
void StoreForwardPlusPlusModule::maybeMoveFromCanonScratch(uint8_t *root_hash, size_t root_hash_len)
{
// get the earliest chain count. See if it's the next one. Try to commit
// if not, speculate and try to commit
link_object lo = getfromCanonScratch(root_hash, root_hash_len);
if (!lo.validObject)
return;
link_object chain_end = getLinkFromCount(0, root_hash, root_hash_len);
if (!chain_end.validObject)
return;
if (lo.counter == chain_end.counter + 1 &&
recalculateHash(lo, root_hash, root_hash_len, lo.commit_hash, lo.commit_hash_len)) {
addToChain(lo);
maybeMoveFromCanonScratch(root_hash, root_hash_len); // recursion!
return;
}
// TODO check hash lengths here
LOG_INFO("speculating chain, attempting to commit from canon scratch");
if (speculateScratchChain(lo.commit_hash, lo.commit_hash_len, root_hash, lo.commit_hash)) {
int count = 0;
do {
count++;
link_object next_scratch_object = getNextScratchObject(root_hash);
if (!next_scratch_object.validObject) {
LOG_ERROR("StoreForwardpp Speculation commit possibly failed, no next scratch object");
break;
}
addToChain(next_scratch_object);
removeFromScratch(next_scratch_object.message_hash, next_scratch_object.message_hash_len);
chain_end = getLinkFromCount(0, root_hash, root_hash_len);
} while (memcmp(chain_end.commit_hash, lo.commit_hash, lo.commit_hash_len) != 0);
LOG_INFO("StoreForwardpp added %d links from scratch", count);
maybeMoveFromCanonScratch(root_hash, root_hash_len); // recursion!
// We have an object from Scratch that we know is a commit, but we can't put it on the chain yet
}
}
void StoreForwardPlusPlusModule::addToCanonScratch(link_object &lo)
{
// recalculate full message hash?
LOG_INFO("addToCanonScratch");
logLinkObject(lo);
// push a message into the local chain DB
// destination
sqlite3_bind_int(canon_scratch_insert_stmt, 1, lo.to);
// sender
sqlite3_bind_int(canon_scratch_insert_stmt, 2, lo.from);
// packet_id
sqlite3_bind_int(canon_scratch_insert_stmt, 3, lo.id);
// root_hash
sqlite3_bind_blob(canon_scratch_insert_stmt, 4, lo.root_hash, lo.root_hash_len, NULL);
// encrypted_bytes
sqlite3_bind_blob(canon_scratch_insert_stmt, 5, lo.encrypted_bytes, lo.encrypted_len, NULL);
// message_hash
sqlite3_bind_blob(canon_scratch_insert_stmt, 6, lo.message_hash, lo.message_hash_len, NULL);
// rx_time
sqlite3_bind_int(canon_scratch_insert_stmt, 7, lo.rx_time);
// commit_hash
sqlite3_bind_blob(canon_scratch_insert_stmt, 8, lo.commit_hash, lo.commit_hash_len, NULL);
// payload
sqlite3_bind_text(canon_scratch_insert_stmt, 9, lo.payload.c_str(), lo.payload.length(), NULL);
sqlite3_bind_int(canon_scratch_insert_stmt, 10, lo.counter);
int res = sqlite3_step(canon_scratch_insert_stmt);
if (res != SQLITE_OK && res != SQLITE_DONE) {
LOG_ERROR("StoreForwardpp canon_scratch_insert_stmt Cannot step %u: %s", res, sqlite3_errmsg(ppDb));
}
sqlite3_reset(canon_scratch_insert_stmt);
}
StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getfromCanonScratch(uint8_t *root_hash_bytes, size_t hash_len)
{
link_object lo;
sqlite3_bind_int(getCanonScratchStmt, 1, hash_len);
sqlite3_bind_blob(getCanonScratchStmt, 2, root_hash_bytes, hash_len, NULL);
auto res = sqlite3_step(getCanonScratchStmt);
if (res != SQLITE_ROW && res != SQLITE_OK) {
const char *_error_mesg = sqlite3_errmsg(ppDb);
LOG_ERROR("StoreForwardpp getCanonScratchStmt step error %u, %s", res, _error_mesg);
lo.validObject = false;
return lo;
}
lo.to = sqlite3_column_int(getCanonScratchStmt, 0);
lo.from = sqlite3_column_int(getCanonScratchStmt, 1);
lo.id = sqlite3_column_int(getCanonScratchStmt, 2);
uint8_t *_payload = (uint8_t *)sqlite3_column_blob(getCanonScratchStmt, 3);
lo.encrypted_len = sqlite3_column_bytes(getCanonScratchStmt, 3);
memcpy(lo.encrypted_bytes, _payload, lo.encrypted_len);
uint8_t *_message_hash = (uint8_t *)sqlite3_column_blob(getCanonScratchStmt, 4);
lo.message_hash_len = sqlite3_column_bytes(getCanonScratchStmt, 4);
memcpy(lo.message_hash, _message_hash, lo.message_hash_len);
lo.rx_time = sqlite3_column_int(getCanonScratchStmt, 5);
uint8_t *_tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getCanonScratchStmt, 6);
lo.commit_hash_len = sqlite3_column_bytes(getCanonScratchStmt, 6);
memcpy(lo.commit_hash, _tmp_commit_hash, lo.commit_hash_len);
uint8_t *_root_hash = (uint8_t *)sqlite3_column_blob(getCanonScratchStmt, 7);
lo.root_hash_len = sqlite3_column_bytes(getCanonScratchStmt, 7);
memcpy(lo.root_hash, _root_hash, lo.root_hash_len);
lo.counter = sqlite3_column_int(getCanonScratchStmt, 8);
lo.payload = std::string((char *)sqlite3_column_text(getCanonScratchStmt, 9));
sqlite3_reset(getCanonScratchStmt);
return lo;
}
void StoreForwardPlusPlusModule::removeFromCanonScratch(uint8_t *message_hash_bytes, size_t message_hash_len)
{
printBytes("StoreForwardpp removing from canon scratch: ", message_hash_bytes, message_hash_len);
sqlite3_bind_int(removeCanonScratch, 1, message_hash_len);
sqlite3_bind_blob(removeCanonScratch, 2, message_hash_bytes, message_hash_len, NULL);
sqlite3_step(removeCanonScratch);
sqlite3_reset(removeCanonScratch);
}
bool StoreForwardPlusPlusModule::isInCanonScratch(uint8_t *message_hash, size_t message_hash_len)
{
if (message_hash_len < SFPP_SHORT_HASH_SIZE) {
return false;
}
sqlite3_bind_int(getCanonScratchCountStmt, 1, message_hash_len);
sqlite3_bind_blob(getCanonScratchCountStmt, 2, message_hash, message_hash_len, NULL);
sqlite3_step(getCanonScratchCountStmt);
int count = sqlite3_column_int(getCanonScratchCountStmt, 0);
sqlite3_reset(getCanonScratchCountStmt);
if (count > 0)
return true;
return false;
}
void StoreForwardPlusPlusModule::clearCanonScratch(uint8_t *root_hash, size_t root_hash_len, uint32_t new_count)
{
sqlite3_bind_int(clearCanonScratchStmt, 1, root_hash_len);
sqlite3_bind_blob(clearCanonScratchStmt, 2, root_hash, root_hash_len, NULL);
sqlite3_bind_int(clearCanonScratchStmt, 3, new_count);
int res = sqlite3_step(clearCanonScratchStmt);
if (res != SQLITE_OK && res != SQLITE_DONE) {
LOG_ERROR("StoreForwardpp Clear Canon Scratch sqlite error %u, %s", res, sqlite3_errmsg(ppDb));
}
sqlite3_reset(clearCanonScratchStmt);
}
void StoreForwardPlusPlusModule::logLinkObject(link_object &lo)
{
LOG_DEBUG("To: %u, From: %u, id: %u, rx_time: %u, counter: %u, channel_hash: 0x%02x", lo.to, lo.from, lo.id, lo.rx_time,
lo.counter, lo.channel_hash);
printBytes("encrypted_bytes: ", lo.encrypted_bytes, lo.encrypted_len);
printBytes("message_hash: ", lo.message_hash, lo.message_hash_len);
printBytes("root_hash: ", lo.root_hash, lo.root_hash_len);
printBytes("commit_hash: ", lo.commit_hash, lo.commit_hash_len);
LOG_DEBUG("payload: %s", lo.payload.c_str());
if (lo.validObject) {
LOG_DEBUG("Valid Object");
}
}
#endif // has include sqlite3

View File

@@ -112,6 +112,7 @@ class StoreForwardPlusPlusModule : public ProtobufModule<meshtastic_StoreForward
sqlite3_stmt *getNextHashStmt;
sqlite3_stmt *getChainEndStmt;
sqlite3_stmt *getLinkStmt;
sqlite3_stmt *getLinkFromMessageHashStmt;
sqlite3_stmt *getHashFromRootStmt;
sqlite3_stmt *addRootToMappingsStmt;
sqlite3_stmt *getRootFromChannelHashStmt;
@@ -124,6 +125,11 @@ class StoreForwardPlusPlusModule : public ProtobufModule<meshtastic_StoreForward
sqlite3_stmt *getPeerStmt;
sqlite3_stmt *updatePeerStmt;
sqlite3_stmt *clearChainStmt;
sqlite3_stmt *canon_scratch_insert_stmt;
sqlite3_stmt *getCanonScratchCountStmt;
sqlite3_stmt *getCanonScratchStmt;
sqlite3_stmt *removeCanonScratch;
sqlite3_stmt *clearCanonScratchStmt;
// For a given Meshtastic ChannelHash, fills the root_hash buffer with a 32-byte root hash
// returns true if the root hash was found
@@ -170,7 +176,7 @@ class StoreForwardPlusPlusModule : public ProtobufModule<meshtastic_StoreForward
bool addToScratch(link_object &);
// sends a CANON_ANNOUNCE message, specifying the given root and commit hashes
void canonAnnounce(link_object &, uint8_t *, uint8_t *, uint8_t *, uint32_t);
void canonAnnounce(link_object &);
// checks if the message hash is present in the canonical chain database
bool isInDB(uint8_t *, size_t);
@@ -205,9 +211,12 @@ class StoreForwardPlusPlusModule : public ProtobufModule<meshtastic_StoreForward
// confirms the root hash and commit hash
link_object ingestLinkMessage(meshtastic_StoreForwardPlusPlus *, bool = true);
// retrieves a link object from the canonical chain database given a message hash
// retrieves a link object from the canonical chain database given a commit hash
link_object getLink(uint8_t *, size_t);
// retrieves a link object from the canonical chain database given a message hash
link_object getLinkFromMessageHash(uint8_t *, size_t);
// puts the encrypted payload back into the queue as if it were just received
void rebroadcastLinkObject(link_object &);
@@ -238,6 +247,19 @@ class StoreForwardPlusPlusModule : public ProtobufModule<meshtastic_StoreForward
void updatePeers(const meshtastic_MeshPacket &, meshtastic_StoreForwardPlusPlus_SFPP_message_type);
void maybeMoveFromCanonScratch(uint8_t *, size_t);
void addToCanonScratch(link_object &);
link_object getfromCanonScratch(uint8_t *, size_t);
void removeFromCanonScratch(uint8_t *, size_t);
void clearCanonScratch(uint8_t *, size_t, uint32_t);
bool isInCanonScratch(uint8_t *, size_t);
void logLinkObject(link_object &);
// Track if we have a scheduled runOnce pending
// useful to not accudentally delay a scheduled runOnce
bool pendingRun = false;
@@ -253,5 +275,7 @@ class StoreForwardPlusPlusModule : public ProtobufModule<meshtastic_StoreForward
bool doing_split_receive = false;
link_object split_link_in;
bool did_announce_last = false;
};
#endif