273 lines
11 KiB
Python
273 lines
11 KiB
Python
"""Tests for the feedback API backend."""
|
|
import pytest
|
|
from unittest.mock import patch, MagicMock
|
|
from pathlib import Path
|
|
|
|
|
|
# ── mask_pii ──────────────────────────────────────────────────────────────────
|
|
|
|
def test_mask_pii_email():
|
|
from scripts.feedback_api import mask_pii
|
|
assert mask_pii("contact foo@bar.com please") == "contact [email redacted] please"
|
|
|
|
|
|
def test_mask_pii_phone_dashes():
|
|
from scripts.feedback_api import mask_pii
|
|
assert mask_pii("call 555-123-4567 now") == "call [phone redacted] now"
|
|
|
|
|
|
def test_mask_pii_phone_parens():
|
|
from scripts.feedback_api import mask_pii
|
|
assert mask_pii("(510) 764-3155") == "[phone redacted]"
|
|
|
|
|
|
def test_mask_pii_clean_text():
|
|
from scripts.feedback_api import mask_pii
|
|
assert mask_pii("no sensitive data here") == "no sensitive data here"
|
|
|
|
|
|
def test_mask_pii_multiple_emails():
|
|
from scripts.feedback_api import mask_pii
|
|
result = mask_pii("a@b.com and c@d.com")
|
|
assert result == "[email redacted] and [email redacted]"
|
|
|
|
|
|
# ── collect_context ───────────────────────────────────────────────────────────
|
|
|
|
def test_collect_context_required_keys():
|
|
from scripts.feedback_api import collect_context
|
|
ctx = collect_context("Home")
|
|
for key in ("page", "version", "tier", "llm_backend", "os", "timestamp"):
|
|
assert key in ctx, f"missing key: {key}"
|
|
|
|
|
|
def test_collect_context_page_value():
|
|
from scripts.feedback_api import collect_context
|
|
ctx = collect_context("MyPage")
|
|
assert ctx["page"] == "MyPage"
|
|
|
|
|
|
def test_collect_context_timestamp_is_utc():
|
|
from scripts.feedback_api import collect_context
|
|
ctx = collect_context("X")
|
|
assert ctx["timestamp"].endswith("Z")
|
|
|
|
|
|
# ── collect_logs ──────────────────────────────────────────────────────────────
|
|
|
|
def test_collect_logs_returns_string(tmp_path):
|
|
from scripts.feedback_api import collect_logs
|
|
log = tmp_path / ".streamlit.log"
|
|
log.write_text("line1\nline2\nline3\n")
|
|
result = collect_logs(log_path=log, n=10)
|
|
assert isinstance(result, str)
|
|
assert "line3" in result
|
|
|
|
|
|
def test_collect_logs_tails_n_lines(tmp_path):
|
|
from scripts.feedback_api import collect_logs
|
|
log = tmp_path / ".streamlit.log"
|
|
log.write_text("\n".join(f"line{i}" for i in range(200)))
|
|
result = collect_logs(log_path=log, n=10)
|
|
assert "line199" in result
|
|
assert "line0" not in result
|
|
|
|
|
|
def test_collect_logs_masks_pii(tmp_path):
|
|
from scripts.feedback_api import collect_logs
|
|
log = tmp_path / "test.log"
|
|
log.write_text("user foo@bar.com connected\n")
|
|
result = collect_logs(log_path=log)
|
|
assert "foo@bar.com" not in result
|
|
assert "[email redacted]" in result
|
|
|
|
|
|
def test_collect_logs_missing_file(tmp_path):
|
|
from scripts.feedback_api import collect_logs
|
|
result = collect_logs(log_path=tmp_path / "nonexistent.log")
|
|
assert "no log file" in result.lower()
|
|
|
|
|
|
# ── collect_listings ──────────────────────────────────────────────────────────
|
|
|
|
def test_collect_listings_safe_fields_only(tmp_path):
|
|
"""Only title, company, url — no cover letters, notes, or emails."""
|
|
from scripts.db import init_db, insert_job
|
|
from scripts.feedback_api import collect_listings
|
|
db = tmp_path / "test.db"
|
|
init_db(db)
|
|
insert_job(db, {
|
|
"title": "CSM", "company": "Acme", "url": "https://example.com/1",
|
|
"source": "linkedin", "location": "Remote", "is_remote": True,
|
|
"salary": "", "description": "great role", "date_found": "2026-03-01",
|
|
})
|
|
results = collect_listings(db_path=db, n=5)
|
|
assert len(results) == 1
|
|
assert set(results[0].keys()) == {"title", "company", "url"}
|
|
assert results[0]["title"] == "CSM"
|
|
|
|
|
|
def test_collect_listings_respects_n(tmp_path):
|
|
from scripts.db import init_db, insert_job
|
|
from scripts.feedback_api import collect_listings
|
|
db = tmp_path / "test.db"
|
|
init_db(db)
|
|
for i in range(10):
|
|
insert_job(db, {
|
|
"title": f"Job {i}", "company": "Acme", "url": f"https://example.com/{i}",
|
|
"source": "linkedin", "location": "Remote", "is_remote": False,
|
|
"salary": "", "description": "", "date_found": "2026-03-01",
|
|
})
|
|
assert len(collect_listings(db_path=db, n=3)) == 3
|
|
|
|
|
|
# ── build_issue_body ──────────────────────────────────────────────────────────
|
|
|
|
def test_build_issue_body_contains_description():
|
|
from scripts.feedback_api import build_issue_body
|
|
form = {"type": "bug", "title": "Test", "description": "it broke", "repro": ""}
|
|
ctx = {"page": "Home", "version": "v1.0", "tier": "free",
|
|
"llm_backend": "ollama", "os": "Linux", "timestamp": "2026-03-03T00:00:00Z"}
|
|
body = build_issue_body(form, ctx, {})
|
|
assert "it broke" in body
|
|
assert "Home" in body
|
|
assert "v1.0" in body
|
|
|
|
|
|
def test_build_issue_body_bug_includes_repro():
|
|
from scripts.feedback_api import build_issue_body
|
|
form = {"type": "bug", "title": "X", "description": "desc", "repro": "step 1\nstep 2"}
|
|
body = build_issue_body(form, {}, {})
|
|
assert "step 1" in body
|
|
assert "Reproduction" in body
|
|
|
|
|
|
def test_build_issue_body_no_repro_for_feature():
|
|
from scripts.feedback_api import build_issue_body
|
|
form = {"type": "feature", "title": "X", "description": "add dark mode", "repro": "ignored"}
|
|
body = build_issue_body(form, {}, {})
|
|
assert "Reproduction" not in body
|
|
|
|
|
|
def test_build_issue_body_logs_in_collapsible():
|
|
from scripts.feedback_api import build_issue_body
|
|
form = {"type": "other", "title": "X", "description": "Y", "repro": ""}
|
|
body = build_issue_body(form, {}, {"logs": "log line 1\nlog line 2"})
|
|
assert "<details>" in body
|
|
assert "log line 1" in body
|
|
|
|
|
|
def test_build_issue_body_omits_logs_when_not_provided():
|
|
from scripts.feedback_api import build_issue_body
|
|
form = {"type": "bug", "title": "X", "description": "Y", "repro": ""}
|
|
body = build_issue_body(form, {}, {})
|
|
assert "<details>" not in body
|
|
|
|
|
|
def test_build_issue_body_submitter_attribution():
|
|
from scripts.feedback_api import build_issue_body
|
|
form = {"type": "bug", "title": "X", "description": "Y", "repro": ""}
|
|
body = build_issue_body(form, {}, {"submitter": "Jane Doe <jane@example.com>"})
|
|
assert "Jane Doe" in body
|
|
|
|
|
|
def test_build_issue_body_listings_shown():
|
|
from scripts.feedback_api import build_issue_body
|
|
form = {"type": "bug", "title": "X", "description": "Y", "repro": ""}
|
|
listings = [{"title": "CSM", "company": "Acme", "url": "https://example.com/1"}]
|
|
body = build_issue_body(form, {}, {"listings": listings})
|
|
assert "CSM" in body
|
|
assert "Acme" in body
|
|
|
|
|
|
# ── Forgejo API ───────────────────────────────────────────────────────────────
|
|
|
|
@patch("scripts.feedback_api.requests.get")
|
|
@patch("scripts.feedback_api.requests.post")
|
|
def test_ensure_labels_uses_existing(mock_post, mock_get):
|
|
from scripts.feedback_api import _ensure_labels
|
|
mock_get.return_value.ok = True
|
|
mock_get.return_value.json.return_value = [
|
|
{"name": "beta-feedback", "id": 1},
|
|
{"name": "bug", "id": 2},
|
|
]
|
|
ids = _ensure_labels(
|
|
["beta-feedback", "bug"],
|
|
"https://example.com/api/v1", {"Authorization": "token x"}, "owner/repo"
|
|
)
|
|
assert ids == [1, 2]
|
|
mock_post.assert_not_called()
|
|
|
|
|
|
@patch("scripts.feedback_api.requests.get")
|
|
@patch("scripts.feedback_api.requests.post")
|
|
def test_ensure_labels_creates_missing(mock_post, mock_get):
|
|
from scripts.feedback_api import _ensure_labels
|
|
mock_get.return_value.ok = True
|
|
mock_get.return_value.json.return_value = []
|
|
mock_post.return_value.ok = True
|
|
mock_post.return_value.json.return_value = {"id": 99}
|
|
ids = _ensure_labels(
|
|
["needs-triage"],
|
|
"https://example.com/api/v1", {"Authorization": "token x"}, "owner/repo"
|
|
)
|
|
assert 99 in ids
|
|
|
|
|
|
@patch("scripts.feedback_api._ensure_labels", return_value=[1, 2])
|
|
@patch("scripts.feedback_api.requests.post")
|
|
def test_create_forgejo_issue_success(mock_post, mock_labels, monkeypatch):
|
|
from scripts.feedback_api import create_forgejo_issue
|
|
monkeypatch.setenv("FORGEJO_API_TOKEN", "testtoken")
|
|
monkeypatch.setenv("FORGEJO_REPO", "owner/repo")
|
|
monkeypatch.setenv("FORGEJO_API_URL", "https://example.com/api/v1")
|
|
mock_post.return_value.status_code = 201
|
|
mock_post.return_value.raise_for_status = lambda: None
|
|
mock_post.return_value.json.return_value = {"number": 42, "html_url": "https://example.com/issues/42"}
|
|
result = create_forgejo_issue("Test issue", "body text", ["beta-feedback", "bug"])
|
|
assert result["number"] == 42
|
|
assert "42" in result["url"]
|
|
|
|
|
|
@patch("scripts.feedback_api.requests.post")
|
|
def test_upload_attachment_returns_url(mock_post, monkeypatch):
|
|
from scripts.feedback_api import upload_attachment
|
|
monkeypatch.setenv("FORGEJO_API_TOKEN", "testtoken")
|
|
monkeypatch.setenv("FORGEJO_REPO", "owner/repo")
|
|
monkeypatch.setenv("FORGEJO_API_URL", "https://example.com/api/v1")
|
|
mock_post.return_value.status_code = 201
|
|
mock_post.return_value.raise_for_status = lambda: None
|
|
mock_post.return_value.json.return_value = {
|
|
"uuid": "abc", "browser_download_url": "https://example.com/assets/abc"
|
|
}
|
|
url = upload_attachment(42, b"\x89PNG", "screenshot.png")
|
|
assert url == "https://example.com/assets/abc"
|
|
|
|
|
|
# ── screenshot_page ───────────────────────────────────────────────────────────
|
|
|
|
def test_screenshot_page_returns_none_on_failure(monkeypatch):
|
|
"""screenshot_page returns None gracefully when capture fails."""
|
|
from scripts.feedback_api import screenshot_page
|
|
import playwright.sync_api as pw_api
|
|
original = pw_api.sync_playwright
|
|
def bad_playwright():
|
|
raise RuntimeError("browser unavailable")
|
|
monkeypatch.setattr(pw_api, "sync_playwright", bad_playwright)
|
|
result = screenshot_page(port=9999)
|
|
assert result is None
|
|
|
|
|
|
@patch("playwright.sync_api.sync_playwright")
|
|
def test_screenshot_page_returns_bytes(mock_pw):
|
|
"""screenshot_page returns PNG bytes when playwright is available."""
|
|
from scripts.feedback_api import screenshot_page
|
|
fake_png = b"\x89PNG\r\n\x1a\n"
|
|
mock_context = MagicMock()
|
|
mock_pw.return_value.__enter__ = lambda s: mock_context
|
|
mock_pw.return_value.__exit__ = MagicMock(return_value=False)
|
|
mock_browser = mock_context.chromium.launch.return_value
|
|
mock_page = mock_browser.new_page.return_value
|
|
mock_page.screenshot.return_value = fake_png
|
|
result = screenshot_page(port=8502)
|
|
assert result == fake_png
|