kiwi/app/services/recipe/browse_counts_cache.py
pyr0ball 9697c7b64f feat(recipe-tags): merge accepted community tags into browse counts + FTS fallback
browse_counts_cache.py: after FTS counts, _merge_community_tag_counts() queries
  accepted tags (upvotes>=2) grouped by (domain,category,subcategory) and adds
  distinct recipe_id counts to the cached keyword-set totals. Skips silently
  when community Postgres is unavailable.

store.py: fetch_recipes_by_ids() fetches corpus recipes by explicit ID list,
  used by the FTS fallback when a subcategory returns zero FTS results.

recipes.py (browse endpoint): when FTS total==0 for a subcategory, queries
  community store for accepted tag IDs and serves those recipes directly.
  Sets community_tagged=True in the response so the UI can surface context.
  Refs kiwi#118.
2026-04-22 12:37:44 -07:00

256 lines
9.4 KiB
Python

"""
Browse counts cache — pre-computes and persists recipe counts for all
browse domain keyword sets so category/subcategory page loads never
hit the 3.8 GB FTS index at request time.
Counts change only when the corpus changes (after a pipeline run).
The cache is a small SQLite file separate from both the read-only
corpus DB and per-user kiwi.db files, so the container can write it.
Refresh triggers:
1. Startup — if cache is missing or older than STALE_DAYS
2. Nightly — asyncio background task started in main.py lifespan
3. Pipeline — infer_recipe_tags.py calls refresh() at end of run
The in-memory _COUNT_CACHE in store.py is pre-warmed from this file
on startup, so FTS queries are never needed for known keyword sets.
"""
from __future__ import annotations
import logging
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
logger = logging.getLogger(__name__)
STALE_DAYS = 7
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _kw_key(keywords: list[str]) -> str:
"""Stable string key for a keyword list — sorted and pipe-joined."""
return "|".join(sorted(keywords))
def _fts_match_expr(keywords: list[str]) -> str:
phrases = ['"' + kw.replace('"', '""') + '"' for kw in keywords]
return " OR ".join(phrases)
def _ensure_schema(conn: sqlite3.Connection) -> None:
conn.execute("""
CREATE TABLE IF NOT EXISTS browse_counts (
keywords_key TEXT PRIMARY KEY,
count INTEGER NOT NULL,
computed_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS browse_counts_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
)
""")
conn.commit()
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def is_stale(cache_path: Path, max_age_days: int = STALE_DAYS) -> bool:
"""Return True if the cache is missing, empty, or older than max_age_days."""
if not cache_path.exists():
return True
try:
conn = sqlite3.connect(cache_path)
row = conn.execute(
"SELECT value FROM browse_counts_meta WHERE key = 'refreshed_at'"
).fetchone()
conn.close()
if row is None:
return True
age = (datetime.now(timezone.utc) - datetime.fromisoformat(row[0])).days
return age >= max_age_days
except Exception:
return True
def load_into_memory(cache_path: Path, count_cache: dict, corpus_path: str) -> int:
"""
Load all rows from the cache file into the in-memory count_cache dict.
Uses corpus_path (the current RECIPE_DB_PATH env value) as the cache key,
not what was stored in the file — the file may have been built against a
different mount path (e.g. pipeline ran on host, container sees a different
path). Counts are corpus-content-derived and path-independent.
Returns the number of entries loaded.
"""
if not cache_path.exists():
return 0
try:
conn = sqlite3.connect(cache_path)
rows = conn.execute("SELECT keywords_key, count FROM browse_counts").fetchall()
conn.close()
loaded = 0
for kw_key, count in rows:
keywords = kw_key.split("|") if kw_key else []
cache_key = (corpus_path, *sorted(keywords))
count_cache[cache_key] = count
loaded += 1
logger.info("browse_counts: warmed %d entries from %s", loaded, cache_path)
return loaded
except Exception as exc:
logger.warning("browse_counts: load failed: %s", exc)
return 0
def refresh(corpus_path: str, cache_path: Path) -> int:
"""
Run FTS5 queries for every keyword set in browser_domains.DOMAINS
and write results to cache_path.
Safe to call from both the host pipeline script and the in-container
nightly task. The corpus_path must be reachable and readable from
the calling process.
Returns the number of keyword sets computed.
"""
from app.services.recipe.browser_domains import DOMAINS # local import — avoid circular
cache_path.parent.mkdir(parents=True, exist_ok=True)
cache_conn = sqlite3.connect(cache_path)
_ensure_schema(cache_conn)
# Collect every unique keyword list across all domains/categories/subcategories.
# DOMAINS structure: {domain: {label: str, categories: {cat_name: {keywords, subcategories}}}}
seen: dict[str, list[str]] = {}
for domain_data in DOMAINS.values():
for cat_data in domain_data.get("categories", {}).values():
if not isinstance(cat_data, dict):
continue
top_kws = cat_data.get("keywords", [])
if top_kws:
seen[_kw_key(top_kws)] = top_kws
for subcat_kws in cat_data.get("subcategories", {}).values():
if subcat_kws:
seen[_kw_key(subcat_kws)] = subcat_kws
try:
corpus_conn = sqlite3.connect(f"file:{corpus_path}?mode=ro", uri=True)
except Exception as exc:
logger.error("browse_counts: cannot open corpus %s: %s", corpus_path, exc)
cache_conn.close()
return 0
now = datetime.now(timezone.utc).isoformat()
computed = 0
try:
for kw_key, kws in seen.items():
try:
row = corpus_conn.execute(
"SELECT count(*) FROM recipe_browser_fts WHERE recipe_browser_fts MATCH ?",
(_fts_match_expr(kws),),
).fetchone()
count = row[0] if row else 0
cache_conn.execute(
"INSERT OR REPLACE INTO browse_counts (keywords_key, count, computed_at)"
" VALUES (?, ?, ?)",
(kw_key, count, now),
)
computed += 1
except Exception as exc:
logger.warning("browse_counts: query failed key=%r: %s", kw_key[:60], exc)
# Merge accepted community tags into counts.
# For each (domain, category, subcategory) that has accepted community
# tags, add the count of distinct tagged recipe_ids to the FTS count.
# The two overlap rarely (community tags exist precisely because FTS
# missed those recipes), so simple addition is accurate enough.
try:
_merge_community_tag_counts(cache_conn, DOMAINS, now)
except Exception as exc:
logger.warning("browse_counts: community merge skipped: %s", exc)
cache_conn.execute(
"INSERT OR REPLACE INTO browse_counts_meta (key, value) VALUES ('refreshed_at', ?)",
(now,),
)
cache_conn.execute(
"INSERT OR REPLACE INTO browse_counts_meta (key, value) VALUES ('corpus_path', ?)",
(corpus_path,),
)
cache_conn.commit()
logger.info("browse_counts: wrote %d counts → %s", computed, cache_path)
finally:
corpus_conn.close()
cache_conn.close()
return computed
def _merge_community_tag_counts(
cache_conn: sqlite3.Connection,
domains: dict,
now: str,
threshold: int = 2,
) -> None:
"""Add accepted community tag counts on top of FTS counts in the cache.
Queries the community PostgreSQL store (if available) for accepted tags
grouped by (domain, category, subcategory), maps each back to its keyword
set key, then increments the cached count.
Silently skips if community features are unavailable.
"""
try:
from app.api.endpoints.community import _get_community_store
store = _get_community_store()
if store is None:
return
except Exception:
return
for domain_id, domain_data in domains.items():
for cat_name, cat_data in domain_data.get("categories", {}).items():
if not isinstance(cat_data, dict):
continue
# Check subcategories
for subcat_name, subcat_kws in cat_data.get("subcategories", {}).items():
if not subcat_kws:
continue
ids = store.get_accepted_recipe_ids_for_subcategory(
domain=domain_id,
category=cat_name,
subcategory=subcat_name,
threshold=threshold,
)
if not ids:
continue
kw_key = _kw_key(subcat_kws)
cache_conn.execute(
"UPDATE browse_counts SET count = count + ? WHERE keywords_key = ?",
(len(ids), kw_key),
)
# Check category-level tags (subcategory IS NULL)
top_kws = cat_data.get("keywords", [])
if top_kws:
ids = store.get_accepted_recipe_ids_for_subcategory(
domain=domain_id,
category=cat_name,
subcategory=None,
threshold=threshold,
)
if ids:
kw_key = _kw_key(top_kws)
cache_conn.execute(
"UPDATE browse_counts SET count = count + ? WHERE keywords_key = ?",
(len(ids), kw_key),
)
logger.info("browse_counts: community tag counts merged")