mirror of
https://github.com/meshtastic/firmware.git
synced 2025-12-20 17:52:35 +00:00
Merge branch 'develop' into nice-threads
This commit is contained in:
@@ -42,7 +42,7 @@ void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
|
||||
payloadStr[length] = 0; // null terminated string
|
||||
JSONValue *json_value = JSON::Parse(payloadStr);
|
||||
if (json_value != NULL) {
|
||||
DEBUG_MSG("JSON Received on MQTT, parsing..\n");
|
||||
LOG_INFO("JSON Received on MQTT, parsing..\n");
|
||||
// check if it is a valid envelope
|
||||
JSONObject json;
|
||||
json = json_value->AsObject();
|
||||
@@ -50,7 +50,7 @@ void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
|
||||
// this is a valid envelope
|
||||
if (json["payload"]->IsString() && json["type"]->IsString() && (json["sender"]->AsString().compare(owner.id) != 0)) {
|
||||
std::string jsonPayloadStr = json["payload"]->AsString();
|
||||
DEBUG_MSG("JSON payload %s, length %u\n", jsonPayloadStr.c_str(), jsonPayloadStr.length());
|
||||
LOG_INFO("JSON payload %s, length %u\n", jsonPayloadStr.c_str(), jsonPayloadStr.length());
|
||||
|
||||
// construct protobuf data packet using TEXT_MESSAGE, send it to the mesh
|
||||
MeshPacket *p = router->allocForSending();
|
||||
@@ -61,10 +61,10 @@ void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
|
||||
MeshPacket *packet = packetPool.allocCopy(*p);
|
||||
service.sendToMesh(packet, RX_SRC_LOCAL);
|
||||
} else {
|
||||
DEBUG_MSG("Received MQTT json payload too long, dropping\n");
|
||||
LOG_WARN("Received MQTT json payload too long, dropping\n");
|
||||
}
|
||||
} else {
|
||||
DEBUG_MSG("JSON Ignoring downlink message we originally sent.\n");
|
||||
LOG_DEBUG("JSON Ignoring downlink message we originally sent.\n");
|
||||
}
|
||||
} else if ((json.find("sender") != json.end()) && (json.find("payload") != json.end()) && (json.find("type") != json.end()) && json["type"]->IsString() && (json["type"]->AsString().compare("sendposition") == 0)) {
|
||||
//invent the "sendposition" type for a valid envelope
|
||||
@@ -84,26 +84,26 @@ void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
|
||||
service.sendToMesh(p, RX_SRC_LOCAL);
|
||||
|
||||
} else {
|
||||
DEBUG_MSG("JSON Ignoring downlink message we originally sent.\n");
|
||||
LOG_DEBUG("JSON Ignoring downlink message we originally sent.\n");
|
||||
}
|
||||
} else{
|
||||
DEBUG_MSG("JSON Received payload on MQTT but not a valid envelope\n");
|
||||
LOG_ERROR("JSON Received payload on MQTT but not a valid envelope\n");
|
||||
}
|
||||
} else {
|
||||
// no json, this is an invalid payload
|
||||
DEBUG_MSG("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length);
|
||||
LOG_ERROR("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length);
|
||||
}
|
||||
delete json_value;
|
||||
} else {
|
||||
if (!pb_decode_from_bytes(payload, length, &ServiceEnvelope_msg, &e)) {
|
||||
DEBUG_MSG("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length);
|
||||
LOG_ERROR("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length);
|
||||
return;
|
||||
}else {
|
||||
if (strcmp(e.gateway_id, owner.id) == 0)
|
||||
DEBUG_MSG("Ignoring downlink message we originally sent.\n");
|
||||
LOG_INFO("Ignoring downlink message we originally sent.\n");
|
||||
else {
|
||||
if (e.packet) {
|
||||
DEBUG_MSG("Received MQTT topic %s, len=%u\n", topic, length);
|
||||
LOG_INFO("Received MQTT topic %s, len=%u\n", topic, length);
|
||||
MeshPacket *p = packetPool.allocCopy(*e.packet);
|
||||
|
||||
// ignore messages sent by us or if we don't have the channel key
|
||||
@@ -170,23 +170,24 @@ void MQTT::reconnect()
|
||||
serverAddr = server.c_str();
|
||||
}
|
||||
pubSub.setServer(serverAddr, serverPort);
|
||||
pubSub.setBufferSize(512);
|
||||
|
||||
DEBUG_MSG("Connecting to MQTT server %s, port: %d, username: %s, password: %s\n", serverAddr, serverPort, mqttUsername, mqttPassword);
|
||||
LOG_INFO("Connecting to MQTT server %s, port: %d, username: %s, password: %s\n", serverAddr, serverPort, mqttUsername, mqttPassword);
|
||||
auto myStatus = (statusTopic + owner.id);
|
||||
bool connected = pubSub.connect(owner.id, mqttUsername, mqttPassword, myStatus.c_str(), 1, true, "offline");
|
||||
if (connected) {
|
||||
DEBUG_MSG("MQTT connected\n");
|
||||
LOG_INFO("MQTT connected\n");
|
||||
enabled = true; // Start running background process again
|
||||
runASAP = true;
|
||||
reconnectCount = 0;
|
||||
|
||||
/// FIXME, include more information in the status text
|
||||
bool ok = pubSub.publish(myStatus.c_str(), "online", true);
|
||||
DEBUG_MSG("published %d\n", ok);
|
||||
LOG_INFO("published %d\n", ok);
|
||||
|
||||
sendSubscriptions();
|
||||
} else {
|
||||
DEBUG_MSG("Failed to contact MQTT server (%d/10)...\n",reconnectCount);
|
||||
LOG_ERROR("Failed to contact MQTT server (%d/10)...\n",reconnectCount);
|
||||
#if HAS_WIFI && !defined(ARCH_PORTDUINO)
|
||||
if (reconnectCount > 9) {
|
||||
needReconnect = true;
|
||||
@@ -205,11 +206,11 @@ void MQTT::sendSubscriptions()
|
||||
auto &ch = channels.getByIndex(i);
|
||||
if (ch.settings.downlink_enabled) {
|
||||
String topic = cryptTopic + channels.getGlobalId(i) + "/#";
|
||||
DEBUG_MSG("Subscribing to %s\n", topic.c_str());
|
||||
LOG_INFO("Subscribing to %s\n", topic.c_str());
|
||||
pubSub.subscribe(topic.c_str(), 1); // FIXME, is QOS 1 right?
|
||||
if (moduleConfig.mqtt.json_enabled == true) {
|
||||
String topicDecoded = jsonTopic + channels.getGlobalId(i) + "/#";
|
||||
DEBUG_MSG("Subscribing to %s\n", topicDecoded.c_str());
|
||||
LOG_INFO("Subscribing to %s\n", topicDecoded.c_str());
|
||||
pubSub.subscribe(topicDecoded.c_str(), 1); // FIXME, is QOS 1 right?
|
||||
}
|
||||
}
|
||||
@@ -262,7 +263,7 @@ int32_t MQTT::runOnce()
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env);
|
||||
|
||||
String topic = cryptTopic + env->channel_id + "/" + owner.id;
|
||||
DEBUG_MSG("publish %s, %u bytes from queue\n", topic.c_str(), numBytes);
|
||||
LOG_INFO("publish %s, %u bytes from queue\n", topic.c_str(), numBytes);
|
||||
|
||||
|
||||
pubSub.publish(topic.c_str(), bytes, numBytes, false);
|
||||
@@ -272,7 +273,7 @@ int32_t MQTT::runOnce()
|
||||
auto jsonString = this->downstreamPacketToJson(env->packet);
|
||||
if (jsonString.length() != 0) {
|
||||
String topicJson = jsonTopic + env->channel_id + "/" + owner.id;
|
||||
DEBUG_MSG("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str());
|
||||
LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str());
|
||||
pubSub.publish(topicJson.c_str(), jsonString.c_str(), false);
|
||||
}
|
||||
}
|
||||
@@ -287,7 +288,7 @@ int32_t MQTT::runOnce()
|
||||
} else {
|
||||
// we are connected to server, check often for new requests on the TCP port
|
||||
if (!wantConnection) {
|
||||
DEBUG_MSG("MQTT link not needed, dropping\n");
|
||||
LOG_INFO("MQTT link not needed, dropping\n");
|
||||
pubSub.disconnect();
|
||||
}
|
||||
|
||||
@@ -316,7 +317,7 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env);
|
||||
|
||||
String topic = cryptTopic + channelId + "/" + owner.id;
|
||||
DEBUG_MSG("publish %s, %u bytes\n", topic.c_str(), numBytes);
|
||||
LOG_DEBUG("publish %s, %u bytes\n", topic.c_str(), numBytes);
|
||||
|
||||
pubSub.publish(topic.c_str(), bytes, numBytes, false);
|
||||
|
||||
@@ -325,14 +326,14 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
|
||||
auto jsonString = this->downstreamPacketToJson((MeshPacket *)&mp);
|
||||
if (jsonString.length() != 0) {
|
||||
String topicJson = jsonTopic + channelId + "/" + owner.id;
|
||||
DEBUG_MSG("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str());
|
||||
LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(), jsonString.c_str());
|
||||
pubSub.publish(topicJson.c_str(), jsonString.c_str(), false);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
DEBUG_MSG("MQTT not connected, queueing packet\n");
|
||||
LOG_INFO("MQTT not connected, queueing packet\n");
|
||||
if (mqttQueue.numFree() == 0) {
|
||||
DEBUG_MSG("NOTE: MQTT queue is full, discarding oldest\n");
|
||||
LOG_WARN("NOTE: MQTT queue is full, discarding oldest\n");
|
||||
ServiceEnvelope *d = mqttQueue.dequeuePtr(0);
|
||||
if (d)
|
||||
mqttPool.release(d);
|
||||
@@ -358,20 +359,20 @@ std::string MQTT::downstreamPacketToJson(MeshPacket *mp)
|
||||
case PortNum_TEXT_MESSAGE_APP: {
|
||||
msgType = "text";
|
||||
// convert bytes to string
|
||||
DEBUG_MSG("got text message of size %u\n", mp->decoded.payload.size);
|
||||
LOG_DEBUG("got text message of size %u\n", mp->decoded.payload.size);
|
||||
char payloadStr[(mp->decoded.payload.size) + 1];
|
||||
memcpy(payloadStr, mp->decoded.payload.bytes, mp->decoded.payload.size);
|
||||
payloadStr[mp->decoded.payload.size] = 0; // null terminated string
|
||||
// check if this is a JSON payload
|
||||
JSONValue *json_value = JSON::Parse(payloadStr);
|
||||
if (json_value != NULL) {
|
||||
DEBUG_MSG("text message payload is of type json\n");
|
||||
LOG_INFO("text message payload is of type json\n");
|
||||
// if it is, then we can just use the json object
|
||||
jsonObj["payload"] = json_value;
|
||||
} else {
|
||||
// if it isn't, then we need to create a json object
|
||||
// with the string as the value
|
||||
DEBUG_MSG("text message payload is of type plaintext\n");
|
||||
LOG_INFO("text message payload is of type plaintext\n");
|
||||
msgPayload["text"] = new JSONValue(payloadStr);
|
||||
jsonObj["payload"] = new JSONValue(msgPayload);
|
||||
}
|
||||
@@ -400,7 +401,7 @@ std::string MQTT::downstreamPacketToJson(MeshPacket *mp)
|
||||
}
|
||||
jsonObj["payload"] = new JSONValue(msgPayload);
|
||||
} else
|
||||
DEBUG_MSG("Error decoding protobuf for telemetry message!\n");
|
||||
LOG_ERROR("Error decoding protobuf for telemetry message!\n");
|
||||
};
|
||||
break;
|
||||
}
|
||||
@@ -418,7 +419,7 @@ std::string MQTT::downstreamPacketToJson(MeshPacket *mp)
|
||||
msgPayload["hardware"] = new JSONValue(decoded->hw_model);
|
||||
jsonObj["payload"] = new JSONValue(msgPayload);
|
||||
} else
|
||||
DEBUG_MSG("Error decoding protobuf for nodeinfo message!\n");
|
||||
LOG_ERROR("Error decoding protobuf for nodeinfo message!\n");
|
||||
};
|
||||
break;
|
||||
}
|
||||
@@ -437,7 +438,7 @@ std::string MQTT::downstreamPacketToJson(MeshPacket *mp)
|
||||
if((int)decoded->altitude){msgPayload["altitude"] = new JSONValue((int)decoded->altitude);}
|
||||
jsonObj["payload"] = new JSONValue(msgPayload);
|
||||
} else {
|
||||
DEBUG_MSG("Error decoding protobuf for position message!\n");
|
||||
LOG_ERROR("Error decoding protobuf for position message!\n");
|
||||
}
|
||||
};
|
||||
break;
|
||||
@@ -460,7 +461,7 @@ std::string MQTT::downstreamPacketToJson(MeshPacket *mp)
|
||||
msgPayload["longitude_i"] = new JSONValue((int)decoded->longitude_i);
|
||||
jsonObj["payload"] = new JSONValue(msgPayload);
|
||||
} else {
|
||||
DEBUG_MSG("Error decoding protobuf for position message!\n");
|
||||
LOG_ERROR("Error decoding protobuf for position message!\n");
|
||||
}
|
||||
};
|
||||
break;
|
||||
@@ -482,7 +483,7 @@ std::string MQTT::downstreamPacketToJson(MeshPacket *mp)
|
||||
JSONValue *value = new JSONValue(jsonObj);
|
||||
std::string jsonStr = value->Stringify();
|
||||
|
||||
DEBUG_MSG("serialized json message: %s\n", jsonStr.c_str());
|
||||
LOG_INFO("serialized json message: %s\n", jsonStr.c_str());
|
||||
|
||||
delete value;
|
||||
return jsonStr;
|
||||
|
||||
Reference in New Issue
Block a user