sfpp Split Messages

This commit is contained in:
Jonathan Bennett
2025-12-29 21:57:42 -06:00
parent f8c27d1714
commit d272b28ed4
2 changed files with 123 additions and 58 deletions

View File

@@ -1,6 +1,3 @@
// TODO: Split packets when needed
// TODO: roll messages out of DB
// Eventual TODO: non-stratum0 nodes need to be pointed at their upstream source? Maybe
#if __has_include("sqlite3.h")
@@ -219,6 +216,14 @@ int32_t StoreForwardPlusPlusModule::runOnce()
return portduino_config.sfpp_announce_interval * 60 * 1000;
}
if (doing_split_send) {
LOG_DEBUG("Sending split second half");
broadcastLink(split_link_out, true, true);
split_link_out = link_object();
split_link_out.validObject = false;
return portduino_config.sfpp_announce_interval * 60 * 1000;
}
// get tip of chain for this channel
link_object chain_end = getLinkFromCount(0, root_hash_bytes, SFPP_HASH_SIZE);
@@ -326,6 +331,9 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
printBytes("commit_hash ", t->commit_hash.bytes, t->commit_hash.size);
printBytes("root_hash ", t->root_hash.bytes, t->root_hash.size);
link_object incoming_link;
incoming_link.validObject = false;
if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE) {
// TODO: Regardless of where we are in the chain, if we have a newer message, send it back.
@@ -420,6 +428,56 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
LOG_DEBUG("Link Provide received!");
link_object incoming_link = ingestLinkMessage(t);
} else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_FIRSTHALF) {
LOG_DEBUG("Link Provide First Half received!");
split_link_in = ingestLinkMessage(t);
doing_split_receive = true;
return true;
} else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_SECONDHALF) {
LOG_DEBUG("Link Provide Second Half received!");
if (!doing_split_receive) {
LOG_DEBUG("Received second half without first half, ignoring");
return true;
}
if (!split_link_in.validObject) {
LOG_WARN("No first half stored, cannot combine");
doing_split_receive = false;
return true;
}
link_object second_half = ingestLinkMessage(t);
if (second_half.validObject == false) {
LOG_WARN("Second half invalid");
return true;
}
if (split_link_in.encrypted_len + second_half.encrypted_len > 256) {
LOG_WARN("Combined link too large");
return true;
}
if (split_link_in.from == second_half.from && split_link_in.to == second_half.to &&
split_link_in.root_hash_len == second_half.root_hash_len &&
memcmp(split_link_in.root_hash, second_half.root_hash, split_link_in.root_hash_len) == 0 &&
split_link_in.message_hash_len == second_half.message_hash_len &&
memcmp(split_link_in.message_hash, second_half.message_hash, split_link_in.message_hash_len) == 0) {
incoming_link = split_link_in;
memcpy(&incoming_link.encrypted_bytes[split_link_in.encrypted_len], second_half.encrypted_bytes,
second_half.encrypted_len);
incoming_link.encrypted_len = split_link_in.encrypted_len + second_half.encrypted_len;
// append the encrypted bytes
// clear first half
split_link_in = link_object();
split_link_in.validObject = false;
doing_split_receive = false;
} else {
LOG_WARN("No first half stored, cannot combine");
return true;
}
}
if (incoming_link.validObject) {
if (incoming_link.root_hash_len == 0) {
LOG_WARN("Hash bytes not found for incoming link");
return true;
@@ -464,7 +522,8 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac
rebroadcastLinkObject(incoming_link);
}
}
requestNextMessage(t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size);
requestNextMessage(incoming_link.root_hash, incoming_link.root_hash_len, incoming_link.commit_hash,
incoming_link.commit_hash_len);
} else {
if (!isInScratch(incoming_link.message_hash, incoming_link.message_hash_len)) {
addToScratch(incoming_link);
@@ -640,51 +699,63 @@ void StoreForwardPlusPlusModule::broadcastLink(uint8_t *_commit_hash, size_t _co
sqlite3_bind_int(getLinkStmt, 1, _commit_hash_len);
sqlite3_bind_blob(getLinkStmt, 2, _commit_hash, _commit_hash_len, NULL);
int res = sqlite3_step(getLinkStmt);
link_object lo;
meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero;
storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE;
storeforward.encapsulated_to = sqlite3_column_int(getLinkStmt, 0);
if (storeforward.encapsulated_to == NODENUM_BROADCAST) {
storeforward.encapsulated_to = 0;
}
storeforward.encapsulated_from = sqlite3_column_int(getLinkStmt, 1);
storeforward.encapsulated_id = sqlite3_column_int(getLinkStmt, 2);
lo.to = sqlite3_column_int(getLinkStmt, 0);
lo.from = sqlite3_column_int(getLinkStmt, 1);
lo.id = sqlite3_column_int(getLinkStmt, 2);
uint8_t *_payload = (uint8_t *)sqlite3_column_blob(getLinkStmt, 3);
storeforward.message.size = sqlite3_column_bytes(getLinkStmt, 3);
memcpy(storeforward.message.bytes, _payload, storeforward.message.size);
lo.encrypted_len = sqlite3_column_bytes(getLinkStmt, 3);
memcpy(lo.encrypted_bytes, _payload, lo.encrypted_len);
uint8_t *_message_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 4);
lo.message_hash_len = SFPP_HASH_SIZE;
memcpy(lo.message_hash, _message_hash, lo.message_hash_len);
storeforward.encapsulated_rxtime = sqlite3_column_int(getLinkStmt, 5);
lo.rx_time = sqlite3_column_int(getLinkStmt, 5);
uint8_t *_tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 6);
storeforward.commit_hash.size = 8;
memcpy(storeforward.commit_hash.bytes, _tmp_commit_hash, storeforward.commit_hash.size);
lo.commit_hash_len = 8;
memcpy(lo.commit_hash, _tmp_commit_hash, lo.commit_hash_len);
uint8_t *_root_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 7);
storeforward.root_hash.size = 8;
memcpy(storeforward.root_hash.bytes, _root_hash, storeforward.root_hash.size);
lo.root_hash_len = 8;
memcpy(lo.root_hash, _root_hash, lo.root_hash_len);
sqlite3_reset(getLinkStmt);
meshtastic_MeshPacket *p = allocDataProtobuf(storeforward);
p->to = NODENUM_BROADCAST;
p->decoded.want_response = false;
p->priority = meshtastic_MeshPacket_Priority_BACKGROUND;
p->channel = 0;
p->hop_limit = portduino_config.sfpp_hops;
p->hop_start = portduino_config.sfpp_hops;
LOG_INFO("Send link to mesh");
service->sendToMesh(p, RX_SRC_LOCAL, true);
broadcastLink(lo, false);
}
// TODO if stratum0, send the chain count
void StoreForwardPlusPlusModule::broadcastLink(link_object &lo, bool full_commit_hash)
void StoreForwardPlusPlusModule::broadcastLink(link_object &lo, bool full_commit_hash, bool is_split_second_half)
{
meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero;
storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE;
if (lo.encrypted_len > 200) {
LOG_WARN("Link too large to send (%u bytes)", lo.encrypted_len);
doing_split_send = true;
storeforward.message_hash.size = SFPP_SHORT_HASH_SIZE;
memcpy(storeforward.message_hash.bytes, lo.message_hash, storeforward.message_hash.size);
link_object full_link = lo;
split_link_out = lo;
size_t half_size = lo.encrypted_len / 2;
storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_FIRSTHALF;
lo.encrypted_len = half_size;
split_link_out.encrypted_len = full_link.encrypted_len - half_size;
memcpy(split_link_out.encrypted_bytes, &full_link.encrypted_bytes[half_size], split_link_out.encrypted_len);
setIntervalFromNow(30 * 1000); // send second half in 30 seconds
} else if (is_split_second_half) {
storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE_SECONDHALF;
storeforward.message_hash.size = SFPP_SHORT_HASH_SIZE;
memcpy(storeforward.message_hash.bytes, lo.message_hash, storeforward.message_hash.size);
doing_split_send = false;
}
storeforward.encapsulated_to = lo.to;
if (storeforward.encapsulated_to == NODENUM_BROADCAST) {
@@ -704,11 +775,11 @@ void StoreForwardPlusPlusModule::broadcastLink(link_object &lo, bool full_commit
if (full_commit_hash)
storeforward.commit_hash.size = lo.commit_hash_len;
else
storeforward.commit_hash.size = 8;
storeforward.commit_hash.size = SFPP_SHORT_HASH_SIZE;
memcpy(storeforward.commit_hash.bytes, lo.commit_hash, storeforward.commit_hash.size);
}
storeforward.root_hash.size = 8;
storeforward.root_hash.size = SFPP_SHORT_HASH_SIZE;
memcpy(storeforward.root_hash.bytes, lo.root_hash, storeforward.root_hash.size);
sqlite3_reset(getLinkStmt);
@@ -767,45 +838,33 @@ StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getLink(uint
bool StoreForwardPlusPlusModule::sendFromScratch(uint8_t *root_hash)
{
link_object lo;
sqlite3_bind_blob(fromScratchStmt, 1, root_hash, SFPP_HASH_SIZE, NULL);
if (sqlite3_step(fromScratchStmt) == SQLITE_DONE) {
return false;
}
meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero;
storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE;
storeforward.encapsulated_to = sqlite3_column_int(fromScratchStmt, 0);
if (storeforward.encapsulated_to == NODENUM_BROADCAST) {
storeforward.encapsulated_to = 0;
}
storeforward.encapsulated_from = sqlite3_column_int(fromScratchStmt, 1);
storeforward.encapsulated_id = sqlite3_column_int(fromScratchStmt, 2);
lo.to = sqlite3_column_int(fromScratchStmt, 0);
lo.from = sqlite3_column_int(fromScratchStmt, 1);
lo.id = sqlite3_column_int(fromScratchStmt, 2);
uint8_t *_encrypted = (uint8_t *)sqlite3_column_blob(fromScratchStmt, 3);
storeforward.message.size = sqlite3_column_bytes(fromScratchStmt, 3);
memcpy(storeforward.message.bytes, _encrypted, storeforward.message.size);
lo.encrypted_len = sqlite3_column_bytes(fromScratchStmt, 3);
memcpy(lo.encrypted_bytes, _encrypted, lo.encrypted_len);
uint8_t *_message_hash = (uint8_t *)sqlite3_column_blob(fromScratchStmt, 4);
storeforward.encapsulated_rxtime = sqlite3_column_int(fromScratchStmt, 5);
lo.rx_time = sqlite3_column_int(fromScratchStmt, 5);
storeforward.root_hash.size = SFPP_SHORT_HASH_SIZE;
memcpy(storeforward.root_hash.bytes, root_hash, SFPP_SHORT_HASH_SIZE);
lo.root_hash_len = SFPP_SHORT_HASH_SIZE;
memcpy(lo.root_hash, root_hash, lo.root_hash_len);
sqlite3_reset(fromScratchStmt);
meshtastic_MeshPacket *p = allocDataProtobuf(storeforward);
p->to = NODENUM_BROADCAST;
p->decoded.want_response = false;
p->priority = meshtastic_MeshPacket_Priority_BACKGROUND;
p->channel = 0;
p->hop_limit = portduino_config.sfpp_hops;
p->hop_start = portduino_config.sfpp_hops;
printBytes("Send link to mesh ", _message_hash, 8);
LOG_WARN("Size: %d", storeforward.message.size);
printBytes("encrypted ", storeforward.message.bytes, storeforward.message.size);
service->sendToMesh(p, RX_SRC_LOCAL, true);
LOG_WARN("Size: %d", lo.encrypted_len);
printBytes("encrypted ", lo.encrypted_bytes, lo.encrypted_len);
broadcastLink(lo, false);
return true;
}
@@ -1085,6 +1144,7 @@ StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::ingestLinkMe
// we don't ever get the payload here, so it's always an empty string
lo.payload = "";
lo.validObject = true;
return lo;
}
@@ -1170,7 +1230,7 @@ uint32_t StoreForwardPlusPlusModule::getChainCount(uint8_t *root_hash, size_t ro
sqlite3_bind_blob(getChainCountStmt, 2, root_hash, root_hash_len, NULL);
int res = sqlite3_step(getChainCountStmt);
if (res != SQLITE_OK && res != SQLITE_DONE) {
if (res != SQLITE_OK && res != SQLITE_DONE && res != SQLITE_ROW) {
LOG_ERROR("getChainCount sqlite error %u, %s", res, sqlite3_errmsg(ppDb));
}
uint32_t count = sqlite3_column_int(getChainCountStmt, 0);

View File

@@ -153,7 +153,7 @@ class StoreForwardPlusPlusModule : public ProtobufModule<meshtastic_StoreForward
void broadcastLink(uint8_t *, size_t);
// sends a LINK_PROVIDE message broadcasting the given link object
void broadcastLink(link_object &, bool);
void broadcastLink(link_object &, bool, bool = false);
// sends a LINK_PROVIDE message broadcasting the given link object from scratch message store
bool sendFromScratch(uint8_t *);
@@ -226,5 +226,10 @@ class StoreForwardPlusPlusModule : public ProtobufModule<meshtastic_StoreForward
};
uint32_t rebroadcastTimeout = 3600; // Messages older than this (in seconds) will not be rebroadcast
bool doing_split_send = false;
link_object split_link_out;
bool doing_split_receive = false;
link_object split_link_in;
};
#endif