Add peers table

This commit is contained in:
Jonathan Bennett
2026-01-01 22:16:35 -06:00
parent 7d6a0f20c6
commit b7028fff08
2 changed files with 75 additions and 11 deletions

View File

@@ -2,6 +2,8 @@
// Track the nodenums we've gotten SFPP traffic from, to build a network graph
// store nodes, packet counts, and hops away in a new peers table
// TODO: When evaluating a canon announce, try speculatively commiting multiple messages from scratch.
// Eventual TODO: non-stratum0 nodes need to be pointed at their upstream source? Maybe
#if __has_include("sqlite3.h")
@@ -137,7 +139,7 @@ StoreForwardPlusPlusModule::StoreForwardPlusPlusModule()
provide_count INT DEFAULT 0, \
split_count INT DEFAULT 0, \
total_count INT DEFAULT 0, \
average_hops INT DEFAULT 0, \
average_hops REAL DEFAULT 0, \
PRIMARY KEY (nodenum) \
);",
NULL, NULL, &err);
@@ -217,6 +219,16 @@ StoreForwardPlusPlusModule::StoreForwardPlusPlusModule()
"INSERT INTO peers (nodenum, total_count, average_hops) VALUES(?, ?, ?) ON CONFLICT(nodenum) DO NOTHING;",
-1, &maybeAddPeerStmt, NULL);
sqlite3_prepare_v2(ppDb,
"SELECT announce_count, query_count, request_count, provide_count, split_count, total_count, average_hops "
"FROM peers WHERE nodenum=?;",
-1, &getPeerStmt, NULL);
sqlite3_prepare_v2(ppDb,
"UPDATE peers SET announce_count=?, query_count=?, request_count=?, provide_count=?, split_count=?, "
"total_count=?, average_hops=? WHERE nodenum=?;",
-1, &updatePeerStmt, NULL);
encryptedOk = true;
this->setInterval(portduino_config.sfpp_announce_interval * 60 * 1000);
@@ -385,7 +397,7 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
link_object incoming_link;
incoming_link.validObject = false;
maybeAddPeer(mp);
updatePeers(mp, t->sfpp_message_type);
if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE) {
@@ -1375,16 +1387,66 @@ bool StoreForwardPlusPlusModule::recalculateHash(StoreForwardPlusPlusModule::lin
return true;
}
void StoreForwardPlusPlusModule::maybeAddPeer(const meshtastic_MeshPacket &mp)
void StoreForwardPlusPlusModule::updatePeers(const meshtastic_MeshPacket &mp,
meshtastic_StoreForwardPlusPlus_SFPP_message_type type)
{
sqlite3_bind_int(maybeAddPeerStmt, 1, mp.from);
sqlite3_bind_int(maybeAddPeerStmt, 2, 1);
sqlite3_bind_int(maybeAddPeerStmt, 3, mp.hop_start - mp.hop_limit);
int res = sqlite3_step(maybeAddPeerStmt);
if (res != SQLITE_OK && res != SQLITE_DONE) {
LOG_ERROR("StoreForwardpp Maybe Add Peer sqlite error %u, %s", res, sqlite3_errmsg(ppDb));
sqlite3_bind_int(getPeerStmt, 1, mp.from);
int res = sqlite3_step(getPeerStmt);
if (res == SQLITE_ROW) {
auto announce_count = sqlite3_column_int(getPeerStmt, 0);
auto query_count = sqlite3_column_int(getPeerStmt, 1);
auto request_count = sqlite3_column_int(getPeerStmt, 2);
auto provide_count = sqlite3_column_int(getPeerStmt, 3);
auto split_count = sqlite3_column_int(getPeerStmt, 4);
auto total_count = sqlite3_column_int(getPeerStmt, 5);
auto average_hops = sqlite3_column_double(getPeerStmt, 6);
total_count++;
if (type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE) {
announce_count++;
} else if (type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_CHAIN_QUERY) {
query_count++;
} else if (type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_REQUEST) {
request_count++;
} else if (type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE) {
provide_count++;
} else if (type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_FIRSTHALF ||
type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_SECONDHALF) {
split_count++;
}
// recalculate average hops
average_hops = ((average_hops * (total_count - 1)) + (mp.hop_start - mp.hop_limit)) / total_count;
sqlite3_bind_int(updatePeerStmt, 1, announce_count);
sqlite3_bind_int(updatePeerStmt, 2, query_count);
sqlite3_bind_int(updatePeerStmt, 3, request_count);
sqlite3_bind_int(updatePeerStmt, 4, provide_count);
sqlite3_bind_int(updatePeerStmt, 5, split_count);
sqlite3_bind_int(updatePeerStmt, 6, total_count);
sqlite3_bind_double(updatePeerStmt, 7, average_hops);
sqlite3_bind_int(updatePeerStmt, 8, mp.from);
res = sqlite3_step(updatePeerStmt);
if (res != SQLITE_OK && res != SQLITE_DONE) {
LOG_ERROR("StoreForwardpp Update Peer sqlite error %u, %s", res, sqlite3_errmsg(ppDb));
}
sqlite3_reset(updatePeerStmt);
} else {
sqlite3_bind_int(maybeAddPeerStmt, 1, mp.from);
sqlite3_bind_int(maybeAddPeerStmt, 2, 1);
sqlite3_bind_int(maybeAddPeerStmt, 3, mp.hop_start - mp.hop_limit);
res = sqlite3_step(maybeAddPeerStmt);
if (res != SQLITE_OK && res != SQLITE_DONE) {
LOG_ERROR("StoreForwardpp Maybe Add Peer sqlite error %u, %s", res, sqlite3_errmsg(ppDb));
}
sqlite3_reset(maybeAddPeerStmt);
}
sqlite3_reset(maybeAddPeerStmt);
sqlite3_reset(getPeerStmt);
// get the peer row for the from node
// if it's new, just push the new one in
// if not new, run the calculations and update the numbers
}
#endif // has include sqlite3

View File

@@ -120,6 +120,8 @@ class StoreForwardPlusPlusModule : public ProtobufModule<meshtastic_StoreForward
sqlite3_stmt *pruneScratchQueueStmt;
sqlite3_stmt *trimOldestLinkStmt;
sqlite3_stmt *maybeAddPeerStmt;
sqlite3_stmt *getPeerStmt;
sqlite3_stmt *updatePeerStmt;
// For a given Meshtastic ChannelHash, fills the root_hash buffer with a 32-byte root hash
// returns true if the root hash was found
@@ -221,7 +223,7 @@ class StoreForwardPlusPlusModule : public ProtobufModule<meshtastic_StoreForward
// returns true if a match
bool recalculateHash(link_object &, uint8_t *, size_t, uint8_t *, size_t);
void maybeAddPeer(const meshtastic_MeshPacket &);
void updatePeers(const meshtastic_MeshPacket &, meshtastic_StoreForwardPlusPlus_SFPP_message_type);
// Track if we have a scheduled runOnce pending
// useful to not accudentally delay a scheduled runOnce