#!/usr/bin/env python3 """Turnstone Harvester — collect logs and ship them to a Turnstone instance. Subcommands: push Read sources.yaml, POST each log file to Turnstone /api/glean/upload incident Tag an incident on the remote Turnstone instance Usage: # Push all configured sources python harvester.py push --url http://turnstone:8534 --sources /patterns/sources.yaml # Tag an incident python harvester.py incident "jellyseerr went down" \\ --url http://turnstone:8534 \\ --started "2026-05-19 10:00" --ended "2026-05-19 10:30" \\ --type crash --severity HIGH Environment variables (override flags): TURNSTONE_URL Base URL of the Turnstone instance TURNSTONE_SOURCES Path to sources.yaml """ from __future__ import annotations import argparse import json import logging import sys import urllib.error import urllib.parse import urllib.request from pathlib import Path import yaml logging.basicConfig(level=logging.INFO, format="%(levelname)s %(message)s") logger = logging.getLogger("harvester") # --------------------------------------------------------------------------- # HTTP helpers # --------------------------------------------------------------------------- def _post_json(url: str, payload: dict) -> dict: data = json.dumps(payload).encode() req = urllib.request.Request( url, data=data, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read()) def _post_file(url: str, path: Path, source_id: str) -> dict: """POST a log file as multipart/form-data.""" boundary = "----TurnstoneHarvesterBoundary" body_parts: list[bytes] = [] content = path.read_bytes() body_parts.append( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="file"; filename="{path.name}"\r\n' f"Content-Type: text/plain\r\n\r\n".encode() ) body_parts.append(content) body_parts.append(b"\r\n") body_parts.append(f"--{boundary}--\r\n".encode()) body = b"".join(body_parts) params = urllib.parse.urlencode({"source_id": source_id}) req = urllib.request.Request( f"{url}?{params}", data=body, headers={"Content-Type": f"multipart/form-data; boundary={boundary}"}, method="POST", ) with urllib.request.urlopen(req, timeout=60) as resp: return json.loads(resp.read()) # --------------------------------------------------------------------------- # push subcommand # --------------------------------------------------------------------------- def cmd_push(args: argparse.Namespace) -> int: sources_path = Path(args.sources) if not sources_path.exists(): logger.error("sources file not found: %s", sources_path) return 1 with open(sources_path) as f: config = yaml.safe_load(f) or {} sources = config.get("sources", []) if not sources: logger.warning("No sources defined in %s", sources_path) return 0 upload_url = args.url.rstrip("/") + "/turnstone/api/glean/upload" total_gleaned = 0 errors = 0 for src in sources: src_id = src.get("id", "unknown") src_path = Path(src["path"]) if not src_path.exists(): logger.warning("Source %r not found, skipping: %s", src_id, src_path) continue logger.info("Pushing %s (%s) ...", src_id, src_path) try: result = _post_file(upload_url, src_path, src_id) count = result.get("gleaned", 0) total_gleaned += count logger.info(" %s: %d entries gleaned", src_id, count) except urllib.error.HTTPError as exc: logger.error(" %s: HTTP %d — %s", src_id, exc.code, exc.read().decode(errors="replace")) errors += 1 except Exception as exc: logger.error(" %s: %s", src_id, exc) errors += 1 logger.info("Done. Total gleaned: %d entries, errors: %d", total_gleaned, errors) return 1 if errors else 0 # --------------------------------------------------------------------------- # incident subcommand # --------------------------------------------------------------------------- def cmd_incident(args: argparse.Namespace) -> int: payload = { "label": args.label, "issue_type": args.type or "", "started_at": args.started or "", "ended_at": args.ended or "", "notes": args.notes or "", "severity": args.severity or "MEDIUM", } url = args.url.rstrip("/") + "/turnstone/api/incidents" try: result = _post_json(url, payload) logger.info("Incident created: %s", result.get("id", result)) return 0 except urllib.error.HTTPError as exc: logger.error("HTTP %d — %s", exc.code, exc.read().decode(errors="replace")) return 1 except Exception as exc: logger.error("%s", exc) return 1 # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def build_parser() -> argparse.ArgumentParser: import os default_url = os.environ.get("TURNSTONE_URL", "http://localhost:8534") default_sources = os.environ.get("TURNSTONE_SOURCES", "/patterns/sources.yaml") parser = argparse.ArgumentParser( description="Turnstone Harvester — ship logs and tag incidents", formatter_class=argparse.RawDescriptionHelpFormatter, ) sub = parser.add_subparsers(dest="cmd", required=True) # push p_push = sub.add_parser("push", help="Push log files to Turnstone") p_push.add_argument("--url", default=default_url, help="Turnstone base URL (default: %(default)s)") p_push.add_argument("--sources", default=default_sources, help="Path to sources.yaml (default: %(default)s)") # incident p_inc = sub.add_parser("incident", help="Tag an incident on the Turnstone instance") p_inc.add_argument("label", help="Short description of the incident") p_inc.add_argument("--url", default=default_url, help="Turnstone base URL (default: %(default)s)") p_inc.add_argument("--started", help="Start time (ISO or natural language)") p_inc.add_argument("--ended", help="End time (ISO or natural language)") p_inc.add_argument("--type", dest="type", help="Issue type tag (e.g. crash, oom, auth_fail)") p_inc.add_argument("--severity", default="MEDIUM", choices=["LOW", "MEDIUM", "HIGH", "CRITICAL"], help="Incident severity (default: MEDIUM)") p_inc.add_argument("--notes", help="Additional notes") return parser def main() -> int: parser = build_parser() args = parser.parse_args() if args.cmd == "push": return cmd_push(args) if args.cmd == "incident": return cmd_incident(args) parser.print_help() return 1 if __name__ == "__main__": sys.exit(main())