MQTT at least talks to server, works in native and esp32

This commit is contained in:
Kevin Hester
2021-04-03 14:54:10 +08:00
parent 2acde3333c
commit dcf64dfacd
16 changed files with 267 additions and 46 deletions

View File

@@ -32,8 +32,9 @@
#include "nimble/BluetoothUtil.h"
#endif
#ifdef PORTDUINO
#if defined(HAS_WIFI) || defined(PORTDUINO)
#include "mesh/wifi/WiFiServerAPI.h"
#include "mqtt/MQTT.h"
#endif
#include "RF95Interface.h"
@@ -541,6 +542,10 @@ void setup()
initApiServer();
#endif
#if defined(PORTDUINO) || defined(HAS_WIFI)
mqttInit();
#endif
// Start airtime logger thread.
airTime = new AirTime();

View File

@@ -79,7 +79,7 @@ extern const pb_msgdesc_t AdminMessage_msg;
#define AdminMessage_fields &AdminMessage_msg
/* Maximum encoded size of messages (where known) */
#define AdminMessage_size 360
#define AdminMessage_size 397
#ifdef __cplusplus
} /* extern "C" */

View File

@@ -6,9 +6,6 @@
#error Regenerate this file with the current version of nanopb generator.
#endif
PB_BIND(ServiceEnvelope, ServiceEnvelope, 2)
PB_BIND(ChannelSet, ChannelSet, AUTO)

View File

@@ -4,7 +4,6 @@
#ifndef PB_APPONLY_PB_H_INCLUDED
#define PB_APPONLY_PB_H_INCLUDED
#include <pb.h>
#include "mesh.pb.h"
#include "channel.pb.h"
#if PB_PROTO_HEADER_VERSION != 40
@@ -16,54 +15,31 @@ typedef struct _ChannelSet {
pb_callback_t settings;
} ChannelSet;
typedef struct _ServiceEnvelope {
bool has_packet;
MeshPacket packet;
pb_callback_t channel_id;
pb_callback_t gateway_id;
} ServiceEnvelope;
#ifdef __cplusplus
extern "C" {
#endif
/* Initializer values for message structs */
#define ServiceEnvelope_init_default {false, MeshPacket_init_default, {{NULL}, NULL}, {{NULL}, NULL}}
#define ChannelSet_init_default {{{NULL}, NULL}}
#define ServiceEnvelope_init_zero {false, MeshPacket_init_zero, {{NULL}, NULL}, {{NULL}, NULL}}
#define ChannelSet_init_zero {{{NULL}, NULL}}
/* Field tags (for use in manual encoding/decoding) */
#define ChannelSet_settings_tag 1
#define ServiceEnvelope_packet_tag 1
#define ServiceEnvelope_channel_id_tag 2
#define ServiceEnvelope_gateway_id_tag 3
/* Struct field encoding specification for nanopb */
#define ServiceEnvelope_FIELDLIST(X, a) \
X(a, STATIC, OPTIONAL, MESSAGE, packet, 1) \
X(a, CALLBACK, SINGULAR, STRING, channel_id, 2) \
X(a, CALLBACK, SINGULAR, STRING, gateway_id, 3)
#define ServiceEnvelope_CALLBACK pb_default_field_callback
#define ServiceEnvelope_DEFAULT NULL
#define ServiceEnvelope_packet_MSGTYPE MeshPacket
#define ChannelSet_FIELDLIST(X, a) \
X(a, CALLBACK, REPEATED, MESSAGE, settings, 1)
#define ChannelSet_CALLBACK pb_default_field_callback
#define ChannelSet_DEFAULT NULL
#define ChannelSet_settings_MSGTYPE ChannelSettings
extern const pb_msgdesc_t ServiceEnvelope_msg;
extern const pb_msgdesc_t ChannelSet_msg;
/* Defines for backwards compatibility with code written before nanopb-0.4.0 */
#define ServiceEnvelope_fields &ServiceEnvelope_msg
#define ChannelSet_fields &ChannelSet_msg
/* Maximum encoded size of messages (where known) */
/* ServiceEnvelope_size depends on runtime parameters */
/* ChannelSet_size depends on runtime parameters */
#ifdef __cplusplus

View File

@@ -0,0 +1,12 @@
/* Automatically generated nanopb constant definitions */
/* Generated by nanopb-0.4.4 */
#include "mqtt.pb.h"
#if PB_PROTO_HEADER_VERSION != 40
#error Regenerate this file with the current version of nanopb generator.
#endif
PB_BIND(ServiceEnvelope, ServiceEnvelope, AUTO)

View File

@@ -0,0 +1,55 @@
/* Automatically generated nanopb header */
/* Generated by nanopb-0.4.4 */
#ifndef PB_MQTT_PB_H_INCLUDED
#define PB_MQTT_PB_H_INCLUDED
#include <pb.h>
#include "mesh.pb.h"
#if PB_PROTO_HEADER_VERSION != 40
#error Regenerate this file with the current version of nanopb generator.
#endif
/* Struct definitions */
typedef struct _ServiceEnvelope {
struct _MeshPacket *packet;
char *channel_id;
char *gateway_id;
} ServiceEnvelope;
#ifdef __cplusplus
extern "C" {
#endif
/* Initializer values for message structs */
#define ServiceEnvelope_init_default {NULL, NULL, NULL}
#define ServiceEnvelope_init_zero {NULL, NULL, NULL}
/* Field tags (for use in manual encoding/decoding) */
#define ServiceEnvelope_packet_tag 1
#define ServiceEnvelope_channel_id_tag 2
#define ServiceEnvelope_gateway_id_tag 3
/* Struct field encoding specification for nanopb */
#define ServiceEnvelope_FIELDLIST(X, a) \
X(a, POINTER, OPTIONAL, MESSAGE, packet, 1) \
X(a, POINTER, SINGULAR, STRING, channel_id, 2) \
X(a, POINTER, SINGULAR, STRING, gateway_id, 3)
#define ServiceEnvelope_CALLBACK NULL
#define ServiceEnvelope_DEFAULT NULL
#define ServiceEnvelope_packet_MSGTYPE MeshPacket
extern const pb_msgdesc_t ServiceEnvelope_msg;
/* Defines for backwards compatibility with code written before nanopb-0.4.0 */
#define ServiceEnvelope_fields &ServiceEnvelope_msg
/* Maximum encoded size of messages (where known) */
/* ServiceEnvelope_size depends on runtime parameters */
#ifdef __cplusplus
} /* extern "C" */
#endif
#endif

View File

@@ -87,6 +87,8 @@ typedef struct _RadioConfig_UserPreferences {
bool fixed_position;
bool serial_disabled;
float frequency_offset;
char mqtt_server[32];
bool mqtt_disabled;
bool factory_reset;
bool debug_log_enabled;
pb_size_t ignore_incoming_count;
@@ -152,9 +154,9 @@ extern "C" {
/* Initializer values for message structs */
#define RadioConfig_init_default {false, RadioConfig_UserPreferences_init_default}
#define RadioConfig_UserPreferences_init_default {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", 0, _RegionCode_MIN, _ChargeCurrent_MIN, _LocationSharing_MIN, _GpsOperation_MIN, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, {0, 0, 0}, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, _RadioConfig_UserPreferences_EnvironmentalMeasurementSensorType_MIN, 0, 0}
#define RadioConfig_UserPreferences_init_default {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", 0, _RegionCode_MIN, _ChargeCurrent_MIN, _LocationSharing_MIN, _GpsOperation_MIN, 0, 0, 0, 0, 0, 0, 0, "", 0, 0, 0, 0, {0, 0, 0}, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, _RadioConfig_UserPreferences_EnvironmentalMeasurementSensorType_MIN, 0, 0}
#define RadioConfig_init_zero {false, RadioConfig_UserPreferences_init_zero}
#define RadioConfig_UserPreferences_init_zero {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", 0, _RegionCode_MIN, _ChargeCurrent_MIN, _LocationSharing_MIN, _GpsOperation_MIN, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, {0, 0, 0}, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, _RadioConfig_UserPreferences_EnvironmentalMeasurementSensorType_MIN, 0, 0}
#define RadioConfig_UserPreferences_init_zero {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, "", "", 0, _RegionCode_MIN, _ChargeCurrent_MIN, _LocationSharing_MIN, _GpsOperation_MIN, 0, 0, 0, 0, 0, 0, 0, "", 0, 0, 0, 0, {0, 0, 0}, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, _RadioConfig_UserPreferences_EnvironmentalMeasurementSensorType_MIN, 0, 0}
/* Field tags (for use in manual encoding/decoding) */
#define RadioConfig_UserPreferences_position_broadcast_secs_tag 1
@@ -181,6 +183,8 @@ extern "C" {
#define RadioConfig_UserPreferences_fixed_position_tag 39
#define RadioConfig_UserPreferences_serial_disabled_tag 40
#define RadioConfig_UserPreferences_frequency_offset_tag 41
#define RadioConfig_UserPreferences_mqtt_server_tag 42
#define RadioConfig_UserPreferences_mqtt_disabled_tag 43
#define RadioConfig_UserPreferences_factory_reset_tag 100
#define RadioConfig_UserPreferences_debug_log_enabled_tag 101
#define RadioConfig_UserPreferences_ignore_incoming_tag 103
@@ -243,6 +247,8 @@ X(a, STATIC, SINGULAR, BOOL, is_low_power, 38) \
X(a, STATIC, SINGULAR, BOOL, fixed_position, 39) \
X(a, STATIC, SINGULAR, BOOL, serial_disabled, 40) \
X(a, STATIC, SINGULAR, FLOAT, frequency_offset, 41) \
X(a, STATIC, SINGULAR, STRING, mqtt_server, 42) \
X(a, STATIC, SINGULAR, BOOL, mqtt_disabled, 43) \
X(a, STATIC, SINGULAR, BOOL, factory_reset, 100) \
X(a, STATIC, SINGULAR, BOOL, debug_log_enabled, 101) \
X(a, STATIC, REPEATED, UINT32, ignore_incoming, 103) \
@@ -282,8 +288,8 @@ extern const pb_msgdesc_t RadioConfig_UserPreferences_msg;
#define RadioConfig_UserPreferences_fields &RadioConfig_UserPreferences_msg
/* Maximum encoded size of messages (where known) */
#define RadioConfig_size 357
#define RadioConfig_UserPreferences_size 354
#define RadioConfig_size 394
#define RadioConfig_UserPreferences_size 391
#ifdef __cplusplus
} /* extern "C" */

76
src/mqtt/MQTT.cpp Normal file
View File

@@ -0,0 +1,76 @@
#include "MQTT.h"
#include "NodeDB.h"
#include <WiFi.h>
#include <assert.h>
MQTT *mqtt;
String statusTopic = "mstat/";
String packetTopic = "mesh/";
void mqttCallback(char *topic, byte *payload, unsigned int length)
{
DEBUG_MSG("MQTT topic %s\n", topic);
// After parsing ServiceEnvelope
// FIXME - make sure to free both strings and the MeshPacket
}
void mqttInit()
{
// FIXME, for now we require the user to specifically set a MQTT server (till tested)
if (radioConfig.preferences.mqtt_disabled || !*radioConfig.preferences.mqtt_server)
DEBUG_MSG("MQTT disabled...\n");
else if (!WiFi.isConnected())
DEBUG_MSG("WiFi is not connected, can not start MQTT\n");
else
new MQTT();
}
MQTT::MQTT() : pubSub(mqttClient)
{
assert(!mqtt);
mqtt = this;
// pubSub.setServer("devsrv.ezdevice.net", 1883); or 192.168.10.188
const char *serverAddr = "test.mosquitto.org"; // "mqtt.meshtastic.org"; // default hostname
if (*radioConfig.preferences.mqtt_server)
serverAddr = radioConfig.preferences.mqtt_server; // Override the default
pubSub.setServer(serverAddr, 1883);
pubSub.setCallback(mqttCallback);
DEBUG_MSG("Connecting to MQTT server: %s\n", serverAddr);
auto myStatus = (statusTopic + nodeId);
// bool connected = pubSub.connect(nodeId.c_str(), "meshdev", "apes4cats", myStatus.c_str(), 1, true, "offline");
bool connected = pubSub.connect(nodeId.c_str(), myStatus.c_str(), 1, true, "offline");
if (connected) {
DEBUG_MSG("MQTT connected\n");
static char subsStr[64]; /* We keep this static because the mqtt lib
might not be copying it */
// snprintf(subsStr, sizeof(subsStr), "/ezd/todev/%s/#", clientId);
// mqtt.subscribe(subsStr, 1); // we use qos 1 because we don't want to miss messages
/// FIXME, include more information in the status text
bool ok = pubSub.publish(myStatus.c_str(), "online", true);
DEBUG_MSG("published %d\n", ok);
}
}
void MQTT::publish(const MeshPacket *mp, String channelId)
{
// DEBUG_MSG("publish %s = %s\n", suffix.c_str(), payload.c_str());
// pubSub.publish(getTopic(suffix), payload.c_str(), retained);
}
const char *MQTT::getTopic(String suffix, const char *direction)
{
static char buf[128];
// "mesh/crypt/CHANNELID/NODEID/PORTID"
snprintf(buf, sizeof(buf), "mesh/%s/%s/%s", direction, nodeId.c_str(), suffix.c_str());
return buf;
}

38
src/mqtt/MQTT.h Normal file
View File

@@ -0,0 +1,38 @@
#pragma once
#include "configuration.h"
#include <PubSubClient.h>
#include <WiFiClient.h>
/**
* Our wrapper/singleton for sending/receiving MQTT "udp" packets. This object isolates the MQTT protocol implementation from
* the two components that use it: MQTTPlugin and MQTTSimInterface.
*/
class MQTT
{
/// Our globally unique node ID
String nodeId = "fixmemode";
// supposedly the current version is busted:
// http://www.iotsharing.com/2017/08/how-to-use-esp32-mqtts-with-mqtts-mosquitto-broker-tls-ssl.html
// WiFiClientSecure wifiClient;
WiFiClient mqttClient;
PubSubClient pubSub;
public:
MQTT();
/**
* Publish a packet on the glboal MQTT server.
* @param channelId must be a globally unique channel ID
*/
void publish(const MeshPacket *mp, String channelId);
private:
const char *getTopic(String suffix, const char *direction = "dev");
};
void mqttInit();
extern MQTT *mqtt;