diff --git a/app/core/config.py b/app/core/config.py index 315fa90..42375e3 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -35,6 +35,14 @@ class Settings: # Database DB_PATH: Path = Path(os.environ.get("DB_PATH", str(DATA_DIR / "kiwi.db"))) + # Pre-computed browse counts cache (small SQLite, separate from corpus). + # Written by the nightly refresh task and by infer_recipe_tags.py. + # Set BROWSE_COUNTS_PATH to a bind-mounted path if you want the host + # pipeline to share counts with the container without re-running FTS. + BROWSE_COUNTS_PATH: Path = Path( + os.environ.get("BROWSE_COUNTS_PATH", str(DATA_DIR / "browse_counts.db")) + ) + # Community feature settings COMMUNITY_DB_URL: str | None = os.environ.get("COMMUNITY_DB_URL") or None COMMUNITY_PSEUDONYM_SALT: str = os.environ.get( diff --git a/app/main.py b/app/main.py index 218e08a..65ce214 100644 --- a/app/main.py +++ b/app/main.py @@ -1,7 +1,9 @@ #!/usr/bin/env python # app/main.py +import asyncio import logging +import os from contextlib import asynccontextmanager from fastapi import FastAPI @@ -16,6 +18,26 @@ from app.services.meal_plan.affiliates import register_kiwi_programs logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s") logger = logging.getLogger(__name__) +_BROWSE_REFRESH_INTERVAL_H = 24 + + +async def _browse_counts_refresh_loop(corpus_path: str) -> None: + """Refresh browse counts every 24 h while the container is running.""" + from app.db.store import _COUNT_CACHE + from app.services.recipe.browse_counts_cache import load_into_memory, refresh + + while True: + await asyncio.sleep(_BROWSE_REFRESH_INTERVAL_H * 3600) + try: + logger.info("browse_counts: starting scheduled refresh...") + computed = await asyncio.to_thread( + refresh, corpus_path, settings.BROWSE_COUNTS_PATH + ) + load_into_memory(settings.BROWSE_COUNTS_PATH, _COUNT_CACHE, corpus_path) + logger.info("browse_counts: scheduled refresh complete (%d sets)", computed) + except Exception as exc: + logger.warning("browse_counts: scheduled refresh failed: %s", exc) + @asynccontextmanager async def lifespan(app: FastAPI): @@ -32,6 +54,27 @@ async def lifespan(app: FastAPI): from app.api.endpoints.community import init_community_store init_community_store(settings.COMMUNITY_DB_URL) + # Browse counts cache — warm in-memory cache from disk, refresh if stale. + # Uses the corpus path the store will attach to at request time. + corpus_path = os.environ.get("RECIPE_DB_PATH", str(settings.DB_PATH)) + try: + from app.db.store import _COUNT_CACHE + from app.services.recipe.browse_counts_cache import ( + is_stale, load_into_memory, refresh, + ) + if is_stale(settings.BROWSE_COUNTS_PATH): + logger.info("browse_counts: cache stale — refreshing in background...") + asyncio.create_task( + asyncio.to_thread(refresh, corpus_path, settings.BROWSE_COUNTS_PATH) + ) + else: + load_into_memory(settings.BROWSE_COUNTS_PATH, _COUNT_CACHE, corpus_path) + except Exception as exc: + logger.warning("browse_counts: startup init failed (live FTS fallback active): %s", exc) + + # Nightly background refresh loop + asyncio.create_task(_browse_counts_refresh_loop(corpus_path)) + yield # Graceful scheduler shutdown diff --git a/app/services/recipe/browse_counts_cache.py b/app/services/recipe/browse_counts_cache.py new file mode 100644 index 0000000..2a7497e --- /dev/null +++ b/app/services/recipe/browse_counts_cache.py @@ -0,0 +1,185 @@ +""" +Browse counts cache — pre-computes and persists recipe counts for all +browse domain keyword sets so category/subcategory page loads never +hit the 3.8 GB FTS index at request time. + +Counts change only when the corpus changes (after a pipeline run). +The cache is a small SQLite file separate from both the read-only +corpus DB and per-user kiwi.db files, so the container can write it. + +Refresh triggers: + 1. Startup — if cache is missing or older than STALE_DAYS + 2. Nightly — asyncio background task started in main.py lifespan + 3. Pipeline — infer_recipe_tags.py calls refresh() at end of run + +The in-memory _COUNT_CACHE in store.py is pre-warmed from this file +on startup, so FTS queries are never needed for known keyword sets. +""" +from __future__ import annotations + +import logging +import sqlite3 +from datetime import datetime, timezone +from pathlib import Path + +logger = logging.getLogger(__name__) + +STALE_DAYS = 7 + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +def _kw_key(keywords: list[str]) -> str: + """Stable string key for a keyword list — sorted and pipe-joined.""" + return "|".join(sorted(keywords)) + + +def _fts_match_expr(keywords: list[str]) -> str: + phrases = ['"' + kw.replace('"', '""') + '"' for kw in keywords] + return " OR ".join(phrases) + + +def _ensure_schema(conn: sqlite3.Connection) -> None: + conn.execute(""" + CREATE TABLE IF NOT EXISTS browse_counts ( + keywords_key TEXT PRIMARY KEY, + count INTEGER NOT NULL, + computed_at TEXT NOT NULL + ) + """) + conn.execute(""" + CREATE TABLE IF NOT EXISTS browse_counts_meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL + ) + """) + conn.commit() + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +def is_stale(cache_path: Path, max_age_days: int = STALE_DAYS) -> bool: + """Return True if the cache is missing, empty, or older than max_age_days.""" + if not cache_path.exists(): + return True + try: + conn = sqlite3.connect(cache_path) + row = conn.execute( + "SELECT value FROM browse_counts_meta WHERE key = 'refreshed_at'" + ).fetchone() + conn.close() + if row is None: + return True + age = (datetime.now(timezone.utc) - datetime.fromisoformat(row[0])).days + return age >= max_age_days + except Exception: + return True + + +def load_into_memory(cache_path: Path, count_cache: dict, corpus_path: str) -> int: + """ + Load all rows from the cache file into the in-memory count_cache dict. + + Uses corpus_path (the current RECIPE_DB_PATH env value) as the cache key, + not what was stored in the file — the file may have been built against a + different mount path (e.g. pipeline ran on host, container sees a different + path). Counts are corpus-content-derived and path-independent. + + Returns the number of entries loaded. + """ + if not cache_path.exists(): + return 0 + try: + conn = sqlite3.connect(cache_path) + rows = conn.execute("SELECT keywords_key, count FROM browse_counts").fetchall() + conn.close() + loaded = 0 + for kw_key, count in rows: + keywords = kw_key.split("|") if kw_key else [] + cache_key = (corpus_path, *sorted(keywords)) + count_cache[cache_key] = count + loaded += 1 + logger.info("browse_counts: warmed %d entries from %s", loaded, cache_path) + return loaded + except Exception as exc: + logger.warning("browse_counts: load failed: %s", exc) + return 0 + + +def refresh(corpus_path: str, cache_path: Path) -> int: + """ + Run FTS5 queries for every keyword set in browser_domains.DOMAINS + and write results to cache_path. + + Safe to call from both the host pipeline script and the in-container + nightly task. The corpus_path must be reachable and readable from + the calling process. + + Returns the number of keyword sets computed. + """ + from app.services.recipe.browser_domains import DOMAINS # local import — avoid circular + + cache_path.parent.mkdir(parents=True, exist_ok=True) + cache_conn = sqlite3.connect(cache_path) + _ensure_schema(cache_conn) + + # Collect every unique keyword list across all domains/categories/subcategories. + # DOMAINS structure: {domain: {label: str, categories: {cat_name: {keywords, subcategories}}}} + seen: dict[str, list[str]] = {} + for domain_data in DOMAINS.values(): + for cat_data in domain_data.get("categories", {}).values(): + if not isinstance(cat_data, dict): + continue + top_kws = cat_data.get("keywords", []) + if top_kws: + seen[_kw_key(top_kws)] = top_kws + for subcat_kws in cat_data.get("subcategories", {}).values(): + if subcat_kws: + seen[_kw_key(subcat_kws)] = subcat_kws + + try: + corpus_conn = sqlite3.connect(f"file:{corpus_path}?mode=ro", uri=True) + except Exception as exc: + logger.error("browse_counts: cannot open corpus %s: %s", corpus_path, exc) + cache_conn.close() + return 0 + + now = datetime.now(timezone.utc).isoformat() + computed = 0 + + try: + for kw_key, kws in seen.items(): + try: + row = corpus_conn.execute( + "SELECT count(*) FROM recipe_browser_fts WHERE recipe_browser_fts MATCH ?", + (_fts_match_expr(kws),), + ).fetchone() + count = row[0] if row else 0 + cache_conn.execute( + "INSERT OR REPLACE INTO browse_counts (keywords_key, count, computed_at)" + " VALUES (?, ?, ?)", + (kw_key, count, now), + ) + computed += 1 + except Exception as exc: + logger.warning("browse_counts: query failed key=%r: %s", kw_key[:60], exc) + + cache_conn.execute( + "INSERT OR REPLACE INTO browse_counts_meta (key, value) VALUES ('refreshed_at', ?)", + (now,), + ) + cache_conn.execute( + "INSERT OR REPLACE INTO browse_counts_meta (key, value) VALUES ('corpus_path', ?)", + (corpus_path,), + ) + cache_conn.commit() + logger.info("browse_counts: wrote %d counts → %s", computed, cache_path) + finally: + corpus_conn.close() + cache_conn.close() + + return computed diff --git a/scripts/pipeline/infer_recipe_tags.py b/scripts/pipeline/infer_recipe_tags.py index 954d46e..00fc0a5 100644 --- a/scripts/pipeline/infer_recipe_tags.py +++ b/scripts/pipeline/infer_recipe_tags.py @@ -18,6 +18,7 @@ from __future__ import annotations import argparse import json +import os import sqlite3 import sys from pathlib import Path @@ -248,8 +249,37 @@ if __name__ == "__main__": parser.add_argument("--batch-size", type=int, default=2000) parser.add_argument("--force", action="store_true", help="Re-derive tags even if inferred_tags is already set.") + parser.add_argument( + "--browse-counts-path", + type=Path, + default=None, + metavar="PATH", + help=( + "Path to the browse_counts.db cache file to refresh after tagging. " + "Defaults to DATA_DIR/browse_counts.db if DATA_DIR env var is set, " + "otherwise skipped." + ), + ) args = parser.parse_args() if not args.db.exists(): print(f"DB not found: {args.db}") sys.exit(1) run(args.db, args.batch_size, args.force) + + # Refresh browse counts cache after a successful run so the app picks up + # the updated FTS index without restarting. Skipped if no cache path given + # and DATA_DIR env var is not set. + cache_path = args.browse_counts_path + if cache_path is None: + data_dir = os.environ.get("DATA_DIR") + if data_dir: + cache_path = Path(data_dir) / "browse_counts.db" + + if cache_path is not None: + print(f"Refreshing browse counts cache → {cache_path} ...") + try: + from app.services.recipe.browse_counts_cache import refresh as _refresh + computed = _refresh(str(args.db), cache_path) + print(f"Browse counts cache refreshed ({computed} keyword sets).") + except Exception as exc: + print(f"Browse counts refresh skipped: {exc}")