// I've done a lot of this in SQLite for now, but honestly it needs to happen in memory, and get saved to sqlite during downtime // TODO: Message asking for the message x spots from the tip of the chain. // This would allow individual nodes to apt in, grab the latest fe messages, and not need to sync the entire chain // TODO: custom hops. 1 maybe 0. Configurable? // TODO: non-stratum0 nodes need to be pointed at their upstream source? Maybe // TODO: There will come a point where the chain is too big to sync // TODO: evict messages from scratch after a timeout // things may get weird if there are multiple stratum-0 nodes on a single mesh. Come up with mitigations #if __has_include("sqlite3.h") #include "StoreForwardPlusPlus.h" #include "MeshService.h" #include "RTC.h" #include "SHA256.h" #include "meshUtils.h" #include "modules/RoutingModule.h" StoreForwardPlusPlusModule::StoreForwardPlusPlusModule() : ProtobufModule("StoreForwardpp", meshtastic_PortNum_STORE_FORWARD_PLUSPLUS_APP, &meshtastic_StoreForwardPlusPlus_msg), concurrency::OSThread("StoreForwardpp") { LOG_WARN("StoreForwardPlusPlusModule init"); if (portduino_config.sfpp_stratum0) LOG_WARN("SF++ stratum0"); int res = sqlite3_open("test.db", &ppDb); LOG_WARN("Result1 %u", res); char *err = nullptr; res = sqlite3_exec(ppDb, " \ CREATE TABLE channel_messages( \ destination INT NOT NULL, \ sender INT NOT NULL, \ packet_id INT NOT NULL, \ rx_time INT NOT NULL, \ root_hash BLOB NOT NULL, \ encrypted_bytes BLOB NOT NULL, \ message_hash BLOB NOT NULL, \ commit_hash BLOB NOT NULL, \ payload TEXT, \ counter INT DEFAULT 0, \ PRIMARY KEY (message_hash) \ );", NULL, NULL, &err); LOG_WARN("Result2 %u", res); if (err != nullptr) LOG_ERROR("%s", err); sqlite3_free(err); res = sqlite3_exec(ppDb, " \ CREATE TABLE local_messages( \ destination INT NOT NULL, \ sender INT NOT NULL, \ packet_id INT NOT NULL, \ rx_time INT NOT NULL, \ root_hash BLOB NOT NULL, \ encrypted_bytes BLOB NOT NULL, \ message_hash BLOB NOT NULL, \ payload TEXT, \ PRIMARY KEY (message_hash) \ );", NULL, NULL, &err); LOG_WARN("Result2 %u", res); if (err != nullptr) LOG_ERROR("%s", err); sqlite3_free(err); // create table DMs res = sqlite3_exec(ppDb, " \ CREATE TABLE direct_messages( \ destination INT NOT NULL, \ sender INT NOT NULL, \ packet_id INT NOT NULL, \ rx_time INT NOT NULL, \ root_hash BLOB NOT NULL, \ commit_hash BLOB NOT NULL, \ encrypted_bytes BLOB NOT NULL, \ message_hash BLOB NOT NULL, \ payload TEXT, \ PRIMARY KEY (message_hash) \ );", NULL, NULL, &err); LOG_WARN("Result2 %u", res); if (err != nullptr) LOG_ERROR("%s", err); sqlite3_free(err); // mappings table -- connects the root hashes to channel hashes and DM identifiers res = sqlite3_exec(ppDb, " \ CREATE TABLE mappings( \ chain_type INT NOT NULL, \ identifier INT NOT NULL, \ root_hash BLOB NOT NULL, \ count INT DEFAULT 0, \ PRIMARY KEY (identifier) \ );", NULL, NULL, &err); LOG_WARN("Result2 %u", res); if (err != nullptr) LOG_ERROR("%s", err); sqlite3_free(err); // store schema version somewhere // prepared statements *should* make this faster. sqlite3_prepare_v2(ppDb, "INSERT INTO channel_messages (destination, sender, packet_id, root_hash, \ encrypted_bytes, message_hash, rx_time, commit_hash, payload, counter) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", -1, &chain_insert_stmt, NULL); sqlite3_prepare_v2(ppDb, "INSERT INTO local_messages (destination, sender, packet_id, root_hash, \ encrypted_bytes, message_hash, rx_time, payload) VALUES(?, ?, ?, ?, ?, ?, ?, ?);", -1, &scratch_insert_stmt, NULL); sqlite3_prepare_v2(ppDb, "select destination, sender, packet_id, encrypted_bytes, message_hash, rx_time, root_hash \ from local_messages where root_hash=? order by rx_time asc LIMIT 1;", // earliest first -1, &fromScratchStmt, NULL); sqlite3_prepare_v2(ppDb, "select destination, sender, packet_id, encrypted_bytes, message_hash, rx_time, root_hash, payload \ from local_messages where substr(message_hash,1,?)=? order by rx_time asc LIMIT 1;", // earliest first -1, &fromScratchByHashStmt, NULL); sqlite3_prepare_v2(ppDb, "SELECT COUNT(*) from channel_messages where substr(message_hash,1,?)=?", -1, &checkDup, NULL); sqlite3_prepare_v2(ppDb, "SELECT COUNT(*) from local_messages where substr(message_hash,1,?)=?", -1, &checkScratch, NULL); sqlite3_prepare_v2(ppDb, "DELETE from local_messages where substr(message_hash,1,?)=?", -1, &removeScratch, NULL); sqlite3_prepare_v2(ppDb, "UPDATE channel_messages SET payload=? WHERE substr(message_hash,1,?)=?", -1, &updatePayloadStmt, NULL); sqlite3_prepare_v2(ppDb, "select commit_hash from channel_messages where substr(root_hash,1,?)=? order by rowid ASC;", -1, &getNextHashStmt, NULL); sqlite3_prepare_v2(ppDb, "select commit_hash, message_hash, rx_time from channel_messages where substr(root_hash,1,?)=? order by " "rowid desc LIMIT 1;", -1, &getChainEndStmt, NULL); sqlite3_prepare_v2( ppDb, "select destination, sender, packet_id, encrypted_bytes, message_hash, rx_time, commit_hash, root_hash, counter, payload \ from channel_messages where substr(commit_hash,1,?)=?;", -1, &getLinkStmt, NULL); sqlite3_prepare_v2(ppDb, "select identifier from mappings where substr(root_hash,1,?)=?;", -1, &getHashFromRootStmt, NULL); sqlite3_prepare_v2(ppDb, "INSERT INTO mappings (chain_type, identifier, root_hash) VALUES(?, ?, ?);", -1, &addRootToMappingsStmt, NULL); sqlite3_prepare_v2(ppDb, "select root_hash from mappings where identifier=?;", -1, &getRootFromChannelHashStmt, NULL); sqlite3_prepare_v2(ppDb, "select root_hash from mappings where substr(root_hash,1,?)=?;", -1, &getFullRootHashStmt, NULL); sqlite3_prepare_v2(ppDb, "UPDATE mappings SET count=? WHERE substr(root_hash,1,?)=?;", -1, &setChainCountStmt, NULL); sqlite3_prepare_v2(ppDb, "SELECT count FROM mappings WHERE substr(root_hash,1,?)=?;", -1, &getChainCountStmt, NULL); encryptedOk = true; // wait about 15 seconds after boot for the first runOnce() // TODO: When not doing active development, adjust this to a longer time this->setInterval(15 * 1000); } int32_t StoreForwardPlusPlusModule::runOnce() { LOG_WARN("StoreForward++ runONce"); pendingRun = false; if (getRTCQuality() < RTCQualityNTP) { LOG_WARN("StoreForward++ deferred due to time quality %u", getRTCQuality()); return 5 * 60 * 1000; } uint8_t root_hash_bytes[32] = {0}; ChannelHash hash = channels.getHash(0); getOrAddRootFromChannelHash(hash, root_hash_bytes); // get tip of chain for this channel link_object chain_end = getChainEndLinkObject(root_hash_bytes, 32); if (chain_end.rx_time == 0) { LOG_WARN("Store and Forward++ database lookup returned null"); // first attempt at a chain-only announce with no messages meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE; storeforward.root_hash.size = 32; memcpy(storeforward.root_hash.bytes, root_hash_bytes, 32); storeforward.encapsulated_rxtime = 0; // storeforward. meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); p->to = NODENUM_BROADCAST; p->decoded.want_response = false; p->priority = meshtastic_MeshPacket_Priority_BACKGROUND; p->channel = 0; LOG_INFO("Send packet to mesh"); service->sendToMesh(p, RX_SRC_LOCAL, true); return 5 * 60 * 1000; } // broadcast the tip of the chain canonAnnounce(chain_end.message_hash, chain_end.commit_hash, root_hash_bytes, chain_end.rx_time); // eventually timeout things on the scratch queue return 5 * 60 * 1000; } ProcessMessage StoreForwardPlusPlusModule::handleReceived(const meshtastic_MeshPacket &mp) { // To avoid terrible time problems, require NTP or GPS time if (getRTCQuality() < RTCQualityNTP) { return ProcessMessage::CONTINUE; } // For the moment, this is strictly LoRa if (mp.transport_mechanism != meshtastic_MeshPacket_TransportMechanism_TRANSPORT_LORA) { return ProcessMessage::CONTINUE; // Let others look at this message also if they want } // will eventually host DMs and other undecodable messages if (mp.which_payload_variant != meshtastic_MeshPacket_decoded_tag) { return ProcessMessage::CONTINUE; // Let others look at this message also if they want } LOG_WARN("in handleReceived"); if (mp.decoded.portnum == meshtastic_PortNum_TEXT_MESSAGE_APP && mp.to == NODENUM_BROADCAST) { link_object lo = ingestTextPacket(mp, router->p_encrypted); if (isInDB(lo.message_hash, lo.message_hash_len)) { LOG_WARN("found message in db"); // We may have this message already, but we may not have the payload // if we do, we can update the payload in the database if (lo.payload != "") updatePayload(lo.message_hash, lo.message_hash_len, lo.payload); return ProcessMessage::CONTINUE; } if (!portduino_config.sfpp_stratum0) { if (!isInDB(lo.message_hash, lo.message_hash_len)) { // todo check for valid root hash addToScratch(lo); LOG_WARN("added message to scratch"); // send link to upstream? } return ProcessMessage::CONTINUE; } addToChain(lo); if (!pendingRun) { setIntervalFromNow(60 * 1000); // run again in 60 seconds to announce the new tip of chain pendingRun = true; } // canonAnnounce(lo.message_hash, lo.commit_hash, lo.root_hash, lo.rx_time); return ProcessMessage::CONTINUE; // Let others look at this message also if they want } else if (mp.decoded.portnum == meshtastic_PortNum_STORE_FORWARD_PLUSPLUS_APP) { LOG_WARN("Got a STORE_FORWARD++ packet"); meshtastic_StoreForwardPlusPlus scratch; pb_decode_from_bytes(mp.decoded.payload.bytes, mp.decoded.payload.size, meshtastic_StoreForwardPlusPlus_fields, &scratch); handleReceivedProtobuf(mp, &scratch); return ProcessMessage::CONTINUE; } return ProcessMessage::CONTINUE; } bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPacket &mp, meshtastic_StoreForwardPlusPlus *t) { LOG_WARN("in handleReceivedProtobuf"); LOG_WARN("Sfp++ node %u sent us sf++ packet", mp.from); printBytes("commit_hash ", t->commit_hash.bytes, t->commit_hash.size); printBytes("root_hash ", t->root_hash.bytes, t->root_hash.size); if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE) { // TODO: Regardless of where we are in the chain, if we have a newer message, send it back. if (portduino_config.sfpp_stratum0) { LOG_WARN("Received a CANON_ANNOUNCE while stratum 0"); uint8_t next_commit_hash[32] = {0}; if (getNextHash(t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size, next_commit_hash)) { printBytes("next chain hash: ", next_commit_hash, 32); if (airTime->isTxAllowedChannelUtil(true)) { broadcastLink(next_commit_hash, 32); } } } else { uint8_t tmp_root_hash_bytes[32] = {0}; LOG_WARN("Received a CANON_ANNOUNCE"); if (getRootFromChannelHash(router->p_encrypted->channel, tmp_root_hash_bytes)) { // we found the hash, check if it's the right one // TODO handle oddball sizes here if (memcmp(tmp_root_hash_bytes, t->root_hash.bytes, t->root_hash.size) != 0) { LOG_WARN("Found root hash, and it doesn't match!"); return true; } } else { // TODO: size check addRootToMappings(router->p_encrypted->channel, t->root_hash.bytes); LOG_WARN("Adding root hash to mappings"); } if (t->encapsulated_rxtime == 0) { LOG_WARN("No encapsulated time, conclude the chain is empty"); return true; } // get tip of chain for this channel link_object chain_end = getChainEndLinkObject(t->root_hash.bytes, t->root_hash.size); // get chain tip if (chain_end.rx_time != 0) { // TODO: size check if (memcmp(chain_end.commit_hash, t->commit_hash.bytes, t->commit_hash.size) == 0) { LOG_WARN("End of chain matches!"); sendFromScratch(chain_end.root_hash); } else { LOG_INFO("End of chain does not match!"); // We just got an end of chain announce, checking if we have seen this message and have it in scratch. if (isInScratch(t->message_hash.bytes, t->message_hash.size)) { link_object scratch_object = getFromScratch(t->message_hash.bytes, t->message_hash.size); // if this matches, we don't need to request the message // we know exactly what it is if (t->message_hash.size >= 8 && checkCommitHash(scratch_object, t->commit_hash.bytes, t->message_hash.size)) { addToChain(scratch_object); removeFromScratch(scratch_object.message_hash, scratch_object.message_hash_len); // short circuit and return // falls through to a request for the message return true; } } if (airTime->isTxAllowedChannelUtil(true)) { requestNextMessage(t->root_hash.bytes, t->root_hash.size, chain_end.commit_hash, 32); } } } else { // if chainEnd() LOG_WARN("No Messages on this chain, request!"); if (airTime->isTxAllowedChannelUtil(true)) { requestNextMessage(t->root_hash.bytes, t->root_hash.size, t->root_hash.bytes, t->root_hash.size); } } } } else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_REQUEST) { uint8_t next_commit_hash[32] = {0}; LOG_WARN("Received link request"); if (getNextHash(t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size, next_commit_hash)) { printBytes("next chain hash: ", next_commit_hash, 32); broadcastLink(next_commit_hash, 32); } // if root and chain hashes are the same, grab the first message on the chain // if different, get the message directly after. // check if the root } else if (t->sfpp_message_type == meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE) { LOG_WARN("Link Provide received!"); link_object incoming_link = ingestLinkMessage(t); if (incoming_link.root_hash_len == 0) { LOG_WARN("Hash bytes not found for incoming link"); return true; } if (!incoming_link.validObject) { LOG_WARN("commit byte mismatch"); return true; } if (portduino_config.sfpp_stratum0) { if (isInDB(incoming_link.message_hash, incoming_link.message_hash_len)) { LOG_WARN("Received link already in chain"); // TODO: respond with next link? return true; } // calculate the commit_hash addToChain(incoming_link); if (!pendingRun) { setIntervalFromNow(60 * 1000); // run again in 60 seconds to announce the new tip of chain pendingRun = true; } // timebox to no more than an hour old if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { // if this packet is new to us, we rebroadcast it rebroadcastLinkObject(incoming_link); } } else { if (incoming_link.commit_hash_len == 32) { addToChain(incoming_link); if (isInScratch(incoming_link.message_hash, incoming_link.message_hash_len)) { link_object scratch_object = getFromScratch(incoming_link.message_hash, incoming_link.message_hash_len); if (scratch_object.payload != "") { updatePayload(incoming_link.message_hash, incoming_link.message_hash_len, scratch_object.payload); } removeFromScratch(incoming_link.message_hash, incoming_link.message_hash_len); } else { // if this packet is new to us, we rebroadcast it, but only up to an hour old if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { LOG_WARN("Attempting to Rebroadcast message"); rebroadcastLinkObject(incoming_link); } } requestNextMessage(t->root_hash.bytes, t->root_hash.size, t->commit_hash.bytes, t->commit_hash.size); } else { if (!isInScratch(incoming_link.message_hash, incoming_link.message_hash_len)) { addToScratch(incoming_link); LOG_WARN("added incoming non-canon message to scratch"); if (incoming_link.rx_time > getValidTime(RTCQuality::RTCQualityNTP, true) - rebroadcastTimeout) { LOG_WARN("Attempting to Rebroadcast message"); rebroadcastLinkObject(incoming_link); } } } } } return true; } bool StoreForwardPlusPlusModule::getRootFromChannelHash(ChannelHash _ch_hash, uint8_t *_root_hash) { bool found = false; sqlite3_bind_int(getRootFromChannelHashStmt, 1, _ch_hash); sqlite3_step(getRootFromChannelHashStmt); uint8_t *tmp_root_hash = (uint8_t *)sqlite3_column_blob(getRootFromChannelHashStmt, 0); if (tmp_root_hash) { LOG_WARN("Found root hash!"); memcpy(_root_hash, tmp_root_hash, 32); found = true; } sqlite3_reset(getRootFromChannelHashStmt); return found; } ChannelHash StoreForwardPlusPlusModule::getChannelHashFromRoot(uint8_t *_root_hash, size_t _root_hash_len) { sqlite3_bind_int(getHashFromRootStmt, 1, _root_hash_len); sqlite3_bind_blob(getHashFromRootStmt, 2, _root_hash, _root_hash_len, NULL); sqlite3_step(getHashFromRootStmt); ChannelHash tmp_hash = (ChannelHash)sqlite3_column_int(getHashFromRootStmt, 0); sqlite3_reset(getHashFromRootStmt); return tmp_hash; } // return code indicates newly created chain bool StoreForwardPlusPlusModule::getOrAddRootFromChannelHash(ChannelHash _ch_hash, uint8_t *_root_hash) { LOG_WARN("getOrAddRootFromChannelHash()"); bool isNew = !getRootFromChannelHash(_ch_hash, _root_hash); if (isNew) { if (portduino_config.sfpp_stratum0) { LOG_WARN("Generating Root hash!"); // generate root hash SHA256 root_hash; root_hash.update(&_ch_hash, sizeof(_ch_hash)); NodeNum ourNode = nodeDB->getNodeNum(); root_hash.update(&ourNode, sizeof(ourNode)); uint32_t rtc_sec = getValidTime(RTCQuality::RTCQualityDevice, true); root_hash.update(&rtc_sec, sizeof(rtc_sec)); root_hash.finalize(_root_hash, 32); addRootToMappings(_ch_hash, _root_hash); } } return isNew; } void StoreForwardPlusPlusModule::addRootToMappings(ChannelHash _ch_hash, uint8_t *_root_hash) { LOG_WARN("addRootToMappings()"); printBytes("_root_hash", _root_hash, 32); // write to the table int type = chain_types::channel_chain; // note, must be an int variable sqlite3_bind_int(addRootToMappingsStmt, 1, type); sqlite3_bind_int(addRootToMappingsStmt, 2, _ch_hash); sqlite3_bind_blob(addRootToMappingsStmt, 3, _root_hash, 32, NULL); auto rc = sqlite3_step(addRootToMappingsStmt); LOG_WARN("result %u, %s", rc, sqlite3_errmsg(ppDb)); sqlite3_reset(addRootToMappingsStmt); } StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getChainEndLinkObject(uint8_t *_root_hash, size_t _root_hash_len) { LOG_WARN("getChainEndLinkObject"); link_object lo; int rc; sqlite3_bind_int(getChainEndStmt, 1, _root_hash_len); sqlite3_bind_blob(getChainEndStmt, 2, _root_hash, _root_hash_len, NULL); sqlite3_step(getChainEndStmt); uint8_t *last_message_commit_hash = (uint8_t *)sqlite3_column_blob(getChainEndStmt, 0); uint8_t *last_message_hash = (uint8_t *)sqlite3_column_blob(getChainEndStmt, 1); uint32_t _rx_time = sqlite3_column_int(getChainEndStmt, 2); if (last_message_commit_hash != nullptr) { lo = getLink(last_message_commit_hash, 32); } sqlite3_reset(getChainEndStmt); return lo; } // TODO: make DM? void StoreForwardPlusPlusModule::requestNextMessage(uint8_t *_root_hash, size_t _root_hash_len, uint8_t *_commit_hash, size_t _commit_hash_len) { meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_REQUEST; // set root hash // set chain hash storeforward.commit_hash.size = _commit_hash_len; memcpy(storeforward.commit_hash.bytes, _commit_hash, _commit_hash_len); // set root hash storeforward.root_hash.size = _root_hash_len; memcpy(storeforward.root_hash.bytes, _root_hash, _root_hash_len); // storeforward. meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); p->to = NODENUM_BROADCAST; p->decoded.want_response = false; p->priority = meshtastic_MeshPacket_Priority_BACKGROUND; p->channel = 0; LOG_INFO("Send packet to mesh"); service->sendToMesh(p, RX_SRC_LOCAL, true); } bool StoreForwardPlusPlusModule::getNextHash(uint8_t *_root_hash, size_t _root_hash_len, uint8_t *_commit_hash, size_t _commit_hash_len, uint8_t *next_commit_hash) { LOG_WARN("getNextHash"); // ChannelHash _channel_hash = getChannelHashFromRoot(_root_hash, _root_hash_len); // LOG_WARN("_channel_hash %u", _channel_hash); int rc; sqlite3_bind_int(getNextHashStmt, 1, _root_hash_len); sqlite3_bind_blob(getNextHashStmt, 2, _root_hash, _root_hash_len, NULL); bool next_hash = false; // asking for the first entry on the chain if (memcmp(_root_hash, _commit_hash, _commit_hash_len) == 0) { rc = sqlite3_step(getNextHashStmt); if (rc != SQLITE_OK) { LOG_WARN("here2 %u, %s", rc, sqlite3_errmsg(ppDb)); } uint8_t *tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getNextHashStmt, 0); if (tmp_commit_hash == nullptr) { LOG_WARN("No next hash found"); sqlite3_reset(getNextHashStmt); return false; } printBytes("commit_hash", tmp_commit_hash, 32); memcpy(next_commit_hash, tmp_commit_hash, 32); next_hash = true; } else { bool found_hash = false; LOG_WARN("Looking for next hashes"); uint8_t *tmp_commit_hash; while (sqlite3_step(getNextHashStmt) != SQLITE_DONE) { tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getNextHashStmt, 0); if (found_hash) { LOG_WARN("Found hash"); memcpy(next_commit_hash, tmp_commit_hash, 32); next_hash = true; break; } if (memcmp(tmp_commit_hash, _commit_hash, _commit_hash_len) == 0) found_hash = true; } } sqlite3_reset(getNextHashStmt); return next_hash; } void StoreForwardPlusPlusModule::broadcastLink(uint8_t *_commit_hash, size_t _commit_hash_len) { int rc; sqlite3_bind_int(getLinkStmt, 1, _commit_hash_len); sqlite3_bind_blob(getLinkStmt, 2, _commit_hash, _commit_hash_len, NULL); LOG_WARN("%d", sqlite3_step(getLinkStmt)); meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE; storeforward.encapsulated_to = sqlite3_column_int(getLinkStmt, 0); storeforward.encapsulated_from = sqlite3_column_int(getLinkStmt, 1); storeforward.encapsulated_id = sqlite3_column_int(getLinkStmt, 2); uint8_t *_payload = (uint8_t *)sqlite3_column_blob(getLinkStmt, 3); storeforward.message.size = sqlite3_column_bytes(getLinkStmt, 3); memcpy(storeforward.message.bytes, _payload, storeforward.message.size); uint8_t *_message_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 4); // storeforward.message_hash.size = 8; // memcpy(storeforward.message_hash.bytes, _message_hash, storeforward.message_hash.size); storeforward.encapsulated_rxtime = sqlite3_column_int(getLinkStmt, 5); uint8_t *_tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 6); storeforward.commit_hash.size = 8; memcpy(storeforward.commit_hash.bytes, _tmp_commit_hash, storeforward.commit_hash.size); uint8_t *_root_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 7); storeforward.root_hash.size = 8; memcpy(storeforward.root_hash.bytes, _root_hash, storeforward.root_hash.size); sqlite3_reset(getLinkStmt); meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); p->to = NODENUM_BROADCAST; p->decoded.want_response = false; p->priority = meshtastic_MeshPacket_Priority_BACKGROUND; p->channel = 0; LOG_INFO("Send link to mesh"); service->sendToMesh(p, RX_SRC_LOCAL, true); } // StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getLink(uint8_t *_commit_hash, size_t _commit_hash_len) { link_object lo; int rc; sqlite3_bind_int(getLinkStmt, 1, _commit_hash_len); sqlite3_bind_blob(getLinkStmt, 2, _commit_hash, _commit_hash_len, NULL); LOG_WARN("%d", sqlite3_step(getLinkStmt)); lo.to = sqlite3_column_int(getLinkStmt, 0); lo.from = sqlite3_column_int(getLinkStmt, 1); lo.id = sqlite3_column_int(getLinkStmt, 2); uint8_t *_payload = (uint8_t *)sqlite3_column_blob(getLinkStmt, 3); lo.encrypted_len = sqlite3_column_bytes(getLinkStmt, 3); memcpy(lo.encrypted_bytes, _payload, lo.encrypted_len); uint8_t *_message_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 4); lo.message_hash_len = 32; memcpy(lo.message_hash, _message_hash, lo.message_hash_len); lo.rx_time = sqlite3_column_int(getLinkStmt, 5); uint8_t *_tmp_commit_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 6); lo.commit_hash_len = 32; memcpy(lo.commit_hash, _tmp_commit_hash, lo.commit_hash_len); uint8_t *_root_hash = (uint8_t *)sqlite3_column_blob(getLinkStmt, 7); lo.root_hash_len = 32; memcpy(lo.root_hash, _root_hash, lo.root_hash_len); lo.counter = sqlite3_column_int(getLinkStmt, 8); lo.payload = std::string((char *)sqlite3_column_text(getLinkStmt, 9)); lo.channel_hash = getChannelHashFromRoot(lo.root_hash, lo.root_hash_len); sqlite3_reset(getLinkStmt); return lo; } bool StoreForwardPlusPlusModule::sendFromScratch(uint8_t *root_hash) { LOG_WARN("sendFromScratch"); // "select destination, sender, packet_id, channel_hash, encrypted_bytes, message_hash, rx_time \ // from local_messages order by rx_time desc LIMIT 1;" sqlite3_bind_blob(fromScratchStmt, 1, root_hash, 32, NULL); if (sqlite3_step(fromScratchStmt) == SQLITE_DONE) { LOG_WARN("No messages in scratch to forward"); return false; } meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_LINK_PROVIDE; storeforward.encapsulated_to = sqlite3_column_int(fromScratchStmt, 0); storeforward.encapsulated_from = sqlite3_column_int(fromScratchStmt, 1); storeforward.encapsulated_id = sqlite3_column_int(fromScratchStmt, 2); uint8_t *_encrypted = (uint8_t *)sqlite3_column_blob(fromScratchStmt, 3); storeforward.message.size = sqlite3_column_bytes(fromScratchStmt, 3); memcpy(storeforward.message.bytes, _encrypted, storeforward.message.size); uint8_t *_message_hash = (uint8_t *)sqlite3_column_blob(fromScratchStmt, 4); storeforward.message_hash.size = 32; memcpy(storeforward.message_hash.bytes, _message_hash, storeforward.message_hash.size); storeforward.encapsulated_rxtime = sqlite3_column_int(fromScratchStmt, 5); storeforward.root_hash.size = 32; memcpy(storeforward.root_hash.bytes, root_hash, 32); sqlite3_reset(fromScratchStmt); meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); p->to = NODENUM_BROADCAST; p->decoded.want_response = false; p->priority = meshtastic_MeshPacket_Priority_BACKGROUND; p->channel = 0; LOG_INFO("Send link to mesh"); service->sendToMesh(p, RX_SRC_LOCAL, true); return true; } bool StoreForwardPlusPlusModule::addToChain(link_object &lo) { LOG_WARN("Add to chain"); link_object chain_end = getChainEndLinkObject(lo.root_hash, lo.root_hash_len); // we may need to calculate the full commit hash at this point if (lo.commit_hash_len != 32) { SHA256 commit_hash; commit_hash.reset(); if (chain_end.commit_hash_len == 32) { printBytes("last message: 0x", chain_end.commit_hash, 32); commit_hash.update(chain_end.commit_hash, 32); } else { printBytes("new chain root: 0x", lo.root_hash, 32); commit_hash.update(lo.root_hash, 32); } commit_hash.update(lo.message_hash, 32); // message_hash.update(&mp.rx_time, sizeof(mp.rx_time)); commit_hash.finalize(lo.commit_hash, 32); } lo.counter = chain_end.counter + 1; // push a message into the local chain DB // destination sqlite3_bind_int(chain_insert_stmt, 1, lo.to); // sender sqlite3_bind_int(chain_insert_stmt, 2, lo.from); // packet_id sqlite3_bind_int(chain_insert_stmt, 3, lo.id); // root_hash sqlite3_bind_blob(chain_insert_stmt, 4, lo.root_hash, 32, NULL); // encrypted_bytes sqlite3_bind_blob(chain_insert_stmt, 5, lo.encrypted_bytes, lo.encrypted_len, NULL); // message_hash sqlite3_bind_blob(chain_insert_stmt, 6, lo.message_hash, 32, NULL); // rx_time sqlite3_bind_int(chain_insert_stmt, 7, lo.rx_time); // commit_hash sqlite3_bind_blob(chain_insert_stmt, 8, lo.commit_hash, 32, NULL); // payload sqlite3_bind_text(chain_insert_stmt, 9, lo.payload.c_str(), lo.payload.length(), NULL); sqlite3_bind_int(chain_insert_stmt, 10, lo.counter); sqlite3_step(chain_insert_stmt); sqlite3_reset(chain_insert_stmt); setChainCount(lo.root_hash, 32, lo.counter); return true; } bool StoreForwardPlusPlusModule::addToScratch(link_object &lo) { // push a message into the local chain DB // destination sqlite3_bind_int(scratch_insert_stmt, 1, lo.to); // sender sqlite3_bind_int(scratch_insert_stmt, 2, lo.from); // packet_id sqlite3_bind_int(scratch_insert_stmt, 3, lo.id); // root_hash sqlite3_bind_blob(scratch_insert_stmt, 4, lo.root_hash, 32, NULL); // encrypted_bytes sqlite3_bind_blob(scratch_insert_stmt, 5, lo.encrypted_bytes, lo.encrypted_len, NULL); // message_hash sqlite3_bind_blob(scratch_insert_stmt, 6, lo.message_hash, 32, NULL); // rx_time sqlite3_bind_int(scratch_insert_stmt, 7, lo.rx_time); // payload sqlite3_bind_text(scratch_insert_stmt, 8, lo.payload.c_str(), lo.payload.length(), NULL); const char *_error_mesg = sqlite3_errmsg(ppDb); LOG_WARN("step %u, %s", sqlite3_step(scratch_insert_stmt), _error_mesg); sqlite3_reset(scratch_insert_stmt); return true; } void StoreForwardPlusPlusModule::canonAnnounce(uint8_t *_message_hash, uint8_t *_commit_hash, uint8_t *_root_hash, uint32_t _rx_time) { LOG_WARN("canonAnnounce()"); meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE; // set root hash // set message hash storeforward.message_hash.size = 8; memcpy(storeforward.message_hash.bytes, _message_hash, 8); // set chain hash storeforward.commit_hash.size = 8; memcpy(storeforward.commit_hash.bytes, _commit_hash, 8); // set root hash // needs to be the full hash to bootstrap storeforward.root_hash.size = 32; memcpy(storeforward.root_hash.bytes, _root_hash, 32); storeforward.encapsulated_rxtime = _rx_time; // storeforward. meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); p->to = NODENUM_BROADCAST; p->decoded.want_response = false; p->priority = meshtastic_MeshPacket_Priority_BACKGROUND; p->channel = 0; LOG_INFO("Send packet to mesh"); service->sendToMesh(p, RX_SRC_LOCAL, true); } bool StoreForwardPlusPlusModule::isInDB(uint8_t *message_hash_bytes, size_t message_hash_len) { sqlite3_bind_int(checkDup, 1, message_hash_len); sqlite3_bind_blob(checkDup, 2, message_hash_bytes, message_hash_len, NULL); sqlite3_step(checkDup); int numberFound = sqlite3_column_int(checkDup, 0); sqlite3_reset(checkDup); if (numberFound > 0) return true; return false; } bool StoreForwardPlusPlusModule::isInScratch(uint8_t *message_hash_bytes, size_t message_hash_len) { LOG_WARN("isInScratch"); sqlite3_bind_int(checkScratch, 1, message_hash_len); sqlite3_bind_blob(checkScratch, 2, message_hash_bytes, message_hash_len, NULL); sqlite3_step(checkScratch); int numberFound = sqlite3_column_int(checkScratch, 0); sqlite3_reset(checkScratch); if (numberFound > 0) return true; return false; } void StoreForwardPlusPlusModule::removeFromScratch(uint8_t *message_hash_bytes, size_t message_hash_len) { LOG_WARN("removeFromScratch"); sqlite3_bind_int(removeScratch, 1, message_hash_len); sqlite3_bind_blob(removeScratch, 2, message_hash_bytes, message_hash_len, NULL); sqlite3_step(removeScratch); int numberFound = sqlite3_column_int(removeScratch, 0); sqlite3_reset(removeScratch); } void StoreForwardPlusPlusModule::updatePayload(uint8_t *message_hash_bytes, size_t message_hash_len, std::string payload) { LOG_WARN("updatePayload"); sqlite3_bind_text(updatePayloadStmt, 1, payload.c_str(), payload.length(), NULL); sqlite3_bind_int(updatePayloadStmt, 2, message_hash_len); sqlite3_bind_blob(updatePayloadStmt, 3, message_hash_bytes, message_hash_len, NULL); auto res = sqlite3_step(updatePayloadStmt); const char *_error_mesg = sqlite3_errmsg(ppDb); LOG_WARN("step %u, %s", res, _error_mesg); sqlite3_reset(updatePayloadStmt); } StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::getFromScratch(uint8_t *message_hash_bytes, size_t hash_len) { // vscode wrote this LOG_WARN("getFromScratch"); link_object lo; sqlite3_bind_int(fromScratchByHashStmt, 1, hash_len); sqlite3_bind_blob(fromScratchByHashStmt, 2, message_hash_bytes, hash_len, NULL); auto res = sqlite3_step(fromScratchByHashStmt); const char *_error_mesg = sqlite3_errmsg(ppDb); LOG_WARN("step %u, %s", res, _error_mesg); lo.to = sqlite3_column_int(fromScratchByHashStmt, 0); lo.from = sqlite3_column_int(fromScratchByHashStmt, 1); lo.id = sqlite3_column_int(fromScratchByHashStmt, 2); uint8_t *encrypted_bytes = (uint8_t *)sqlite3_column_blob(fromScratchByHashStmt, 3); lo.encrypted_len = sqlite3_column_bytes(fromScratchByHashStmt, 3); memcpy(lo.encrypted_bytes, encrypted_bytes, lo.encrypted_len); uint8_t *message_hash = (uint8_t *)sqlite3_column_blob(fromScratchByHashStmt, 4); memcpy(lo.message_hash, message_hash, 32); lo.rx_time = sqlite3_column_int(fromScratchByHashStmt, 5); uint8_t *root_hash = (uint8_t *)sqlite3_column_blob(fromScratchByHashStmt, 6); memcpy(lo.root_hash, root_hash, 32); lo.payload = std::string((char *)sqlite3_column_text(fromScratchByHashStmt, 7), sqlite3_column_bytes(fromScratchByHashStmt, 7)); sqlite3_reset(fromScratchByHashStmt); return lo; } // should not need size considerations StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::ingestTextPacket(const meshtastic_MeshPacket &mp, const meshtastic_MeshPacket *encrypted_meshpacket) { LOG_WARN("ingestTextPacket()"); link_object lo; SHA256 message_hash; lo.to = mp.to; lo.from = mp.from; lo.id = mp.id; lo.rx_time = mp.rx_time; lo.channel_hash = encrypted_meshpacket->channel; memcpy(lo.encrypted_bytes, encrypted_meshpacket->encrypted.bytes, encrypted_meshpacket->encrypted.size); lo.encrypted_len = encrypted_meshpacket->encrypted.size; lo.payload = std::string((char *)mp.decoded.payload.bytes, mp.decoded.payload.size); message_hash.reset(); message_hash.update(encrypted_meshpacket->encrypted.bytes, encrypted_meshpacket->encrypted.size); message_hash.update(&mp.to, sizeof(mp.to)); message_hash.update(&mp.from, sizeof(mp.from)); message_hash.update(&mp.id, sizeof(mp.id)); message_hash.finalize(lo.message_hash, 32); lo.message_hash_len = 32; getOrAddRootFromChannelHash(encrypted_meshpacket->channel, lo.root_hash); return lo; } StoreForwardPlusPlusModule::link_object StoreForwardPlusPlusModule::ingestLinkMessage(meshtastic_StoreForwardPlusPlus *t) { link_object lo; lo.to = t->encapsulated_to; lo.from = t->encapsulated_from; lo.id = t->encapsulated_id; lo.rx_time = t->encapsulated_rxtime; // What if we don't have this root hash? Should drop this packet before this point. lo.channel_hash = getChannelHashFromRoot(t->root_hash.bytes, t->root_hash.size); SHA256 message_hash; memcpy(lo.encrypted_bytes, t->message.bytes, t->message.size); lo.encrypted_len = t->message.size; message_hash.reset(); message_hash.update(lo.encrypted_bytes, lo.encrypted_len); message_hash.update(&lo.to, sizeof(lo.to)); message_hash.update(&lo.from, sizeof(lo.from)); message_hash.update(&lo.id, sizeof(lo.id)); message_hash.finalize(lo.message_hash, 32); lo.message_hash_len = 32; // look up full root hash and copy over the partial if it matches if (lookUpFullRootHash(t->root_hash.bytes, t->root_hash.size, lo.root_hash)) { printBytes("Found full root hash: 0x", lo.root_hash, 32); lo.root_hash_len = 32; } else { LOG_WARN("root hash does not match %d bytes", t->root_hash.size); printBytes("Using partial root hash: 0x", t->root_hash.bytes, t->root_hash.size); lo.root_hash_len = 0; } if (t->commit_hash.size > 0) { // calculate the full commit hash and replace the partial if it matches if (checkCommitHash(lo, t->commit_hash.bytes, t->commit_hash.size)) { printBytes("commit hash matches: 0x", t->commit_hash.bytes, t->commit_hash.size); } else { LOG_WARN("commit hash does not match"); lo.commit_hash_len = 0; lo.validObject = false; } } // we don't ever get the payload here, so it's always an empty string lo.payload = ""; return lo; } void StoreForwardPlusPlusModule::rebroadcastLinkObject(StoreForwardPlusPlusModule::link_object &lo) { LOG_WARN("Attempting to Rebroadcast1"); meshtastic_MeshPacket *p = router->allocForSending(); p->to = lo.to; p->from = lo.from; p->id = lo.id; p->channel = lo.channel_hash; p->which_payload_variant = meshtastic_MeshPacket_encrypted_tag; p->encrypted.size = lo.encrypted_len; memcpy(p->encrypted.bytes, lo.encrypted_bytes, lo.encrypted_len); p->transport_mechanism = meshtastic_MeshPacket_TransportMechanism_TRANSPORT_LORA; // only a tiny white lie service->sendToMesh(p, RX_SRC_RADIO, true); // Send to mesh, cc to phone } bool StoreForwardPlusPlusModule::checkCommitHash(StoreForwardPlusPlusModule::link_object &lo, uint8_t *commit_hash_bytes, size_t hash_len) { SHA256 commit_hash; link_object chain_end = getChainEndLinkObject(lo.root_hash, lo.root_hash_len); commit_hash.reset(); if (chain_end.commit_hash_len == 32) { printBytes("last message: 0x", chain_end.commit_hash, 32); commit_hash.update(chain_end.commit_hash, 32); } else { if (lo.root_hash_len != 32) { LOG_WARN("Short root hash in link object, cannot create new chain"); return false; } printBytes("new chain root: 0x", lo.root_hash, 32); commit_hash.update(lo.root_hash, 32); } commit_hash.update(lo.message_hash, 32); commit_hash.finalize(lo.commit_hash, 32); lo.commit_hash_len = 32; if (hash_len == 0 || memcmp(commit_hash_bytes, lo.commit_hash, hash_len) == 0) { return true; } return false; } bool StoreForwardPlusPlusModule::lookUpFullRootHash(uint8_t *partial_root_hash, size_t partial_root_hash_len, uint8_t *full_root_hash) { LOG_WARN("lookUpFullRootHash"); printBytes("partial_root_hash", partial_root_hash, partial_root_hash_len); sqlite3_bind_int(getFullRootHashStmt, 1, partial_root_hash_len); sqlite3_bind_blob(getFullRootHashStmt, 2, partial_root_hash, partial_root_hash_len, NULL); sqlite3_step(getFullRootHashStmt); uint8_t *tmp_root_hash = (uint8_t *)sqlite3_column_blob(getFullRootHashStmt, 0); if (tmp_root_hash) { LOG_WARN("Found full root hash!"); memcpy(full_root_hash, tmp_root_hash, 32); sqlite3_reset(getFullRootHashStmt); return true; } sqlite3_reset(getFullRootHashStmt); return false; } void StoreForwardPlusPlusModule::setChainCount(uint8_t *root_hash, size_t root_hash_len, uint32_t count) { sqlite3_bind_blob(setChainCountStmt, 1, root_hash, root_hash_len, NULL); sqlite3_bind_int(setChainCountStmt, 2, count); sqlite3_step(setChainCountStmt); sqlite3_reset(setChainCountStmt); } uint32_t StoreForwardPlusPlusModule::getChainCount(uint8_t *root_hash, size_t root_hash_len) { sqlite3_bind_blob(getChainCountStmt, 1, root_hash, root_hash_len, NULL); sqlite3_step(getChainCountStmt); uint32_t count = sqlite3_column_int(getChainCountStmt, 0); sqlite3_reset(getChainCountStmt); return count; } #endif // has include sqlite3