"""Verify context SSE event and LLM prompt injection.""" import asyncio import sqlite3 from pathlib import Path from unittest.mock import patch import pytest from app.db.schema import ensure_schema, ensure_context_schema from app.services.llm import summarize from app.services.search import SearchResult def _entry(text: str) -> SearchResult: return SearchResult( entry_id="x", source_id="svc", sequence=0, timestamp_iso="2026-05-13T00:00:00+00:00", severity="ERROR", text=text, matched_patterns=[], repeat_count=1, out_of_order=False, rank=0.0, ) def test_summarize_includes_context_block(): captured = {} def fake_post(url, json=None, headers=None, timeout=None): captured["json"] = json raise ConnectionError("offline") with patch("app.services.llm.httpx.post", side_effect=fake_post): summarize( "plex stopped", [_entry("plex error")], llm_url="http://localhost:11434", llm_model="llama3", context_block="Known environment facts:\n [service] plex: port:32400", ) messages = captured.get("json", {}).get("messages", []) content = " ".join(m.get("content", "") for m in messages) assert "Known environment facts" in content assert "plex: port:32400" in content def test_summarize_without_context_block_unchanged(): """When context_block is None the prompt must not contain the context header.""" captured = {} def fake_post(url, json=None, headers=None, timeout=None): captured["json"] = json raise ConnectionError("offline") with patch("app.services.llm.httpx.post", side_effect=fake_post): summarize( "plex stopped", [_entry("plex error")], llm_url="http://localhost:11434", llm_model="llama3", context_block=None, ) messages = captured.get("json", {}).get("messages", []) content = " ".join(m.get("content", "") for m in messages) assert "Known environment facts" not in content @pytest.fixture def db_with_facts(tmp_path): db_path = tmp_path / "t.db" ensure_schema(db_path) ensure_context_schema(db_path) conn = sqlite3.connect(str(db_path)) conn.execute( "INSERT INTO context_facts(id, tenant_id, category, key, value, source, created_at) " "VALUES (?,?,?,?,?,?,?)", ("f1", "", "service", "plex", "port:32400", "wizard", "2026-05-13T00:00:00+00:00"), ) conn.commit() conn.close() return db_path def test_diagnose_stream_emits_context_event(db_with_facts): from app.services.diagnose import diagnose_stream events = [] async def collect(): async for evt in diagnose_stream( db_path=db_with_facts, query="plex stopped", ): events.append(evt) asyncio.run(collect()) types = [e["type"] for e in events] assert "context" in types ctx_event = next(e for e in events if e["type"] == "context") assert "facts" in ctx_event assert any(f["key"] == "plex" for f in ctx_event["facts"])