kiwi/app/db/store.py
pyr0ball 1a493e0ad9 feat: recipe engine — assembly templates, prep notes, FTS fixes, texture backfill
- Assembly template system (13 templates: burrito, fried rice, omelette, stir fry,
  pasta, sandwich, grain bowl, soup/stew, casserole, pancakes, porridge, pie, pudding)
  with role-based matching, whole-word single-keyword guard, deterministic titles
  via MD5 pantry hash
- Prep-state stripping: strips 'melted butter' → 'butter' for coverage checks;
  reconstructs actionable states as 'Before you start:' cooking instructions
  (NutritionPanel prep_notes field + RecipesView.vue display block)
- FTS5 fixes: always double-quote all terms; strip apostrophes to prevent
  syntax errors on brands like "Stouffer's"; 'plant-based' → bare 'based' crash
- Bidirectional synonym expansion: alt-meat, alt-chicken, alt-beef, alt-pork
  mapped to canonical texture class; pantry expansion covers 'hamburger' from
  'burger patties' etc.
- Texture profile backfill script (378K ingredient_profiles rows) with macro-derived
  classification in priority order (fatty → creamy → starchy → firm → fibrous →
  tender → liquid → neutral); oats/legumes starchy-first fix
- LLM prompt: ban flavoured/sweetened ingredients (vanilla yoghurt) from savoury
- Migrations 014 (nutrition macros) + 015 (recipe FTS index)
- Nutrition estimation pipeline script
- gitignore MagicMock sqlite test artifacts
2026-04-02 22:12:35 -07:00

737 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
SQLite data store for Kiwi.
Uses circuitforge-core for connection management and migrations.
"""
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from typing import Any
from circuitforge_core.db.base import get_connection
from circuitforge_core.db.migrations import run_migrations
MIGRATIONS_DIR = Path(__file__).parent / "migrations"
class Store:
def __init__(self, db_path: Path, key: str = "") -> None:
self.conn: sqlite3.Connection = get_connection(db_path, key)
self.conn.execute("PRAGMA journal_mode=WAL")
self.conn.execute("PRAGMA foreign_keys=ON")
run_migrations(self.conn, MIGRATIONS_DIR)
def close(self) -> None:
self.conn.close()
# ── helpers ───────────────────────────────────────────────────────────
def _row_to_dict(self, row: sqlite3.Row) -> dict[str, Any]:
d = dict(row)
# Deserialise any TEXT columns that contain JSON
for key in ("metadata", "nutrition_data", "source_data", "items",
"metrics", "improvement_suggestions", "confidence_scores",
"warnings",
# recipe columns
"ingredients", "ingredient_names", "directions",
"keywords", "element_coverage"):
if key in d and isinstance(d[key], str):
try:
d[key] = json.loads(d[key])
except (json.JSONDecodeError, TypeError):
pass
return d
def _fetch_one(self, sql: str, params: tuple = ()) -> dict[str, Any] | None:
self.conn.row_factory = sqlite3.Row
row = self.conn.execute(sql, params).fetchone()
return self._row_to_dict(row) if row else None
def _fetch_all(self, sql: str, params: tuple = ()) -> list[dict[str, Any]]:
self.conn.row_factory = sqlite3.Row
rows = self.conn.execute(sql, params).fetchall()
return [self._row_to_dict(r) for r in rows]
def _dump(self, value: Any) -> str:
"""Serialise a Python object to a JSON string for storage."""
return json.dumps(value)
# ── receipts ──────────────────────────────────────────────────────────
def _insert_returning(self, sql: str, params: tuple = ()) -> dict[str, Any]:
"""Execute an INSERT ... RETURNING * and return the new row as a dict.
Fetches the row BEFORE committing — SQLite requires the cursor to be
fully consumed before the transaction is committed."""
self.conn.row_factory = sqlite3.Row
cur = self.conn.execute(sql, params)
row = self._row_to_dict(cur.fetchone())
self.conn.commit()
return row
def create_receipt(self, filename: str, original_path: str) -> dict[str, Any]:
return self._insert_returning(
"INSERT INTO receipts (filename, original_path) VALUES (?, ?) RETURNING *",
(filename, original_path),
)
def get_receipt(self, receipt_id: int) -> dict[str, Any] | None:
return self._fetch_one("SELECT * FROM receipts WHERE id = ?", (receipt_id,))
def list_receipts(self, limit: int = 50, offset: int = 0) -> list[dict[str, Any]]:
return self._fetch_all(
"SELECT * FROM receipts ORDER BY created_at DESC LIMIT ? OFFSET ?",
(limit, offset),
)
def update_receipt_status(self, receipt_id: int, status: str,
error: str | None = None) -> None:
self.conn.execute(
"UPDATE receipts SET status = ?, error = ?, updated_at = datetime('now') WHERE id = ?",
(status, error, receipt_id),
)
self.conn.commit()
def update_receipt_metadata(self, receipt_id: int, metadata: dict) -> None:
self.conn.execute(
"UPDATE receipts SET metadata = ?, updated_at = datetime('now') WHERE id = ?",
(self._dump(metadata), receipt_id),
)
self.conn.commit()
# ── quality assessments ───────────────────────────────────────────────
def upsert_quality_assessment(self, receipt_id: int, overall_score: float,
is_acceptable: bool, metrics: dict,
suggestions: list) -> dict[str, Any]:
self.conn.execute(
"""INSERT INTO quality_assessments
(receipt_id, overall_score, is_acceptable, metrics, improvement_suggestions)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (receipt_id) DO UPDATE SET
overall_score = excluded.overall_score,
is_acceptable = excluded.is_acceptable,
metrics = excluded.metrics,
improvement_suggestions = excluded.improvement_suggestions""",
(receipt_id, overall_score, int(is_acceptable),
self._dump(metrics), self._dump(suggestions)),
)
self.conn.commit()
return self._fetch_one(
"SELECT * FROM quality_assessments WHERE receipt_id = ?", (receipt_id,)
)
# ── products ──────────────────────────────────────────────────────────
def get_or_create_product(self, name: str, barcode: str | None = None,
**kwargs) -> tuple[dict[str, Any], bool]:
"""Returns (product, created). Looks up by barcode first, then name."""
if barcode:
existing = self._fetch_one(
"SELECT * FROM products WHERE barcode = ?", (barcode,)
)
if existing:
return existing, False
existing = self._fetch_one("SELECT * FROM products WHERE name = ?", (name,))
if existing:
return existing, False
row = self._insert_returning(
"""INSERT INTO products (name, barcode, brand, category, description,
image_url, nutrition_data, source, source_data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING *""",
(
name, barcode,
kwargs.get("brand"), kwargs.get("category"),
kwargs.get("description"), kwargs.get("image_url"),
self._dump(kwargs.get("nutrition_data", {})),
kwargs.get("source", "manual"),
self._dump(kwargs.get("source_data", {})),
),
)
return row, True
def get_product(self, product_id: int) -> dict[str, Any] | None:
return self._fetch_one("SELECT * FROM products WHERE id = ?", (product_id,))
def list_products(self) -> list[dict[str, Any]]:
return self._fetch_all("SELECT * FROM products ORDER BY name")
# ── inventory ─────────────────────────────────────────────────────────
def add_inventory_item(self, product_id: int, location: str,
quantity: float = 1.0, unit: str = "count",
**kwargs) -> dict[str, Any]:
return self._insert_returning(
"""INSERT INTO inventory_items
(product_id, receipt_id, quantity, unit, location, sublocation,
purchase_date, expiration_date, notes, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING *""",
(
product_id, kwargs.get("receipt_id"),
quantity, unit, location, kwargs.get("sublocation"),
kwargs.get("purchase_date"), kwargs.get("expiration_date"),
kwargs.get("notes"), kwargs.get("source", "manual"),
),
)
def get_inventory_item(self, item_id: int) -> dict[str, Any] | None:
return self._fetch_one(
"""SELECT i.*, p.name as product_name, p.barcode, p.category
FROM inventory_items i
JOIN products p ON p.id = i.product_id
WHERE i.id = ?""",
(item_id,),
)
def list_inventory(self, location: str | None = None,
status: str = "available") -> list[dict[str, Any]]:
if location:
return self._fetch_all(
"""SELECT i.*, p.name as product_name, p.barcode, p.category
FROM inventory_items i
JOIN products p ON p.id = i.product_id
WHERE i.status = ? AND i.location = ?
ORDER BY i.expiration_date ASC NULLS LAST""",
(status, location),
)
return self._fetch_all(
"""SELECT i.*, p.name as product_name, p.barcode, p.category
FROM inventory_items i
JOIN products p ON p.id = i.product_id
WHERE i.status = ?
ORDER BY i.expiration_date ASC NULLS LAST""",
(status,),
)
def update_inventory_item(self, item_id: int, **kwargs) -> dict[str, Any] | None:
allowed = {"quantity", "unit", "location", "sublocation",
"expiration_date", "status", "notes", "consumed_at"}
updates = {k: v for k, v in kwargs.items() if k in allowed}
if not updates:
return self.get_inventory_item(item_id)
sets = ", ".join(f"{k} = ?" for k in updates)
values = list(updates.values()) + [item_id]
self.conn.execute(
f"UPDATE inventory_items SET {sets}, updated_at = datetime('now') WHERE id = ?",
values,
)
self.conn.commit()
return self.get_inventory_item(item_id)
def expiring_soon(self, days: int = 7) -> list[dict[str, Any]]:
return self._fetch_all(
"""SELECT i.*, p.name as product_name, p.category
FROM inventory_items i
JOIN products p ON p.id = i.product_id
WHERE i.status = 'available'
AND i.expiration_date IS NOT NULL
AND date(i.expiration_date) <= date('now', ? || ' days')
ORDER BY i.expiration_date ASC""",
(str(days),),
)
def recalculate_expiry(
self,
tier: str = "local",
has_byok: bool = False,
) -> tuple[int, int]:
"""Re-run the expiration predictor over all available inventory items.
Uses each item's existing purchase_date (falls back to today if NULL)
and its current location. Skips items that have an explicit
expiration_date from a source other than auto-prediction (i.e. items
whose expiry was found on a receipt or entered by the user) cannot be
distinguished — all available items are recalculated.
Returns (updated_count, skipped_count).
"""
from datetime import date
from app.services.expiration_predictor import ExpirationPredictor
predictor = ExpirationPredictor()
rows = self._fetch_all(
"""SELECT i.id, i.location, i.purchase_date,
p.name AS product_name, p.category AS product_category
FROM inventory_items i
JOIN products p ON p.id = i.product_id
WHERE i.status = 'available'""",
(),
)
updated = skipped = 0
for row in rows:
cat = predictor.get_category_from_product(
row["product_name"] or "",
product_category=row.get("product_category"),
location=row.get("location"),
)
purchase_date_raw = row.get("purchase_date")
try:
purchase_date = (
date.fromisoformat(purchase_date_raw)
if purchase_date_raw
else date.today()
)
except (ValueError, TypeError):
purchase_date = date.today()
exp = predictor.predict_expiration(
cat,
row["location"] or "pantry",
purchase_date=purchase_date,
product_name=row["product_name"],
tier=tier,
has_byok=has_byok,
)
if exp is None:
skipped += 1
continue
self.conn.execute(
"UPDATE inventory_items SET expiration_date = ?, updated_at = datetime('now') WHERE id = ?",
(str(exp), row["id"]),
)
updated += 1
self.conn.commit()
return updated, skipped
# ── receipt_data ──────────────────────────────────────────────────────
def upsert_receipt_data(self, receipt_id: int, data: dict) -> dict[str, Any]:
fields = [
"merchant_name", "merchant_address", "merchant_phone", "merchant_email",
"merchant_website", "merchant_tax_id", "transaction_date", "transaction_time",
"receipt_number", "register_number", "cashier_name", "transaction_id",
"items", "subtotal", "tax", "discount", "tip", "total",
"payment_method", "amount_paid", "change_given",
"raw_text", "confidence_scores", "warnings", "processing_time",
]
json_fields = {"items", "confidence_scores", "warnings"}
cols = ", ".join(fields)
placeholders = ", ".join("?" for _ in fields)
values = [
self._dump(data.get(f)) if f in json_fields and data.get(f) is not None
else data.get(f)
for f in fields
]
self.conn.execute(
f"""INSERT INTO receipt_data (receipt_id, {cols})
VALUES (?, {placeholders})
ON CONFLICT (receipt_id) DO UPDATE SET
{', '.join(f'{f} = excluded.{f}' for f in fields)},
updated_at = datetime('now')""",
[receipt_id] + values,
)
self.conn.commit()
return self._fetch_one(
"SELECT * FROM receipt_data WHERE receipt_id = ?", (receipt_id,)
)
# ── recipes ───────────────────────────────────────────────────────────
def _fts_ready(self) -> bool:
"""Return True if the recipes_fts virtual table exists."""
row = self._fetch_one(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='recipes_fts'"
)
return row is not None
# Words that carry no recipe-ingredient signal and should be filtered
# out when tokenising multi-word product names for FTS expansion.
_FTS_TOKEN_STOPWORDS: frozenset[str] = frozenset({
# Common English stopwords
"a", "an", "the", "of", "in", "for", "with", "and", "or", "to",
"from", "at", "by", "as", "on", "into",
# Brand / marketing words that appear in product names
"lean", "cuisine", "healthy", "choice", "stouffer", "original",
"classic", "deluxe", "homestyle", "family", "style", "grade",
"premium", "select", "natural", "organic", "fresh", "lite",
"ready", "quick", "easy", "instant", "microwave", "frozen",
"brand", "size", "large", "small", "medium", "extra",
# Plant-based / alt-meat brand names
"daring", "gardein", "morningstar", "lightlife", "tofurky",
"quorn", "omni", "nuggs", "simulate", "simulate",
# Preparation states — "cut up chicken" is still chicken
"cut", "diced", "sliced", "chopped", "minced", "shredded",
"cooked", "raw", "whole", "boneless", "skinless", "trimmed",
"pre", "prepared", "marinated", "seasoned", "breaded", "battered",
"grilled", "roasted", "smoked", "canned", "dried", "dehydrated",
"pieces", "piece", "strips", "strip", "chunks", "chunk",
"fillets", "fillet", "cutlets", "cutlet", "tenders", "nuggets",
# Units / packaging
"oz", "lb", "lbs", "pkg", "pack", "box", "can", "bag", "jar",
})
# Maps substrings found in product-label names to canonical recipe-corpus
# ingredient terms. Checked as substring matches against the lower-cased
# full product name, then against each individual token.
_FTS_SYNONYMS: dict[str, str] = {
# Ground / minced beef
"burger patt": "hamburger",
"beef patt": "hamburger",
"ground beef": "hamburger",
"ground chuck": "hamburger",
"ground round": "hamburger",
"mince": "hamburger",
"veggie burger": "hamburger",
"beyond burger": "hamburger",
"impossible burger": "hamburger",
"plant burger": "hamburger",
"chicken patt": "hamburger", # FTS match only — recipe scoring still works
# Sausages
"kielbasa": "sausage",
"bratwurst": "sausage",
"brat ": "sausage",
"frankfurter": "hotdog",
"wiener": "hotdog",
# Chicken cuts + plant-based chicken → generic chicken for broader matching
"chicken breast": "chicken",
"chicken thigh": "chicken",
"chicken drumstick": "chicken",
"chicken wing": "chicken",
"rotisserie chicken": "chicken",
"chicken tender": "chicken",
"chicken strip": "chicken",
"chicken piece": "chicken",
"fake chicken": "chicken",
"plant chicken": "chicken",
"vegan chicken": "chicken",
"daring": "chicken", # Daring Foods brand
"gardein chick": "chicken",
"quorn chick": "chicken",
"chick'n": "chicken",
"chikn": "chicken",
"not-chicken": "chicken",
"no-chicken": "chicken",
# Plant-based beef subs — map to broad "beef" not "hamburger"
# (texture varies: strips ≠ ground; let corpus handle the specific form)
"not-beef": "beef",
"no-beef": "beef",
"plant beef": "beef",
"vegan beef": "beef",
# Plant-based pork subs
"not-pork": "pork",
"no-pork": "pork",
"plant pork": "pork",
"vegan pork": "pork",
"omnipork": "pork",
"omni pork": "pork",
# Generic alt-meat catch-alls → broad "beef" (safer than hamburger)
"fake meat": "beef",
"plant meat": "beef",
"vegan meat": "beef",
"meat-free": "beef",
"meatless": "beef",
# Pork cuts
"pork chop": "pork",
"pork loin": "pork",
"pork tenderloin": "pork",
# Tomato-based sauces
"marinara": "tomato sauce",
"pasta sauce": "tomato sauce",
"spaghetti sauce": "tomato sauce",
"pizza sauce": "tomato sauce",
# Pasta shapes — map to generic "pasta" so FTS finds any pasta recipe
"macaroni": "pasta",
"noodles": "pasta",
"spaghetti": "pasta",
"penne": "pasta",
"fettuccine": "pasta",
"rigatoni": "pasta",
"linguine": "pasta",
"rotini": "pasta",
"farfalle": "pasta",
# Cheese variants → "cheese" for broad matching
"shredded cheese": "cheese",
"sliced cheese": "cheese",
"american cheese": "cheese",
"cheddar": "cheese",
"mozzarella": "cheese",
# Cream variants
"heavy cream": "cream",
"whipping cream": "cream",
"half and half": "cream",
# Buns / rolls
"burger bun": "buns",
"hamburger bun": "buns",
"hot dog bun": "buns",
"bread roll": "buns",
"dinner roll": "buns",
# Tortillas / wraps
"flour tortilla": "tortillas",
"corn tortilla": "tortillas",
"tortilla wrap": "tortillas",
"soft taco shell": "tortillas",
"taco shell": "taco shells",
"pita bread": "pita",
"flatbread": "flatbread",
# Canned beans
"black bean": "beans",
"pinto bean": "beans",
"kidney bean": "beans",
"refried bean": "beans",
"chickpea": "beans",
"garbanzo": "beans",
# Rice variants
"white rice": "rice",
"brown rice": "rice",
"jasmine rice": "rice",
"basmati rice": "rice",
"instant rice": "rice",
"microwavable rice": "rice",
# Salsa / hot sauce
"hot sauce": "salsa",
"taco sauce": "salsa",
"enchilada sauce": "salsa",
# Sour cream substitute
"greek yogurt": "sour cream",
# Prepackaged meals
"lean cuisine": "casserole",
"stouffer": "casserole",
"healthy choice": "casserole",
"marie callender": "casserole",
}
@staticmethod
def _normalize_for_fts(name: str) -> list[str]:
"""Expand one pantry item to all FTS search terms it should contribute.
Returns the original name plus:
- Any synonym-map canonical terms (handles product-label → corpus name)
- Individual significant tokens from multi-word product names
(handles packaged meals like "Lean Cuisine Chicken Alfredo" → also
searches for "chicken" and "alfredo" independently)
"""
lower = name.lower().strip()
if not lower:
return []
terms: list[str] = [lower]
# Substring synonym check on full name
for pattern, canonical in Store._FTS_SYNONYMS.items():
if pattern in lower:
terms.append(canonical)
# For multi-word product names, also add individual significant tokens
if " " in lower:
for token in lower.split():
if len(token) <= 3 or token in Store._FTS_TOKEN_STOPWORDS:
continue
if token not in terms:
terms.append(token)
# Synonym-expand individual tokens too
if token in Store._FTS_SYNONYMS:
canonical = Store._FTS_SYNONYMS[token]
if canonical not in terms:
terms.append(canonical)
return terms
@staticmethod
def _build_fts_query(ingredient_names: list[str]) -> str:
"""Build an FTS5 MATCH expression ORing all ingredient terms.
Each pantry item is expanded via _normalize_for_fts so that
product-label names (e.g. "burger patties") also search for their
recipe-corpus equivalents (e.g. "hamburger"), and multi-word packaged
product names contribute their individual ingredient tokens.
"""
parts: list[str] = []
seen: set[str] = set()
for name in ingredient_names:
for term in Store._normalize_for_fts(name):
# Strip characters that break FTS5 query syntax
clean = term.replace('"', "").replace("'", "")
if not clean or clean in seen:
continue
seen.add(clean)
parts.append(f'"{clean}"')
return " OR ".join(parts)
def search_recipes_by_ingredients(
self,
ingredient_names: list[str],
limit: int = 20,
category: str | None = None,
max_calories: float | None = None,
max_sugar_g: float | None = None,
max_carbs_g: float | None = None,
max_sodium_mg: float | None = None,
excluded_ids: list[int] | None = None,
) -> list[dict]:
"""Find recipes containing any of the given ingredient names.
Scores by match count and returns highest-scoring first.
Uses FTS5 index (migration 015) when available — O(log N) per query.
Falls back to LIKE scans on older databases.
Nutrition filters use NULL-passthrough: rows without nutrition data
always pass (they may be estimated or absent entirely).
"""
if not ingredient_names:
return []
extra_clauses: list[str] = []
extra_params: list = []
if category:
extra_clauses.append("r.category = ?")
extra_params.append(category)
if max_calories is not None:
extra_clauses.append("(r.calories IS NULL OR r.calories <= ?)")
extra_params.append(max_calories)
if max_sugar_g is not None:
extra_clauses.append("(r.sugar_g IS NULL OR r.sugar_g <= ?)")
extra_params.append(max_sugar_g)
if max_carbs_g is not None:
extra_clauses.append("(r.carbs_g IS NULL OR r.carbs_g <= ?)")
extra_params.append(max_carbs_g)
if max_sodium_mg is not None:
extra_clauses.append("(r.sodium_mg IS NULL OR r.sodium_mg <= ?)")
extra_params.append(max_sodium_mg)
if excluded_ids:
placeholders = ",".join("?" * len(excluded_ids))
extra_clauses.append(f"r.id NOT IN ({placeholders})")
extra_params.extend(excluded_ids)
where_extra = (" AND " + " AND ".join(extra_clauses)) if extra_clauses else ""
if self._fts_ready():
return self._search_recipes_fts(
ingredient_names, limit, where_extra, extra_params
)
return self._search_recipes_like(
ingredient_names, limit, where_extra, extra_params
)
def _search_recipes_fts(
self,
ingredient_names: list[str],
limit: int,
where_extra: str,
extra_params: list,
) -> list[dict]:
"""FTS5-backed ingredient search. Candidates fetched via inverted index;
match_count computed in Python over the small candidate set."""
fts_query = self._build_fts_query(ingredient_names)
if not fts_query:
return []
# Pull up to 10× limit candidates so ranking has enough headroom.
sql = f"""
SELECT r.*
FROM recipes_fts
JOIN recipes r ON r.id = recipes_fts.rowid
WHERE recipes_fts MATCH ?
{where_extra}
LIMIT ?
"""
rows = self._fetch_all(sql, (fts_query, *extra_params, limit * 10))
pantry_set = {n.lower().strip() for n in ingredient_names}
scored: list[dict] = []
for row in rows:
raw = row.get("ingredient_names") or []
names: list[str] = raw if isinstance(raw, list) else json.loads(raw or "[]")
match_count = sum(1 for n in names if n.lower() in pantry_set)
scored.append({**row, "match_count": match_count})
scored.sort(key=lambda r: (-r["match_count"], r["id"]))
return scored[:limit]
def _search_recipes_like(
self,
ingredient_names: list[str],
limit: int,
where_extra: str,
extra_params: list,
) -> list[dict]:
"""Legacy LIKE-based ingredient search (O(N×rows) — slow on large corpora)."""
like_params = [f'%"{n}"%' for n in ingredient_names]
like_clauses = " OR ".join(
"r.ingredient_names LIKE ?" for _ in ingredient_names
)
match_score = " + ".join(
"CASE WHEN r.ingredient_names LIKE ? THEN 1 ELSE 0 END"
for _ in ingredient_names
)
sql = f"""
SELECT r.*, ({match_score}) AS match_count
FROM recipes r
WHERE ({like_clauses})
{where_extra}
ORDER BY match_count DESC, r.id ASC
LIMIT ?
"""
all_params = like_params + like_params + extra_params + [limit]
return self._fetch_all(sql, tuple(all_params))
def get_recipe(self, recipe_id: int) -> dict | None:
return self._fetch_one("SELECT * FROM recipes WHERE id = ?", (recipe_id,))
# ── rate limits ───────────────────────────────────────────────────────
def check_and_increment_rate_limit(
self, feature: str, daily_max: int
) -> tuple[bool, int]:
"""Check daily counter for feature; only increment if under the limit.
Returns (allowed, current_count). Rejected calls do not consume quota."""
from datetime import date
today = date.today().isoformat()
row = self._fetch_one(
"SELECT count FROM rate_limits WHERE feature = ? AND window_date = ?",
(feature, today),
)
current = row["count"] if row else 0
if current >= daily_max:
return (False, current)
self.conn.execute("""
INSERT INTO rate_limits (feature, window_date, count)
VALUES (?, ?, 1)
ON CONFLICT(feature, window_date) DO UPDATE SET count = count + 1
""", (feature, today))
self.conn.commit()
return (True, current + 1)
# ── user settings ────────────────────────────────────────────────────
def get_setting(self, key: str) -> str | None:
"""Return the value for a settings key, or None if not set."""
row = self._fetch_one(
"SELECT value FROM user_settings WHERE key = ?", (key,)
)
return row["value"] if row else None
def set_setting(self, key: str, value: str) -> None:
"""Upsert a settings key-value pair."""
self.conn.execute(
"INSERT INTO user_settings (key, value) VALUES (?, ?)"
" ON CONFLICT(key) DO UPDATE SET value = excluded.value",
(key, value),
)
self.conn.commit()
# ── substitution feedback ─────────────────────────────────────────────
def log_substitution_feedback(
self,
original: str,
substitute: str,
constraint: str | None,
compensation_used: list[str],
approved: bool,
opted_in: bool,
) -> None:
self.conn.execute("""
INSERT INTO substitution_feedback
(original_name, substitute_name, constraint_label,
compensation_used, approved, opted_in)
VALUES (?,?,?,?,?,?)
""", (
original, substitute, constraint,
self._dump(compensation_used),
int(approved), int(opted_in),
))
self.conn.commit()