mirror of
https://github.com/meshtastic/firmware.git
synced 2025-12-20 17:52:35 +00:00
Add QueueStatus sending to the firmware (#1820)
* Yank mqtt service envelope queue * trybuildfix mqtt system * removed too much * no excessive heap debugging on release builds * send QueueStatus messages The QueueStatus message is sent as a response to the attempt to queue an outgoing MeshPacket and contains statuses of the last queue attempt, TX Queue space and capacity and MeshPacket.id that was queued. When TX Queue changes status from completely full to at least a single slot free a QueueStatus message is also sent to notify that user can queue more messages. Signed-off-by: Pavel Boldin <pavel.b@techspark.engineering> * WIP: update protobufs Signed-off-by: Pavel Boldin <pavel.b@techspark.engineering> * update protobufs * regen protos Signed-off-by: Pavel Boldin <pavel.b@techspark.engineering> Co-authored-by: Ben Meadors <benmmeadors@gmail.com> Co-authored-by: Thomas Göttgens <tgoettgens@gmail.com> Co-authored-by: Sacha Weatherstone <sachaw100@hotmail.com>
This commit is contained in:
@@ -21,10 +21,6 @@ String statusTopic = "msh/2/stat/";
|
||||
String cryptTopic = "msh/2/c/"; // msh/2/c/CHANNELID/NODEID
|
||||
String jsonTopic = "msh/2/json/"; // msh/2/json/CHANNELID/NODEID
|
||||
|
||||
static MemoryDynamic<ServiceEnvelope> staticMqttPool;
|
||||
|
||||
Allocator<ServiceEnvelope> &mqttPool = staticMqttPool;
|
||||
|
||||
void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length)
|
||||
{
|
||||
mqtt->onPublish(topic, payload, length);
|
||||
@@ -126,7 +122,7 @@ void mqttInit()
|
||||
new MQTT();
|
||||
}
|
||||
|
||||
MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_MQTT_QUEUE)
|
||||
MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient)
|
||||
{
|
||||
if(moduleConfig.mqtt.enabled) {
|
||||
|
||||
@@ -253,36 +249,9 @@ int32_t MQTT::runOnce()
|
||||
if (!pubSub.loop()) {
|
||||
if (wantConnection) {
|
||||
reconnect();
|
||||
|
||||
// If we succeeded, empty the queue one by one and start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely)
|
||||
if (pubSub.connected()) {
|
||||
if (!mqttQueue.isEmpty()) {
|
||||
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
|
||||
ServiceEnvelope *env = mqttQueue.dequeuePtr(0);
|
||||
static uint8_t bytes[MeshPacket_size + 64];
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env);
|
||||
|
||||
String topic = cryptTopic + env->channel_id + "/" + owner.id;
|
||||
LOG_INFO("publish %s, %u bytes from queue\n", topic.c_str(), numBytes);
|
||||
|
||||
|
||||
pubSub.publish(topic.c_str(), bytes, numBytes, false);
|
||||
|
||||
if (moduleConfig.mqtt.json_enabled) {
|
||||
// handle json topic
|
||||
auto jsonString = this->downstreamPacketToJson(env->packet);
|
||||
if (jsonString.length() != 0) {
|
||||
String topicJson = jsonTopic + env->channel_id + "/" + owner.id;
|
||||
LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str());
|
||||
pubSub.publish(topicJson.c_str(), jsonString.c_str(), false);
|
||||
}
|
||||
}
|
||||
mqttPool.release(env);
|
||||
}
|
||||
return 20;
|
||||
} else {
|
||||
return 30000;
|
||||
}
|
||||
|
||||
// If we succeeded, start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely)
|
||||
return pubSub.connected() ? 20 : 30000;
|
||||
} else
|
||||
return 5000; // If we don't want connection now, check again in 5 secs
|
||||
} else {
|
||||
@@ -304,17 +273,17 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
|
||||
if (ch.settings.uplink_enabled) {
|
||||
const char *channelId = channels.getGlobalId(chIndex); // FIXME, for now we just use the human name for the channel
|
||||
|
||||
ServiceEnvelope *env = mqttPool.allocZeroed();
|
||||
env->channel_id = (char *)channelId;
|
||||
env->gateway_id = owner.id;
|
||||
env->packet = (MeshPacket *)∓
|
||||
ServiceEnvelope env = ServiceEnvelope_init_default;
|
||||
env.channel_id = (char *)channelId;
|
||||
env.gateway_id = owner.id;
|
||||
env.packet = (MeshPacket *)∓
|
||||
|
||||
// don't bother sending if not connected...
|
||||
if (pubSub.connected()) {
|
||||
|
||||
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
|
||||
static uint8_t bytes[MeshPacket_size + 64];
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env);
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, &env);
|
||||
|
||||
String topic = cryptTopic + channelId + "/" + owner.id;
|
||||
LOG_DEBUG("publish %s, %u bytes\n", topic.c_str(), numBytes);
|
||||
@@ -330,19 +299,7 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
|
||||
pubSub.publish(topicJson.c_str(), jsonString.c_str(), false);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOG_INFO("MQTT not connected, queueing packet\n");
|
||||
if (mqttQueue.numFree() == 0) {
|
||||
LOG_WARN("NOTE: MQTT queue is full, discarding oldest\n");
|
||||
ServiceEnvelope *d = mqttQueue.dequeuePtr(0);
|
||||
if (d)
|
||||
mqttPool.release(d);
|
||||
}
|
||||
// make a copy of serviceEnvelope and queue it
|
||||
ServiceEnvelope *copied = mqttPool.allocCopy(*env);
|
||||
assert(mqttQueue.enqueue(copied, 0));
|
||||
}
|
||||
mqttPool.release(env);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user