Improve Wifi Reconnect handling and add outgoing queue for MQTT packets to bridge short connection issues.

This commit is contained in:
Thomas Göttgens
2022-12-16 20:28:28 +01:00
parent 1c8181dc75
commit 68f6cfde0c
4 changed files with 175 additions and 74 deletions

View File

@@ -7,6 +7,7 @@
#include "mesh/Router.h"
#include "mesh/generated/mqtt.pb.h"
#include "mesh/generated/telemetry.pb.h"
#include "mesh/http/WiFiAPClient.h"
#include "sleep.h"
#if HAS_WIFI
#include <WiFi.h>
@@ -20,6 +21,10 @@ String statusTopic = "msh/2/stat/";
String cryptTopic = "msh/2/c/"; // msh/2/c/CHANNELID/NODEID
String jsonTopic = "msh/2/json/"; // msh/2/json/CHANNELID/NODEID
static MemoryDynamic<ServiceEnvelope> staticMqttPool;
Allocator<ServiceEnvelope> &mqttPool = staticMqttPool;
void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length)
{
mqtt->onPublish(topic, payload, length);
@@ -121,7 +126,7 @@ void mqttInit()
new MQTT();
}
MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient)
MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_MQTT_QUEUE)
{
assert(!mqtt);
mqtt = this;
@@ -168,14 +173,23 @@ void MQTT::reconnect()
DEBUG_MSG("MQTT connected\n");
enabled = true; // Start running background process again
runASAP = true;
reconnectCount = 0;
/// FIXME, include more information in the status text
bool ok = pubSub.publish(myStatus.c_str(), "online", true);
DEBUG_MSG("published %d\n", ok);
sendSubscriptions();
} else
DEBUG_MSG("Failed to contact MQTT server...\n");
} else {
DEBUG_MSG("Failed to contact MQTT server (%d/10)...\n",reconnectCount);
#if HAS_WIFI
if (reconnectCount > 9) {
needReconnect = true;
wifiReconnect->setIntervalFromNow(1000);
}
#endif
reconnectCount++;
}
}
}
@@ -231,8 +245,35 @@ int32_t MQTT::runOnce()
if (wantConnection) {
reconnect();
// If we succeeded, start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely)
return pubSub.connected() ? 20 : 30000;
// If we succeeded, empty the queue one by one and start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely)
if (pubSub.connected()) {
if (!mqttQueue.isEmpty()) {
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
ServiceEnvelope *env = mqttQueue.dequeuePtr(0);
static uint8_t bytes[MeshPacket_size + 64];
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), ServiceEnvelope_fields, env);
String topic = cryptTopic + env->channel_id + "/" + owner.id;
DEBUG_MSG("publish %s, %u bytes from queue\n", topic.c_str(), numBytes);
pubSub.publish(topic.c_str(), bytes, numBytes, false);
if (moduleConfig.mqtt.json_enabled) {
// handle json topic
auto jsonString = this->downstreamPacketToJson(env->packet);
if (jsonString.length() != 0) {
String topicJson = jsonTopic + env->channel_id + "/" + owner.id;
DEBUG_MSG("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str());
pubSub.publish(topicJson.c_str(), jsonString.c_str(), false);
}
}
mqttPool.release(env);
}
return 20;
} else {
return 30000;
}
} else
return 5000; // If we don't want connection now, check again in 5 secs
} else {
@@ -251,33 +292,48 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
{
auto &ch = channels.getByIndex(chIndex);
// don't bother sending if not connected...
if (pubSub.connected() && ch.settings.uplink_enabled) {
if (ch.settings.uplink_enabled) {
const char *channelId = channels.getGlobalId(chIndex); // FIXME, for now we just use the human name for the channel
ServiceEnvelope env = ServiceEnvelope_init_default;
env.channel_id = (char *)channelId;
env.gateway_id = owner.id;
env.packet = (MeshPacket *)&mp;
ServiceEnvelope *env = mqttPool.allocZeroed();
env->channel_id = (char *)channelId;
env->gateway_id = owner.id;
env->packet = (MeshPacket *)&mp;
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
static uint8_t bytes[MeshPacket_size + 64];
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), ServiceEnvelope_fields, &env);
// don't bother sending if not connected...
if (pubSub.connected()) {
String topic = cryptTopic + channelId + "/" + owner.id;
DEBUG_MSG("publish %s, %u bytes\n", topic.c_str(), numBytes);
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
static uint8_t bytes[MeshPacket_size + 64];
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), ServiceEnvelope_fields, env);
pubSub.publish(topic.c_str(), bytes, numBytes, false);
String topic = cryptTopic + channelId + "/" + owner.id;
DEBUG_MSG("publish %s, %u bytes\n", topic.c_str(), numBytes);
if (moduleConfig.mqtt.json_enabled) {
// handle json topic
auto jsonString = this->downstreamPacketToJson((MeshPacket *)&mp);
if (jsonString.length() != 0) {
String topicJson = jsonTopic + channelId + "/" + owner.id;
DEBUG_MSG("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str());
pubSub.publish(topicJson.c_str(), jsonString.c_str(), false);
pubSub.publish(topic.c_str(), bytes, numBytes, false);
if (moduleConfig.mqtt.json_enabled) {
// handle json topic
auto jsonString = this->downstreamPacketToJson((MeshPacket *)&mp);
if (jsonString.length() != 0) {
String topicJson = jsonTopic + channelId + "/" + owner.id;
DEBUG_MSG("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str());
pubSub.publish(topicJson.c_str(), jsonString.c_str(), false);
}
}
} else {
DEBUG_MSG("MQTT not connected, queueing packet\n");
if (mqttQueue.numFree() == 0) {
DEBUG_MSG("NOTE: MQTT queue is full, discarding oldest\n");
ServiceEnvelope *d = mqttQueue.dequeuePtr(0);
if (d)
mqttPool.release(d);
}
// make a copy of serviceEnvelope and queue it
ServiceEnvelope *copied = mqttPool.allocCopy(*env);
assert(mqttQueue.enqueue(copied, 0));
}
mqttPool.release(env);
}
}