feat(browse-counts): add pre-computed FTS counts cache with nightly refresh
Multiple concurrent users browsing the 3.2M recipe corpus would cause FTS5 page cache contention and slow per-request queries. Solution: pre-compute counts for all category/subcategory keyword sets into a small SQLite cache. - browse_counts_cache.py: refresh(), load_into_memory(), is_stale() helpers - config.py: BROWSE_COUNTS_PATH setting (default DATA_DIR/browse_counts.db) - main.py: warms in-memory cache on startup; runs nightly refresh task every 24h - infer_recipe_tags.py: auto-refreshes cache after a successful tag run so the app picks up updated FTS counts without a restart
This commit is contained in:
parent
5d0ee2493e
commit
1a7a94a344
4 changed files with 266 additions and 0 deletions
|
|
@ -35,6 +35,14 @@ class Settings:
|
|||
# Database
|
||||
DB_PATH: Path = Path(os.environ.get("DB_PATH", str(DATA_DIR / "kiwi.db")))
|
||||
|
||||
# Pre-computed browse counts cache (small SQLite, separate from corpus).
|
||||
# Written by the nightly refresh task and by infer_recipe_tags.py.
|
||||
# Set BROWSE_COUNTS_PATH to a bind-mounted path if you want the host
|
||||
# pipeline to share counts with the container without re-running FTS.
|
||||
BROWSE_COUNTS_PATH: Path = Path(
|
||||
os.environ.get("BROWSE_COUNTS_PATH", str(DATA_DIR / "browse_counts.db"))
|
||||
)
|
||||
|
||||
# Community feature settings
|
||||
COMMUNITY_DB_URL: str | None = os.environ.get("COMMUNITY_DB_URL") or None
|
||||
COMMUNITY_PSEUDONYM_SALT: str = os.environ.get(
|
||||
|
|
|
|||
43
app/main.py
43
app/main.py
|
|
@ -1,7 +1,9 @@
|
|||
#!/usr/bin/env python
|
||||
# app/main.py
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
from fastapi import FastAPI
|
||||
|
|
@ -16,6 +18,26 @@ from app.services.meal_plan.affiliates import register_kiwi_programs
|
|||
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_BROWSE_REFRESH_INTERVAL_H = 24
|
||||
|
||||
|
||||
async def _browse_counts_refresh_loop(corpus_path: str) -> None:
|
||||
"""Refresh browse counts every 24 h while the container is running."""
|
||||
from app.db.store import _COUNT_CACHE
|
||||
from app.services.recipe.browse_counts_cache import load_into_memory, refresh
|
||||
|
||||
while True:
|
||||
await asyncio.sleep(_BROWSE_REFRESH_INTERVAL_H * 3600)
|
||||
try:
|
||||
logger.info("browse_counts: starting scheduled refresh...")
|
||||
computed = await asyncio.to_thread(
|
||||
refresh, corpus_path, settings.BROWSE_COUNTS_PATH
|
||||
)
|
||||
load_into_memory(settings.BROWSE_COUNTS_PATH, _COUNT_CACHE, corpus_path)
|
||||
logger.info("browse_counts: scheduled refresh complete (%d sets)", computed)
|
||||
except Exception as exc:
|
||||
logger.warning("browse_counts: scheduled refresh failed: %s", exc)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
|
|
@ -32,6 +54,27 @@ async def lifespan(app: FastAPI):
|
|||
from app.api.endpoints.community import init_community_store
|
||||
init_community_store(settings.COMMUNITY_DB_URL)
|
||||
|
||||
# Browse counts cache — warm in-memory cache from disk, refresh if stale.
|
||||
# Uses the corpus path the store will attach to at request time.
|
||||
corpus_path = os.environ.get("RECIPE_DB_PATH", str(settings.DB_PATH))
|
||||
try:
|
||||
from app.db.store import _COUNT_CACHE
|
||||
from app.services.recipe.browse_counts_cache import (
|
||||
is_stale, load_into_memory, refresh,
|
||||
)
|
||||
if is_stale(settings.BROWSE_COUNTS_PATH):
|
||||
logger.info("browse_counts: cache stale — refreshing in background...")
|
||||
asyncio.create_task(
|
||||
asyncio.to_thread(refresh, corpus_path, settings.BROWSE_COUNTS_PATH)
|
||||
)
|
||||
else:
|
||||
load_into_memory(settings.BROWSE_COUNTS_PATH, _COUNT_CACHE, corpus_path)
|
||||
except Exception as exc:
|
||||
logger.warning("browse_counts: startup init failed (live FTS fallback active): %s", exc)
|
||||
|
||||
# Nightly background refresh loop
|
||||
asyncio.create_task(_browse_counts_refresh_loop(corpus_path))
|
||||
|
||||
yield
|
||||
|
||||
# Graceful scheduler shutdown
|
||||
|
|
|
|||
185
app/services/recipe/browse_counts_cache.py
Normal file
185
app/services/recipe/browse_counts_cache.py
Normal file
|
|
@ -0,0 +1,185 @@
|
|||
"""
|
||||
Browse counts cache — pre-computes and persists recipe counts for all
|
||||
browse domain keyword sets so category/subcategory page loads never
|
||||
hit the 3.8 GB FTS index at request time.
|
||||
|
||||
Counts change only when the corpus changes (after a pipeline run).
|
||||
The cache is a small SQLite file separate from both the read-only
|
||||
corpus DB and per-user kiwi.db files, so the container can write it.
|
||||
|
||||
Refresh triggers:
|
||||
1. Startup — if cache is missing or older than STALE_DAYS
|
||||
2. Nightly — asyncio background task started in main.py lifespan
|
||||
3. Pipeline — infer_recipe_tags.py calls refresh() at end of run
|
||||
|
||||
The in-memory _COUNT_CACHE in store.py is pre-warmed from this file
|
||||
on startup, so FTS queries are never needed for known keyword sets.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import sqlite3
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
STALE_DAYS = 7
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Internal helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _kw_key(keywords: list[str]) -> str:
|
||||
"""Stable string key for a keyword list — sorted and pipe-joined."""
|
||||
return "|".join(sorted(keywords))
|
||||
|
||||
|
||||
def _fts_match_expr(keywords: list[str]) -> str:
|
||||
phrases = ['"' + kw.replace('"', '""') + '"' for kw in keywords]
|
||||
return " OR ".join(phrases)
|
||||
|
||||
|
||||
def _ensure_schema(conn: sqlite3.Connection) -> None:
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS browse_counts (
|
||||
keywords_key TEXT PRIMARY KEY,
|
||||
count INTEGER NOT NULL,
|
||||
computed_at TEXT NOT NULL
|
||||
)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS browse_counts_meta (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT NOT NULL
|
||||
)
|
||||
""")
|
||||
conn.commit()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def is_stale(cache_path: Path, max_age_days: int = STALE_DAYS) -> bool:
|
||||
"""Return True if the cache is missing, empty, or older than max_age_days."""
|
||||
if not cache_path.exists():
|
||||
return True
|
||||
try:
|
||||
conn = sqlite3.connect(cache_path)
|
||||
row = conn.execute(
|
||||
"SELECT value FROM browse_counts_meta WHERE key = 'refreshed_at'"
|
||||
).fetchone()
|
||||
conn.close()
|
||||
if row is None:
|
||||
return True
|
||||
age = (datetime.now(timezone.utc) - datetime.fromisoformat(row[0])).days
|
||||
return age >= max_age_days
|
||||
except Exception:
|
||||
return True
|
||||
|
||||
|
||||
def load_into_memory(cache_path: Path, count_cache: dict, corpus_path: str) -> int:
|
||||
"""
|
||||
Load all rows from the cache file into the in-memory count_cache dict.
|
||||
|
||||
Uses corpus_path (the current RECIPE_DB_PATH env value) as the cache key,
|
||||
not what was stored in the file — the file may have been built against a
|
||||
different mount path (e.g. pipeline ran on host, container sees a different
|
||||
path). Counts are corpus-content-derived and path-independent.
|
||||
|
||||
Returns the number of entries loaded.
|
||||
"""
|
||||
if not cache_path.exists():
|
||||
return 0
|
||||
try:
|
||||
conn = sqlite3.connect(cache_path)
|
||||
rows = conn.execute("SELECT keywords_key, count FROM browse_counts").fetchall()
|
||||
conn.close()
|
||||
loaded = 0
|
||||
for kw_key, count in rows:
|
||||
keywords = kw_key.split("|") if kw_key else []
|
||||
cache_key = (corpus_path, *sorted(keywords))
|
||||
count_cache[cache_key] = count
|
||||
loaded += 1
|
||||
logger.info("browse_counts: warmed %d entries from %s", loaded, cache_path)
|
||||
return loaded
|
||||
except Exception as exc:
|
||||
logger.warning("browse_counts: load failed: %s", exc)
|
||||
return 0
|
||||
|
||||
|
||||
def refresh(corpus_path: str, cache_path: Path) -> int:
|
||||
"""
|
||||
Run FTS5 queries for every keyword set in browser_domains.DOMAINS
|
||||
and write results to cache_path.
|
||||
|
||||
Safe to call from both the host pipeline script and the in-container
|
||||
nightly task. The corpus_path must be reachable and readable from
|
||||
the calling process.
|
||||
|
||||
Returns the number of keyword sets computed.
|
||||
"""
|
||||
from app.services.recipe.browser_domains import DOMAINS # local import — avoid circular
|
||||
|
||||
cache_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
cache_conn = sqlite3.connect(cache_path)
|
||||
_ensure_schema(cache_conn)
|
||||
|
||||
# Collect every unique keyword list across all domains/categories/subcategories.
|
||||
# DOMAINS structure: {domain: {label: str, categories: {cat_name: {keywords, subcategories}}}}
|
||||
seen: dict[str, list[str]] = {}
|
||||
for domain_data in DOMAINS.values():
|
||||
for cat_data in domain_data.get("categories", {}).values():
|
||||
if not isinstance(cat_data, dict):
|
||||
continue
|
||||
top_kws = cat_data.get("keywords", [])
|
||||
if top_kws:
|
||||
seen[_kw_key(top_kws)] = top_kws
|
||||
for subcat_kws in cat_data.get("subcategories", {}).values():
|
||||
if subcat_kws:
|
||||
seen[_kw_key(subcat_kws)] = subcat_kws
|
||||
|
||||
try:
|
||||
corpus_conn = sqlite3.connect(f"file:{corpus_path}?mode=ro", uri=True)
|
||||
except Exception as exc:
|
||||
logger.error("browse_counts: cannot open corpus %s: %s", corpus_path, exc)
|
||||
cache_conn.close()
|
||||
return 0
|
||||
|
||||
now = datetime.now(timezone.utc).isoformat()
|
||||
computed = 0
|
||||
|
||||
try:
|
||||
for kw_key, kws in seen.items():
|
||||
try:
|
||||
row = corpus_conn.execute(
|
||||
"SELECT count(*) FROM recipe_browser_fts WHERE recipe_browser_fts MATCH ?",
|
||||
(_fts_match_expr(kws),),
|
||||
).fetchone()
|
||||
count = row[0] if row else 0
|
||||
cache_conn.execute(
|
||||
"INSERT OR REPLACE INTO browse_counts (keywords_key, count, computed_at)"
|
||||
" VALUES (?, ?, ?)",
|
||||
(kw_key, count, now),
|
||||
)
|
||||
computed += 1
|
||||
except Exception as exc:
|
||||
logger.warning("browse_counts: query failed key=%r: %s", kw_key[:60], exc)
|
||||
|
||||
cache_conn.execute(
|
||||
"INSERT OR REPLACE INTO browse_counts_meta (key, value) VALUES ('refreshed_at', ?)",
|
||||
(now,),
|
||||
)
|
||||
cache_conn.execute(
|
||||
"INSERT OR REPLACE INTO browse_counts_meta (key, value) VALUES ('corpus_path', ?)",
|
||||
(corpus_path,),
|
||||
)
|
||||
cache_conn.commit()
|
||||
logger.info("browse_counts: wrote %d counts → %s", computed, cache_path)
|
||||
finally:
|
||||
corpus_conn.close()
|
||||
cache_conn.close()
|
||||
|
||||
return computed
|
||||
|
|
@ -18,6 +18,7 @@ from __future__ import annotations
|
|||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
|
@ -248,8 +249,37 @@ if __name__ == "__main__":
|
|||
parser.add_argument("--batch-size", type=int, default=2000)
|
||||
parser.add_argument("--force", action="store_true",
|
||||
help="Re-derive tags even if inferred_tags is already set.")
|
||||
parser.add_argument(
|
||||
"--browse-counts-path",
|
||||
type=Path,
|
||||
default=None,
|
||||
metavar="PATH",
|
||||
help=(
|
||||
"Path to the browse_counts.db cache file to refresh after tagging. "
|
||||
"Defaults to DATA_DIR/browse_counts.db if DATA_DIR env var is set, "
|
||||
"otherwise skipped."
|
||||
),
|
||||
)
|
||||
args = parser.parse_args()
|
||||
if not args.db.exists():
|
||||
print(f"DB not found: {args.db}")
|
||||
sys.exit(1)
|
||||
run(args.db, args.batch_size, args.force)
|
||||
|
||||
# Refresh browse counts cache after a successful run so the app picks up
|
||||
# the updated FTS index without restarting. Skipped if no cache path given
|
||||
# and DATA_DIR env var is not set.
|
||||
cache_path = args.browse_counts_path
|
||||
if cache_path is None:
|
||||
data_dir = os.environ.get("DATA_DIR")
|
||||
if data_dir:
|
||||
cache_path = Path(data_dir) / "browse_counts.db"
|
||||
|
||||
if cache_path is not None:
|
||||
print(f"Refreshing browse counts cache → {cache_path} ...")
|
||||
try:
|
||||
from app.services.recipe.browse_counts_cache import refresh as _refresh
|
||||
computed = _refresh(str(args.db), cache_path)
|
||||
print(f"Browse counts cache refreshed ({computed} keyword sets).")
|
||||
except Exception as exc:
|
||||
print(f"Browse counts refresh skipped: {exc}")
|
||||
|
|
|
|||
Loading…
Reference in a new issue