turnstone/app/glean/mqtt_subscriber.py
pyr0ball 7f49961ec4 fix(db): add timeout=30s to all sqlite3.connect() calls across app
Watcher, REST endpoints, services (search, incidents, blocklist),
MCP server, context retriever, embedder, glean_scheduler, and
doc_upload all used the default 5-second SQLite busy timeout.
During collect glean write phases, watcher flush threads were hitting
'database is locked' errors when the glean held the write lock longer
than 5 seconds.

All connections now use timeout=30.0, matching the pipeline fix
from commit 5a9281a. No logic changes.
2026-05-26 23:12:48 -07:00

166 lines
5.3 KiB
Python

"""Live MQTT glean subscriber for Turnstone.
Reads ``type: mqtt`` entries from sources.yaml and subscribes to each broker
in the background. Incoming messages are normalized to RetrievedEntry and
written to the Turnstone SQLite database as they arrive.
This runs as an asyncio task alongside the batch glean scheduler. It is
started from the FastAPI lifespan in rest.py.
MQTT source config format in sources.yaml::
sources:
- id: meshtastic-home
type: mqtt
broker_host: 10.1.10.5
broker_port: 1883 # optional, default 1883
broker_username: ~ # optional
broker_password: ~ # optional
topics:
- msh/# # one or more topic patterns
severity: INFO # optional default severity for all messages
- id: iot-sensors
type: mqtt
broker_host: localhost
topics:
- home/+/temperature
- home/+/humidity
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
import yaml
from app.services.models import RetrievedEntry
logger = logging.getLogger(__name__)
def _load_mqtt_sources(sources_file: Path) -> list[dict]:
"""Return only the ``type: mqtt`` entries from sources.yaml."""
if not sources_file.exists():
return []
with sources_file.open() as f:
data = yaml.safe_load(f) or {}
return [s for s in data.get("sources", []) if s.get("type") == "mqtt"]
def _make_entry_id(source_id: str, seq: int, text: str) -> str:
h = hashlib.sha1(f"{source_id}:{seq}:{text}".encode()).hexdigest()[:16]
return f"{source_id}:{seq}:{h}"
def _write_entry(db_path: Path, entry: RetrievedEntry) -> None:
with sqlite3.connect(db_path, timeout=30.0) as conn:
conn.execute(
"""
INSERT OR IGNORE INTO log_entries
(id, source_id, sequence, timestamp_raw, timestamp_iso,
ingest_time, severity, repeat_count, out_of_order,
matched_patterns, text)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
entry.entry_id,
entry.source_id,
entry.sequence,
entry.timestamp_raw,
entry.timestamp_iso,
entry.ingest_time,
entry.severity,
entry.repeat_count,
1 if entry.out_of_order else 0,
json.dumps(entry.matched_patterns),
entry.text,
),
)
async def _run_source_subscriber(source: dict, db_path: Path) -> None:
"""Maintain a subscription to one MQTT source, reconnecting on error."""
try:
from circuitforge_core.mqtt import MQTTClient, MQTTConfig
except ImportError:
logger.error(
"circuitforge-core[mqtt] is not installed — MQTT source %r skipped. "
"Run: pip install circuitforge-core[mqtt]",
source.get("id"),
)
return
source_id: str = source["id"]
host: str = source["broker_host"]
port: int = int(source.get("broker_port", 1883))
username: str | None = source.get("broker_username") or source.get("username")
password: str | None = source.get("broker_password") or source.get("password")
topics: list[str] = source.get("topics", ["#"])
default_severity: str = source.get("severity", "INFO")
cfg = MQTTConfig(
host=host,
port=port,
username=username,
password=password,
client_id=f"turnstone-{source_id}",
)
client = MQTTClient(cfg)
seq = 0
for topic in topics:
@client.on(topic)
async def _handle(msg, _src=source_id, _sev=default_severity):
nonlocal seq
seq += 1
now = datetime.now(tz=timezone.utc).isoformat()
text = msg.text()
entry = RetrievedEntry(
entry_id=_make_entry_id(_src, seq, text),
source_id=_src,
sequence=seq,
timestamp_raw=now,
timestamp_iso=now,
ingest_time=now,
severity=_sev,
repeat_count=1,
out_of_order=False,
matched_patterns=[],
text=f"[{msg.topic}] {text}",
)
_write_entry(db_path, entry)
logger.debug("MQTT[%s] %s: %s", _src, msg.topic, text[:120])
logger.info("MQTT subscriber starting: %s%s:%d topics=%s", source_id, host, port, topics)
await client.run()
async def run_mqtt_subscribers(sources_file: Path, db_path: Path) -> None:
"""Start one subscriber task per MQTT source. Runs until cancelled."""
sources = _load_mqtt_sources(sources_file)
if not sources:
logger.debug("No MQTT sources configured in %s", sources_file)
return
logger.info("Starting %d MQTT subscriber(s)", len(sources))
tasks = [
asyncio.create_task(
_run_source_subscriber(src, db_path),
name=f"mqtt-{src.get('id', i)}",
)
for i, src in enumerate(sources)
]
try:
await asyncio.gather(*tasks)
except asyncio.CancelledError:
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
raise