mirror of
https://github.com/meshtastic/firmware.git
synced 2025-12-14 06:42:34 +00:00
Compare commits
6 Commits
81799af73d
...
end-2-end-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
700397d3ed | ||
|
|
cd65497ac4 | ||
|
|
3c90a65a66 | ||
|
|
9e8f0814ab | ||
|
|
36cadd03ee | ||
|
|
15ee827efd |
@@ -39,4 +39,4 @@ def readProps(prefsLoc):
|
|||||||
return verObj
|
return verObj
|
||||||
|
|
||||||
|
|
||||||
# print("path is" + ','.join(sys.path))
|
# print("path is" + ','.join(sys.path))
|
||||||
58
test/end2end/flash.py
Normal file
58
test/end2end/flash.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
# trunk-ignore-all(bandit/B404)
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
import usb.core
|
||||||
|
|
||||||
|
|
||||||
|
def find_usb_device(vendor_id, product_id):
|
||||||
|
# Find USB devices
|
||||||
|
dev = usb.core.find(find_all=True)
|
||||||
|
# Loop through devices, printing vendor and product ids in decimal and hex
|
||||||
|
for cfg in dev:
|
||||||
|
if cfg.idVendor == vendor_id and cfg.idProduct == product_id:
|
||||||
|
return cfg
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# Flash esp32 target
|
||||||
|
def flash_esp32(pio_env, port):
|
||||||
|
# trunk-ignore(bandit/B603)
|
||||||
|
# trunk-ignore(bandit/B607)
|
||||||
|
subprocess.run(
|
||||||
|
["platformio", "run", "-e", pio_env, "-t", "upload", "--upload-port", port]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def flash_nrf52(pio_env, port):
|
||||||
|
# trunk-ignore(bandit/B603)
|
||||||
|
# trunk-ignore(bandit/B607)
|
||||||
|
subprocess.run(
|
||||||
|
["platformio", "run", "-e", pio_env, "-t", "upload", "--upload-port", port]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def find_usb_device(vendor_id, product_id):
|
||||||
|
# Find USB devices
|
||||||
|
dev = usb.core.find(find_all=True)
|
||||||
|
# Loop through devices, printing vendor and product ids in decimal and hex
|
||||||
|
for cfg in dev:
|
||||||
|
if cfg.idVendor == vendor_id and cfg.idProduct == product_id:
|
||||||
|
return cfg
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# Flash esp32 target
|
||||||
|
def flash_esp32(pio_env, port):
|
||||||
|
# trunk-ignore(bandit/B603)
|
||||||
|
# trunk-ignore(bandit/B607)
|
||||||
|
subprocess.run(
|
||||||
|
["platformio", "run", "-e", pio_env, "-t", "upload", "--upload-port", port]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def flash_nrf52(pio_env, port):
|
||||||
|
# trunk-ignore(bandit/B603)
|
||||||
|
# trunk-ignore(bandit/B607)
|
||||||
|
subprocess.run(
|
||||||
|
["platformio", "run", "-e", pio_env, "-t", "upload", "--upload-port", port]
|
||||||
|
)
|
||||||
42
test/end2end/readprops.py
Normal file
42
test/end2end/readprops.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
import configparser
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
|
||||||
|
def readProps(prefsLoc):
|
||||||
|
"""Read the version of our project as a string"""
|
||||||
|
|
||||||
|
config = configparser.RawConfigParser()
|
||||||
|
config.read(prefsLoc)
|
||||||
|
version = dict(config.items("VERSION"))
|
||||||
|
verObj = dict(
|
||||||
|
short="{}.{}.{}".format(version["major"], version["minor"], version["build"]),
|
||||||
|
long="unset",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Try to find current build SHA if if the workspace is clean. This could fail if git is not installed
|
||||||
|
try:
|
||||||
|
sha = (
|
||||||
|
subprocess.check_output(["git", "rev-parse", "--short", "HEAD"])
|
||||||
|
.decode("utf-8")
|
||||||
|
.strip()
|
||||||
|
)
|
||||||
|
isDirty = (
|
||||||
|
subprocess.check_output(["git", "diff", "HEAD"]).decode("utf-8").strip()
|
||||||
|
)
|
||||||
|
suffix = sha
|
||||||
|
# if isDirty:
|
||||||
|
# # short for 'dirty', we want to keep our verstrings source for protobuf reasons
|
||||||
|
# suffix = sha + "-d"
|
||||||
|
verObj["long"] = "{}.{}.{}.{}".format(
|
||||||
|
version["major"], version["minor"], version["build"], suffix
|
||||||
|
)
|
||||||
|
except:
|
||||||
|
# print("Unexpected error:", sys.exc_info()[0])
|
||||||
|
# traceback.print_exc()
|
||||||
|
verObj["long"] = verObj["short"]
|
||||||
|
|
||||||
|
# print("firmware version " + verStr)
|
||||||
|
return verObj
|
||||||
|
|
||||||
|
|
||||||
|
# print("path is" + ','.join(sys.path))
|
||||||
59
test/end2end/setup.py
Normal file
59
test/end2end/setup.py
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
import time
|
||||||
|
|
||||||
|
import flash
|
||||||
|
import meshtastic
|
||||||
|
import meshtastic.serial_interface
|
||||||
|
from readprops import readProps
|
||||||
|
|
||||||
|
version = readProps("version.properties")["long"]
|
||||||
|
|
||||||
|
|
||||||
|
def setup_users_prefs(prefsLoc):
|
||||||
|
with open(prefsLoc, "r") as file:
|
||||||
|
filedata = file.read()
|
||||||
|
filedata = filedata.replace(
|
||||||
|
"// #define CONFIG_LORA_REGION_USERPREFS",
|
||||||
|
"#define CONFIG_LORA_REGION_USERPREFS",
|
||||||
|
)
|
||||||
|
filedata = filedata.replace(
|
||||||
|
"// #define LORACONFIG_CHANNEL_NUM_USERPREFS",
|
||||||
|
"#define LORACONFIG_CHANNEL_NUM_USERPREFS",
|
||||||
|
)
|
||||||
|
filedata = filedata.replace(
|
||||||
|
"// #define CHANNEL_0_PRECISION", "#define CHANNEL_0_PRECISION"
|
||||||
|
)
|
||||||
|
with open(prefsLoc, "w") as file:
|
||||||
|
file.write(filedata)
|
||||||
|
|
||||||
|
|
||||||
|
def setup_device(port, pio_env, arch):
|
||||||
|
interface = meshtastic.serial_interface.SerialInterface(port)
|
||||||
|
try:
|
||||||
|
interface.waitForConfig()
|
||||||
|
if interface.metadata.firmware_version == version:
|
||||||
|
print("Already at local ref version", version)
|
||||||
|
else:
|
||||||
|
print(
|
||||||
|
"Device has version",
|
||||||
|
interface.metadata.firmware_version,
|
||||||
|
" updating to",
|
||||||
|
version,
|
||||||
|
)
|
||||||
|
interface.close()
|
||||||
|
time.sleep(1)
|
||||||
|
flash_device(port, pio_env, arch)
|
||||||
|
time.sleep(2)
|
||||||
|
except:
|
||||||
|
interface.close()
|
||||||
|
time.sleep(1)
|
||||||
|
flash_device(port, pio_env, arch)
|
||||||
|
time.sleep(2)
|
||||||
|
interface = meshtastic.serial_interface.SerialInterface(port)
|
||||||
|
interface.waitForConfig()
|
||||||
|
|
||||||
|
|
||||||
|
def flash_device(port, pio_env, arch):
|
||||||
|
if arch == "esp32":
|
||||||
|
flash.flash_esp32(pio_env=pio_env, port=port)
|
||||||
|
elif arch == "nrf52":
|
||||||
|
flash.flash_nrf52(pio_env=pio_env, port=port)
|
||||||
121
test/end2end/test.py
Normal file
121
test/end2end/test.py
Normal file
@@ -0,0 +1,121 @@
|
|||||||
|
import time
|
||||||
|
from typing import Dict, List, NamedTuple
|
||||||
|
|
||||||
|
import meshtastic
|
||||||
|
import meshtastic.serial_interface
|
||||||
|
import pytest
|
||||||
|
from dotmap import DotMap
|
||||||
|
from pubsub import pub # type: ignore[import-untyped]
|
||||||
|
from setup import setup_device, setup_users_prefs # type: ignore[import-untyped]
|
||||||
|
|
||||||
|
|
||||||
|
class ConnectedDevice(NamedTuple):
|
||||||
|
port: str
|
||||||
|
pio_env: str
|
||||||
|
arch: str
|
||||||
|
interface: meshtastic.serial_interface.SerialInterface
|
||||||
|
mesh_packets: List[meshtastic.mesh_pb2.FromRadio]
|
||||||
|
|
||||||
|
|
||||||
|
devices: Dict[str, ConnectedDevice] = {}
|
||||||
|
|
||||||
|
heltec_v3 = ["/dev/cu.usbserial-0001", "heltec-v3", "esp32"]
|
||||||
|
rak4631 = ["/dev/cu.usbmodem14101", "rak4631", "nrf52"]
|
||||||
|
tbeam = ["/dev/", "rak4631", "nrf52"]
|
||||||
|
|
||||||
|
|
||||||
|
setup_users_prefs("userPrefs.h")
|
||||||
|
|
||||||
|
for port_device in [heltec_v3, rak4631]:
|
||||||
|
print("Setting up device", port_device[1], "on port", port_device[0])
|
||||||
|
setup_device(port=port_device[0], pio_env=port_device[1], arch=port_device[2])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module", params=[rak4631, heltec_v3])
|
||||||
|
def device(request):
|
||||||
|
port = request.param[0]
|
||||||
|
pio_env = request.param[1]
|
||||||
|
arch = request.param[2]
|
||||||
|
|
||||||
|
if devices.get(port) is not None and devices[port].interface.isConnected:
|
||||||
|
yield devices[port]
|
||||||
|
else:
|
||||||
|
time.sleep(1)
|
||||||
|
devices[port] = ConnectedDevice(
|
||||||
|
port=port,
|
||||||
|
pio_env=pio_env,
|
||||||
|
arch=arch,
|
||||||
|
interface=meshtastic.serial_interface.SerialInterface(port),
|
||||||
|
mesh_packets=[],
|
||||||
|
)
|
||||||
|
yield devices[port]
|
||||||
|
# Tear down devices
|
||||||
|
devices[port].interface.close()
|
||||||
|
|
||||||
|
|
||||||
|
def default_on_receive(packet, interface):
|
||||||
|
print("Received packet", packet["decoded"], "interface", interface)
|
||||||
|
# find the device that sent the packet
|
||||||
|
for port in devices:
|
||||||
|
if devices[port].interface == interface:
|
||||||
|
devices[port].mesh_packets.append(packet)
|
||||||
|
|
||||||
|
|
||||||
|
# Test want_config responses from device
|
||||||
|
def test_should_get_and_set_config(device: ConnectedDevice):
|
||||||
|
assert device is not None, "Expected connected device"
|
||||||
|
assert (
|
||||||
|
len(device.interface.nodes) > 0
|
||||||
|
), "Expected at least one node in the device NodeDB"
|
||||||
|
assert (
|
||||||
|
device.interface.localNode.localConfig is not None
|
||||||
|
), "Expected LocalConfig to be set"
|
||||||
|
assert (
|
||||||
|
device.interface.localNode.moduleConfig is not None
|
||||||
|
), "Expected ModuleConfig to be set"
|
||||||
|
assert (
|
||||||
|
len(device.interface.localNode.channels) > 0
|
||||||
|
), "Expected at least one channel in the device"
|
||||||
|
pub.subscribe(default_on_receive, "meshtastic.receive")
|
||||||
|
device.interface.waitForAckNak
|
||||||
|
|
||||||
|
|
||||||
|
def test_should_send_text_message_and_receive_ack(device: ConnectedDevice):
|
||||||
|
# Send a text message
|
||||||
|
print("Sending text from device", device.pio_env)
|
||||||
|
device.interface.sendText(text="Test broadcast", wantAck=True)
|
||||||
|
# time.sleep(1)
|
||||||
|
# device.interface.waitForAckNak()
|
||||||
|
print("Received ack from device")
|
||||||
|
time.sleep(2)
|
||||||
|
# for port in devices:
|
||||||
|
# if devices[port].port != device.port:
|
||||||
|
# print("Checking device", devices[port].pio_env, "for received message")
|
||||||
|
# print(devices[port].mesh_packets)
|
||||||
|
# # Assert should have received a message
|
||||||
|
# # find text message in packets
|
||||||
|
# textPackets = list(
|
||||||
|
# filter(
|
||||||
|
# lambda packet: packet["decoded"]["portnum"]
|
||||||
|
# == meshtastic.portnums_pb2.TEXT_MESSAGE_APP
|
||||||
|
# and packet["decoded"]["payload"].decode("utf-8")
|
||||||
|
# == "Test broadcast",
|
||||||
|
# devices[port].mesh_packets,
|
||||||
|
# )
|
||||||
|
# )
|
||||||
|
# assert (
|
||||||
|
# len(textPackets) > 0
|
||||||
|
# ), "Expected a text message received on other device"
|
||||||
|
# # Assert should have received an ack
|
||||||
|
# ackPackets = list(
|
||||||
|
# filter(
|
||||||
|
# lambda packet: packet["decoded"]["portnum"]
|
||||||
|
# == meshtastic.portnums_pb2.ROUTING_APP,
|
||||||
|
# device.mesh_packets,
|
||||||
|
# )
|
||||||
|
# )
|
||||||
|
# assert len(ackPackets) > 0, "Expected an ack from the device"
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
pytest.main()
|
||||||
Reference in New Issue
Block a user