mirror of
https://github.com/meshtastic/firmware.git
synced 2025-12-20 01:32:40 +00:00
Merge branch 'master' into neighborinfo
This commit is contained in:
@@ -25,12 +25,16 @@ Allocator<meshtastic_ServiceEnvelope> &mqttPool = staticMqttPool;
|
||||
|
||||
void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length)
|
||||
{
|
||||
mqtt->onPublish(topic, payload, length);
|
||||
mqtt->onReceive(topic, payload, length);
|
||||
}
|
||||
|
||||
void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
|
||||
void MQTT::onClientProxyReceive(meshtastic_MqttClientProxyMessage msg)
|
||||
{
|
||||
onReceive(msg.topic, msg.payload_variant.data.bytes, msg.payload_variant.data.size);
|
||||
}
|
||||
|
||||
void MQTT::onReceive(char *topic, byte *payload, size_t length)
|
||||
{
|
||||
// parsing ServiceEnvelope
|
||||
meshtastic_ServiceEnvelope e = meshtastic_ServiceEnvelope_init_default;
|
||||
|
||||
if (moduleConfig.mqtt.json_enabled && (strncmp(topic, jsonTopic.c_str(), jsonTopic.length()) == 0)) {
|
||||
@@ -153,10 +157,13 @@ void mqttInit()
|
||||
new MQTT();
|
||||
}
|
||||
|
||||
#ifdef HAS_NETWORKING
|
||||
MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_MQTT_QUEUE)
|
||||
#else
|
||||
MQTT::MQTT() : concurrency::OSThread("mqtt"), mqttQueue(MAX_MQTT_QUEUE)
|
||||
#endif
|
||||
{
|
||||
if (moduleConfig.mqtt.enabled) {
|
||||
|
||||
assert(!mqtt);
|
||||
mqtt = this;
|
||||
|
||||
@@ -170,22 +177,77 @@ MQTT::MQTT() : concurrency::OSThread("mqtt"), pubSub(mqttClient), mqttQueue(MAX_
|
||||
jsonTopic = "msh" + jsonTopic;
|
||||
}
|
||||
|
||||
pubSub.setCallback(mqttCallback);
|
||||
|
||||
#ifdef HAS_NETWORKING
|
||||
if (!moduleConfig.mqtt.proxy_to_client_enabled)
|
||||
pubSub.setCallback(mqttCallback);
|
||||
#endif
|
||||
// preflightSleepObserver.observe(&preflightSleep);
|
||||
} else {
|
||||
disable();
|
||||
}
|
||||
}
|
||||
|
||||
bool MQTT::connected()
|
||||
bool MQTT::isConnectedDirectly()
|
||||
{
|
||||
#ifdef HAS_NETWORKING
|
||||
return pubSub.connected();
|
||||
#else
|
||||
return false;
|
||||
#endif
|
||||
}
|
||||
|
||||
bool MQTT::publish(const char *topic, const char *payload, bool retained)
|
||||
{
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled) {
|
||||
meshtastic_MqttClientProxyMessage *msg = mqttClientProxyMessagePool.allocZeroed();
|
||||
msg->which_payload_variant = meshtastic_MqttClientProxyMessage_text_tag;
|
||||
strcpy(msg->topic, topic);
|
||||
strcpy(msg->payload_variant.text, payload);
|
||||
msg->retained = retained;
|
||||
service.sendMqttMessageToClientProxy(msg);
|
||||
return true;
|
||||
}
|
||||
#ifdef HAS_NETWORKING
|
||||
else if (isConnectedDirectly()) {
|
||||
return pubSub.publish(topic, payload, retained);
|
||||
}
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
|
||||
bool MQTT::publish(const char *topic, const uint8_t *payload, size_t length, bool retained)
|
||||
{
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled) {
|
||||
meshtastic_MqttClientProxyMessage *msg = mqttClientProxyMessagePool.allocZeroed();
|
||||
msg->which_payload_variant = meshtastic_MqttClientProxyMessage_data_tag;
|
||||
strcpy(msg->topic, topic);
|
||||
msg->payload_variant.data.size = length;
|
||||
memcpy(msg->payload_variant.data.bytes, payload, length);
|
||||
msg->retained = retained;
|
||||
service.sendMqttMessageToClientProxy(msg);
|
||||
return true;
|
||||
}
|
||||
#ifdef HAS_NETWORKING
|
||||
else if (isConnectedDirectly()) {
|
||||
return pubSub.publish(topic, payload, length, retained);
|
||||
}
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
|
||||
void MQTT::reconnect()
|
||||
{
|
||||
if (wantsLink()) {
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled) {
|
||||
LOG_INFO("MQTT connecting via client proxy instead...\n");
|
||||
enabled = true;
|
||||
runASAP = true;
|
||||
reconnectCount = 0;
|
||||
|
||||
publishStatus();
|
||||
return; // Don't try to connect directly to the server
|
||||
}
|
||||
#ifdef HAS_NETWORKING
|
||||
// Defaults
|
||||
int serverPort = 1883;
|
||||
const char *serverAddr = default_mqtt_address;
|
||||
@@ -197,7 +259,6 @@ void MQTT::reconnect()
|
||||
mqttUsername = moduleConfig.mqtt.username;
|
||||
mqttPassword = moduleConfig.mqtt.password;
|
||||
}
|
||||
|
||||
#if HAS_WIFI && !defined(ARCH_PORTDUINO)
|
||||
if (moduleConfig.mqtt.tls_enabled) {
|
||||
// change default for encrypted to 8883
|
||||
@@ -214,7 +275,7 @@ void MQTT::reconnect()
|
||||
LOG_INFO("Using non-TLS-encrypted session\n");
|
||||
pubSub.setClient(mqttClient);
|
||||
}
|
||||
#else
|
||||
#elif HAS_NETWORKING
|
||||
pubSub.setClient(mqttClient);
|
||||
#endif
|
||||
|
||||
@@ -229,8 +290,9 @@ void MQTT::reconnect()
|
||||
pubSub.setServer(serverAddr, serverPort);
|
||||
pubSub.setBufferSize(512);
|
||||
|
||||
LOG_INFO("Connecting to MQTT server %s, port: %d, username: %s, password: %s\n", serverAddr, serverPort, mqttUsername,
|
||||
mqttPassword);
|
||||
LOG_INFO("Attempting to connect directly to MQTT server %s, port: %d, username: %s, password: %s\n", serverAddr,
|
||||
serverPort, mqttUsername, mqttPassword);
|
||||
|
||||
auto myStatus = (statusTopic + owner.id);
|
||||
bool connected = pubSub.connect(owner.id, mqttUsername, mqttPassword, myStatus.c_str(), 1, true, "offline");
|
||||
if (connected) {
|
||||
@@ -239,15 +301,12 @@ void MQTT::reconnect()
|
||||
runASAP = true;
|
||||
reconnectCount = 0;
|
||||
|
||||
/// FIXME, include more information in the status text
|
||||
bool ok = pubSub.publish(myStatus.c_str(), "online", true);
|
||||
LOG_INFO("published %d\n", ok);
|
||||
|
||||
publishStatus();
|
||||
sendSubscriptions();
|
||||
} else {
|
||||
#if HAS_WIFI && !defined(ARCH_PORTDUINO)
|
||||
reconnectCount++;
|
||||
LOG_ERROR("Failed to contact MQTT server (%d/%d)...\n", reconnectCount, reconnectMax);
|
||||
LOG_ERROR("Failed to contact MQTT server directly (%d/%d)...\n", reconnectCount, reconnectMax);
|
||||
if (reconnectCount >= reconnectMax) {
|
||||
needReconnect = true;
|
||||
wifiReconnect->setIntervalFromNow(0);
|
||||
@@ -255,11 +314,13 @@ void MQTT::reconnect()
|
||||
}
|
||||
#endif
|
||||
}
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
void MQTT::sendSubscriptions()
|
||||
{
|
||||
#ifdef HAS_NETWORKING
|
||||
size_t numChan = channels.getNumChannels();
|
||||
for (size_t i = 0; i < numChan; i++) {
|
||||
auto &ch = channels.getByIndex(i);
|
||||
@@ -274,6 +335,7 @@ void MQTT::sendSubscriptions()
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
bool MQTT::wantsLink() const
|
||||
@@ -291,60 +353,44 @@ bool MQTT::wantsLink() const
|
||||
}
|
||||
}
|
||||
}
|
||||
if (hasChannel && moduleConfig.mqtt.proxy_to_client_enabled)
|
||||
return true;
|
||||
|
||||
#if HAS_WIFI
|
||||
return hasChannel && WiFi.isConnected();
|
||||
#endif
|
||||
#if HAS_ETHERNET
|
||||
return hasChannel && (Ethernet.linkStatus() == LinkON);
|
||||
return hasChannel && Ethernet.linkStatus() == LinkON;
|
||||
#endif
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t MQTT::runOnce()
|
||||
{
|
||||
if (!moduleConfig.mqtt.enabled) {
|
||||
if (!moduleConfig.mqtt.enabled)
|
||||
return disable();
|
||||
}
|
||||
|
||||
bool wantConnection = wantsLink();
|
||||
|
||||
// If connected poll rapidly, otherwise only occasionally check for a wifi connection change and ability to contact server
|
||||
if (!pubSub.loop()) {
|
||||
if (wantConnection) {
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled) {
|
||||
publishQueuedMessages();
|
||||
return 200;
|
||||
}
|
||||
#ifdef HAS_NETWORKING
|
||||
else if (!pubSub.loop()) {
|
||||
if (!wantConnection)
|
||||
return 5000; // If we don't want connection now, check again in 5 secs
|
||||
else {
|
||||
reconnect();
|
||||
|
||||
// If we succeeded, empty the queue one by one and start reading rapidly, else try again in 30 seconds (TCP
|
||||
// connections are EXPENSIVE so try rarely)
|
||||
if (pubSub.connected()) {
|
||||
if (!mqttQueue.isEmpty()) {
|
||||
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
|
||||
meshtastic_ServiceEnvelope *env = mqttQueue.dequeuePtr(0);
|
||||
static uint8_t bytes[meshtastic_MeshPacket_size + 64];
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env);
|
||||
|
||||
std::string topic = cryptTopic + env->channel_id + "/" + owner.id;
|
||||
LOG_INFO("publish %s, %u bytes from queue\n", topic.c_str(), numBytes);
|
||||
|
||||
pubSub.publish(topic.c_str(), bytes, numBytes, false);
|
||||
|
||||
if (moduleConfig.mqtt.json_enabled) {
|
||||
// handle json topic
|
||||
auto jsonString = this->downstreamPacketToJson(env->packet);
|
||||
if (jsonString.length() != 0) {
|
||||
std::string topicJson = jsonTopic + env->channel_id + "/" + owner.id;
|
||||
LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(),
|
||||
jsonString.c_str());
|
||||
pubSub.publish(topicJson.c_str(), jsonString.c_str(), false);
|
||||
}
|
||||
}
|
||||
mqttPool.release(env);
|
||||
}
|
||||
if (isConnectedDirectly()) {
|
||||
publishQueuedMessages();
|
||||
return 200;
|
||||
} else {
|
||||
} else
|
||||
return 30000;
|
||||
}
|
||||
} else
|
||||
return 5000; // If we don't want connection now, check again in 5 secs
|
||||
}
|
||||
} else {
|
||||
// we are connected to server, check often for new requests on the TCP port
|
||||
if (!wantConnection) {
|
||||
@@ -355,6 +401,44 @@ int32_t MQTT::runOnce()
|
||||
powerFSM.trigger(EVENT_CONTACT_FROM_PHONE); // Suppress entering light sleep (because that would turn off bluetooth)
|
||||
return 20;
|
||||
}
|
||||
#endif
|
||||
return 30000;
|
||||
}
|
||||
|
||||
/// FIXME, include more information in the status text
|
||||
void MQTT::publishStatus()
|
||||
{
|
||||
auto myStatus = (statusTopic + owner.id);
|
||||
bool ok = publish(myStatus.c_str(), "online", true);
|
||||
LOG_INFO("published online=%d\n", ok);
|
||||
}
|
||||
|
||||
void MQTT::publishQueuedMessages()
|
||||
{
|
||||
if (!mqttQueue.isEmpty()) {
|
||||
LOG_DEBUG("Publishing enqueued MQTT message\n");
|
||||
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
|
||||
meshtastic_ServiceEnvelope *env = mqttQueue.dequeuePtr(0);
|
||||
static uint8_t bytes[meshtastic_MeshPacket_size + 64];
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env);
|
||||
|
||||
std::string topic = cryptTopic + env->channel_id + "/" + owner.id;
|
||||
LOG_INFO("publish %s, %u bytes from queue\n", topic.c_str(), numBytes);
|
||||
|
||||
publish(topic.c_str(), bytes, numBytes, false);
|
||||
|
||||
if (moduleConfig.mqtt.json_enabled) {
|
||||
// handle json topic
|
||||
auto jsonString = this->meshPacketToJson(env->packet);
|
||||
if (jsonString.length() != 0) {
|
||||
std::string topicJson = jsonTopic + env->channel_id + "/" + owner.id;
|
||||
LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(),
|
||||
jsonString.c_str());
|
||||
publish(topicJson.c_str(), jsonString.c_str(), false);
|
||||
}
|
||||
}
|
||||
mqttPool.release(env);
|
||||
}
|
||||
}
|
||||
|
||||
void MQTT::onSend(const meshtastic_MeshPacket &mp, ChannelIndex chIndex)
|
||||
@@ -368,27 +452,26 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp, ChannelIndex chIndex)
|
||||
env->channel_id = (char *)channelId;
|
||||
env->gateway_id = owner.id;
|
||||
env->packet = (meshtastic_MeshPacket *)∓
|
||||
LOG_DEBUG("MQTT onSend - Publishing portnum %i message\n", env->packet->decoded.portnum);
|
||||
|
||||
// don't bother sending if not connected...
|
||||
if (pubSub.connected()) {
|
||||
|
||||
if (moduleConfig.mqtt.proxy_to_client_enabled || this->isConnectedDirectly()) {
|
||||
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
|
||||
static uint8_t bytes[meshtastic_MeshPacket_size + 64];
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env);
|
||||
|
||||
std::string topic = cryptTopic + channelId + "/" + owner.id;
|
||||
LOG_DEBUG("publish %s, %u bytes\n", topic.c_str(), numBytes);
|
||||
LOG_DEBUG("MQTT Publish %s, %u bytes\n", topic.c_str(), numBytes);
|
||||
|
||||
pubSub.publish(topic.c_str(), bytes, numBytes, false);
|
||||
publish(topic.c_str(), bytes, numBytes, false);
|
||||
|
||||
if (moduleConfig.mqtt.json_enabled) {
|
||||
// handle json topic
|
||||
auto jsonString = this->downstreamPacketToJson((meshtastic_MeshPacket *)&mp);
|
||||
auto jsonString = this->meshPacketToJson((meshtastic_MeshPacket *)&mp);
|
||||
if (jsonString.length() != 0) {
|
||||
std::string topicJson = jsonTopic + channelId + "/" + owner.id;
|
||||
LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(),
|
||||
jsonString.c_str());
|
||||
pubSub.publish(topicJson.c_str(), jsonString.c_str(), false);
|
||||
publish(topicJson.c_str(), jsonString.c_str(), false);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@@ -408,7 +491,7 @@ void MQTT::onSend(const meshtastic_MeshPacket &mp, ChannelIndex chIndex)
|
||||
}
|
||||
|
||||
// converts a downstream packet into a json message
|
||||
std::string MQTT::downstreamPacketToJson(meshtastic_MeshPacket *mp)
|
||||
std::string MQTT::meshPacketToJson(meshtastic_MeshPacket *mp)
|
||||
{
|
||||
// the created jsonObj is immutable after creation, so
|
||||
// we need to do the heavy lifting before assembling it.
|
||||
|
||||
Reference in New Issue
Block a user