MQTT client proxying (#2587)

* WIP on MQTT proxy message queue

* Fix copy paste goof

* Progress on uplink

* Has packets

* Avoid trying to connect if we're proxying

* Pointer correctly

* Remove wifi guards

* Client proxy subscribe

* Fixed method that got bababababorked somehow... personally I blame CoPilot

* Short circuit logic

* Remove canned settings

* Missed some stuff in the move

* Guard pubsub client for non-networked variants

* Has networking guard

* else

* Return statement for fall-thru

* More gaurd removals

* Removed source filters. No wonder I was confused

* Bounding

* Scope guard around else and fix return

* Portduino

* Defs instead

* Move macro up to actually fix portduino

* Size_t

* Unsigned int

* Thread interval

* Protos

* Protobufs ref
This commit is contained in:
Ben Meadors
2023-07-08 20:37:04 -05:00
committed by GitHub
parent da389eb787
commit 6e96216ba3
21 changed files with 339 additions and 103 deletions

View File

@@ -18,6 +18,8 @@
#error ToRadio is too big
#endif
#include "mqtt/MQTT.h"
PhoneAPI::PhoneAPI()
{
lastContactMsec = millis();
@@ -54,6 +56,7 @@ void PhoneAPI::close()
unobserve(&xModem.packetReady);
releasePhonePacket(); // Don't leak phone packets on shutdown
releaseQueueStatusPhonePacket();
releaseMqttClientProxyPhonePacket();
onConnectionChanged(false);
}
@@ -98,6 +101,12 @@ bool PhoneAPI::handleToRadio(const uint8_t *buf, size_t bufLength)
LOG_INFO("Got xmodem packet\n");
xModem.handlePacket(toRadioScratch.xmodemPacket);
break;
case meshtastic_ToRadio_mqttClientProxyMessage_tag:
LOG_INFO("Got MqttClientProxy message\n");
if (mqtt && moduleConfig.mqtt.proxy_to_client_enabled) {
mqtt->onClientProxyReceive(toRadioScratch.mqttClientProxyMessage);
}
break;
default:
// Ignore nop messages
// LOG_DEBUG("Error: unexpected ToRadio variant\n");
@@ -295,12 +304,16 @@ size_t PhoneAPI::getFromRadio(uint8_t *buf)
break;
case STATE_SEND_PACKETS:
// Do we have a message from the mesh?
// Do we have a message from the mesh or packet from the local device?
LOG_INFO("getFromRadio=STATE_SEND_PACKETS\n");
if (queueStatusPacketForPhone) {
fromRadioScratch.which_payload_variant = meshtastic_FromRadio_queueStatus_tag;
fromRadioScratch.queueStatus = *queueStatusPacketForPhone;
releaseQueueStatusPhonePacket();
} else if (mqttClientProxyMessageForPhone) {
fromRadioScratch.which_payload_variant = meshtastic_FromRadio_mqttClientProxyMessage_tag;
fromRadioScratch.mqttClientProxyMessage = *mqttClientProxyMessageForPhone;
releaseMqttClientProxyPhonePacket();
} else if (xmodemPacketForPhone.control != meshtastic_XModem_Control_NUL) {
fromRadioScratch.which_payload_variant = meshtastic_FromRadio_xmodemPacket_tag;
fromRadioScratch.xmodemPacket = xmodemPacketForPhone;
@@ -353,6 +366,14 @@ void PhoneAPI::releaseQueueStatusPhonePacket()
}
}
void PhoneAPI::releaseMqttClientProxyPhonePacket()
{
if (mqttClientProxyMessageForPhone) {
service.releaseMqttClientProxyMessageToPool(mqttClientProxyMessageForPhone);
mqttClientProxyMessageForPhone = NULL;
}
}
/**
* Return true if we have data available to send to the phone
*/
@@ -381,7 +402,9 @@ bool PhoneAPI::available()
case STATE_SEND_PACKETS: {
if (!queueStatusPacketForPhone)
queueStatusPacketForPhone = service.getQueueStatusForPhone();
bool hasPacket = !!queueStatusPacketForPhone;
if (!mqttClientProxyMessageForPhone)
mqttClientProxyMessageForPhone = service.getMqttClientProxyMessageForPhone();
bool hasPacket = !!queueStatusPacketForPhone || !!mqttClientProxyMessageForPhone;
if (hasPacket)
return true;