Add peers table, check for null

This commit is contained in:
Jonathan Bennett
2026-01-01 13:47:50 -06:00
parent 1fecdc7603
commit baccd0c532
2 changed files with 109 additions and 70 deletions

View File

@@ -1,3 +1,7 @@
// ideas:
// Track the nodenums we've gotten SFPP traffic from, to build a network graph
// store nodes, packet counts, and hops away in a new peers table
// Eventual TODO: non-stratum0 nodes need to be pointed at their upstream source? Maybe
#if __has_include("sqlite3.h")
@@ -122,6 +126,29 @@ StoreForwardPlusPlusModule::StoreForwardPlusPlusModule()
LOG_ERROR("%s", err);
sqlite3_free(err);
// mappings table -- connects the root hashes to channel hashes and DM identifiers
res = sqlite3_exec(ppDb, " \
CREATE TABLE IF NOT EXISTS \
peers( \
nodenum INT NOT NULL, \
announce_count INT DEFAULT 0, \
query_count INT DEFAULT 0, \
request_count INT DEFAULT 0, \
provide_count INT DEFAULT 0, \
split_count INT DEFAULT 0, \
total_count INT DEFAULT 0, \
average_hops INT DEFAULT 0, \
PRIMARY KEY (nodenum) \
);",
NULL, NULL, &err);
if (res != SQLITE_OK) {
LOG_ERROR("Failed to create table: %s", sqlite3_errmsg(ppDb));
}
if (err != nullptr)
LOG_ERROR("%s", err);
sqlite3_free(err);
// store schema version somewhere
// prepared statements *should* make this faster.
@@ -186,6 +213,10 @@ StoreForwardPlusPlusModule::StoreForwardPlusPlusModule()
"substr(root_hash,1,?)=? ORDER BY rowid ASC LIMIT 1);",
-1, &trimOldestLinkStmt, NULL);
sqlite3_prepare_v2(ppDb,
"INSERT INTO peers (nodenum, total_count, average_hops) VALUES(?, ?, ?) ON CONFLICT(nodenum) DO NOTHING;",
-1, &maybeAddPeerStmt, NULL);
encryptedOk = true;
this->setInterval(portduino_config.sfpp_announce_interval * 60 * 1000);
@@ -293,6 +324,11 @@ ProcessMessage StoreForwardPlusPlusModule::handleReceived(const meshtastic_MeshP
return ProcessMessage::CONTINUE; // Let others look at this message also if they want
}
if (router == nullptr || router->p_encrypted == nullptr) {
LOG_WARN("StoreForward++ cannot process message, due to null pointer");
return ProcessMessage::CONTINUE;
}
// will eventually host DMs and other undecodable messages
if (mp.which_payload_variant != meshtastic_MeshPacket_decoded_tag) {
return ProcessMessage::CONTINUE; // Let others look at this message also if they want
@@ -349,6 +385,8 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
link_object incoming_link;
incoming_link.validObject = false;
maybeAddPeer(mp);
if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE) {
// TODO: Regardless of where we are in the chain, if we have a newer message, send it back.
@@ -484,42 +522,12 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
split_link_in.validObject = false;
doing_split_receive = false;
// do the recalcualte step we skipped
// TODO put this in a function
SHA256 message_hash;
message_hash.reset();
message_hash.update(incoming_link.encrypted_bytes, incoming_link.encrypted_len);
message_hash.update(&incoming_link.to, sizeof(incoming_link.to));
message_hash.update(&incoming_link.from, sizeof(incoming_link.from));
message_hash.update(&incoming_link.id, sizeof(incoming_link.id));
message_hash.finalize(incoming_link.message_hash, SFPP_HASH_SIZE);
incoming_link.message_hash_len = SFPP_HASH_SIZE;
// look up full root hash and copy over the partial if it matches
if (lookUpFullRootHash(t->root_hash.bytes, t->root_hash.size, incoming_link.root_hash)) {
printBytes("Found full root hash: 0x", incoming_link.root_hash, SFPP_HASH_SIZE);
incoming_link.root_hash_len = SFPP_HASH_SIZE;
} else {
LOG_WARN("root hash does not match %d bytes", t->root_hash.size);
incoming_link.root_hash_len = 0;
incoming_link.validObject = false;
if (!recalculateHash(incoming_link, t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes,
t->commit_hash.size)) {
LOG_WARN("Recalculated hash does not match");
return true;
}
if (t->commit_hash.size == SFPP_HASH_SIZE && getChainCount(t->root_hash.bytes, t->root_hash.size) == 0 &&
portduino_config.sfpp_initial_sync != 0 && !portduino_config.sfpp_stratum0) {
incoming_link.commit_hash_len = SFPP_HASH_SIZE;
memcpy(incoming_link.commit_hash, t->commit_hash.bytes, SFPP_HASH_SIZE);
} else if (t->commit_hash.size > 0) {
// calculate the full commit hash and replace the partial if it matches
if (checkCommitHash(incoming_link, t->commit_hash.bytes, t->commit_hash.size)) {
printBytes("commit hash matches: 0x", t->commit_hash.bytes, t->commit_hash.size);
} else {
LOG_WARN("commit hash does not match, rejecting link.");
incoming_link.commit_hash_len = 0;
incoming_link.validObject = false;
}
}
} else {
LOG_WARN("No first half stored, cannot combine");
return true;
@@ -537,12 +545,13 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
return true;
}
if (isInDB(incoming_link.message_hash, incoming_link.message_hash_len)) {
LOG_INFO("Received link already in chain");
// TODO: respond with next link?
return true;
}
if (portduino_config.sfpp_stratum0) {
if (isInDB(incoming_link.message_hash, incoming_link.message_hash_len)) {
LOG_INFO("Received link already in chain");
// TODO: respond with next link?
return true;
}
// calculate the commit_hash
addToChain(incoming_link);
@@ -1153,45 +1162,14 @@ StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::ingestLinkMe
// What if we don't have this root hash? Should drop this packet before this point.
lo.channel_hash = getChannelHashFromRoot(t->root_hash.bytes, t->root_hash.size);
SHA256 message_hash;
memcpy(lo.encrypted_bytes, t->message.bytes, t->message.size);
lo.encrypted_len = t->message.size;
if (recalc) {
message_hash.reset();
message_hash.update(lo.encrypted_bytes, lo.encrypted_len);
message_hash.update(&lo.to, sizeof(lo.to));
message_hash.update(&lo.from, sizeof(lo.from));
message_hash.update(&lo.id, sizeof(lo.id));
message_hash.finalize(lo.message_hash, SFPP_HASH_SIZE);
lo.message_hash_len = SFPP_HASH_SIZE;
// look up full root hash and copy over the partial if it matches
if (lookUpFullRootHash(t->root_hash.bytes, t->root_hash.size, lo.root_hash)) {
printBytes("Found full root hash: 0x", lo.root_hash, SFPP_HASH_SIZE);
lo.root_hash_len = SFPP_HASH_SIZE;
} else {
LOG_WARN("root hash does not match %d bytes", t->root_hash.size);
lo.root_hash_len = 0;
if (!recalculateHash(lo, t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size)) {
lo.validObject = false;
return lo;
}
if (t->commit_hash.size == SFPP_HASH_SIZE && getChainCount(t->root_hash.bytes, t->root_hash.size) == 0 &&
portduino_config.sfpp_initial_sync != 0 && !portduino_config.sfpp_stratum0) {
lo.commit_hash_len = SFPP_HASH_SIZE;
memcpy(lo.commit_hash, t->commit_hash.bytes, SFPP_HASH_SIZE);
} else if (t->commit_hash.size > 0) {
// calculate the full commit hash and replace the partial if it matches
if (checkCommitHash(lo, t->commit_hash.bytes, t->commit_hash.size)) {
printBytes("commit hash matches: 0x", t->commit_hash.bytes, t->commit_hash.size);
} else {
LOG_WARN("commit hash does not match, rejecting link.");
lo.commit_hash_len = 0;
lo.validObject = false;
}
}
} else {
memcpy(lo.message_hash, t->message_hash.bytes, t->message_hash.size);
lo.message_hash_len = t->message_hash.size;
@@ -1354,4 +1332,58 @@ void StoreForwardPlusPlusModule::trimOldestLink(uint8_t *root_hash, size_t root_
sqlite3_reset(trimOldestLinkStmt);
}
bool StoreForwardPlusPlusModule::recalculateHash(StoreForwardPlusPlusModule::link_object &lo, uint8_t *_root_hash_bytes,
size_t _root_hash_len, uint8_t *_commit_hash_bytes, size_t _commit_hash_len)
{
SHA256 message_hash;
message_hash.reset();
message_hash.update(lo.encrypted_bytes, lo.encrypted_len);
message_hash.update(&lo.to, sizeof(lo.to));
message_hash.update(&lo.from, sizeof(lo.from));
message_hash.update(&lo.id, sizeof(lo.id));
message_hash.finalize(lo.message_hash, SFPP_HASH_SIZE);
lo.message_hash_len = SFPP_HASH_SIZE;
// look up full root hash and copy over the partial if it matches
if (lookUpFullRootHash(_root_hash_bytes, _root_hash_len, lo.root_hash)) {
printBytes("Found full root hash: 0x", lo.root_hash, SFPP_HASH_SIZE);
lo.root_hash_len = SFPP_HASH_SIZE;
} else {
LOG_WARN("root hash does not match %d bytes", _root_hash_len);
lo.root_hash_len = 0;
lo.validObject = false;
return false;
}
if (_commit_hash_len == SFPP_HASH_SIZE && getChainCount(_root_hash_bytes, _root_hash_len) == 0 &&
portduino_config.sfpp_initial_sync != 0 && !portduino_config.sfpp_stratum0) {
lo.commit_hash_len = SFPP_HASH_SIZE;
memcpy(lo.commit_hash, _commit_hash_bytes, SFPP_HASH_SIZE);
} else if (_commit_hash_len > 0) {
// calculate the full commit hash and replace the partial if it matches
if (checkCommitHash(lo, _commit_hash_bytes, _commit_hash_len)) {
printBytes("commit hash matches: 0x", _commit_hash_bytes, _commit_hash_len);
} else {
LOG_WARN("commit hash does not match, rejecting link.");
lo.commit_hash_len = 0;
lo.validObject = false;
return false;
}
}
return true;
}
void StoreForwardPlusPlusModule::maybeAddPeer(const meshtastic_MeshPacket &mp)
{
sqlite3_bind_int(maybeAddPeerStmt, 1, mp.from);
sqlite3_bind_int(maybeAddPeerStmt, 2, 1);
sqlite3_bind_int(maybeAddPeerStmt, 3, mp.hop_start - mp.hop_limit);
int res = sqlite3_step(maybeAddPeerStmt);
if (res != SQLITE_OK && res != SQLITE_DONE) {
LOG_ERROR("Maybe Add Peer sqlite error %u, %s", res, sqlite3_errmsg(ppDb));
}
sqlite3_reset(maybeAddPeerStmt);
}
#endif // has include sqlite3

View File

@@ -119,6 +119,7 @@ class StoreForwardPlusPlusModule : public ProtobufModule<meshtastic_StoreForward
sqlite3_stmt *getChainCountStmt;
sqlite3_stmt *pruneScratchQueueStmt;
sqlite3_stmt *trimOldestLinkStmt;
sqlite3_stmt *maybeAddPeerStmt;
// For a given Meshtastic ChannelHash, fills the root_hash buffer with a 32-byte root hash
// returns true if the root hash was found
@@ -216,6 +217,12 @@ class StoreForwardPlusPlusModule : public ProtobufModule<meshtastic_StoreForward
void trimOldestLink(uint8_t *, size_t);
// given a link object with a payload and other fields, recalculates the message hash
// returns true if a match
bool recalculateHash(link_object &, uint8_t *, size_t, uint8_t *, size_t);
void maybeAddPeer(const meshtastic_MeshPacket &);
// Track if we have a scheduled runOnce pending
// useful to not accudentally delay a scheduled runOnce
bool pendingRun = false;