305 lines
9.9 KiB
Python
305 lines
9.9 KiB
Python
"""Ingest pipeline: auto-detect format, parse, write to SQLite."""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import sqlite3
|
|
from pathlib import Path
|
|
from typing import Iterator
|
|
|
|
import yaml
|
|
|
|
from app.ingest import caddy, dmesg_log, docker_log, journald, plaintext, plex, qbittorrent, servarr, syslog
|
|
from app.ingest.base import _compile, load_patterns, now_iso
|
|
from app.services.models import LogPattern, RetrievedEntry
|
|
from app.services.search import build_fts_index
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS log_entries (
|
|
id TEXT PRIMARY KEY,
|
|
source_id TEXT NOT NULL,
|
|
sequence INTEGER NOT NULL,
|
|
timestamp_raw TEXT,
|
|
timestamp_iso TEXT,
|
|
ingest_time TEXT NOT NULL,
|
|
severity TEXT,
|
|
repeat_count INTEGER DEFAULT 1,
|
|
out_of_order INTEGER DEFAULT 0,
|
|
matched_patterns TEXT DEFAULT '[]',
|
|
text TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_source ON log_entries(source_id);
|
|
CREATE INDEX IF NOT EXISTS idx_timestamp ON log_entries(timestamp_iso);
|
|
CREATE INDEX IF NOT EXISTS idx_ts_repeat ON log_entries(timestamp_iso, repeat_count);
|
|
CREATE INDEX IF NOT EXISTS idx_severity ON log_entries(severity);
|
|
CREATE INDEX IF NOT EXISTS idx_patterns ON log_entries(matched_patterns);
|
|
|
|
CREATE TABLE IF NOT EXISTS incidents (
|
|
id TEXT PRIMARY KEY,
|
|
label TEXT NOT NULL,
|
|
issue_type TEXT NOT NULL DEFAULT '',
|
|
started_at TEXT,
|
|
ended_at TEXT,
|
|
notes TEXT NOT NULL DEFAULT '',
|
|
created_at TEXT NOT NULL,
|
|
severity TEXT NOT NULL DEFAULT 'medium'
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_incidents_time ON incidents(started_at, ended_at);
|
|
|
|
CREATE TABLE IF NOT EXISTS received_bundles (
|
|
id TEXT PRIMARY KEY,
|
|
source_host TEXT NOT NULL,
|
|
issue_type TEXT NOT NULL DEFAULT '',
|
|
label TEXT NOT NULL,
|
|
severity TEXT NOT NULL DEFAULT 'medium',
|
|
started_at TEXT,
|
|
bundled_at TEXT NOT NULL,
|
|
entry_count INTEGER NOT NULL DEFAULT 0,
|
|
bundle_json TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_bundles_bundled ON received_bundles(bundled_at);
|
|
CREATE INDEX IF NOT EXISTS idx_bundles_type ON received_bundles(issue_type);
|
|
|
|
CREATE TABLE IF NOT EXISTS context_facts (
|
|
id TEXT PRIMARY KEY,
|
|
category TEXT NOT NULL,
|
|
key TEXT NOT NULL,
|
|
value TEXT NOT NULL,
|
|
source TEXT,
|
|
created_at TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_facts_category ON context_facts(category);
|
|
CREATE INDEX IF NOT EXISTS idx_facts_key ON context_facts(key);
|
|
|
|
CREATE TABLE IF NOT EXISTS context_documents (
|
|
id TEXT PRIMARY KEY,
|
|
filename TEXT NOT NULL,
|
|
doc_type TEXT NOT NULL,
|
|
full_text TEXT NOT NULL,
|
|
file_size INTEGER,
|
|
uploaded_at TEXT NOT NULL
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS context_chunks (
|
|
id TEXT PRIMARY KEY,
|
|
document_id TEXT NOT NULL REFERENCES context_documents(id) ON DELETE CASCADE,
|
|
chunk_index INTEGER NOT NULL,
|
|
text TEXT NOT NULL,
|
|
embedding BLOB
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_chunks_doc ON context_chunks(document_id);
|
|
"""
|
|
|
|
|
|
def ensure_schema(db_path: Path) -> None:
|
|
"""Create all tables and apply additive migrations. Safe to call on every startup."""
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.executescript(_SCHEMA)
|
|
# Additive column migrations — ALTER TABLE silently skips if column exists
|
|
for stmt in [
|
|
"ALTER TABLE incidents ADD COLUMN issue_type TEXT NOT NULL DEFAULT ''",
|
|
]:
|
|
try:
|
|
conn.execute(stmt)
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def _detect_format(first_line: str) -> str:
|
|
try:
|
|
obj = json.loads(first_line)
|
|
if "__REALTIME_TIMESTAMP" in obj:
|
|
return "journald"
|
|
if "SOURCE" in obj and str(obj.get("SOURCE", "")).startswith("docker:"):
|
|
return "docker"
|
|
if "ts" in obj and ("msg" in obj or "message" in obj or "request" in obj):
|
|
return "caddy"
|
|
except (json.JSONDecodeError, AttributeError):
|
|
pass
|
|
if plex.is_plex_log(first_line):
|
|
return "plex"
|
|
if qbittorrent.is_qbit_log(first_line):
|
|
return "qbittorrent"
|
|
if servarr.is_servarr_log(first_line):
|
|
return "servarr"
|
|
if dmesg_log.is_dmesg_log(first_line):
|
|
return "dmesg"
|
|
if syslog.is_syslog(first_line):
|
|
return "syslog"
|
|
return "plaintext"
|
|
|
|
|
|
def _parse_file(
|
|
path: Path,
|
|
compiled: list[tuple[LogPattern, object]],
|
|
ingest_time: str,
|
|
source_id: str | None = None,
|
|
) -> Iterator[RetrievedEntry]:
|
|
source_id = source_id or path.stem
|
|
|
|
with path.open("r", errors="replace") as f:
|
|
lines = iter(f)
|
|
try:
|
|
first = next(lines)
|
|
except StopIteration:
|
|
return
|
|
|
|
fmt = _detect_format(first.strip())
|
|
logger.info("Detected format %r for %s", fmt, path.name)
|
|
|
|
def all_lines():
|
|
yield first
|
|
yield from lines
|
|
|
|
if fmt == "journald":
|
|
yield from journald.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "docker":
|
|
yield from docker_log.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "caddy":
|
|
yield from caddy.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "plex":
|
|
yield from plex.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "qbittorrent":
|
|
yield from qbittorrent.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "servarr":
|
|
yield from servarr.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "dmesg":
|
|
yield from dmesg_log.parse(all_lines(), source_id, compiled, ingest_time)
|
|
elif fmt == "syslog":
|
|
yield from syslog.parse(all_lines(), source_id, compiled, ingest_time)
|
|
else:
|
|
yield from plaintext.parse(all_lines(), source_id, compiled, ingest_time)
|
|
|
|
|
|
def _write_batch(conn: sqlite3.Connection, batch: list[RetrievedEntry]) -> None:
|
|
conn.executemany(
|
|
"""
|
|
INSERT OR IGNORE INTO log_entries
|
|
(id, source_id, sequence, timestamp_raw, timestamp_iso,
|
|
ingest_time, severity, repeat_count, out_of_order,
|
|
matched_patterns, text)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?,?)
|
|
""",
|
|
[
|
|
(
|
|
e.entry_id, e.source_id, e.sequence,
|
|
e.timestamp_raw, e.timestamp_iso, e.ingest_time,
|
|
e.severity, e.repeat_count, int(e.out_of_order),
|
|
json.dumps(list(e.matched_patterns)), e.text,
|
|
)
|
|
for e in batch
|
|
],
|
|
)
|
|
|
|
|
|
def _ingest_files(
|
|
files: list[Path],
|
|
db_path: Path,
|
|
pattern_file: Path | None = None,
|
|
batch_size: int = 1000,
|
|
source_id_map: dict[Path, str] | None = None,
|
|
) -> dict[str, int]:
|
|
pattern_file = pattern_file or Path("patterns/default.yaml")
|
|
patterns = load_patterns(pattern_file)
|
|
compiled = _compile(patterns)
|
|
ingest_time = now_iso()
|
|
source_id_map = source_id_map or {}
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.executescript(_SCHEMA)
|
|
conn.commit()
|
|
|
|
stats: dict[str, int] = {}
|
|
|
|
for log_file in files:
|
|
source_id = source_id_map.get(log_file, log_file.stem)
|
|
count = 0
|
|
batch: list[RetrievedEntry] = []
|
|
for entry in _parse_file(log_file, compiled, ingest_time, source_id=source_id):
|
|
batch.append(entry)
|
|
if len(batch) >= batch_size:
|
|
_write_batch(conn, batch)
|
|
conn.commit()
|
|
count += len(batch)
|
|
batch.clear()
|
|
if batch:
|
|
_write_batch(conn, batch)
|
|
conn.commit()
|
|
count += len(batch)
|
|
stats[source_id] = stats.get(source_id, 0) + count
|
|
logger.info("Ingested %d entries from %s (source: %s)", count, log_file.name, source_id)
|
|
|
|
conn.close()
|
|
|
|
logger.info("Building FTS index...")
|
|
build_fts_index(db_path)
|
|
logger.info("FTS index ready")
|
|
|
|
return stats
|
|
|
|
|
|
def ingest(
|
|
corpus_dir: Path,
|
|
db_path: Path,
|
|
pattern_file: Path | None = None,
|
|
batch_size: int = 1000,
|
|
) -> dict[str, int]:
|
|
"""Ingest all .jsonl and .log files from a corpus directory."""
|
|
files = sorted(corpus_dir.glob("*.jsonl")) + sorted(corpus_dir.glob("*.log"))
|
|
return _ingest_files(files, db_path, pattern_file, batch_size)
|
|
|
|
|
|
def ingest_file(
|
|
log_file: Path,
|
|
db_path: Path,
|
|
pattern_file: Path | None = None,
|
|
) -> dict[str, int]:
|
|
"""Ingest a single log file (any supported format)."""
|
|
return _ingest_files([log_file], db_path, pattern_file)
|
|
|
|
|
|
def ingest_sources(
|
|
sources_file: Path,
|
|
db_path: Path,
|
|
pattern_file: Path | None = None,
|
|
batch_size: int = 1000,
|
|
) -> dict[str, int]:
|
|
"""Ingest all sources listed in a sources.yaml config file.
|
|
|
|
sources.yaml format:
|
|
sources:
|
|
- id: sonarr
|
|
path: /opt/sonarr/config/logs/sonarr.0.txt
|
|
- id: qbittorrent
|
|
path: /opt/qbittorrent/config/data/logs/qbittorrent.log
|
|
|
|
Missing paths are skipped with a warning so the cron keeps running
|
|
when a service is temporarily down.
|
|
"""
|
|
with open(sources_file) as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
files: list[Path] = []
|
|
source_id_map: dict[Path, str] = {}
|
|
|
|
for src in config.get("sources", []):
|
|
path = Path(src["path"])
|
|
if not path.exists():
|
|
logger.warning("Source %r not found, skipping: %s", src.get("id", "?"), path)
|
|
continue
|
|
files.append(path)
|
|
if "id" in src:
|
|
source_id_map[path] = src["id"]
|
|
|
|
if not files:
|
|
logger.warning("No source files found — check sources.yaml paths")
|
|
return {}
|
|
|
|
return _ingest_files(files, db_path, pattern_file, batch_size, source_id_map)
|