turnstone/app/glean/mqtt_subscriber.py
pyr0ball 12cd0a23d5 refactor: rename ingest → glean throughout codebase
Renames the app/ingest/ package to app/glean/ and updates all
references across Python modules, shell scripts, Vue components,
tests, and documentation.

Intentionally preserved:
- SQLite column name ingest_time (avoids schema migration)
- RetrievedEntry.ingest_time field (maps to the column above)
- Any public-facing JSON keys that reference ingest_time

Changes by category:
- app/ingest/ → app/glean/ (full package move, all parsers)
- app/tasks/ingest_scheduler.py → app/tasks/glean_scheduler.py
- scripts/ingest_corpus.py → scripts/glean_corpus.py
- tests/test_ingest_*.py → tests/test_glean_*.py
- Docstrings, log messages, comments: ingest → glean
- Env var: TURNSTONE_INGEST_INTERVAL → TURNSTONE_GLEAN_INTERVAL
- Shell scripts: glean.log, glean_corpus.py references
- README.md: multi-source ingest → multi-source glean
- .env.example: updated env var name
- patterns/: new diagnostic patterns from 2026-05-20 SSH incident
  (service_crash_loop, pkg_daemon_restart, ssh_forward_conflict)
- SourcesView.vue: pipeline label updated
- All test import paths updated to app.glean.*

285 tests passing.
2026-05-20 23:02:55 -07:00

166 lines
5.3 KiB
Python

"""Live MQTT glean subscriber for Turnstone.
Reads ``type: mqtt`` entries from sources.yaml and subscribes to each broker
in the background. Incoming messages are normalized to RetrievedEntry and
written to the Turnstone SQLite database as they arrive.
This runs as an asyncio task alongside the batch glean scheduler. It is
started from the FastAPI lifespan in rest.py.
MQTT source config format in sources.yaml::
sources:
- id: meshtastic-home
type: mqtt
broker_host: 10.1.10.5
broker_port: 1883 # optional, default 1883
broker_username: ~ # optional
broker_password: ~ # optional
topics:
- msh/# # one or more topic patterns
severity: INFO # optional default severity for all messages
- id: iot-sensors
type: mqtt
broker_host: localhost
topics:
- home/+/temperature
- home/+/humidity
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
import yaml
from app.services.models import RetrievedEntry
logger = logging.getLogger(__name__)
def _load_mqtt_sources(sources_file: Path) -> list[dict]:
"""Return only the ``type: mqtt`` entries from sources.yaml."""
if not sources_file.exists():
return []
with sources_file.open() as f:
data = yaml.safe_load(f) or {}
return [s for s in data.get("sources", []) if s.get("type") == "mqtt"]
def _make_entry_id(source_id: str, seq: int, text: str) -> str:
h = hashlib.sha1(f"{source_id}:{seq}:{text}".encode()).hexdigest()[:16]
return f"{source_id}:{seq}:{h}"
def _write_entry(db_path: Path, entry: RetrievedEntry) -> None:
with sqlite3.connect(db_path) as conn:
conn.execute(
"""
INSERT OR IGNORE INTO log_entries
(id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
entry.entry_id,
entry.source_id,
entry.sequence,
entry.timestamp_raw,
entry.timestamp_iso,
entry.ingest_time,
entry.severity,
entry.repeat_count,
1 if entry.out_of_order else 0,
json.dumps(entry.matched_patterns),
entry.text,
),
)
async def _run_source_subscriber(source: dict, db_path: Path) -> None:
"""Maintain a subscription to one MQTT source, reconnecting on error."""
try:
from circuitforge_core.mqtt import MQTTClient, MQTTConfig
except ImportError:
logger.error(
"circuitforge-core[mqtt] is not installed — MQTT source %r skipped. "
"Run: pip install circuitforge-core[mqtt]",
source.get("id"),
)
return
source_id: str = source["id"]
host: str = source["broker_host"]
port: int = int(source.get("broker_port", 1883))
username: str | None = source.get("broker_username") or source.get("username")
password: str | None = source.get("broker_password") or source.get("password")
topics: list[str] = source.get("topics", ["#"])
default_severity: str = source.get("severity", "INFO")
cfg = MQTTConfig(
host=host,
port=port,
username=username,
password=password,
client_id=f"turnstone-{source_id}",
)
client = MQTTClient(cfg)
seq = 0
for topic in topics:
@client.on(topic)
async def _handle(msg, _src=source_id, _sev=default_severity):
nonlocal seq
seq += 1
now = datetime.now(tz=timezone.utc).isoformat()
text = msg.text()
entry = RetrievedEntry(
entry_id=_make_entry_id(_src, seq, text),
source_id=_src,
sequence=seq,
timestamp_raw=now,
timestamp_iso=now,
ingest_time=now,
severity=_sev,
repeat_count=1,
out_of_order=False,
matched_patterns=[],
text=f"[{msg.topic}] {text}",
)
_write_entry(db_path, entry)
logger.debug("MQTT[%s] %s: %s", _src, msg.topic, text[:120])
logger.info("MQTT subscriber starting: %s%s:%d topics=%s", source_id, host, port, topics)
await client.run()
async def run_mqtt_subscribers(sources_file: Path, db_path: Path) -> None:
"""Start one subscriber task per MQTT source. Runs until cancelled."""
sources = _load_mqtt_sources(sources_file)
if not sources:
logger.debug("No MQTT sources configured in %s", sources_file)
return
logger.info("Starting %d MQTT subscriber(s)", len(sources))
tasks = [
asyncio.create_task(
_run_source_subscriber(src, db_path),
name=f"mqtt-{src.get('id', i)}",
)
for i, src in enumerate(sources)
]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
raise