mirror of
https://github.com/meshtastic/firmware.git
synced 2025-12-20 01:32:40 +00:00
re-add namespacing in protobufs. Let's see what i missed. Portduino likely ...
Checking in generated on purpose.
This commit is contained in:
@@ -21,9 +21,9 @@ String statusTopic = "msh/2/stat/";
|
||||
String cryptTopic = "msh/2/c/"; // msh/2/c/CHANNELID/NODEID
|
||||
String jsonTopic = "msh/2/json/"; // msh/2/json/CHANNELID/NODEID
|
||||
|
||||
static MemoryDynamic<ServiceEnvelope> staticMqttPool;
|
||||
static MemoryDynamic<meshtastic_ServiceEnvelope> staticMqttPool;
|
||||
|
||||
Allocator<ServiceEnvelope> &mqttPool = staticMqttPool;
|
||||
Allocator<meshtastic_ServiceEnvelope> &mqttPool = staticMqttPool;
|
||||
|
||||
void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length)
|
||||
{
|
||||
@@ -33,7 +33,7 @@ void MQTT::mqttCallback(char *topic, byte *payload, unsigned int length)
|
||||
void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
|
||||
{
|
||||
// parsing ServiceEnvelope
|
||||
ServiceEnvelope e = ServiceEnvelope_init_default;
|
||||
meshtastic_ServiceEnvelope e = meshtastic_ServiceEnvelope_init_default;
|
||||
|
||||
if (moduleConfig.mqtt.json_enabled && (strncmp(topic, jsonTopic.c_str(), jsonTopic.length()) == 0)) {
|
||||
// check if this is a json payload message by comparing the topic start
|
||||
@@ -54,7 +54,7 @@ void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
|
||||
ptr = strtok(NULL, "/");
|
||||
}
|
||||
LOG_DEBUG("Looking for Channel name: %s\n", ptr);
|
||||
Channel sendChannel = channels.getByName(ptr);
|
||||
meshtastic_Channel sendChannel = channels.getByName(ptr);
|
||||
LOG_DEBUG("Found Channel name: %s (Index %d)\n", channels.getGlobalId(sendChannel.settings.channel_num),
|
||||
sendChannel.settings.channel_num);
|
||||
|
||||
@@ -68,14 +68,14 @@ void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
|
||||
LOG_INFO("JSON payload %s, length %u\n", jsonPayloadStr.c_str(), jsonPayloadStr.length());
|
||||
|
||||
// construct protobuf data packet using TEXT_MESSAGE, send it to the mesh
|
||||
MeshPacket *p = router->allocForSending();
|
||||
p->decoded.portnum = PortNum_TEXT_MESSAGE_APP;
|
||||
meshtastic_MeshPacket *p = router->allocForSending();
|
||||
p->decoded.portnum = meshtastic_PortNum_TEXT_MESSAGE_APP;
|
||||
p->channel = sendChannel.settings.channel_num;
|
||||
if (sendChannel.settings.downlink_enabled) {
|
||||
if (jsonPayloadStr.length() <= sizeof(p->decoded.payload.bytes)) {
|
||||
memcpy(p->decoded.payload.bytes, jsonPayloadStr.c_str(), jsonPayloadStr.length());
|
||||
p->decoded.payload.size = jsonPayloadStr.length();
|
||||
MeshPacket *packet = packetPool.allocCopy(*p);
|
||||
meshtastic_MeshPacket *packet = packetPool.allocCopy(*p);
|
||||
service.sendToMesh(packet, RX_SRC_LOCAL);
|
||||
} else {
|
||||
LOG_WARN("Received MQTT json payload too long, dropping\n");
|
||||
@@ -95,19 +95,19 @@ void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
|
||||
(json["sender"]->AsString().compare(owner.id) != 0)) {
|
||||
JSONObject posit;
|
||||
posit = json["payload"]->AsObject(); // get nested JSON Position
|
||||
Position pos = Position_init_default;
|
||||
meshtastic_Position pos = meshtastic_Position_init_default;
|
||||
pos.latitude_i = posit["latitude_i"]->AsNumber();
|
||||
pos.longitude_i = posit["longitude_i"]->AsNumber();
|
||||
pos.altitude = posit["altitude"]->AsNumber();
|
||||
pos.time = posit["time"]->AsNumber();
|
||||
|
||||
// construct protobuf data packet using POSITION, send it to the mesh
|
||||
MeshPacket *p = router->allocForSending();
|
||||
p->decoded.portnum = PortNum_POSITION_APP;
|
||||
meshtastic_MeshPacket *p = router->allocForSending();
|
||||
p->decoded.portnum = meshtastic_PortNum_POSITION_APP;
|
||||
p->channel = sendChannel.settings.channel_num;
|
||||
if (sendChannel.settings.downlink_enabled) {
|
||||
p->decoded.payload.size = pb_encode_to_bytes(p->decoded.payload.bytes, sizeof(p->decoded.payload.bytes),
|
||||
&Position_msg, &pos); // make the Data protobuf from position
|
||||
&meshtastic_Position_msg, &pos); // make the Data protobuf from position
|
||||
service.sendToMesh(p, RX_SRC_LOCAL);
|
||||
} else {
|
||||
LOG_WARN("Received MQTT json payload on channel %s, but downlink is disabled, dropping\n",
|
||||
@@ -125,7 +125,7 @@ void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
|
||||
}
|
||||
delete json_value;
|
||||
} else {
|
||||
if (!pb_decode_from_bytes(payload, length, &ServiceEnvelope_msg, &e)) {
|
||||
if (!pb_decode_from_bytes(payload, length, &meshtastic_ServiceEnvelope_msg, &e)) {
|
||||
LOG_ERROR("Invalid MQTT service envelope, topic %s, len %u!\n", topic, length);
|
||||
return;
|
||||
} else {
|
||||
@@ -134,7 +134,7 @@ void MQTT::onPublish(char *topic, byte *payload, unsigned int length)
|
||||
else {
|
||||
if (e.packet) {
|
||||
LOG_INFO("Received MQTT topic %s, len=%u\n", topic, length);
|
||||
MeshPacket *p = packetPool.allocCopy(*e.packet);
|
||||
meshtastic_MeshPacket *p = packetPool.allocCopy(*e.packet);
|
||||
|
||||
// ignore messages sent by us or if we don't have the channel key
|
||||
if (router && p->from != nodeDB.getNodeNum() && perhapsDecode(p))
|
||||
@@ -293,9 +293,9 @@ int32_t MQTT::runOnce()
|
||||
if (pubSub.connected()) {
|
||||
if (!mqttQueue.isEmpty()) {
|
||||
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
|
||||
ServiceEnvelope *env = mqttQueue.dequeuePtr(0);
|
||||
static uint8_t bytes[MeshPacket_size + 64];
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env);
|
||||
meshtastic_ServiceEnvelope *env = mqttQueue.dequeuePtr(0);
|
||||
static uint8_t bytes[meshtastic_MeshPacket_size + 64];
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env);
|
||||
|
||||
String topic = cryptTopic + env->channel_id + "/" + owner.id;
|
||||
LOG_INFO("publish %s, %u bytes from queue\n", topic.c_str(), numBytes);
|
||||
@@ -332,24 +332,24 @@ int32_t MQTT::runOnce()
|
||||
}
|
||||
}
|
||||
|
||||
void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
|
||||
void MQTT::onSend(const meshtastic_MeshPacket &mp, ChannelIndex chIndex)
|
||||
{
|
||||
auto &ch = channels.getByIndex(chIndex);
|
||||
|
||||
if (ch.settings.uplink_enabled) {
|
||||
const char *channelId = channels.getGlobalId(chIndex); // FIXME, for now we just use the human name for the channel
|
||||
|
||||
ServiceEnvelope *env = mqttPool.allocZeroed();
|
||||
meshtastic_ServiceEnvelope *env = mqttPool.allocZeroed();
|
||||
env->channel_id = (char *)channelId;
|
||||
env->gateway_id = owner.id;
|
||||
env->packet = (MeshPacket *)∓
|
||||
env->packet = (meshtastic_MeshPacket *)∓
|
||||
|
||||
// don't bother sending if not connected...
|
||||
if (pubSub.connected()) {
|
||||
|
||||
// FIXME - this size calculation is super sloppy, but it will go away once we dynamically alloc meshpackets
|
||||
static uint8_t bytes[MeshPacket_size + 64];
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &ServiceEnvelope_msg, env);
|
||||
static uint8_t bytes[meshtastic_MeshPacket_size + 64];
|
||||
size_t numBytes = pb_encode_to_bytes(bytes, sizeof(bytes), &meshtastic_ServiceEnvelope_msg, env);
|
||||
|
||||
String topic = cryptTopic + channelId + "/" + owner.id;
|
||||
LOG_DEBUG("publish %s, %u bytes\n", topic.c_str(), numBytes);
|
||||
@@ -358,7 +358,7 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
|
||||
|
||||
if (moduleConfig.mqtt.json_enabled) {
|
||||
// handle json topic
|
||||
auto jsonString = this->downstreamPacketToJson((MeshPacket *)&mp);
|
||||
auto jsonString = this->downstreamPacketToJson((meshtastic_MeshPacket *)&mp);
|
||||
if (jsonString.length() != 0) {
|
||||
String topicJson = jsonTopic + channelId + "/" + owner.id;
|
||||
LOG_INFO("JSON publish message to %s, %u bytes: %s\n", topicJson.c_str(), jsonString.length(),
|
||||
@@ -370,12 +370,12 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
|
||||
LOG_INFO("MQTT not connected, queueing packet\n");
|
||||
if (mqttQueue.numFree() == 0) {
|
||||
LOG_WARN("NOTE: MQTT queue is full, discarding oldest\n");
|
||||
ServiceEnvelope *d = mqttQueue.dequeuePtr(0);
|
||||
meshtastic_ServiceEnvelope *d = mqttQueue.dequeuePtr(0);
|
||||
if (d)
|
||||
mqttPool.release(d);
|
||||
}
|
||||
// make a copy of serviceEnvelope and queue it
|
||||
ServiceEnvelope *copied = mqttPool.allocCopy(*env);
|
||||
meshtastic_ServiceEnvelope *copied = mqttPool.allocCopy(*env);
|
||||
assert(mqttQueue.enqueue(copied, 0));
|
||||
}
|
||||
mqttPool.release(env);
|
||||
@@ -383,7 +383,7 @@ void MQTT::onSend(const MeshPacket &mp, ChannelIndex chIndex)
|
||||
}
|
||||
|
||||
// converts a downstream packet into a json message
|
||||
std::string MQTT::downstreamPacketToJson(MeshPacket *mp)
|
||||
std::string MQTT::downstreamPacketToJson(meshtastic_MeshPacket *mp)
|
||||
{
|
||||
// the created jsonObj is immutable after creation, so
|
||||
// we need to do the heavy lifting before assembling it.
|
||||
@@ -392,7 +392,7 @@ std::string MQTT::downstreamPacketToJson(MeshPacket *mp)
|
||||
JSONObject jsonObj;
|
||||
|
||||
switch (mp->decoded.portnum) {
|
||||
case PortNum_TEXT_MESSAGE_APP: {
|
||||
case meshtastic_PortNum_TEXT_MESSAGE_APP: {
|
||||
msgType = "text";
|
||||
// convert bytes to string
|
||||
LOG_DEBUG("got text message of size %u\n", mp->decoded.payload.size);
|
||||
@@ -414,20 +414,20 @@ std::string MQTT::downstreamPacketToJson(MeshPacket *mp)
|
||||
}
|
||||
break;
|
||||
}
|
||||
case PortNum_TELEMETRY_APP: {
|
||||
case meshtastic_PortNum_TELEMETRY_APP: {
|
||||
msgType = "telemetry";
|
||||
Telemetry scratch;
|
||||
Telemetry *decoded = NULL;
|
||||
if (mp->which_payload_variant == MeshPacket_decoded_tag) {
|
||||
meshtastic_Telemetry scratch;
|
||||
meshtastic_Telemetry *decoded = NULL;
|
||||
if (mp->which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
|
||||
memset(&scratch, 0, sizeof(scratch));
|
||||
if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &Telemetry_msg, &scratch)) {
|
||||
if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &meshtastic_Telemetry_msg, &scratch)) {
|
||||
decoded = &scratch;
|
||||
if (decoded->which_variant == Telemetry_device_metrics_tag) {
|
||||
if (decoded->which_variant == meshtastic_Telemetry_device_metrics_tag) {
|
||||
msgPayload["battery_level"] = new JSONValue((int)decoded->variant.device_metrics.battery_level);
|
||||
msgPayload["voltage"] = new JSONValue(decoded->variant.device_metrics.voltage);
|
||||
msgPayload["channel_utilization"] = new JSONValue(decoded->variant.device_metrics.channel_utilization);
|
||||
msgPayload["air_util_tx"] = new JSONValue(decoded->variant.device_metrics.air_util_tx);
|
||||
} else if (decoded->which_variant == Telemetry_environment_metrics_tag) {
|
||||
} else if (decoded->which_variant == meshtastic_Telemetry_environment_metrics_tag) {
|
||||
msgPayload["temperature"] = new JSONValue(decoded->variant.environment_metrics.temperature);
|
||||
msgPayload["relative_humidity"] = new JSONValue(decoded->variant.environment_metrics.relative_humidity);
|
||||
msgPayload["barometric_pressure"] = new JSONValue(decoded->variant.environment_metrics.barometric_pressure);
|
||||
@@ -441,13 +441,13 @@ std::string MQTT::downstreamPacketToJson(MeshPacket *mp)
|
||||
};
|
||||
break;
|
||||
}
|
||||
case PortNum_NODEINFO_APP: {
|
||||
case meshtastic_PortNum_NODEINFO_APP: {
|
||||
msgType = "nodeinfo";
|
||||
User scratch;
|
||||
User *decoded = NULL;
|
||||
if (mp->which_payload_variant == MeshPacket_decoded_tag) {
|
||||
meshtastic_User scratch;
|
||||
meshtastic_User *decoded = NULL;
|
||||
if (mp->which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
|
||||
memset(&scratch, 0, sizeof(scratch));
|
||||
if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &User_msg, &scratch)) {
|
||||
if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &meshtastic_User_msg, &scratch)) {
|
||||
decoded = &scratch;
|
||||
msgPayload["id"] = new JSONValue(decoded->id);
|
||||
msgPayload["longname"] = new JSONValue(decoded->long_name);
|
||||
@@ -459,13 +459,13 @@ std::string MQTT::downstreamPacketToJson(MeshPacket *mp)
|
||||
};
|
||||
break;
|
||||
}
|
||||
case PortNum_POSITION_APP: {
|
||||
case meshtastic_PortNum_POSITION_APP: {
|
||||
msgType = "position";
|
||||
Position scratch;
|
||||
Position *decoded = NULL;
|
||||
if (mp->which_payload_variant == MeshPacket_decoded_tag) {
|
||||
meshtastic_Position scratch;
|
||||
meshtastic_Position *decoded = NULL;
|
||||
if (mp->which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
|
||||
memset(&scratch, 0, sizeof(scratch));
|
||||
if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &Position_msg, &scratch)) {
|
||||
if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &meshtastic_Position_msg, &scratch)) {
|
||||
decoded = &scratch;
|
||||
if ((int)decoded->time) {
|
||||
msgPayload["time"] = new JSONValue((int)decoded->time);
|
||||
@@ -486,13 +486,13 @@ std::string MQTT::downstreamPacketToJson(MeshPacket *mp)
|
||||
break;
|
||||
}
|
||||
|
||||
case PortNum_WAYPOINT_APP: {
|
||||
case meshtastic_PortNum_WAYPOINT_APP: {
|
||||
msgType = "position";
|
||||
Waypoint scratch;
|
||||
Waypoint *decoded = NULL;
|
||||
if (mp->which_payload_variant == MeshPacket_decoded_tag) {
|
||||
meshtastic_Waypoint scratch;
|
||||
meshtastic_Waypoint *decoded = NULL;
|
||||
if (mp->which_payload_variant == meshtastic_MeshPacket_decoded_tag) {
|
||||
memset(&scratch, 0, sizeof(scratch));
|
||||
if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &Waypoint_msg, &scratch)) {
|
||||
if (pb_decode_from_bytes(mp->decoded.payload.bytes, mp->decoded.payload.size, &meshtastic_Waypoint_msg, &scratch)) {
|
||||
decoded = &scratch;
|
||||
msgPayload["id"] = new JSONValue((int)decoded->id);
|
||||
msgPayload["name"] = new JSONValue(decoded->name);
|
||||
|
||||
Reference in New Issue
Block a user