turnstone/app/services/discover.py
pyr0ball 600e5a9eac feat(sources): context-aware filesystem log scanner (#23)
Add scan_log_directories() to discover.py that recursively walks
/var/log and /opt, filters to readable log files, and scores each
candidate by recency (mtime, 0.7 weight), file size (0.3), and
keyword match against an optional problem-context query (shifts
weights to 0.4/0.2/0.4 when a query is provided).

- GET /api/setup/scan?query=...&max_results=N — new API endpoint
- SourcesView: "Scan" button opens a panel with ranked candidates,
  checkboxes, and "Add selected" to write to sources.yaml
- 13 new unit tests, 466 passing total

Closes: #23
2026-06-14 14:01:45 -07:00

285 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Environment auto-discovery for the onboarding wizard.
All checks are best-effort — every function returns an empty list on failure
so the wizard degrades gracefully in containers, VMs, and minimal environments.
"""
from __future__ import annotations
import json
import logging
import os
import re
import shutil
import subprocess
import time
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# Common log file candidates: (id, path, description)
_KNOWN_PATHS: list[tuple[str, str, str]] = [
("syslog", "/var/log/syslog", "System syslog (Debian/Ubuntu)"),
("syslog", "/var/log/messages", "System messages (RHEL/Rocky)"),
("auth", "/var/log/auth.log", "Auth log"),
("kern", "/var/log/kern.log", "Kernel log"),
("nginx-access", "/var/log/nginx/access.log", "Nginx access log"),
("nginx-error", "/var/log/nginx/error.log", "Nginx error log"),
("apache", "/var/log/apache2/access.log", "Apache access log"),
("apache-error", "/var/log/apache2/error.log", "Apache error log"),
("caddy", "/var/log/caddy/access.log", "Caddy access log"),
("docker-daemon","/var/log/docker.log", "Docker daemon log"),
("fail2ban", "/var/log/fail2ban.log", "Fail2ban log"),
("ufw", "/var/log/ufw.log", "UFW firewall log"),
]
def _run(cmd: list[str], timeout: float = 5.0) -> str | None:
"""Run a command and return stdout, or None on any error."""
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
return result.stdout if result.returncode == 0 else None
except Exception:
return None
def discover_journald() -> list[dict[str, Any]]:
"""Return a journald source candidate if journalctl is available."""
if not shutil.which("journalctl"):
return []
hostname = _run(["hostname"]) or "localhost"
hostname = hostname.strip()
return [{
"type": "journald",
"id": f"journal:{hostname}",
"label": f"System journal ({hostname})",
"description": "All systemd journal output from this host",
"available": True,
}]
def discover_docker() -> list[dict[str, Any]]:
"""Return Docker container candidates if Docker is running."""
for runtime in ("docker", "podman"):
if not shutil.which(runtime):
continue
out = _run([runtime, "ps", "--format", "{{json .}}"])
if out is None:
continue
containers = []
for line in out.splitlines():
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
name = obj.get("Names") or obj.get("Name") or obj.get("ID", "unknown")
# podman returns a list for Names
if isinstance(name, list):
name = name[0] if name else "unknown"
name = name.lstrip("/")
containers.append({
"type": "docker",
"id": f"{runtime}:{name}",
"label": f"{runtime.capitalize()}{name}",
"description": f"Container log stream for {name}",
"container": name,
"runtime": runtime,
"available": True,
})
except (json.JSONDecodeError, KeyError):
continue
if containers:
return containers
return []
def discover_files() -> list[dict[str, Any]]:
"""Return file-based source candidates for well-known log paths."""
found = []
seen_ids: set[str] = set()
for source_id, path, description in _KNOWN_PATHS:
if not os.path.exists(path):
continue
# deduplicate when both syslog and messages exist — take first match
if source_id in seen_ids:
continue
seen_ids.add(source_id)
found.append({
"type": "file",
"id": source_id,
"label": description,
"path": path,
"description": f"Read from {path}",
"available": True,
})
return found
def discover_all() -> dict[str, Any]:
"""Run all discovery checks and return a structured candidate list."""
candidates: list[dict[str, Any]] = []
candidates.extend(discover_journald())
candidates.extend(discover_docker())
candidates.extend(discover_files())
return {
"candidates": candidates,
"has_journald": any(c["type"] == "journald" for c in candidates),
"has_docker": any(c["type"] == "docker" for c in candidates),
"has_files": any(c["type"] == "file" for c in candidates),
}
def build_sources_yaml(selected: list[dict[str, Any]]) -> str:
"""Generate sources.yaml content from a list of selected candidates.
Each item must have: type, id, and type-specific fields (path, container, etc.).
"""
lines = [
"# Turnstone log sources — generated by the setup wizard.",
"# Edit this file to add, remove, or modify sources.",
"sources:",
]
for src in selected:
src_type = src.get("type", "file")
src_id = src.get("id", "unknown")
if src_type == "journald":
unit = src.get("unit")
lines.append(f" - id: {src_id}")
lines.append(f" type: journald")
if unit:
lines.append(f" unit: {unit}")
elif src_type == "docker":
runtime = src.get("runtime", "docker")
container = src.get("container", src_id.split(":")[-1])
lines.append(f" - id: {src_id}")
lines.append(f" type: docker")
lines.append(f" runtime: {runtime}")
lines.append(f" container: {container}")
else:
path = src.get("path", "")
lines.append(f" - id: {src_id}")
lines.append(f" path: {path}")
return "\n".join(lines) + "\n"
def validate_source(src: dict[str, Any]) -> str | None:
"""Return an error string if the source definition is invalid, else None."""
if not src.get("id"):
return "Source is missing 'id'"
src_type = src.get("type", "file")
if src_type == "file" and not src.get("path"):
return f"File source '{src['id']}' is missing 'path'"
if src_type == "docker" and not src.get("container"):
return f"Docker source '{src['id']}' is missing 'container'"
return None
# Extensions considered as log files in the filesystem scanner.
_LOG_EXTENSIONS = {"", ".log", ".txt", ".out", ".err"}
# Max file size to consider (500 MB).
_MAX_SIZE = 500 * 1024 * 1024
# Recency half-life in days — files older than this are scored near 0.
_RECENCY_HALFLIFE_DAYS = 30
def _path_to_source_id(path: Path) -> str:
"""Convert an absolute path to a kebab-case source ID."""
raw = re.sub(r"[^a-zA-Z0-9]+", "-", str(path)).strip("-").lower()
return raw[:64]
def scan_log_directories(
query: str | None = None,
dirs: list[str] | None = None,
max_depth: int = 4,
max_results: int = 25,
) -> list[dict[str, Any]]:
"""Scan filesystem directories for log files ranked by recency and keyword match.
Scoring weights:
- Recency (0-1): mtime within the last 30 days, decays exponentially
- Size (0-1): prefer 1 KB 50 MB; empty or huge files score low
- Keyword (0-1): stem matches between query words and path components
Returns up to *max_results* candidates sorted by descending score.
"""
if dirs is None:
dirs = ["/var/log", "/opt"]
now = time.time()
query_stems: list[str] = []
if query:
query_stems = [w.lower() for w in re.split(r"\W+", query) if len(w) >= 3]
candidates: list[dict[str, Any]] = []
def _walk(root: Path, depth: int) -> None:
if depth > max_depth:
return
try:
entries = list(root.iterdir())
except OSError:
return
for entry in entries:
if entry.name.startswith("."):
continue
if entry.is_symlink():
continue
if entry.is_dir():
_walk(entry, depth + 1)
continue
if not entry.is_file():
continue
if entry.suffix.lower() not in _LOG_EXTENSIONS:
continue
# Skip compressed archives
if entry.name.endswith((".gz", ".bz2", ".xz", ".zst")):
continue
try:
stat = entry.stat()
except OSError:
continue
if stat.st_size == 0 or stat.st_size > _MAX_SIZE:
continue
if not os.access(entry, os.R_OK):
continue
age_days = (now - stat.st_mtime) / 86400
recency = max(0.0, 1.0 - age_days / _RECENCY_HALFLIFE_DAYS)
if stat.st_size < 1024:
size_score = 0.3
elif stat.st_size <= 50 * 1024 * 1024:
size_score = 1.0
else:
# Large files: linear decay from 50 MB to 500 MB
size_score = max(0.1, 1.0 - (stat.st_size - 50 * 1024 * 1024) / _MAX_SIZE)
keyword_score = 0.0
if query_stems:
path_lower = str(entry).lower()
matches = sum(1 for stem in query_stems if stem in path_lower)
keyword_score = min(1.0, matches / max(len(query_stems), 1))
if query_stems:
total = recency * 0.4 + size_score * 0.2 + keyword_score * 0.4
else:
total = recency * 0.7 + size_score * 0.3
candidates.append({
"type": "file",
"id": _path_to_source_id(entry),
"path": str(entry),
"label": entry.name,
"size_bytes": stat.st_size,
"mtime": stat.st_mtime,
"score": round(total, 3),
"available": True,
})
for d in dirs:
_walk(Path(d), depth=0)
candidates.sort(key=lambda c: c["score"], reverse=True)
return candidates[:max_results]