diff --git a/arch/esp32/esp32.ini b/arch/esp32/esp32.ini index 0011cc39f..70654e8ec 100644 --- a/arch/esp32/esp32.ini +++ b/arch/esp32/esp32.ini @@ -26,7 +26,6 @@ build_flags = -DCONFIG_NIMBLE_CPP_LOG_LEVEL=2 -DCONFIG_BT_NIMBLE_MAX_CCCDS=20 -DESP_OPENSSL_SUPPRESS_LEGACY_WARNING - -DDEBUG_HEAP lib_deps = ${arduino_base.lib_deps} diff --git a/arch/esp32/esp32s2.ini b/arch/esp32/esp32s2.ini index ca4f576d6..cd47c4ca1 100644 --- a/arch/esp32/esp32s2.ini +++ b/arch/esp32/esp32s2.ini @@ -27,7 +27,6 @@ build_flags = -DCONFIG_BT_NIMBLE_MAX_CCCDS=20 -DESP_OPENSSL_SUPPRESS_LEGACY_WARNING -DHAS_BLUETOOTH=0 - -DDEBUG_HEAP lib_deps = ${arduino_base.lib_deps} diff --git a/arch/esp32/esp32s3.ini b/arch/esp32/esp32s3.ini index b276ceff9..ce64fdbe2 100644 --- a/arch/esp32/esp32s3.ini +++ b/arch/esp32/esp32s3.ini @@ -26,7 +26,6 @@ build_flags = -DCONFIG_NIMBLE_CPP_LOG_LEVEL=2 -DCONFIG_BT_NIMBLE_MAX_CCCDS=20 -DESP_OPENSSL_SUPPRESS_LEGACY_WARNING - -DDEBUG_HEAP lib_deps = ${arduino_base.lib_deps} diff --git a/protobufs b/protobufs index 6048ecd2f..3b0d871ca 160000 --- a/protobufs +++ b/protobufs @@ -1 +1 @@ -Subproject commit 6048ecd2f8433005bb73ab4505386a7bbdb78c86 +Subproject commit 3b0d871ca1e0f8a2ed823f0696e2d7cf31ed2ebd diff --git a/src/mesh/MeshPacketQueue.h b/src/mesh/MeshPacketQueue.h index 8c93b452e..ee6c6954a 100644 --- a/src/mesh/MeshPacketQueue.h +++ b/src/mesh/MeshPacketQueue.h @@ -26,6 +26,12 @@ class MeshPacketQueue /** return true if the queue is empty */ bool empty(); + /** return amount of free packets in Queue */ + size_t getFree() { return maxLen - queue.size(); } + + /** return total size of the Queue */ + size_t getMaxLen() { return maxLen; } + MeshPacket *dequeue(); MeshPacket *getFront(); diff --git a/src/mesh/MeshService.cpp b/src/mesh/MeshService.cpp index c300104a4..208b1b9ea 100644 --- a/src/mesh/MeshService.cpp +++ b/src/mesh/MeshService.cpp @@ -52,10 +52,15 @@ FIXME in the initial proof of concept we just skip the entire want/deny flow and MeshService service; +static MemoryDynamic staticQueueStatusPool; + +Allocator &queueStatusPool = staticQueueStatusPool; + #include "Router.h" -MeshService::MeshService() : toPhoneQueue(MAX_RX_TOPHONE) +MeshService::MeshService() : toPhoneQueue(MAX_RX_TOPHONE), toPhoneQueueStatusQueue(MAX_RX_TOPHONE) { + lastQueueStatus = { 0, 0, 16, 0 }; // assert(MAX_RX_TOPHONE == 32); // FIXME, delete this, just checking my clever macro } @@ -83,6 +88,11 @@ int MeshService::handleFromRadio(const MeshPacket *mp) /// Do idle processing (mostly processing messages which have been queued from the radio) void MeshService::loop() { + if (lastQueueStatus.free == 0) { // check if there is now free space in TX queue + QueueStatus qs = router->getQueueStatus(); + if (qs.free != lastQueueStatus.free) + (void)sendQueueStatusToPhone(qs, 0, 0); + } if (oldFromNum != fromNum) { // We don't want to generate extra notifies for multiple new packets fromNumChanged.notifyObservers(fromNum); oldFromNum = fromNum; @@ -179,12 +189,43 @@ bool MeshService::cancelSending(PacketId id) return router->cancelSending(nodeDB.getNodeNum(), id); } +ErrorCode MeshService::sendQueueStatusToPhone(const QueueStatus &qs, ErrorCode res, uint32_t mesh_packet_id) +{ + QueueStatus *copied = queueStatusPool.allocCopy(qs); + + copied->res = res; + copied->mesh_packet_id = mesh_packet_id; + + if (toPhoneQueueStatusQueue.numFree() == 0) { + LOG_DEBUG("NOTE: tophone queue status queue is full, discarding oldest\n"); + QueueStatus *d = toPhoneQueueStatusQueue.dequeuePtr(0); + if (d) + releaseQueueStatusToPool(d); + } + + lastQueueStatus = *copied; + + res = toPhoneQueueStatusQueue.enqueue(copied, 0); + fromNum++; + + return res ? ERRNO_OK : ERRNO_UNKNOWN; +} + void MeshService::sendToMesh(MeshPacket *p, RxSource src, bool ccToPhone) { + uint32_t mesh_packet_id = p->id; nodeDB.updateFrom(*p); // update our local DB for this packet (because phone might have sent position packets etc...) // Note: We might return !OK if our fifo was full, at that point the only option we have is to drop it - router->sendLocal(p, src); + ErrorCode res = router->sendLocal(p, src); + + /* NOTE(pboldin): Prepare and send QueueStatus message to the phone as a + * high-priority message. */ + QueueStatus qs = router->getQueueStatus(); + ErrorCode r = sendQueueStatusToPhone(qs, res, mesh_packet_id); + if (r != ERRNO_OK) { + LOG_DEBUG("Can't send status to phone"); + } if (ccToPhone) { sendToPhone(p); diff --git a/src/mesh/MeshService.h b/src/mesh/MeshService.h index c3b3c95d9..31b63bd3f 100644 --- a/src/mesh/MeshService.h +++ b/src/mesh/MeshService.h @@ -14,6 +14,8 @@ #include "../platform/portduino/SimRadio.h" #endif +extern Allocator &queueStatusPool; + /** * Top level app for this service. keeps the mesh, the radio config and the queue of received packets. * @@ -29,6 +31,12 @@ class MeshService /// FIXME - save this to flash on deep sleep PointerQueue toPhoneQueue; + // keep list of QueueStatus packets to be send to the phone + PointerQueue toPhoneQueueStatusQueue; + + // This holds the last QueueStatus send + QueueStatus lastQueueStatus; + /// The current nonce for the newest packet which has been queued for the phone uint32_t fromNum = 0; @@ -56,6 +64,12 @@ class MeshService /// Allows the bluetooth handler to free packets after they have been sent void releaseToPool(MeshPacket *p) { packetPool.release(p); } + /// Return the next QueueStatus packet destined to the phone. + QueueStatus *getQueueStatusForPhone() { return toPhoneQueueStatusQueue.dequeuePtr(0); } + + // Release QueueStatus packet to pool + void releaseQueueStatusToPool(QueueStatus *p) { queueStatusPool.release(p); } + /** * Given a ToRadio buffer parse it and properly handle it (setup radio, owner or send packet into the mesh) * Called by PhoneAPI.handleToRadio. Note: p is a scratch buffer, this function is allowed to write to it but it can not keep @@ -100,6 +114,8 @@ class MeshService /// needs to keep the packet around it makes a copy int handleFromRadio(const MeshPacket *p); friend class RoutingModule; + + ErrorCode sendQueueStatusToPhone(const QueueStatus &qs, ErrorCode res, uint32_t mesh_packet_id); }; extern MeshService service; diff --git a/src/mesh/PhoneAPI.cpp b/src/mesh/PhoneAPI.cpp index 897ddfe8a..c1d56b7ec 100644 --- a/src/mesh/PhoneAPI.cpp +++ b/src/mesh/PhoneAPI.cpp @@ -50,6 +50,7 @@ void PhoneAPI::close() unobserve(&service.fromNumChanged); releasePhonePacket(); // Don't leak phone packets on shutdown + releaseQueueStatusPhonePacket(); onConnectionChanged(false); } @@ -282,14 +283,19 @@ size_t PhoneAPI::getFromRadio(uint8_t *buf) case STATE_SEND_PACKETS: // Do we have a message from the mesh? LOG_INFO("getFromRadio=STATE_SEND_PACKETS\n"); - if (packetForPhone) { + if (queueStatusPacketForPhone) { + + fromRadioScratch.which_payload_variant = FromRadio_queueStatus_tag; + fromRadioScratch.queueStatus = *queueStatusPacketForPhone; + releaseQueueStatusPhonePacket(); + } else if (packetForPhone) { printPacket("phone downloaded packet", packetForPhone); // Encapsulate as a FromRadio packet fromRadioScratch.which_payload_variant = FromRadio_packet_tag; fromRadioScratch.packet = *packetForPhone; + releasePhonePacket(); } - releasePhonePacket(); break; default: @@ -322,6 +328,14 @@ void PhoneAPI::releasePhonePacket() } } +void PhoneAPI::releaseQueueStatusPhonePacket() +{ + if (queueStatusPacketForPhone) { + service.releaseQueueStatusToPool(queueStatusPacketForPhone); + queueStatusPacketForPhone = NULL; + } +} + /** * Return true if we have data available to send to the phone */ @@ -342,9 +356,15 @@ bool PhoneAPI::available() return true; // Always say we have something, because we might need to advance our state machine case STATE_SEND_PACKETS: { + if (!queueStatusPacketForPhone) + queueStatusPacketForPhone = service.getQueueStatusForPhone(); + bool hasPacket = !!queueStatusPacketForPhone; + if (hasPacket) + return true; + if (!packetForPhone) packetForPhone = service.getForPhone(); - bool hasPacket = !!packetForPhone; + hasPacket = !!packetForPhone; // LOG_DEBUG("available hasPacket=%d\n", hasPacket); return hasPacket; } diff --git a/src/mesh/PhoneAPI.h b/src/mesh/PhoneAPI.h index cbac5f688..2f2695807 100644 --- a/src/mesh/PhoneAPI.h +++ b/src/mesh/PhoneAPI.h @@ -42,6 +42,9 @@ class PhoneAPI : public Observer // FIXME, we shouldn't be inheriting /// downloads it MeshPacket *packetForPhone = NULL; + // Keep QueueStatus packet just as packetForPhone + QueueStatus *queueStatusPacketForPhone = NULL; + /// We temporarily keep the nodeInfo here between the call to available and getFromRadio const NodeInfo *nodeInfoForPhone = NULL; @@ -115,6 +118,8 @@ class PhoneAPI : public Observer // FIXME, we shouldn't be inheriting private: void releasePhonePacket(); + void releaseQueueStatusPhonePacket(); + /// begin a new connection void handleStartConfig(); diff --git a/src/mesh/RadioInterface.h b/src/mesh/RadioInterface.h index f50c0ae77..59c63add2 100644 --- a/src/mesh/RadioInterface.h +++ b/src/mesh/RadioInterface.h @@ -115,6 +115,13 @@ class RadioInterface */ virtual ErrorCode send(MeshPacket *p) = 0; + /** Return TX queue status */ + virtual QueueStatus getQueueStatus() { + QueueStatus qs; + qs.res = qs.mesh_packet_id = qs.free = qs.maxlen = 0; + return qs; + } + /** Attempt to cancel a previously sent packet. Returns true if a packet was found we could cancel */ virtual bool cancelSending(NodeNum from, PacketId id) { return false; } diff --git a/src/mesh/RadioLibInterface.cpp b/src/mesh/RadioLibInterface.cpp index 53eaeca60..81a4d803e 100644 --- a/src/mesh/RadioLibInterface.cpp +++ b/src/mesh/RadioLibInterface.cpp @@ -158,6 +158,17 @@ ErrorCode RadioLibInterface::send(MeshPacket *p) #endif } +QueueStatus RadioLibInterface::getQueueStatus() +{ + QueueStatus qs; + + qs.res = qs.mesh_packet_id = 0; + qs.free = txQueue.getFree(); + qs.maxlen = txQueue.getMaxLen(); + + return qs; +} + bool RadioLibInterface::canSleep() { bool res = txQueue.empty(); diff --git a/src/mesh/RadioLibInterface.h b/src/mesh/RadioLibInterface.h index 16495c2f4..628ea863f 100644 --- a/src/mesh/RadioLibInterface.h +++ b/src/mesh/RadioLibInterface.h @@ -153,6 +153,8 @@ class RadioLibInterface : public RadioInterface, protected concurrency::Notified */ virtual void startSend(MeshPacket *txp); + QueueStatus getQueueStatus(); + protected: /** Do any hardware setup needed on entry into send configuration for the radio. Subclasses can customize */ diff --git a/src/mesh/Router.cpp b/src/mesh/Router.cpp index 0a8497463..0f34010ec 100644 --- a/src/mesh/Router.cpp +++ b/src/mesh/Router.cpp @@ -148,6 +148,11 @@ void Router::setReceivedMessage() runASAP = true; } +QueueStatus Router::getQueueStatus() +{ + return iface->getQueueStatus(); +} + ErrorCode Router::sendLocal(MeshPacket *p, RxSource src) { // No need to deliver externally if the destination is the local node diff --git a/src/mesh/Router.h b/src/mesh/Router.h index f7748bb2b..5a9cc0702 100644 --- a/src/mesh/Router.h +++ b/src/mesh/Router.h @@ -55,6 +55,9 @@ class Router : protected concurrency::OSThread */ MeshPacket *allocForSending(); + /** Return Underlying interface's TX queue status */ + QueueStatus getQueueStatus(); + /** * @return our local nodenum */ NodeNum getNodeNum(); diff --git a/src/mesh/generated/mesh.pb.c b/src/mesh/generated/mesh.pb.c index bd749b6cb..29594c612 100644 --- a/src/mesh/generated/mesh.pb.c +++ b/src/mesh/generated/mesh.pb.c @@ -36,6 +36,9 @@ PB_BIND(MyNodeInfo, MyNodeInfo, AUTO) PB_BIND(LogRecord, LogRecord, AUTO) +PB_BIND(QueueStatus, QueueStatus, AUTO) + + PB_BIND(FromRadio, FromRadio, 2) diff --git a/src/mesh/generated/mesh.pb.h b/src/mesh/generated/mesh.pb.h index 31e7817a0..d1f3e5bf5 100644 --- a/src/mesh/generated/mesh.pb.h +++ b/src/mesh/generated/mesh.pb.h @@ -621,6 +621,17 @@ typedef struct _LogRecord { LogRecord_Level level; } LogRecord; +typedef struct _QueueStatus { + /* Last attempt to queue status, ErrorCode */ + int8_t res; + /* Free entries in the outgoing queue */ + uint8_t free; + /* Maximum entries in the outgoing queue */ + uint8_t maxlen; + /* What was mesh packet id that generated this response? */ + uint32_t mesh_packet_id; +} QueueStatus; + /* Packets from the radio to the phone will appear on the fromRadio characteristic. It will support READ and NOTIFY. When a new packet arrives the device will BLE notify? It will sit in that descriptor until consumed by the phone, @@ -657,6 +668,8 @@ typedef struct _FromRadio { ModuleConfig moduleConfig; /* One packet is sent for each channel */ Channel channel; + /* Queue status info */ + QueueStatus queueStatus; }; } FromRadio; @@ -755,6 +768,7 @@ extern "C" { + #define Compressed_portnum_ENUMTYPE PortNum @@ -769,6 +783,7 @@ extern "C" { #define NodeInfo_init_default {0, false, User_init_default, false, Position_init_default, 0, 0, false, DeviceMetrics_init_default} #define MyNodeInfo_init_default {0, 0, 0, "", _CriticalErrorCode_MIN, 0, 0, 0, 0, 0, 0, 0, {0, 0, 0, 0, 0, 0, 0, 0}, 0, {0, 0, 0, 0, 0, 0, 0, 0}, 0, 0, 0} #define LogRecord_init_default {"", 0, "", _LogRecord_Level_MIN} +#define QueueStatus_init_default {0, 0, 0, 0} #define FromRadio_init_default {0, 0, {MeshPacket_init_default}} #define ToRadio_init_default {0, {MeshPacket_init_default}} #define Compressed_init_default {_PortNum_MIN, {0, {0}}} @@ -782,6 +797,7 @@ extern "C" { #define NodeInfo_init_zero {0, false, User_init_zero, false, Position_init_zero, 0, 0, false, DeviceMetrics_init_zero} #define MyNodeInfo_init_zero {0, 0, 0, "", _CriticalErrorCode_MIN, 0, 0, 0, 0, 0, 0, 0, {0, 0, 0, 0, 0, 0, 0, 0}, 0, {0, 0, 0, 0, 0, 0, 0, 0}, 0, 0, 0} #define LogRecord_init_zero {"", 0, "", _LogRecord_Level_MIN} +#define QueueStatus_init_zero {0, 0, 0, 0} #define FromRadio_init_zero {0, 0, {MeshPacket_init_zero}} #define ToRadio_init_zero {0, {MeshPacket_init_zero}} #define Compressed_init_zero {_PortNum_MIN, {0, {0}}} @@ -873,6 +889,10 @@ extern "C" { #define LogRecord_time_tag 2 #define LogRecord_source_tag 3 #define LogRecord_level_tag 4 +#define QueueStatus_res_tag 1 +#define QueueStatus_free_tag 2 +#define QueueStatus_maxlen_tag 3 +#define QueueStatus_mesh_packet_id_tag 4 #define FromRadio_id_tag 1 #define FromRadio_packet_tag 2 #define FromRadio_my_info_tag 3 @@ -883,6 +903,7 @@ extern "C" { #define FromRadio_rebooted_tag 8 #define FromRadio_moduleConfig_tag 9 #define FromRadio_channel_tag 10 +#define FromRadio_queueStatus_tag 11 #define ToRadio_packet_tag 1 #define ToRadio_want_config_id_tag 3 #define ToRadio_disconnect_tag 4 @@ -1022,6 +1043,14 @@ X(a, STATIC, SINGULAR, UENUM, level, 4) #define LogRecord_CALLBACK NULL #define LogRecord_DEFAULT NULL +#define QueueStatus_FIELDLIST(X, a) \ +X(a, STATIC, SINGULAR, INT32, res, 1) \ +X(a, STATIC, SINGULAR, UINT32, free, 2) \ +X(a, STATIC, SINGULAR, UINT32, maxlen, 3) \ +X(a, STATIC, SINGULAR, UINT32, mesh_packet_id, 4) +#define QueueStatus_CALLBACK NULL +#define QueueStatus_DEFAULT NULL + #define FromRadio_FIELDLIST(X, a) \ X(a, STATIC, SINGULAR, UINT32, id, 1) \ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,packet,packet), 2) \ @@ -1032,7 +1061,8 @@ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,log_record,log_record), 6) X(a, STATIC, ONEOF, UINT32, (payload_variant,config_complete_id,config_complete_id), 7) \ X(a, STATIC, ONEOF, BOOL, (payload_variant,rebooted,rebooted), 8) \ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,moduleConfig,moduleConfig), 9) \ -X(a, STATIC, ONEOF, MESSAGE, (payload_variant,channel,channel), 10) +X(a, STATIC, ONEOF, MESSAGE, (payload_variant,channel,channel), 10) \ +X(a, STATIC, ONEOF, MESSAGE, (payload_variant,queueStatus,queueStatus), 11) #define FromRadio_CALLBACK NULL #define FromRadio_DEFAULT NULL #define FromRadio_payload_variant_packet_MSGTYPE MeshPacket @@ -1042,6 +1072,7 @@ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,channel,channel), 10) #define FromRadio_payload_variant_log_record_MSGTYPE LogRecord #define FromRadio_payload_variant_moduleConfig_MSGTYPE ModuleConfig #define FromRadio_payload_variant_channel_MSGTYPE Channel +#define FromRadio_payload_variant_queueStatus_MSGTYPE QueueStatus #define ToRadio_FIELDLIST(X, a) \ X(a, STATIC, ONEOF, MESSAGE, (payload_variant,packet,packet), 1) \ @@ -1067,6 +1098,7 @@ extern const pb_msgdesc_t MeshPacket_msg; extern const pb_msgdesc_t NodeInfo_msg; extern const pb_msgdesc_t MyNodeInfo_msg; extern const pb_msgdesc_t LogRecord_msg; +extern const pb_msgdesc_t QueueStatus_msg; extern const pb_msgdesc_t FromRadio_msg; extern const pb_msgdesc_t ToRadio_msg; extern const pb_msgdesc_t Compressed_msg; @@ -1082,6 +1114,7 @@ extern const pb_msgdesc_t Compressed_msg; #define NodeInfo_fields &NodeInfo_msg #define MyNodeInfo_fields &MyNodeInfo_msg #define LogRecord_fields &LogRecord_msg +#define QueueStatus_fields &QueueStatus_msg #define FromRadio_fields &FromRadio_msg #define ToRadio_fields &ToRadio_msg #define Compressed_fields &Compressed_msg @@ -1095,6 +1128,7 @@ extern const pb_msgdesc_t Compressed_msg; #define MyNodeInfo_size 179 #define NodeInfo_size 258 #define Position_size 137 +#define QueueStatus_size 23 #define RouteDiscovery_size 40 #define Routing_size 42 #define ToRadio_size 324 diff --git a/src/mesh/generated/module_config.pb.h b/src/mesh/generated/module_config.pb.h index 52b551590..da13ac706 100644 --- a/src/mesh/generated/module_config.pb.h +++ b/src/mesh/generated/module_config.pb.h @@ -301,6 +301,7 @@ extern "C" { + #define ModuleConfig_AudioConfig_bitrate_ENUMTYPE ModuleConfig_AudioConfig_Audio_Baud #define ModuleConfig_SerialConfig_baud_ENUMTYPE ModuleConfig_SerialConfig_Serial_Baud diff --git a/src/mqtt/MQTT.cpp b/src/mqtt/MQTT.cpp index f4004139a..49ad586dd 100644 --- a/src/mqtt/MQTT.cpp +++ b/src/mqtt/MQTT.cpp @@ -21,10 +21,6 @@ String statusTopic = "msh/2/stat/"; String cryptTopic = "msh/2/c/"; // msh/2/c/CHANNELID/NODEID String jsonTopic = "msh/2/json/"; // msh/2/json/CHANNELID/NODEID -static MemoryDynamic staticMqttPool; - -Allocator &mqttPool = staticMqttPool; - void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length) { mqtt->onPublish(topic, payload, length); @@ -126,7 +122,7 @@ void mqttInit() new MQTT(); } -MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_MQTT_QUEUE) +MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient) { if(moduleConfig.mqtt.enabled) { @@ -253,36 +249,9 @@ int32_t MQTT::runOnce() if (!pubSub.loop()) { if (wantConnection) { reconnect(); - - // If we succeeded, empty the queue one by one and start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely) - if (pubSub.connected()) { - if (!mqttQueue.isEmpty()) { - // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets - ServiceEnvelope *env = mqttQueue.dequeuePtr(0); - static uint8_t bytes[MeshPacket_size + 64]; - size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env); - - String topic = cryptTopic + env->channel_id + "/" + owner.id; - LOG_INFO("publish %s, %u bytes from queue\n", topic.c_str(), numBytes); - - - pubSub.publish(topic.c_str(), bytes, numBytes, false); - - if (moduleConfig.mqtt.json_enabled) { - // handle json topic - auto jsonString = this->downstreamPacketToJson(env->packet); - if (jsonString.length() != 0) { - String topicJson = jsonTopic + env->channel_id + "/" + owner.id; - LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str()); - pubSub.publish(topicJson.c_str(), jsonString.c_str(), false); - } - } - mqttPool.release(env); - } - return 20; - } else { - return 30000; - } + + // If we succeeded, start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely) + return pubSub.connected() ? 20 : 30000; } else return 5000; // If we don't want connection now, check again in 5 secs } else { @@ -304,17 +273,17 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex) if (ch.settings.uplink_enabled) { const char *channelId = channels.getGlobalId(chIndex); // FIXME, for now we just use the human name for the channel - ServiceEnvelope *env = mqttPool.allocZeroed(); - env->channel_id = (char *)channelId; - env->gateway_id = owner.id; - env->packet = (MeshPacket *)∓ + ServiceEnvelope env = ServiceEnvelope_init_default; + env.channel_id = (char *)channelId; + env.gateway_id = owner.id; + env.packet = (MeshPacket *)∓ // don't bother sending if not connected... if (pubSub.connected()) { // FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets static uint8_t bytes[MeshPacket_size + 64]; - size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env); + size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, &env); String topic = cryptTopic + channelId + "/" + owner.id; LOG_DEBUG("publish %s, %u bytes\n", topic.c_str(), numBytes); @@ -330,19 +299,7 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex) pubSub.publish(topicJson.c_str(), jsonString.c_str(), false); } } - } else { - LOG_INFO("MQTT not connected, queueing packet\n"); - if (mqttQueue.numFree() == 0) { - LOG_WARN("NOTE: MQTT queue is full, discarding oldest\n"); - ServiceEnvelope *d = mqttQueue.dequeuePtr(0); - if (d) - mqttPool.release(d); - } - // make a copy of serviceEnvelope and queue it - ServiceEnvelope *copied = mqttPool.allocCopy(*env); - assert(mqttQueue.enqueue(copied, 0)); } - mqttPool.release(env); } } diff --git a/src/mqtt/MQTT.h b/src/mqtt/MQTT.h index ddbacbcc4..33fbbb8eb 100644 --- a/src/mqtt/MQTT.h +++ b/src/mqtt/MQTT.h @@ -13,8 +13,6 @@ #include #endif -#define MAX_MQTT_QUEUE 32 - /** * Our wrapper/singleton for sending/receiving MQTT "udp" packets. This object isolates the MQTT protocol implementation from * the two components that use it: MQTTPlugin and MQTTSimInterface. @@ -55,10 +53,9 @@ class MQTT : private concurrency::OSThread bool connected(); protected: - PointerQueue mqttQueue; int reconnectCount = 0; - + virtual int32_t runOnce() override; private: diff --git a/src/platform/portduino/SimRadio.cpp b/src/platform/portduino/SimRadio.cpp index 87800de22..a2611a464 100644 --- a/src/platform/portduino/SimRadio.cpp +++ b/src/platform/portduino/SimRadio.cpp @@ -215,6 +215,16 @@ void SimRadio::startReceive(MeshPacket *p) { handleReceiveInterrupt(p); } +QueueStatus SimRadio::getQueueStatus() +{ + QueueStatus qs; + + qs.res = qs.mesh_packet_id = 0; + qs.free = txQueue.getFree(); + qs.maxlen = txQueue.getMaxLen(); + + return qs; +} void SimRadio::handleReceiveInterrupt(MeshPacket *p) { diff --git a/src/platform/portduino/SimRadio.h b/src/platform/portduino/SimRadio.h index a71cf22f8..d2a36c81e 100644 --- a/src/platform/portduino/SimRadio.h +++ b/src/platform/portduino/SimRadio.h @@ -45,6 +45,9 @@ class SimRadio : public RadioInterface */ virtual void startReceive(MeshPacket *p); + QueueStatus getQueueStatus() override; + + protected: /// are _trying_ to receive a packet currently (note - we might just be waiting for one) bool isReceiving = false;