mqtt: automatically start or stop as neede & attempt reconnect

This commit is contained in:
Kevin Hester
2021-04-05 07:48:46 +08:00
parent d19af8b83d
commit e84edc676f
5 changed files with 73 additions and 42 deletions

View File

@@ -20,14 +20,7 @@ void mqttCallback(char *topic, byte *payload, unsigned int length)
void mqttInit()
{
// FIXME, for now we require the user to specifically set a MQTT server (till tested)
if (radioConfig.preferences.mqtt_disabled || !*radioConfig.preferences.mqtt_server)
DEBUG_MSG("MQTT disabled...\n");
else if (!WiFi.isConnected())
DEBUG_MSG("WiFi is not connected, can not start MQTT\n");
else {
new MQTT();
}
new MQTT();
}
MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient)
@@ -35,6 +28,11 @@ MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient)
assert(!mqtt);
mqtt = this;
pubSub.setCallback(mqttCallback);
}
void MQTT::reconnect()
{
// pubSub.setServer("devsrv.ezdevice.net", 1883); or 192.168.10.188
const char *serverAddr = "test.mosquitto.org"; // "mqtt.meshtastic.org"; // default hostname
@@ -42,9 +40,8 @@ MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient)
serverAddr = radioConfig.preferences.mqtt_server; // Override the default
pubSub.setServer(serverAddr, 1883);
pubSub.setCallback(mqttCallback);
DEBUG_MSG("Connecting to MQTT server: %s\n", serverAddr);
DEBUG_MSG("Connecting to MQTT server\n", serverAddr);
auto myStatus = (statusTopic + owner.id);
// bool connected = pubSub.connect(nodeId.c_str(), "meshdev", "apes4cats", myStatus.c_str(), 1, true, "offline");
bool connected = pubSub.connect(owner.id, myStatus.c_str(), 1, true, "offline");
@@ -61,16 +58,53 @@ MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient)
/// FIXME, include more information in the status text
bool ok = pubSub.publish(myStatus.c_str(), "online", true);
DEBUG_MSG("published %d\n", ok);
} else
DEBUG_MSG("Failed to contact MQTT server...\n");
}
bool MQTT::wantsLink() const
{
bool hasChannel = false;
if (radioConfig.preferences.mqtt_disabled) {
// DEBUG_MSG("MQTT disabled...\n");
} else {
// No need for link if no channel needed it
size_t numChan = channels.getNumChannels();
for (size_t i = 0; i < numChan; i++) {
auto &ch = channels.getByIndex(i);
if (ch.settings.uplink_enabled || ch.settings.downlink_enabled) {
hasChannel = true;
break;
}
}
}
return hasChannel && WiFi.isConnected();
}
int32_t MQTT::runOnce()
{
// If connected poll rapidly, otherwise sleep forever
if (!pubSub.loop())
enabled = false;
bool wantConnection = wantsLink();
return 20;
// If connected poll rapidly, otherwise only occasionally check for a wifi connection change and ability to contact server
if (!pubSub.loop()) {
if (wantConnection) {
reconnect();
// If we succeeded, start reading rapidly, else try again in 30 seconds (TCP connections are EXPENSIVE so try rarely)
return pubSub.connected() ? 20 : 30000;
} else
return 5000; // If we don't want connection now, check again in 5 secs
} else {
// we are connected to server, check often for new requests on the TCP port
if (!wantConnection) {
DEBUG_MSG("MQTT link not needed, dropping\n");
pubSub.disconnect();
}
return 20;
}
}
void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)