- Assembly template system (13 templates: burrito, fried rice, omelette, stir fry, pasta, sandwich, grain bowl, soup/stew, casserole, pancakes, porridge, pie, pudding) with role-based matching, whole-word single-keyword guard, deterministic titles via MD5 pantry hash - Prep-state stripping: strips 'melted butter' → 'butter' for coverage checks; reconstructs actionable states as 'Before you start:' cooking instructions (NutritionPanel prep_notes field + RecipesView.vue display block) - FTS5 fixes: always double-quote all terms; strip apostrophes to prevent syntax errors on brands like "Stouffer's"; 'plant-based' → bare 'based' crash - Bidirectional synonym expansion: alt-meat, alt-chicken, alt-beef, alt-pork mapped to canonical texture class; pantry expansion covers 'hamburger' from 'burger patties' etc. - Texture profile backfill script (378K ingredient_profiles rows) with macro-derived classification in priority order (fatty → creamy → starchy → firm → fibrous → tender → liquid → neutral); oats/legumes starchy-first fix - LLM prompt: ban flavoured/sweetened ingredients (vanilla yoghurt) from savoury - Migrations 014 (nutrition macros) + 015 (recipe FTS index) - Nutrition estimation pipeline script - gitignore MagicMock sqlite test artifacts
737 lines
30 KiB
Python
737 lines
30 KiB
Python
"""
|
||
SQLite data store for Kiwi.
|
||
Uses circuitforge-core for connection management and migrations.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import sqlite3
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
from circuitforge_core.db.base import get_connection
|
||
from circuitforge_core.db.migrations import run_migrations
|
||
|
||
MIGRATIONS_DIR = Path(__file__).parent / "migrations"
|
||
|
||
|
||
class Store:
|
||
def __init__(self, db_path: Path, key: str = "") -> None:
|
||
self.conn: sqlite3.Connection = get_connection(db_path, key)
|
||
self.conn.execute("PRAGMA journal_mode=WAL")
|
||
self.conn.execute("PRAGMA foreign_keys=ON")
|
||
run_migrations(self.conn, MIGRATIONS_DIR)
|
||
|
||
def close(self) -> None:
|
||
self.conn.close()
|
||
|
||
# ── helpers ───────────────────────────────────────────────────────────
|
||
|
||
def _row_to_dict(self, row: sqlite3.Row) -> dict[str, Any]:
|
||
d = dict(row)
|
||
# Deserialise any TEXT columns that contain JSON
|
||
for key in ("metadata", "nutrition_data", "source_data", "items",
|
||
"metrics", "improvement_suggestions", "confidence_scores",
|
||
"warnings",
|
||
# recipe columns
|
||
"ingredients", "ingredient_names", "directions",
|
||
"keywords", "element_coverage"):
|
||
if key in d and isinstance(d[key], str):
|
||
try:
|
||
d[key] = json.loads(d[key])
|
||
except (json.JSONDecodeError, TypeError):
|
||
pass
|
||
return d
|
||
|
||
def _fetch_one(self, sql: str, params: tuple = ()) -> dict[str, Any] | None:
|
||
self.conn.row_factory = sqlite3.Row
|
||
row = self.conn.execute(sql, params).fetchone()
|
||
return self._row_to_dict(row) if row else None
|
||
|
||
def _fetch_all(self, sql: str, params: tuple = ()) -> list[dict[str, Any]]:
|
||
self.conn.row_factory = sqlite3.Row
|
||
rows = self.conn.execute(sql, params).fetchall()
|
||
return [self._row_to_dict(r) for r in rows]
|
||
|
||
def _dump(self, value: Any) -> str:
|
||
"""Serialise a Python object to a JSON string for storage."""
|
||
return json.dumps(value)
|
||
|
||
# ── receipts ──────────────────────────────────────────────────────────
|
||
|
||
def _insert_returning(self, sql: str, params: tuple = ()) -> dict[str, Any]:
|
||
"""Execute an INSERT ... RETURNING * and return the new row as a dict.
|
||
Fetches the row BEFORE committing — SQLite requires the cursor to be
|
||
fully consumed before the transaction is committed."""
|
||
self.conn.row_factory = sqlite3.Row
|
||
cur = self.conn.execute(sql, params)
|
||
row = self._row_to_dict(cur.fetchone())
|
||
self.conn.commit()
|
||
return row
|
||
|
||
def create_receipt(self, filename: str, original_path: str) -> dict[str, Any]:
|
||
return self._insert_returning(
|
||
"INSERT INTO receipts (filename, original_path) VALUES (?, ?) RETURNING *",
|
||
(filename, original_path),
|
||
)
|
||
|
||
def get_receipt(self, receipt_id: int) -> dict[str, Any] | None:
|
||
return self._fetch_one("SELECT * FROM receipts WHERE id = ?", (receipt_id,))
|
||
|
||
def list_receipts(self, limit: int = 50, offset: int = 0) -> list[dict[str, Any]]:
|
||
return self._fetch_all(
|
||
"SELECT * FROM receipts ORDER BY created_at DESC LIMIT ? OFFSET ?",
|
||
(limit, offset),
|
||
)
|
||
|
||
def update_receipt_status(self, receipt_id: int, status: str,
|
||
error: str | None = None) -> None:
|
||
self.conn.execute(
|
||
"UPDATE receipts SET status = ?, error = ?, updated_at = datetime('now') WHERE id = ?",
|
||
(status, error, receipt_id),
|
||
)
|
||
self.conn.commit()
|
||
|
||
def update_receipt_metadata(self, receipt_id: int, metadata: dict) -> None:
|
||
self.conn.execute(
|
||
"UPDATE receipts SET metadata = ?, updated_at = datetime('now') WHERE id = ?",
|
||
(self._dump(metadata), receipt_id),
|
||
)
|
||
self.conn.commit()
|
||
|
||
# ── quality assessments ───────────────────────────────────────────────
|
||
|
||
def upsert_quality_assessment(self, receipt_id: int, overall_score: float,
|
||
is_acceptable: bool, metrics: dict,
|
||
suggestions: list) -> dict[str, Any]:
|
||
self.conn.execute(
|
||
"""INSERT INTO quality_assessments
|
||
(receipt_id, overall_score, is_acceptable, metrics, improvement_suggestions)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
ON CONFLICT (receipt_id) DO UPDATE SET
|
||
overall_score = excluded.overall_score,
|
||
is_acceptable = excluded.is_acceptable,
|
||
metrics = excluded.metrics,
|
||
improvement_suggestions = excluded.improvement_suggestions""",
|
||
(receipt_id, overall_score, int(is_acceptable),
|
||
self._dump(metrics), self._dump(suggestions)),
|
||
)
|
||
self.conn.commit()
|
||
return self._fetch_one(
|
||
"SELECT * FROM quality_assessments WHERE receipt_id = ?", (receipt_id,)
|
||
)
|
||
|
||
# ── products ──────────────────────────────────────────────────────────
|
||
|
||
def get_or_create_product(self, name: str, barcode: str | None = None,
|
||
**kwargs) -> tuple[dict[str, Any], bool]:
|
||
"""Returns (product, created). Looks up by barcode first, then name."""
|
||
if barcode:
|
||
existing = self._fetch_one(
|
||
"SELECT * FROM products WHERE barcode = ?", (barcode,)
|
||
)
|
||
if existing:
|
||
return existing, False
|
||
|
||
existing = self._fetch_one("SELECT * FROM products WHERE name = ?", (name,))
|
||
if existing:
|
||
return existing, False
|
||
|
||
row = self._insert_returning(
|
||
"""INSERT INTO products (name, barcode, brand, category, description,
|
||
image_url, nutrition_data, source, source_data)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING *""",
|
||
(
|
||
name, barcode,
|
||
kwargs.get("brand"), kwargs.get("category"),
|
||
kwargs.get("description"), kwargs.get("image_url"),
|
||
self._dump(kwargs.get("nutrition_data", {})),
|
||
kwargs.get("source", "manual"),
|
||
self._dump(kwargs.get("source_data", {})),
|
||
),
|
||
)
|
||
return row, True
|
||
|
||
def get_product(self, product_id: int) -> dict[str, Any] | None:
|
||
return self._fetch_one("SELECT * FROM products WHERE id = ?", (product_id,))
|
||
|
||
def list_products(self) -> list[dict[str, Any]]:
|
||
return self._fetch_all("SELECT * FROM products ORDER BY name")
|
||
|
||
# ── inventory ─────────────────────────────────────────────────────────
|
||
|
||
def add_inventory_item(self, product_id: int, location: str,
|
||
quantity: float = 1.0, unit: str = "count",
|
||
**kwargs) -> dict[str, Any]:
|
||
return self._insert_returning(
|
||
"""INSERT INTO inventory_items
|
||
(product_id, receipt_id, quantity, unit, location, sublocation,
|
||
purchase_date, expiration_date, notes, source)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING *""",
|
||
(
|
||
product_id, kwargs.get("receipt_id"),
|
||
quantity, unit, location, kwargs.get("sublocation"),
|
||
kwargs.get("purchase_date"), kwargs.get("expiration_date"),
|
||
kwargs.get("notes"), kwargs.get("source", "manual"),
|
||
),
|
||
)
|
||
|
||
def get_inventory_item(self, item_id: int) -> dict[str, Any] | None:
|
||
return self._fetch_one(
|
||
"""SELECT i.*, p.name as product_name, p.barcode, p.category
|
||
FROM inventory_items i
|
||
JOIN products p ON p.id = i.product_id
|
||
WHERE i.id = ?""",
|
||
(item_id,),
|
||
)
|
||
|
||
def list_inventory(self, location: str | None = None,
|
||
status: str = "available") -> list[dict[str, Any]]:
|
||
if location:
|
||
return self._fetch_all(
|
||
"""SELECT i.*, p.name as product_name, p.barcode, p.category
|
||
FROM inventory_items i
|
||
JOIN products p ON p.id = i.product_id
|
||
WHERE i.status = ? AND i.location = ?
|
||
ORDER BY i.expiration_date ASC NULLS LAST""",
|
||
(status, location),
|
||
)
|
||
return self._fetch_all(
|
||
"""SELECT i.*, p.name as product_name, p.barcode, p.category
|
||
FROM inventory_items i
|
||
JOIN products p ON p.id = i.product_id
|
||
WHERE i.status = ?
|
||
ORDER BY i.expiration_date ASC NULLS LAST""",
|
||
(status,),
|
||
)
|
||
|
||
def update_inventory_item(self, item_id: int, **kwargs) -> dict[str, Any] | None:
|
||
allowed = {"quantity", "unit", "location", "sublocation",
|
||
"expiration_date", "status", "notes", "consumed_at"}
|
||
updates = {k: v for k, v in kwargs.items() if k in allowed}
|
||
if not updates:
|
||
return self.get_inventory_item(item_id)
|
||
sets = ", ".join(f"{k} = ?" for k in updates)
|
||
values = list(updates.values()) + [item_id]
|
||
self.conn.execute(
|
||
f"UPDATE inventory_items SET {sets}, updated_at = datetime('now') WHERE id = ?",
|
||
values,
|
||
)
|
||
self.conn.commit()
|
||
return self.get_inventory_item(item_id)
|
||
|
||
def expiring_soon(self, days: int = 7) -> list[dict[str, Any]]:
|
||
return self._fetch_all(
|
||
"""SELECT i.*, p.name as product_name, p.category
|
||
FROM inventory_items i
|
||
JOIN products p ON p.id = i.product_id
|
||
WHERE i.status = 'available'
|
||
AND i.expiration_date IS NOT NULL
|
||
AND date(i.expiration_date) <= date('now', ? || ' days')
|
||
ORDER BY i.expiration_date ASC""",
|
||
(str(days),),
|
||
)
|
||
|
||
def recalculate_expiry(
|
||
self,
|
||
tier: str = "local",
|
||
has_byok: bool = False,
|
||
) -> tuple[int, int]:
|
||
"""Re-run the expiration predictor over all available inventory items.
|
||
|
||
Uses each item's existing purchase_date (falls back to today if NULL)
|
||
and its current location. Skips items that have an explicit
|
||
expiration_date from a source other than auto-prediction (i.e. items
|
||
whose expiry was found on a receipt or entered by the user) cannot be
|
||
distinguished — all available items are recalculated.
|
||
|
||
Returns (updated_count, skipped_count).
|
||
"""
|
||
from datetime import date
|
||
from app.services.expiration_predictor import ExpirationPredictor
|
||
|
||
predictor = ExpirationPredictor()
|
||
rows = self._fetch_all(
|
||
"""SELECT i.id, i.location, i.purchase_date,
|
||
p.name AS product_name, p.category AS product_category
|
||
FROM inventory_items i
|
||
JOIN products p ON p.id = i.product_id
|
||
WHERE i.status = 'available'""",
|
||
(),
|
||
)
|
||
|
||
updated = skipped = 0
|
||
for row in rows:
|
||
cat = predictor.get_category_from_product(
|
||
row["product_name"] or "",
|
||
product_category=row.get("product_category"),
|
||
location=row.get("location"),
|
||
)
|
||
purchase_date_raw = row.get("purchase_date")
|
||
try:
|
||
purchase_date = (
|
||
date.fromisoformat(purchase_date_raw)
|
||
if purchase_date_raw
|
||
else date.today()
|
||
)
|
||
except (ValueError, TypeError):
|
||
purchase_date = date.today()
|
||
|
||
exp = predictor.predict_expiration(
|
||
cat,
|
||
row["location"] or "pantry",
|
||
purchase_date=purchase_date,
|
||
product_name=row["product_name"],
|
||
tier=tier,
|
||
has_byok=has_byok,
|
||
)
|
||
if exp is None:
|
||
skipped += 1
|
||
continue
|
||
|
||
self.conn.execute(
|
||
"UPDATE inventory_items SET expiration_date = ?, updated_at = datetime('now') WHERE id = ?",
|
||
(str(exp), row["id"]),
|
||
)
|
||
updated += 1
|
||
|
||
self.conn.commit()
|
||
return updated, skipped
|
||
|
||
# ── receipt_data ──────────────────────────────────────────────────────
|
||
|
||
def upsert_receipt_data(self, receipt_id: int, data: dict) -> dict[str, Any]:
|
||
fields = [
|
||
"merchant_name", "merchant_address", "merchant_phone", "merchant_email",
|
||
"merchant_website", "merchant_tax_id", "transaction_date", "transaction_time",
|
||
"receipt_number", "register_number", "cashier_name", "transaction_id",
|
||
"items", "subtotal", "tax", "discount", "tip", "total",
|
||
"payment_method", "amount_paid", "change_given",
|
||
"raw_text", "confidence_scores", "warnings", "processing_time",
|
||
]
|
||
json_fields = {"items", "confidence_scores", "warnings"}
|
||
cols = ", ".join(fields)
|
||
placeholders = ", ".join("?" for _ in fields)
|
||
values = [
|
||
self._dump(data.get(f)) if f in json_fields and data.get(f) is not None
|
||
else data.get(f)
|
||
for f in fields
|
||
]
|
||
self.conn.execute(
|
||
f"""INSERT INTO receipt_data (receipt_id, {cols})
|
||
VALUES (?, {placeholders})
|
||
ON CONFLICT (receipt_id) DO UPDATE SET
|
||
{', '.join(f'{f} = excluded.{f}' for f in fields)},
|
||
updated_at = datetime('now')""",
|
||
[receipt_id] + values,
|
||
)
|
||
self.conn.commit()
|
||
return self._fetch_one(
|
||
"SELECT * FROM receipt_data WHERE receipt_id = ?", (receipt_id,)
|
||
)
|
||
|
||
# ── recipes ───────────────────────────────────────────────────────────
|
||
|
||
def _fts_ready(self) -> bool:
|
||
"""Return True if the recipes_fts virtual table exists."""
|
||
row = self._fetch_one(
|
||
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='recipes_fts'"
|
||
)
|
||
return row is not None
|
||
|
||
# Words that carry no recipe-ingredient signal and should be filtered
|
||
# out when tokenising multi-word product names for FTS expansion.
|
||
_FTS_TOKEN_STOPWORDS: frozenset[str] = frozenset({
|
||
# Common English stopwords
|
||
"a", "an", "the", "of", "in", "for", "with", "and", "or", "to",
|
||
"from", "at", "by", "as", "on", "into",
|
||
# Brand / marketing words that appear in product names
|
||
"lean", "cuisine", "healthy", "choice", "stouffer", "original",
|
||
"classic", "deluxe", "homestyle", "family", "style", "grade",
|
||
"premium", "select", "natural", "organic", "fresh", "lite",
|
||
"ready", "quick", "easy", "instant", "microwave", "frozen",
|
||
"brand", "size", "large", "small", "medium", "extra",
|
||
# Plant-based / alt-meat brand names
|
||
"daring", "gardein", "morningstar", "lightlife", "tofurky",
|
||
"quorn", "omni", "nuggs", "simulate", "simulate",
|
||
# Preparation states — "cut up chicken" is still chicken
|
||
"cut", "diced", "sliced", "chopped", "minced", "shredded",
|
||
"cooked", "raw", "whole", "boneless", "skinless", "trimmed",
|
||
"pre", "prepared", "marinated", "seasoned", "breaded", "battered",
|
||
"grilled", "roasted", "smoked", "canned", "dried", "dehydrated",
|
||
"pieces", "piece", "strips", "strip", "chunks", "chunk",
|
||
"fillets", "fillet", "cutlets", "cutlet", "tenders", "nuggets",
|
||
# Units / packaging
|
||
"oz", "lb", "lbs", "pkg", "pack", "box", "can", "bag", "jar",
|
||
})
|
||
|
||
# Maps substrings found in product-label names to canonical recipe-corpus
|
||
# ingredient terms. Checked as substring matches against the lower-cased
|
||
# full product name, then against each individual token.
|
||
_FTS_SYNONYMS: dict[str, str] = {
|
||
# Ground / minced beef
|
||
"burger patt": "hamburger",
|
||
"beef patt": "hamburger",
|
||
"ground beef": "hamburger",
|
||
"ground chuck": "hamburger",
|
||
"ground round": "hamburger",
|
||
"mince": "hamburger",
|
||
"veggie burger": "hamburger",
|
||
"beyond burger": "hamburger",
|
||
"impossible burger": "hamburger",
|
||
"plant burger": "hamburger",
|
||
"chicken patt": "hamburger", # FTS match only — recipe scoring still works
|
||
# Sausages
|
||
"kielbasa": "sausage",
|
||
"bratwurst": "sausage",
|
||
"brat ": "sausage",
|
||
"frankfurter": "hotdog",
|
||
"wiener": "hotdog",
|
||
# Chicken cuts + plant-based chicken → generic chicken for broader matching
|
||
"chicken breast": "chicken",
|
||
"chicken thigh": "chicken",
|
||
"chicken drumstick": "chicken",
|
||
"chicken wing": "chicken",
|
||
"rotisserie chicken": "chicken",
|
||
"chicken tender": "chicken",
|
||
"chicken strip": "chicken",
|
||
"chicken piece": "chicken",
|
||
"fake chicken": "chicken",
|
||
"plant chicken": "chicken",
|
||
"vegan chicken": "chicken",
|
||
"daring": "chicken", # Daring Foods brand
|
||
"gardein chick": "chicken",
|
||
"quorn chick": "chicken",
|
||
"chick'n": "chicken",
|
||
"chikn": "chicken",
|
||
"not-chicken": "chicken",
|
||
"no-chicken": "chicken",
|
||
# Plant-based beef subs — map to broad "beef" not "hamburger"
|
||
# (texture varies: strips ≠ ground; let corpus handle the specific form)
|
||
"not-beef": "beef",
|
||
"no-beef": "beef",
|
||
"plant beef": "beef",
|
||
"vegan beef": "beef",
|
||
# Plant-based pork subs
|
||
"not-pork": "pork",
|
||
"no-pork": "pork",
|
||
"plant pork": "pork",
|
||
"vegan pork": "pork",
|
||
"omnipork": "pork",
|
||
"omni pork": "pork",
|
||
# Generic alt-meat catch-alls → broad "beef" (safer than hamburger)
|
||
"fake meat": "beef",
|
||
"plant meat": "beef",
|
||
"vegan meat": "beef",
|
||
"meat-free": "beef",
|
||
"meatless": "beef",
|
||
# Pork cuts
|
||
"pork chop": "pork",
|
||
"pork loin": "pork",
|
||
"pork tenderloin": "pork",
|
||
# Tomato-based sauces
|
||
"marinara": "tomato sauce",
|
||
"pasta sauce": "tomato sauce",
|
||
"spaghetti sauce": "tomato sauce",
|
||
"pizza sauce": "tomato sauce",
|
||
# Pasta shapes — map to generic "pasta" so FTS finds any pasta recipe
|
||
"macaroni": "pasta",
|
||
"noodles": "pasta",
|
||
"spaghetti": "pasta",
|
||
"penne": "pasta",
|
||
"fettuccine": "pasta",
|
||
"rigatoni": "pasta",
|
||
"linguine": "pasta",
|
||
"rotini": "pasta",
|
||
"farfalle": "pasta",
|
||
# Cheese variants → "cheese" for broad matching
|
||
"shredded cheese": "cheese",
|
||
"sliced cheese": "cheese",
|
||
"american cheese": "cheese",
|
||
"cheddar": "cheese",
|
||
"mozzarella": "cheese",
|
||
# Cream variants
|
||
"heavy cream": "cream",
|
||
"whipping cream": "cream",
|
||
"half and half": "cream",
|
||
# Buns / rolls
|
||
"burger bun": "buns",
|
||
"hamburger bun": "buns",
|
||
"hot dog bun": "buns",
|
||
"bread roll": "buns",
|
||
"dinner roll": "buns",
|
||
# Tortillas / wraps
|
||
"flour tortilla": "tortillas",
|
||
"corn tortilla": "tortillas",
|
||
"tortilla wrap": "tortillas",
|
||
"soft taco shell": "tortillas",
|
||
"taco shell": "taco shells",
|
||
"pita bread": "pita",
|
||
"flatbread": "flatbread",
|
||
# Canned beans
|
||
"black bean": "beans",
|
||
"pinto bean": "beans",
|
||
"kidney bean": "beans",
|
||
"refried bean": "beans",
|
||
"chickpea": "beans",
|
||
"garbanzo": "beans",
|
||
# Rice variants
|
||
"white rice": "rice",
|
||
"brown rice": "rice",
|
||
"jasmine rice": "rice",
|
||
"basmati rice": "rice",
|
||
"instant rice": "rice",
|
||
"microwavable rice": "rice",
|
||
# Salsa / hot sauce
|
||
"hot sauce": "salsa",
|
||
"taco sauce": "salsa",
|
||
"enchilada sauce": "salsa",
|
||
# Sour cream substitute
|
||
"greek yogurt": "sour cream",
|
||
# Prepackaged meals
|
||
"lean cuisine": "casserole",
|
||
"stouffer": "casserole",
|
||
"healthy choice": "casserole",
|
||
"marie callender": "casserole",
|
||
}
|
||
|
||
@staticmethod
|
||
def _normalize_for_fts(name: str) -> list[str]:
|
||
"""Expand one pantry item to all FTS search terms it should contribute.
|
||
|
||
Returns the original name plus:
|
||
- Any synonym-map canonical terms (handles product-label → corpus name)
|
||
- Individual significant tokens from multi-word product names
|
||
(handles packaged meals like "Lean Cuisine Chicken Alfredo" → also
|
||
searches for "chicken" and "alfredo" independently)
|
||
"""
|
||
lower = name.lower().strip()
|
||
if not lower:
|
||
return []
|
||
|
||
terms: list[str] = [lower]
|
||
|
||
# Substring synonym check on full name
|
||
for pattern, canonical in Store._FTS_SYNONYMS.items():
|
||
if pattern in lower:
|
||
terms.append(canonical)
|
||
|
||
# For multi-word product names, also add individual significant tokens
|
||
if " " in lower:
|
||
for token in lower.split():
|
||
if len(token) <= 3 or token in Store._FTS_TOKEN_STOPWORDS:
|
||
continue
|
||
if token not in terms:
|
||
terms.append(token)
|
||
# Synonym-expand individual tokens too
|
||
if token in Store._FTS_SYNONYMS:
|
||
canonical = Store._FTS_SYNONYMS[token]
|
||
if canonical not in terms:
|
||
terms.append(canonical)
|
||
|
||
return terms
|
||
|
||
@staticmethod
|
||
def _build_fts_query(ingredient_names: list[str]) -> str:
|
||
"""Build an FTS5 MATCH expression ORing all ingredient terms.
|
||
|
||
Each pantry item is expanded via _normalize_for_fts so that
|
||
product-label names (e.g. "burger patties") also search for their
|
||
recipe-corpus equivalents (e.g. "hamburger"), and multi-word packaged
|
||
product names contribute their individual ingredient tokens.
|
||
"""
|
||
parts: list[str] = []
|
||
seen: set[str] = set()
|
||
for name in ingredient_names:
|
||
for term in Store._normalize_for_fts(name):
|
||
# Strip characters that break FTS5 query syntax
|
||
clean = term.replace('"', "").replace("'", "")
|
||
if not clean or clean in seen:
|
||
continue
|
||
seen.add(clean)
|
||
parts.append(f'"{clean}"')
|
||
return " OR ".join(parts)
|
||
|
||
def search_recipes_by_ingredients(
|
||
self,
|
||
ingredient_names: list[str],
|
||
limit: int = 20,
|
||
category: str | None = None,
|
||
max_calories: float | None = None,
|
||
max_sugar_g: float | None = None,
|
||
max_carbs_g: float | None = None,
|
||
max_sodium_mg: float | None = None,
|
||
excluded_ids: list[int] | None = None,
|
||
) -> list[dict]:
|
||
"""Find recipes containing any of the given ingredient names.
|
||
Scores by match count and returns highest-scoring first.
|
||
|
||
Uses FTS5 index (migration 015) when available — O(log N) per query.
|
||
Falls back to LIKE scans on older databases.
|
||
|
||
Nutrition filters use NULL-passthrough: rows without nutrition data
|
||
always pass (they may be estimated or absent entirely).
|
||
"""
|
||
if not ingredient_names:
|
||
return []
|
||
|
||
extra_clauses: list[str] = []
|
||
extra_params: list = []
|
||
if category:
|
||
extra_clauses.append("r.category = ?")
|
||
extra_params.append(category)
|
||
if max_calories is not None:
|
||
extra_clauses.append("(r.calories IS NULL OR r.calories <= ?)")
|
||
extra_params.append(max_calories)
|
||
if max_sugar_g is not None:
|
||
extra_clauses.append("(r.sugar_g IS NULL OR r.sugar_g <= ?)")
|
||
extra_params.append(max_sugar_g)
|
||
if max_carbs_g is not None:
|
||
extra_clauses.append("(r.carbs_g IS NULL OR r.carbs_g <= ?)")
|
||
extra_params.append(max_carbs_g)
|
||
if max_sodium_mg is not None:
|
||
extra_clauses.append("(r.sodium_mg IS NULL OR r.sodium_mg <= ?)")
|
||
extra_params.append(max_sodium_mg)
|
||
if excluded_ids:
|
||
placeholders = ",".join("?" * len(excluded_ids))
|
||
extra_clauses.append(f"r.id NOT IN ({placeholders})")
|
||
extra_params.extend(excluded_ids)
|
||
where_extra = (" AND " + " AND ".join(extra_clauses)) if extra_clauses else ""
|
||
|
||
if self._fts_ready():
|
||
return self._search_recipes_fts(
|
||
ingredient_names, limit, where_extra, extra_params
|
||
)
|
||
return self._search_recipes_like(
|
||
ingredient_names, limit, where_extra, extra_params
|
||
)
|
||
|
||
def _search_recipes_fts(
|
||
self,
|
||
ingredient_names: list[str],
|
||
limit: int,
|
||
where_extra: str,
|
||
extra_params: list,
|
||
) -> list[dict]:
|
||
"""FTS5-backed ingredient search. Candidates fetched via inverted index;
|
||
match_count computed in Python over the small candidate set."""
|
||
fts_query = self._build_fts_query(ingredient_names)
|
||
if not fts_query:
|
||
return []
|
||
|
||
# Pull up to 10× limit candidates so ranking has enough headroom.
|
||
sql = f"""
|
||
SELECT r.*
|
||
FROM recipes_fts
|
||
JOIN recipes r ON r.id = recipes_fts.rowid
|
||
WHERE recipes_fts MATCH ?
|
||
{where_extra}
|
||
LIMIT ?
|
||
"""
|
||
rows = self._fetch_all(sql, (fts_query, *extra_params, limit * 10))
|
||
|
||
pantry_set = {n.lower().strip() for n in ingredient_names}
|
||
scored: list[dict] = []
|
||
for row in rows:
|
||
raw = row.get("ingredient_names") or []
|
||
names: list[str] = raw if isinstance(raw, list) else json.loads(raw or "[]")
|
||
match_count = sum(1 for n in names if n.lower() in pantry_set)
|
||
scored.append({**row, "match_count": match_count})
|
||
|
||
scored.sort(key=lambda r: (-r["match_count"], r["id"]))
|
||
return scored[:limit]
|
||
|
||
def _search_recipes_like(
|
||
self,
|
||
ingredient_names: list[str],
|
||
limit: int,
|
||
where_extra: str,
|
||
extra_params: list,
|
||
) -> list[dict]:
|
||
"""Legacy LIKE-based ingredient search (O(N×rows) — slow on large corpora)."""
|
||
like_params = [f'%"{n}"%' for n in ingredient_names]
|
||
like_clauses = " OR ".join(
|
||
"r.ingredient_names LIKE ?" for _ in ingredient_names
|
||
)
|
||
match_score = " + ".join(
|
||
"CASE WHEN r.ingredient_names LIKE ? THEN 1 ELSE 0 END"
|
||
for _ in ingredient_names
|
||
)
|
||
sql = f"""
|
||
SELECT r.*, ({match_score}) AS match_count
|
||
FROM recipes r
|
||
WHERE ({like_clauses})
|
||
{where_extra}
|
||
ORDER BY match_count DESC, r.id ASC
|
||
LIMIT ?
|
||
"""
|
||
all_params = like_params + like_params + extra_params + [limit]
|
||
return self._fetch_all(sql, tuple(all_params))
|
||
|
||
def get_recipe(self, recipe_id: int) -> dict | None:
|
||
return self._fetch_one("SELECT * FROM recipes WHERE id = ?", (recipe_id,))
|
||
|
||
# ── rate limits ───────────────────────────────────────────────────────
|
||
|
||
def check_and_increment_rate_limit(
|
||
self, feature: str, daily_max: int
|
||
) -> tuple[bool, int]:
|
||
"""Check daily counter for feature; only increment if under the limit.
|
||
Returns (allowed, current_count). Rejected calls do not consume quota."""
|
||
from datetime import date
|
||
today = date.today().isoformat()
|
||
row = self._fetch_one(
|
||
"SELECT count FROM rate_limits WHERE feature = ? AND window_date = ?",
|
||
(feature, today),
|
||
)
|
||
current = row["count"] if row else 0
|
||
if current >= daily_max:
|
||
return (False, current)
|
||
self.conn.execute("""
|
||
INSERT INTO rate_limits (feature, window_date, count)
|
||
VALUES (?, ?, 1)
|
||
ON CONFLICT(feature, window_date) DO UPDATE SET count = count + 1
|
||
""", (feature, today))
|
||
self.conn.commit()
|
||
return (True, current + 1)
|
||
|
||
# ── user settings ────────────────────────────────────────────────────
|
||
|
||
def get_setting(self, key: str) -> str | None:
|
||
"""Return the value for a settings key, or None if not set."""
|
||
row = self._fetch_one(
|
||
"SELECT value FROM user_settings WHERE key = ?", (key,)
|
||
)
|
||
return row["value"] if row else None
|
||
|
||
def set_setting(self, key: str, value: str) -> None:
|
||
"""Upsert a settings key-value pair."""
|
||
self.conn.execute(
|
||
"INSERT INTO user_settings (key, value) VALUES (?, ?)"
|
||
" ON CONFLICT(key) DO UPDATE SET value = excluded.value",
|
||
(key, value),
|
||
)
|
||
self.conn.commit()
|
||
|
||
# ── substitution feedback ─────────────────────────────────────────────
|
||
|
||
def log_substitution_feedback(
|
||
self,
|
||
original: str,
|
||
substitute: str,
|
||
constraint: str | None,
|
||
compensation_used: list[str],
|
||
approved: bool,
|
||
opted_in: bool,
|
||
) -> None:
|
||
self.conn.execute("""
|
||
INSERT INTO substitution_feedback
|
||
(original_name, substitute_name, constraint_label,
|
||
compensation_used, approved, opted_in)
|
||
VALUES (?,?,?,?,?,?)
|
||
""", (
|
||
original, substitute, constraint,
|
||
self._dump(compensation_used),
|
||
int(approved), int(opted_in),
|
||
))
|
||
self.conn.commit()
|