diff --git a/src/modules/Native/StoreForwardPlusPlus.cpp b/src/modules/Native/StoreForwardPlusPlus.cpp index 189e459ca..22ea7fa3c 100644 --- a/src/modules/Native/StoreForwardPlusPlus.cpp +++ b/src/modules/Native/StoreForwardPlusPlus.cpp @@ -1,9 +1,12 @@ -// create second module for satellites? +// I've done a lot of this in SQLite for now, but honestly it needs to happen in memory, and get saved to sqlite during downtime -// The central node will generate a 256 or 128 bit value as its seed. This is the value other nodes subscribe to, and serves as -// the root of the chain +// TODO: Put some channel usage limits on this: Should be limited to 25% utilization, for instance -// +// TODO: custom hops. 1 maybe 0 + +// TODO: non-stratum0 nodes need to be pointed at their upstream source? Maybe + +// things may get weird if there are multiple stratum-0 nodes on a single mesh. Come up with mitigations // Basic design: // This module watches a channel for text messages. @@ -29,8 +32,6 @@ // at least initially, there can only be one authoritative host -// first draft is just to save channel 0 in a git-style database - // message objects get a hash value // the message chain gets a commit hash // @@ -46,11 +47,15 @@ StoreForwardPlusPlusModule::StoreForwardPlusPlusModule() concurrency::OSThread("StoreForwardpp") { LOG_WARN("StoreForwardPlusPlusModule init"); + if (portduino_config.sfpp_stratum0) LOG_WARN("SF++ stratum0"); + int res = sqlite3_open("test.db", &ppDb); LOG_WARN("Result1 %u", res); + char *err = nullptr; + res = sqlite3_exec(ppDb, " \ CREATE TABLE channel_messages( \ destination INT NOT NULL, \ @@ -68,8 +73,26 @@ StoreForwardPlusPlusModule::StoreForwardPlusPlusModule() NULL, NULL, &err); LOG_WARN("Result2 %u", res); if (err != nullptr) - ; - LOG_ERROR("%s", err); + LOG_ERROR("%s", err); + sqlite3_free(err); + + res = sqlite3_exec(ppDb, " \ + CREATE TABLE local_messages( \ + destination INT NOT NULL, \ + sender INT NOT NULL, \ + packet_id INT NOT NULL, \ + want_ack BOOL NOT NULL, \ + channel_hash INT NOT NULL, \ + encrypted_bytes BLOB NOT NULL, \ + message_hash BLOB NOT NULL, \ + rx_time INT NOT NULL, \ + payload TEXT, \ + PRIMARY KEY (message_hash) \ + );", + NULL, NULL, &err); + LOG_WARN("Result2 %u", res); + if (err != nullptr) + LOG_ERROR("%s", err); sqlite3_free(err); // create table DMs @@ -90,12 +113,10 @@ StoreForwardPlusPlusModule::StoreForwardPlusPlusModule() NULL, NULL, &err); LOG_WARN("Result2 %u", res); if (err != nullptr) - ; - LOG_ERROR("%s", err); + LOG_ERROR("%s", err); sqlite3_free(err); - // create table mappings - // create table DMs + // mappings table -- connects the root hashes to channel hashes and DM identifiers res = sqlite3_exec(ppDb, " \ CREATE TABLE mappings( \ chain_type INT NOT NULL, \ @@ -106,18 +127,29 @@ StoreForwardPlusPlusModule::StoreForwardPlusPlusModule() NULL, NULL, &err); LOG_WARN("Result2 %u", res); if (err != nullptr) - ; - LOG_ERROR("%s", err); + LOG_ERROR("%s", err); sqlite3_free(err); // store schema version somewhere - std::string insert_statement = "INSERT INTO channel_messages (destination, sender, packet_id, want_ack, channel_hash, \ - encrypted_bytes, message_hash, rx_time, commit_hash, payload) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?);"; - sqlite3_prepare_v2(ppDb, insert_statement.c_str(), insert_statement.length(), &stmt, NULL); + // prepared statements *should* make this faster. + sqlite3_prepare_v2(ppDb, "INSERT INTO channel_messages (destination, sender, packet_id, want_ack, channel_hash, \ + encrypted_bytes, message_hash, rx_time, commit_hash, payload) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", + -1, &chain_insert_stmt, NULL); + + sqlite3_prepare_v2(ppDb, "INSERT INTO channel_messages (destination, sender, packet_id, want_ack, channel_hash, \ + encrypted_bytes, message_hash, rx_time, payload) VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?);", + -1, &scratch_insert_stmt, NULL); + + sqlite3_prepare_v2(ppDb, "SELECT COUNT(*) from channel_messages where commit_hash=?", -1, &checkDup, NULL); + + sqlite3_prepare_v2(ppDb, "SELECT COUNT(*) from local_messages where commit_hash=?", -1, &checkScratch, NULL); + + sqlite3_prepare_v2(ppDb, "DELETE from local_messages where commit_hash=?", -1, &removeScratch, NULL); encryptedOk = true; + // wait about 15 seconds after boot for the first runOnce() this->setInterval(15 * 1000); } @@ -142,30 +174,7 @@ int32_t StoreForwardPlusPlusModule::runOnce() return 60 * 60 * 1000; } - meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; - storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE; - // set root hash - - // set message hash - storeforward.message_hash.size = 32; - memcpy(storeforward.message_hash.bytes, last_message_hash, 32); - - // set chain hash - storeforward.chain_hash.size = 32; - memcpy(storeforward.chain_hash.bytes, last_message_chain_hash, 32); - - // set root hash - storeforward.root_hash.size = 32; - memcpy(storeforward.root_hash.bytes, root_hash_bytes, 32); - - // storeforward. - meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); - p->to = NODENUM_BROADCAST; - p->decoded.want_response = false; - p->priority = meshtastic_MeshPacket_Priority_BACKGROUND; - p->channel = 0; - LOG_INFO("Send packet to mesh"); - service->sendToMesh(p, RX_SRC_LOCAL, true); + canonAnnounce(last_message_hash, last_message_chain_hash, root_hash_bytes); return 60 * 60 * 1000; } @@ -180,6 +189,7 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac if (portduino_config.sfpp_stratum0) { LOG_WARN("Received a CANON_ANNOUNCE while stratum 0"); + // honestly this is fine, and if the announce is not end of chain, just treat it like a request for next } else { uint8_t tmp_root_hash_bytes[32] = {0}; @@ -204,8 +214,11 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac if (getChainEnd(router->p_encrypted->channel, last_message_chain_hash, last_message_hash)) { if (memcmp(last_message_chain_hash, t->chain_hash.bytes, 32) == 0) { LOG_WARN("End of chain matches!"); - } else + // TODO: Send a message from the local queue + } else { ("End of chain does not match!"); + requestNextMessage(t->root_hash.bytes, last_message_chain_hash); + } } else { LOG_WARN("No Messages on this chain, request!"); requestNextMessage(t->root_hash.bytes, t->root_hash.bytes); @@ -232,9 +245,23 @@ bool StoreForwardPlusPlusModule::handleReceivedProtobuf(const meshtastic_MeshPac LOG_WARN("Link Provide received!"); ChannelHash _channel_hash = getChannelHashFromRoot(t->root_hash.bytes); - addToChain(t->encapsulated_to, t->encapsulated_from, t->encapsulated_id, false, _channel_hash, t->message.bytes, - t->message.size, t->message_hash.bytes, t->chain_hash.bytes, t->root_hash.bytes, t->encapsulated_rxtime, "", - 0); + // + if (portduino_config.sfpp_stratum0) { + // check for message_hash in db + // calculate the chain_hash + addToChain(t->encapsulated_to, t->encapsulated_from, t->encapsulated_id, false, _channel_hash, t->message.bytes, + t->message.size, t->message_hash.bytes, t->chain_hash.bytes, t->root_hash.bytes, t->encapsulated_rxtime, + "", 0); + } else { + addToChain(t->encapsulated_to, t->encapsulated_from, t->encapsulated_id, false, _channel_hash, t->message.bytes, + t->message.size, t->message_hash.bytes, t->chain_hash.bytes, t->root_hash.bytes, t->encapsulated_rxtime, + "", 0); + if (isInScratch(t->message_hash.bytes)) + removeFromScratch(t->message_hash.bytes); + // check for message hash in scratch + } + + // turn around and request the next message } return true; @@ -244,7 +271,7 @@ ProcessMessage StoreForwardPlusPlusModule::handleReceived(const meshtastic_MeshP { // To avoid terrible time problems, require NTP or GPS time if (getRTCQuality() < RTCQualityNTP) { - return ProcessMessage::CONTINUE; // Let others look at this message also if they want + return ProcessMessage::CONTINUE; } // the sender+destination pair is an interesting unique id (though ordering) (smaller one goes first?) @@ -272,7 +299,6 @@ ProcessMessage StoreForwardPlusPlusModule::handleReceived(const meshtastic_MeshP if (mp.which_payload_variant != meshtastic_MeshPacket_decoded_tag) { return ProcessMessage::CONTINUE; // Let others look at this message also if they want } - // refuse without valid time? LOG_WARN("in handleReceived"); if (mp.decoded.portnum == meshtastic_PortNum_TEXT_MESSAGE_APP && mp.to == NODENUM_BROADCAST) { @@ -286,62 +312,54 @@ ProcessMessage StoreForwardPlusPlusModule::handleReceived(const meshtastic_MeshP message_hash.update(&mp.id, sizeof(mp.id)); message_hash.finalize(message_hash_bytes, 32); - sqlite3_stmt *checkDup; - sqlite3_prepare_v2(ppDb, "SELECT COUNT(*) from channel_messages where commit_hash=?", -1, &checkDup, NULL); - sqlite3_bind_blob(checkDup, 1, message_hash_bytes, 32, NULL); - sqlite3_step(checkDup); - int numberFound = sqlite3_column_int(checkDup, 0); - LOG_WARN("found %u times in db", numberFound); - - if (numberFound != 0) + if (isInDB(message_hash_bytes)) { + LOG_WARN("found message in db"); + // TODO: For this iteration, we should check if the text payload is present in the db, and update it if not return ProcessMessage::CONTINUE; + } if (!portduino_config.sfpp_stratum0) { - LOG_WARN("TODO: downstream collection of messages"); + if (!isInDB(message_hash_bytes)) { + addToScratch(mp.to, mp.from, mp.id, mp.want_ack, router->p_encrypted->channel, + router->p_encrypted->encrypted.bytes, router->p_encrypted->encrypted.size, message_hash_bytes, + root_hash_bytes, mp.rx_time, (char *)mp.decoded.payload.bytes, mp.decoded.payload.size); + LOG_WARN("added message to scratch"); + // send link to upstream? + } return ProcessMessage::CONTINUE; } // need to resolve the channel hash to the root hash getRootFromChannelHash(router->p_encrypted->channel, root_hash_bytes); - - std::string getEntry_string = - "select commit_hash from channel_messages where channel_hash=? order by rowid desc LIMIT 1;"; - sqlite3_stmt *getEntry; - int rc = sqlite3_prepare_v2(ppDb, getEntry_string.c_str(), getEntry_string.size(), &getEntry, NULL); - sqlite3_bind_int(getEntry, 1, router->p_encrypted->channel); - sqlite3_step(getEntry); - - // this is allocated by sqlite3 and will be deleted when finalize is called - uint8_t *last_message_hash = (uint8_t *)sqlite3_column_blob(getEntry, 0); - if (last_message_hash) { - printBytes("last message: 0x", last_message_hash, 32); - } else { - printBytes("new chain root: 0x", root_hash_bytes, 32); - } - - // look for message_hash_bytes in db - // if found, we bail early + uint8_t *last_message_hash; + uint8_t *last_chain_hash; chain_hash.reset(); - if (last_message_hash) { + + if (getChainEnd(router->p_encrypted->channel, last_chain_hash, last_message_hash)) { + printBytes("last message: 0x", last_chain_hash, 32); chain_hash.update(last_message_hash, 32); } else { + printBytes("new chain root: 0x", root_hash_bytes, 32); chain_hash.update(root_hash_bytes, 32); } + chain_hash.update(message_hash_bytes, 32); // message_hash.update(&mp.rx_time, sizeof(mp.rx_time)); chain_hash.finalize(chain_hash_bytes, 32); - sqlite3_finalize(getEntry); - // select HEX(commit_hash),HEX(channel_hash), payload, destination from channel_messages order by rowid desc; // push a message into the local chain DB + // next, the stratum n+1 node needs to tuck messages away and attempt to update stratum 0 + addToChain(mp.to, mp.from, mp.id, mp.want_ack, router->p_encrypted->channel, router->p_encrypted->encrypted.bytes, router->p_encrypted->encrypted.size, message_hash_bytes, chain_hash_bytes, root_hash_bytes, mp.rx_time, (char *)mp.decoded.payload.bytes, mp.decoded.payload.size); + // TODO: If under 25% usage, go ahead and trigger a canon announce + return ProcessMessage::CONTINUE; // Let others look at this message also if they want // one of the command messages @@ -446,6 +464,10 @@ bool StoreForwardPlusPlusModule::getChainEnd(ChannelHash _ch_hash, uint8_t *_cha sqlite3_step(getEntry); uint8_t *last_message_chain_hash = (uint8_t *)sqlite3_column_blob(getEntry, 0); uint8_t *last_message_hash = (uint8_t *)sqlite3_column_blob(getEntry, 1); + if (last_message_chain_hash != nullptr) + memcpy(_chain_hash, last_message_chain_hash, 32); + if (last_message_hash != nullptr) + memcpy(_message_hash, last_message_hash, 32); if (last_message_chain_hash == nullptr || last_message_hash == nullptr) { LOG_WARN("Store and Forward++ database lookup returned null"); @@ -453,8 +475,6 @@ bool StoreForwardPlusPlusModule::getChainEnd(ChannelHash _ch_hash, uint8_t *_cha return false; } - memcpy(_chain_hash, last_message_chain_hash, 32); - memcpy(_message_hash, last_message_hash, 32); sqlite3_finalize(getEntry); return true; } @@ -580,35 +600,130 @@ bool StoreForwardPlusPlusModule::addToChain(uint32_t to, uint32_t from, uint32_t size_t payload_len) { + // TODO: Make a data structure for this data + // push a message into the local chain DB // destination - sqlite3_bind_int(stmt, 1, to); + sqlite3_bind_int(chain_insert_stmt, 1, to); // sender - sqlite3_bind_int(stmt, 2, from); + sqlite3_bind_int(chain_insert_stmt, 2, from); // packet_id - sqlite3_bind_int(stmt, 3, id); + sqlite3_bind_int(chain_insert_stmt, 3, id); // want_ack - sqlite3_bind_int(stmt, 4, want_ack); + sqlite3_bind_int(chain_insert_stmt, 4, want_ack); // channel_hash - sqlite3_bind_int(stmt, 5, channel_hash); + sqlite3_bind_int(chain_insert_stmt, 5, channel_hash); // encrypted_bytes - sqlite3_bind_blob(stmt, 6, encrypted_bytes, encrypted_len, NULL); + sqlite3_bind_blob(chain_insert_stmt, 6, encrypted_bytes, encrypted_len, NULL); // message_hash - sqlite3_bind_blob(stmt, 7, _message_hash, 32, NULL); + sqlite3_bind_blob(chain_insert_stmt, 7, _message_hash, 32, NULL); // rx_time - sqlite3_bind_int(stmt, 8, _rx_time); + sqlite3_bind_int(chain_insert_stmt, 8, _rx_time); // commit_hash - sqlite3_bind_blob(stmt, 9, _chain_hash, 32, NULL); + sqlite3_bind_blob(chain_insert_stmt, 9, _chain_hash, 32, NULL); // payload - sqlite3_bind_text(stmt, 10, payload_bytes, payload_len, NULL); + sqlite3_bind_text(chain_insert_stmt, 10, payload_bytes, payload_len, NULL); - sqlite3_step(stmt); - sqlite3_reset(stmt); + sqlite3_step(chain_insert_stmt); + sqlite3_reset(chain_insert_stmt); return true; } +bool StoreForwardPlusPlusModule::addToScratch(uint32_t to, uint32_t from, uint32_t id, bool want_ack, ChannelHash channel_hash, + uint8_t *encrypted_bytes, size_t encrypted_len, uint8_t *_message_hash, + uint8_t *_root_hash, uint32_t _rx_time, char *payload_bytes, size_t payload_len) + +{ + // TODO: Make a data structure for this data + + // push a message into the local chain DB + // destination + sqlite3_bind_int(scratch_insert_stmt, 1, to); + // sender + sqlite3_bind_int(scratch_insert_stmt, 2, from); + // packet_id + sqlite3_bind_int(scratch_insert_stmt, 3, id); + // want_ack + sqlite3_bind_int(scratch_insert_stmt, 4, want_ack); + // channel_hash + sqlite3_bind_int(scratch_insert_stmt, 5, channel_hash); + // encrypted_bytes + sqlite3_bind_blob(scratch_insert_stmt, 6, encrypted_bytes, encrypted_len, NULL); + + // message_hash + sqlite3_bind_blob(scratch_insert_stmt, 7, _message_hash, 32, NULL); + // rx_time + sqlite3_bind_int(scratch_insert_stmt, 8, _rx_time); + + // payload + sqlite3_bind_text(scratch_insert_stmt, 10, payload_bytes, payload_len, NULL); + + sqlite3_step(scratch_insert_stmt); + sqlite3_reset(scratch_insert_stmt); + return true; +} + +void StoreForwardPlusPlusModule::canonAnnounce(uint8_t *_message_hash, uint8_t *_chain_hash, uint8_t *_root_hash) +{ + meshtastic_StoreForwardPlusPlus storeforward = meshtastic_StoreForwardPlusPlus_init_zero; + storeforward.sfpp_message_type = meshtastic_StoreForwardPlusPlus_SFPP_message_type_CANON_ANNOUNCE; + // set root hash + + // set message hash + storeforward.message_hash.size = 32; + memcpy(storeforward.message_hash.bytes, _message_hash, 32); + + // set chain hash + storeforward.chain_hash.size = 32; + memcpy(storeforward.chain_hash.bytes, _chain_hash, 32); + + // set root hash + storeforward.root_hash.size = 32; + memcpy(storeforward.root_hash.bytes, _root_hash, 32); + + // storeforward. + meshtastic_MeshPacket *p = allocDataProtobuf(storeforward); + p->to = NODENUM_BROADCAST; + p->decoded.want_response = false; + p->priority = meshtastic_MeshPacket_Priority_BACKGROUND; + p->channel = 0; + LOG_INFO("Send packet to mesh"); + service->sendToMesh(p, RX_SRC_LOCAL, true); +} + +bool StoreForwardPlusPlusModule::isInDB(uint8_t *message_hash_bytes) +{ + sqlite3_bind_blob(checkDup, 1, message_hash_bytes, 32, NULL); + sqlite3_step(checkDup); + int numberFound = sqlite3_column_int(checkDup, 0); + sqlite3_reset(checkDup); + if (numberFound > 0) + return true; + return false; +} + +bool StoreForwardPlusPlusModule::isInScratch(uint8_t *message_hash_bytes) +{ + sqlite3_bind_blob(checkScratch, 1, message_hash_bytes, 32, NULL); + sqlite3_step(checkScratch); + int numberFound = sqlite3_column_int(checkScratch, 0); + sqlite3_reset(checkScratch); + if (numberFound > 0) + return true; + return false; +} + +void StoreForwardPlusPlusModule::removeFromScratch(uint8_t *message_hash_bytes) +{ + LOG_WARN("removeFromScratch"); + sqlite3_bind_blob(removeScratch, 1, message_hash_bytes, 32, NULL); + sqlite3_step(removeScratch); + int numberFound = sqlite3_column_int(removeScratch, 0); + sqlite3_reset(removeScratch); +} + // announce latest hash // chain_end_announce diff --git a/src/modules/Native/StoreForwardPlusPlus.h b/src/modules/Native/StoreForwardPlusPlus.h index 8e2b3fbe3..99a6e62c9 100644 --- a/src/modules/Native/StoreForwardPlusPlus.h +++ b/src/modules/Native/StoreForwardPlusPlus.h @@ -44,7 +44,11 @@ class StoreForwardPlusPlusModule : public ProtobufModule