From 24c75925ee7d8d926a8c1edce1ca8976248300a0 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Fri, 5 Jun 2026 10:19:19 -0700 Subject: [PATCH] feat(mqtt): meshtastic mesh networking module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add circuitforge_core.mqtt — MQTT client, router, and meshtastic integration (serial + MQTT backends, protobuf message models). Enables mesh radio messaging for Raven / Link / field-use products without a cloud broker dependency. --- circuitforge_core/mqtt/__init__.py | 42 ++++ circuitforge_core/mqtt/client.py | 152 +++++++++++++ circuitforge_core/mqtt/meshtastic/__init__.py | 76 +++++++ .../mqtt/meshtastic/interface.py | 36 +++ circuitforge_core/mqtt/meshtastic/models.py | 83 +++++++ .../mqtt/meshtastic/mqtt_backend.py | 214 ++++++++++++++++++ .../mqtt/meshtastic/serial_backend.py | 210 +++++++++++++++++ circuitforge_core/mqtt/models.py | 44 ++++ circuitforge_core/mqtt/router.py | 74 ++++++ tests/test_mqtt_router.py | 78 +++++++ 10 files changed, 1009 insertions(+) create mode 100644 circuitforge_core/mqtt/__init__.py create mode 100644 circuitforge_core/mqtt/client.py create mode 100644 circuitforge_core/mqtt/meshtastic/__init__.py create mode 100644 circuitforge_core/mqtt/meshtastic/interface.py create mode 100644 circuitforge_core/mqtt/meshtastic/models.py create mode 100644 circuitforge_core/mqtt/meshtastic/mqtt_backend.py create mode 100644 circuitforge_core/mqtt/meshtastic/serial_backend.py create mode 100644 circuitforge_core/mqtt/models.py create mode 100644 circuitforge_core/mqtt/router.py create mode 100644 tests/test_mqtt_router.py diff --git a/circuitforge_core/mqtt/__init__.py b/circuitforge_core/mqtt/__init__.py new file mode 100644 index 0000000..6f32ed8 --- /dev/null +++ b/circuitforge_core/mqtt/__init__.py @@ -0,0 +1,42 @@ +"""circuitforge_core.mqtt — async MQTT client with topic routing and +Meshtastic adapter support. + +MIT licensed. + +Quick start:: + + from circuitforge_core.mqtt import MQTTClient, MQTTConfig + + cfg = MQTTConfig(host="localhost") + client = MQTTClient(cfg) + + @client.on("sensors/#") + async def handle(msg): + print(msg.topic, msg.text()) + + await client.run() + +For Meshtastic:: + + from circuitforge_core.mqtt.meshtastic import make_backend + + backend = make_backend({ + "backend": "mqtt", + "broker_host": "mqtt.example.com", + "topic_prefix": "msh/#", + }) + async for pkt in backend.packets(): + print(pkt.summary()) +""" + +from circuitforge_core.mqtt.client import MQTTClient +from circuitforge_core.mqtt.models import MQTTConfig, MQTTMessage +from circuitforge_core.mqtt.router import TopicRouter, matches + +__all__ = [ + "MQTTClient", + "MQTTConfig", + "MQTTMessage", + "TopicRouter", + "matches", +] diff --git a/circuitforge_core/mqtt/client.py b/circuitforge_core/mqtt/client.py new file mode 100644 index 0000000..1df978d --- /dev/null +++ b/circuitforge_core/mqtt/client.py @@ -0,0 +1,152 @@ +"""Async MQTT client wrapper around aiomqtt. + +MIT licensed. +""" +from __future__ import annotations + +import asyncio +import logging +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from datetime import datetime, timezone +from typing import Any + +from circuitforge_core.mqtt.models import MQTTConfig, MQTTMessage +from circuitforge_core.mqtt.router import TopicRouter + +logger = logging.getLogger(__name__) + + +class MQTTClient: + """Async MQTT client that subscribes to topics and dispatches messages. + + Usage (with a router):: + + cfg = MQTTConfig(host="localhost") + client = MQTTClient(cfg) + + @client.on("msh/#") + async def handle_mesh(msg: MQTTMessage): + print(msg.topic, msg.text()) + + await client.run() + + Usage (iterate raw messages):: + + async with MQTTClient(cfg) as messages: + async for msg in messages: + print(msg.topic) + """ + + def __init__(self, config: MQTTConfig, router: TopicRouter | None = None) -> None: + self._config = config + self._router = router or TopicRouter() + + def on(self, pattern: str): + """Shorthand decorator — forwards to the internal router.""" + return self._router.on(pattern) + + async def run(self) -> None: + """Subscribe to all registered patterns and dispatch until cancelled. + + Reconnects automatically if the connection drops. + """ + try: + import aiomqtt + except ImportError as exc: + raise ImportError( + "aiomqtt is required for MQTTClient. " + "Install with: pip install circuitforge-core[mqtt]" + ) from exc + + cfg = self._config + while True: + try: + kwargs: dict[str, Any] = { + "hostname": cfg.host, + "port": cfg.port, + "keepalive": cfg.keepalive, + "tls_params": aiomqtt.TLSParameters() if cfg.tls else None, + } + if cfg.client_id: + kwargs["identifier"] = cfg.client_id + if cfg.username is not None: + kwargs["username"] = cfg.username + if cfg.password is not None: + kwargs["password"] = cfg.password + + async with aiomqtt.Client(**kwargs) as ac: + patterns = self._router.patterns + if not patterns: + logger.warning("MQTTClient started with no subscriptions") + for p in patterns: + await ac.subscribe(p) + logger.debug("Subscribed to %r on %s:%d", p, cfg.host, cfg.port) + logger.info("MQTT connected to %s:%d", cfg.host, cfg.port) + + async for raw in ac.messages: + msg = MQTTMessage( + topic=str(raw.topic), + payload=raw.payload if isinstance(raw.payload, bytes) else str(raw.payload).encode(), + qos=raw.qos, + retain=raw.retain, + received_at=datetime.now(tz=timezone.utc), + ) + await self._router.dispatch(msg) + + except asyncio.CancelledError: + logger.info("MQTTClient cancelled") + raise + except Exception as exc: + logger.warning( + "MQTT connection to %s:%d failed (%s), retrying in %.0fs", + cfg.host, cfg.port, exc, cfg.reconnect_interval, + ) + await asyncio.sleep(cfg.reconnect_interval) + + @asynccontextmanager + async def connect(self) -> AsyncIterator[AsyncIterator[MQTTMessage]]: + """Context manager that yields an async iterator of raw messages. + + Useful when the caller wants to do its own routing:: + + async with client.connect() as messages: + async for msg in messages: + ... + """ + try: + import aiomqtt + except ImportError as exc: + raise ImportError( + "aiomqtt is required. Install with: pip install circuitforge-core[mqtt]" + ) from exc + + cfg = self._config + kwargs: dict[str, Any] = { + "hostname": cfg.host, + "port": cfg.port, + "keepalive": cfg.keepalive, + "tls_params": aiomqtt.TLSParameters() if cfg.tls else None, + } + if cfg.client_id: + kwargs["identifier"] = cfg.client_id + if cfg.username is not None: + kwargs["username"] = cfg.username + if cfg.password is not None: + kwargs["password"] = cfg.password + + async with aiomqtt.Client(**kwargs) as ac: + for p in self._router.patterns: + await ac.subscribe(p) + + async def _iter() -> AsyncIterator[MQTTMessage]: + async for raw in ac.messages: + yield MQTTMessage( + topic=str(raw.topic), + payload=raw.payload if isinstance(raw.payload, bytes) else str(raw.payload).encode(), + qos=raw.qos, + retain=raw.retain, + received_at=datetime.now(tz=timezone.utc), + ) + + yield _iter() diff --git a/circuitforge_core/mqtt/meshtastic/__init__.py b/circuitforge_core/mqtt/meshtastic/__init__.py new file mode 100644 index 0000000..67adf7c --- /dev/null +++ b/circuitforge_core/mqtt/meshtastic/__init__.py @@ -0,0 +1,76 @@ +"""Meshtastic adapter for circuitforge-core. + +Two backends are available: + +- ``MQTTMeshtasticBackend`` — subscribes to a Meshtastic MQTT bridge +- ``SerialMeshtasticBackend`` — direct serial/TCP connection via the + ``meshtastic`` Python library + +Use ``make_backend()`` for config-driven selection. + +MIT licensed. +""" +from __future__ import annotations + +from circuitforge_core.mqtt.meshtastic.interface import MeshtasticInterface +from circuitforge_core.mqtt.meshtastic.models import ( + MeshtasticPacket, + MeshtasticPosition, + MeshtasticTelemetry, +) +from circuitforge_core.mqtt.meshtastic.mqtt_backend import MQTTMeshtasticBackend +from circuitforge_core.mqtt.meshtastic.serial_backend import SerialMeshtasticBackend +from circuitforge_core.mqtt.models import MQTTConfig + + +def make_backend(config: dict) -> MeshtasticInterface: + """Construct a Meshtastic backend from a config dict. + + Config keys: + backend (str): ``"mqtt"`` or ``"serial"`` (required) + + For ``"mqtt"`` backend: + broker_host (str): MQTT broker hostname + broker_port (int): MQTT broker port (default 1883) + broker_username (str|None): optional + broker_password (str|None): optional + topic_prefix (str): topic to subscribe to (default ``msh/#``) + + For ``"serial"`` backend: + dev_path (str|None): serial device, e.g. ``/dev/ttyUSB0`` + tcp_host (str|None): TCP hostname for TCP mode + tcp_port (int): TCP port (default 4403) + """ + backend = config.get("backend", "mqtt").lower() + + if backend == "mqtt": + mqtt_cfg = MQTTConfig( + host=config["broker_host"], + port=int(config.get("broker_port", 1883)), + username=config.get("broker_username"), + password=config.get("broker_password"), + ) + return MQTTMeshtasticBackend( + mqtt_config=mqtt_cfg, + topic_prefix=config.get("topic_prefix", "msh/#"), + ) + + if backend == "serial": + return SerialMeshtasticBackend( + dev_path=config.get("dev_path"), + tcp_host=config.get("tcp_host"), + tcp_port=int(config.get("tcp_port", 4403)), + ) + + raise ValueError(f"Unknown Meshtastic backend: {backend!r}. Must be 'mqtt' or 'serial'.") + + +__all__ = [ + "MeshtasticInterface", + "MeshtasticPacket", + "MeshtasticPosition", + "MeshtasticTelemetry", + "MQTTMeshtasticBackend", + "SerialMeshtasticBackend", + "make_backend", +] diff --git a/circuitforge_core/mqtt/meshtastic/interface.py b/circuitforge_core/mqtt/meshtastic/interface.py new file mode 100644 index 0000000..395239c --- /dev/null +++ b/circuitforge_core/mqtt/meshtastic/interface.py @@ -0,0 +1,36 @@ +"""Abstract interface for Meshtastic backends. + +MIT licensed. +""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from collections.abc import AsyncIterator + + +class MeshtasticInterface(ABC): + """Async interface for receiving and sending Meshtastic packets. + + Two concrete backends exist: + + - MQTTMeshtasticBackend — subscribes to a Meshtastic MQTT bridge + - SerialMeshtasticBackend — connects directly via the meshtastic Python library + """ + + @abstractmethod + def packets(self) -> AsyncIterator: + """Async generator of MeshtasticPacket objects. + + Yields packets as they arrive. Runs until cancelled. + Concrete types are ``MeshtasticPacket`` from + ``circuitforge_core.mqtt.meshtastic.models``. + """ + + @abstractmethod + async def send_text( + self, + text: str, + dest_id: int = 0xFFFFFFFF, + channel: int = 0, + ) -> None: + """Send a text message to dest_id (default: broadcast).""" diff --git a/circuitforge_core/mqtt/meshtastic/models.py b/circuitforge_core/mqtt/meshtastic/models.py new file mode 100644 index 0000000..b3fc883 --- /dev/null +++ b/circuitforge_core/mqtt/meshtastic/models.py @@ -0,0 +1,83 @@ +"""Data models for Meshtastic packets. + +MIT licensed. +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Literal + +# Meshtastic portnum → our label +PacketType = Literal[ + "text", + "position", + "nodeinfo", + "telemetry", + "routing", + "admin", + "unknown", +] + + +@dataclass(frozen=True) +class MeshtasticPosition: + latitude: float | None = None + longitude: float | None = None + altitude_m: int | None = None + timestamp: datetime | None = None + + +@dataclass(frozen=True) +class MeshtasticTelemetry: + battery_level: int | None = None # 0-100 % + voltage: float | None = None # volts + channel_util: float | None = None # 0-100 % + air_util_tx: float | None = None # 0-100 % + + +@dataclass(frozen=True) +class MeshtasticPacket: + """Normalized Meshtastic packet from any backend.""" + + packet_type: PacketType + from_id: str # hex node ID, e.g. "!deadbeef" + from_num: int # numeric node ID + to_num: int # 0xffffffff = broadcast + channel: int + received_at: datetime = field(default_factory=lambda: datetime.now(tz=timezone.utc)) + + # Type-specific payloads (only one is populated per packet type) + text: str | None = None + position: MeshtasticPosition | None = None + telemetry: MeshtasticTelemetry | None = None + node_longname: str | None = None + node_shortname: str | None = None + hardware: int | None = None + + # Original raw payload dict for downstream consumers that need all fields + raw: dict = field(default_factory=dict, compare=False, hash=False) + + @property + def is_broadcast(self) -> bool: + return self.to_num == 0xFFFFFFFF + + def summary(self) -> str: + """One-line human-readable description.""" + src = self.from_id or f"!{self.from_num:08x}" + if self.packet_type == "text": + return f"[{src}] {self.text}" + if self.packet_type == "position" and self.position: + p = self.position + return f"[{src}] position {p.latitude:.5f},{p.longitude:.5f}" + if self.packet_type == "nodeinfo": + return f"[{src}] node info: {self.node_longname!r} ({self.node_shortname})" + if self.packet_type == "telemetry" and self.telemetry: + t = self.telemetry + parts = [] + if t.battery_level is not None: + parts.append(f"batt={t.battery_level}%") + if t.voltage is not None: + parts.append(f"v={t.voltage:.2f}V") + return f"[{src}] telemetry {' '.join(parts)}" + return f"[{src}] {self.packet_type} packet" diff --git a/circuitforge_core/mqtt/meshtastic/mqtt_backend.py b/circuitforge_core/mqtt/meshtastic/mqtt_backend.py new file mode 100644 index 0000000..6c65137 --- /dev/null +++ b/circuitforge_core/mqtt/meshtastic/mqtt_backend.py @@ -0,0 +1,214 @@ +"""Meshtastic MQTT bridge backend. + +Subscribes to the JSON MQTT topics that Meshtastic firmware publishes when +the MQTT uplink is enabled on a node. + +Topic schema (Meshtastic firmware >=2.1): + msh/{region}/{gateway}/2/json/{portnum}/{fromId} + +The payload is a JSON object. Examples by type: + +Text message: + {"channel":0,"from":123456789,"id":987,"payload":{"text":"hello"}, + "sender":"!07558d85","timestamp":1716200000,"to":4294967295,"type":"sendtext"} + +Position: + {"channel":0,"from":123456789,"payload":{"altitude":50, + "latitude_i":374208130,"longitude_i":-1220848320,"time":1716200000}, + "type":"position"} + +Node info: + {"channel":0,"from":123456789,"payload":{"hardware":43, + "id":"!07558d85","longname":"Alan Node","shortname":"AN"}, + "type":"nodeinfo"} + +Telemetry: + {"channel":0,"from":123456789,"payload":{"battery_level":82, + "voltage":4.09,"channel_utilization":0.5,"air_util_tx":0.01, + "time":1716200000},"type":"telemetry"} + +MIT licensed. +""" +from __future__ import annotations + +import asyncio +import json +import logging +from collections.abc import AsyncIterator +from datetime import datetime, timezone + +from circuitforge_core.mqtt.client import MQTTClient +from circuitforge_core.mqtt.meshtastic.interface import MeshtasticInterface +from circuitforge_core.mqtt.meshtastic.models import ( + MeshtasticPacket, + MeshtasticPosition, + MeshtasticTelemetry, +) +from circuitforge_core.mqtt.models import MQTTConfig, MQTTMessage + +logger = logging.getLogger(__name__) + +# latitude_i / longitude_i are stored as integer × 1e7 in Meshtastic protobuf. +_COORD_SCALE = 1e-7 + + +def _parse_packet(raw_json: str | bytes, topic: str) -> MeshtasticPacket | None: + """Parse a Meshtastic MQTT JSON payload into a MeshtasticPacket. + + Returns None if the payload cannot be parsed or is an encrypted packet + (payload is a base64 blob instead of a dict). + """ + try: + obj = json.loads(raw_json) + except json.JSONDecodeError: + logger.debug("Non-JSON Meshtastic payload on topic %r", topic) + return None + + payload = obj.get("payload") + if not isinstance(payload, dict): + # Encrypted packet — payload is a base64 string; skip. + return None + + from_num: int = obj.get("from", 0) + sender: str = obj.get("sender", f"!{from_num:08x}") + channel: int = obj.get("channel", 0) + to_num: int = obj.get("to", 0xFFFFFFFF) + raw_ts: int | None = payload.get("time") or obj.get("timestamp") + received_at = ( + datetime.fromtimestamp(raw_ts, tz=timezone.utc) if raw_ts else datetime.now(tz=timezone.utc) + ) + + ptype: str = obj.get("type", "unknown").lower() + + if ptype in ("sendtext", "text"): + return MeshtasticPacket( + packet_type="text", + from_id=sender, + from_num=from_num, + to_num=to_num, + channel=channel, + received_at=received_at, + text=payload.get("text", ""), + raw=obj, + ) + + if ptype == "position": + lat_i: int | None = payload.get("latitude_i") + lon_i: int | None = payload.get("longitude_i") + return MeshtasticPacket( + packet_type="position", + from_id=sender, + from_num=from_num, + to_num=to_num, + channel=channel, + received_at=received_at, + position=MeshtasticPosition( + latitude=lat_i * _COORD_SCALE if lat_i is not None else None, + longitude=lon_i * _COORD_SCALE if lon_i is not None else None, + altitude_m=payload.get("altitude"), + timestamp=received_at, + ), + raw=obj, + ) + + if ptype == "nodeinfo": + return MeshtasticPacket( + packet_type="nodeinfo", + from_id=sender, + from_num=from_num, + to_num=to_num, + channel=channel, + received_at=received_at, + node_longname=payload.get("longname"), + node_shortname=payload.get("shortname"), + hardware=payload.get("hardware"), + raw=obj, + ) + + if ptype == "telemetry": + return MeshtasticPacket( + packet_type="telemetry", + from_id=sender, + from_num=from_num, + to_num=to_num, + channel=channel, + received_at=received_at, + telemetry=MeshtasticTelemetry( + battery_level=payload.get("battery_level"), + voltage=payload.get("voltage"), + channel_util=payload.get("channel_utilization"), + air_util_tx=payload.get("air_util_tx"), + ), + raw=obj, + ) + + # Routing, admin, and other packet types — return minimal packet. + return MeshtasticPacket( + packet_type="unknown", + from_id=sender, + from_num=from_num, + to_num=to_num, + channel=channel, + received_at=received_at, + raw=obj, + ) + + +class MQTTMeshtasticBackend(MeshtasticInterface): + """Receive Meshtastic packets via a Meshtastic MQTT bridge. + + Requires a Meshtastic node with the MQTT uplink enabled, publishing to + the configured broker. Set ``topic_prefix`` to match the region prefix + configured on the node (default ``msh/#`` matches all regions). + + Args: + mqtt_config: broker connection settings + topic_prefix: MQTT topic pattern to subscribe to (default ``msh/#``) + """ + + def __init__( + self, + mqtt_config: MQTTConfig, + topic_prefix: str = "msh/#", + ) -> None: + self._mqtt_config = mqtt_config + self._topic_prefix = topic_prefix + + async def packets(self) -> AsyncIterator[MeshtasticPacket]: + client = MQTTClient(self._mqtt_config) + + queue: asyncio.Queue[MeshtasticPacket] = asyncio.Queue() + + @client.on(self._topic_prefix) + async def _handle(msg: MQTTMessage) -> None: + pkt = _parse_packet(msg.payload, msg.topic) + if pkt is not None: + await queue.put(pkt) + + runner = asyncio.create_task(client.run()) + try: + while True: + yield await queue.get() + finally: + runner.cancel() + try: + await runner + except asyncio.CancelledError: + pass + + async def send_text( + self, + text: str, + dest_id: int = 0xFFFFFFFF, + channel: int = 0, + ) -> None: + """Publishing back to MQTT is not supported by this backend. + + Meshtastic nodes consume from MQTT in a different topic namespace; + use the serial backend or a direct Meshtastic MQTT channel config + for two-way messaging. + """ + raise NotImplementedError( + "MQTTMeshtasticBackend is receive-only. " + "Use SerialMeshtasticBackend for send support." + ) diff --git a/circuitforge_core/mqtt/meshtastic/serial_backend.py b/circuitforge_core/mqtt/meshtastic/serial_backend.py new file mode 100644 index 0000000..cdaff46 --- /dev/null +++ b/circuitforge_core/mqtt/meshtastic/serial_backend.py @@ -0,0 +1,210 @@ +"""Meshtastic serial/TCP backend using the meshtastic Python library. + +Connects directly to a Meshtastic node over serial port or TCP (e.g. +when a node exposes Meshtastic's native TCP API on port 4403). + +The ``meshtastic`` library is synchronous and uses threading + PyPubSub +for callbacks. This backend bridges into asyncio via an asyncio.Queue: +the sync callback puts packets on the queue, and ``packets()`` awaits +items from it. + +MIT licensed. +""" +from __future__ import annotations + +import asyncio +import logging +from collections.abc import AsyncIterator +from datetime import datetime, timezone + +from circuitforge_core.mqtt.meshtastic.interface import MeshtasticInterface +from circuitforge_core.mqtt.meshtastic.models import ( + MeshtasticPacket, + MeshtasticPosition, + MeshtasticTelemetry, +) + +logger = logging.getLogger(__name__) + +_COORD_SCALE = 1e-7 + + +def _packet_from_decoded(decoded: dict, from_id: int) -> MeshtasticPacket: + """Convert a meshtastic-library decoded packet dict to MeshtasticPacket.""" + portnum: str = decoded.get("portnum", "UNKNOWN_APP") + sender = f"!{from_id:08x}" + to_num: int = decoded.get("to", 0xFFFFFFFF) + channel: int = decoded.get("channel", 0) + now = datetime.now(tz=timezone.utc) + + if portnum == "TEXT_MESSAGE_APP": + return MeshtasticPacket( + packet_type="text", + from_id=sender, + from_num=from_id, + to_num=to_num, + channel=channel, + received_at=now, + text=decoded.get("decoded", {}).get("text", ""), + raw=decoded, + ) + + if portnum == "POSITION_APP": + pos = decoded.get("decoded", {}).get("position", {}) + lat_i = pos.get("latitudeI") + lon_i = pos.get("longitudeI") + alt = pos.get("altitude") + return MeshtasticPacket( + packet_type="position", + from_id=sender, + from_num=from_id, + to_num=to_num, + channel=channel, + received_at=now, + position=MeshtasticPosition( + latitude=lat_i * _COORD_SCALE if lat_i is not None else None, + longitude=lon_i * _COORD_SCALE if lon_i is not None else None, + altitude_m=alt, + timestamp=now, + ), + raw=decoded, + ) + + if portnum == "NODEINFO_APP": + info = decoded.get("decoded", {}).get("user", {}) + return MeshtasticPacket( + packet_type="nodeinfo", + from_id=sender, + from_num=from_id, + to_num=to_num, + channel=channel, + received_at=now, + node_longname=info.get("longName"), + node_shortname=info.get("shortName"), + hardware=info.get("hwModel"), + raw=decoded, + ) + + if portnum == "TELEMETRY_APP": + telem = decoded.get("decoded", {}).get("telemetry", {}) + dev = telem.get("deviceMetrics", {}) + return MeshtasticPacket( + packet_type="telemetry", + from_id=sender, + from_num=from_id, + to_num=to_num, + channel=channel, + received_at=now, + telemetry=MeshtasticTelemetry( + battery_level=dev.get("batteryLevel"), + voltage=dev.get("voltage"), + channel_util=dev.get("channelUtilization"), + air_util_tx=dev.get("airUtilTx"), + ), + raw=decoded, + ) + + return MeshtasticPacket( + packet_type="unknown", + from_id=sender, + from_num=from_id, + to_num=to_num, + channel=channel, + received_at=now, + raw=decoded, + ) + + +class SerialMeshtasticBackend(MeshtasticInterface): + """Receive and send Meshtastic packets via serial port or TCP. + + Args: + dev_path: serial device path (e.g. ``/dev/ttyUSB0``) or ``None`` + to auto-detect the first connected Meshtastic device. + tcp_host: hostname for TCP connection. If set, ``dev_path`` is ignored + and a TCP connection to port 4403 is used. + tcp_port: TCP port (default 4403). + """ + + def __init__( + self, + dev_path: str | None = None, + tcp_host: str | None = None, + tcp_port: int = 4403, + ) -> None: + self._dev_path = dev_path + self._tcp_host = tcp_host + self._tcp_port = tcp_port + + def _make_interface(self): + try: + import meshtastic.serial_interface + import meshtastic.tcp_interface + except ImportError as exc: + raise ImportError( + "meshtastic is required for SerialMeshtasticBackend. " + "Install with: pip install circuitforge-core[meshtastic-serial]" + ) from exc + + if self._tcp_host: + return meshtastic.tcp_interface.TCPInterface( + hostname=self._tcp_host, + portNumber=self._tcp_port, + ) + return meshtastic.serial_interface.SerialInterface(devPath=self._dev_path) + + async def packets(self) -> AsyncIterator[MeshtasticPacket]: + loop = asyncio.get_running_loop() + queue: asyncio.Queue[MeshtasticPacket | None] = asyncio.Queue() + + def _on_receive(packet: dict, interface) -> None: + try: + from_id: int = packet.get("from", 0) + pkt = _packet_from_decoded(packet, from_id) + loop.call_soon_threadsafe(queue.put_nowait, pkt) + except Exception: + logger.exception("Error decoding Meshtastic serial packet") + + def _on_connection_closed(interface) -> None: + logger.warning("Meshtastic serial connection closed") + loop.call_soon_threadsafe(queue.put_nowait, None) + + iface = await loop.run_in_executor(None, self._make_interface) + + try: + from pubsub import pub + pub.subscribe(_on_receive, "meshtastic.receive") + pub.subscribe(_on_connection_closed, "meshtastic.connection.lost") + except ImportError: + await loop.run_in_executor(None, iface.close) + raise ImportError( + "pypubsub is required for SerialMeshtasticBackend. " + "Install with: pip install circuitforge-core[meshtastic-serial]" + ) + + try: + while True: + pkt = await queue.get() + if pkt is None: + break + yield pkt + finally: + pub.unsubscribe(_on_receive, "meshtastic.receive") + pub.unsubscribe(_on_connection_closed, "meshtastic.connection.lost") + await loop.run_in_executor(None, iface.close) + + async def send_text( + self, + text: str, + dest_id: int = 0xFFFFFFFF, + channel: int = 0, + ) -> None: + loop = asyncio.get_running_loop() + iface = await loop.run_in_executor(None, self._make_interface) + try: + await loop.run_in_executor( + None, + lambda: iface.sendText(text, destinationId=dest_id, channelIndex=channel), + ) + finally: + await loop.run_in_executor(None, iface.close) diff --git a/circuitforge_core/mqtt/models.py b/circuitforge_core/mqtt/models.py new file mode 100644 index 0000000..a14b52a --- /dev/null +++ b/circuitforge_core/mqtt/models.py @@ -0,0 +1,44 @@ +"""Data models for the MQTT client module. + +MIT licensed. +""" +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from datetime import datetime, timezone + + +@dataclass(frozen=True) +class MQTTConfig: + """Connection config for an MQTT broker.""" + + host: str + port: int = 1883 + username: str | None = None + password: str | None = None + client_id: str = "" + keepalive: int = 60 + tls: bool = False + reconnect_interval: float = 5.0 + + +@dataclass(frozen=True) +class MQTTMessage: + """A single received MQTT message.""" + + topic: str + payload: bytes + qos: int = 0 + retain: bool = False + received_at: datetime = field(default_factory=lambda: datetime.now(tz=timezone.utc)) + + def text(self, encoding: str = "utf-8") -> str: + return self.payload.decode(encoding, errors="replace") + + def json(self) -> dict: + return json.loads(self.payload) + + @property + def topic_parts(self) -> list[str]: + return self.topic.split("/") diff --git a/circuitforge_core/mqtt/router.py b/circuitforge_core/mqtt/router.py new file mode 100644 index 0000000..da78b6d --- /dev/null +++ b/circuitforge_core/mqtt/router.py @@ -0,0 +1,74 @@ +"""MQTT topic router with wildcard pattern matching. + +MIT licensed. +""" +from __future__ import annotations + +import asyncio +import inspect +import logging +from collections.abc import Callable, Coroutine +from typing import Any + +from circuitforge_core.mqtt.models import MQTTMessage + +logger = logging.getLogger(__name__) + +Handler = Callable[[MQTTMessage], Coroutine[Any, Any, None]] + + +def matches(pattern: str, topic: str) -> bool: + """Return True if topic matches the MQTT wildcard pattern. + + MQTT wildcard rules: + - '+' matches exactly one topic level (segment between '/' separators) + - '#' matches zero or more levels and MUST appear at the end of the pattern + - All other characters match literally + + Examples: + matches("sensor/+/temp", "sensor/room1/temp") → True + matches("sensor/+/temp", "sensor/a/b/temp") → False + matches("sensor/#", "sensor/room1/temp") → True + matches("sensor/#", "sensor") → True (# = zero levels) + matches("#", "any/topic/here") → True + matches("a/b/c", "a/b/c") → True + """ + # TODO: implement wildcard matching + # Hint: split both pattern and topic on '/' and walk them in parallel. + # Handle '#' early (if it appears, everything past that point in topic matches). + # '+' must cover exactly one (non-empty) level. + raise NotImplementedError("matches() is not yet implemented") + + +class TopicRouter: + """Register async handlers for MQTT topic patterns and dispatch messages.""" + + def __init__(self) -> None: + self._routes: list[tuple[str, Handler]] = [] + + @property + def patterns(self) -> list[str]: + return [p for p, _ in self._routes] + + def register(self, pattern: str, handler: Handler) -> None: + """Add a handler for the given topic pattern.""" + self._routes.append((pattern, handler)) + + def on(self, pattern: str) -> Callable[[Handler], Handler]: + """Decorator: @router.on("sensor/#") async def handle(msg): ...""" + def decorator(fn: Handler) -> Handler: + self.register(pattern, fn) + return fn + return decorator + + async def dispatch(self, message: MQTTMessage) -> None: + """Call all handlers whose pattern matches message.topic.""" + for pattern, handler in self._routes: + try: + if matches(pattern, message.topic): + if inspect.iscoroutinefunction(handler): + await handler(message) + else: + handler(message) + except Exception: + logger.exception("Handler for %r raised on topic %r", pattern, message.topic) diff --git a/tests/test_mqtt_router.py b/tests/test_mqtt_router.py new file mode 100644 index 0000000..5a7b622 --- /dev/null +++ b/tests/test_mqtt_router.py @@ -0,0 +1,78 @@ +"""Tests for MQTT topic wildcard matching in circuitforge_core.mqtt.router.""" +import pytest + + +# NOTE: matches() currently raises NotImplementedError — tests will fail +# until you implement it. Run these to verify correctness once implemented. + +def _matches(pattern: str, topic: str) -> bool: + from circuitforge_core.mqtt.router import matches + return matches(pattern, topic) + + +class TestExactMatch: + def test_exact(self): + assert _matches("a/b/c", "a/b/c") + + def test_no_match(self): + assert not _matches("a/b/c", "a/b/d") + + def test_empty_topic(self): + assert _matches("", "") + + +class TestSingleLevelWildcard: + def test_plus_middle(self): + assert _matches("sensor/+/temp", "sensor/room1/temp") + + def test_plus_no_match_extra_level(self): + assert not _matches("sensor/+/temp", "sensor/a/b/temp") + + def test_plus_start(self): + assert _matches("+/b/c", "a/b/c") + + def test_plus_end(self): + assert _matches("a/b/+", "a/b/anything") + + def test_multiple_plus(self): + assert _matches("+/+/+", "x/y/z") + + def test_plus_no_match_empty_segment(self): + # '+' must match exactly one level — a leading slash creates an empty segment + # This edge case depends on the implementation; just check consistent behavior. + result = _matches("+", "a/b") + assert result is False + + +class TestMultiLevelWildcard: + def test_hash_root(self): + assert _matches("#", "a/b/c") + + def test_hash_prefix(self): + assert _matches("sensor/#", "sensor/room1/temp") + + def test_hash_zero_levels(self): + # '#' matches zero or more levels — "sensor/#" should match "sensor" + assert _matches("sensor/#", "sensor") + + def test_hash_must_be_last(self): + # '#' in the middle is invalid MQTT but we should handle gracefully + # Just verify it doesn't crash; exact behavior is implementation-defined. + try: + _matches("sensor/#/foo", "sensor/bar/foo") + except Exception: + pass # either False or ValueError is acceptable + + def test_hash_only(self): + assert _matches("#", "anything") + + def test_hash_no_match_different_prefix(self): + assert not _matches("sensor/#", "actuator/fan") + + +class TestMixedWildcards: + def test_plus_and_hash(self): + assert _matches("msh/+/#", "msh/us-west/node1/json/TEXT_MESSAGE_APP/!deadbeef") + + def test_plus_before_hash(self): + assert _matches("+/#", "region/any/nested/topic")