From 80b0d5fd34e4738c3ef61f49a8b60598032f56c1 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Wed, 8 Apr 2026 23:17:18 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20v0.9.0=20=E2=80=94=20cf-text,=20pipelin?= =?UTF-8?q?e=20crystallization=20engine,=20multimodal=20pipeline,=20a11y?= =?UTF-8?q?=20preferences?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #33, #37, #38, #41, #42. ## cf-text (closes #41) - New module: `circuitforge_core.text` — direct local inference bypassing ollama/vllm - Backends: llama.cpp (GGUF), transformers (HF), mock - Auto-detects backend from file extension; CF_TEXT_BACKEND env override - Optional 4-bit/8-bit quantisation via bitsandbytes (CF_TEXT_4BIT / CF_TEXT_8BIT) - process-level singleton + per-request `make_backend()` path ## Pipeline crystallization engine (closes #33, #37) - FPGA→ASIC model: LLM-discovered paths → deterministic workflows after N approvals - `models.py`: PipelineRun (incl. review_duration_ms + output_modified per #37), CrystallizedWorkflow, Step, hash_input() - `recorder.py`: append-only JSON run log under ~/.config/circuitforge/pipeline/ - `crystallizer.py`: threshold check, majority/most-recent step strategy, rubber-stamp warning (review_duration_ms < 5s triggers warnings.warn) - `registry.py`: exact + fuzzy match, deactivate-without-delete, colon-safe filenames - `executor.py`: deterministic steps with transparent LLM fallback ## Multimodal chunked pipeline (closes #42) - `pipeline/multimodal.py`: cf-docuvision pages → cf-text streaming - `run()` yields PageResult per page (progressive, no full-doc buffer) - `stream()` yields (page_idx, token) tuples for token-level UI rendering - `vram_serialise` flag + `swap_fn` hook for 8GB GPU VRAM management - `prompt_fn` callback for product-specific prompt construction ## Accessibility preferences (closes #38) - `preferences/accessibility.py`: PREF_REDUCED_MOTION, PREF_HIGH_CONTRAST, PREF_FONT_SIZE, PREF_SCREEN_READER with get/set helpers - Exported from preferences package __init__ ## LLM router fix - cf-orch backends: skip reachability pre-check; allocation starts the service - Static backends: reachability check remains in place --- circuitforge_core/llm/router.py | 11 +- circuitforge_core/pipeline/__init__.py | 42 +++- circuitforge_core/pipeline/crystallizer.py | 177 +++++++++++++ circuitforge_core/pipeline/executor.py | 157 ++++++++++++ circuitforge_core/pipeline/models.py | 216 ++++++++++++++++ circuitforge_core/pipeline/multimodal.py | 234 ++++++++++++++++++ circuitforge_core/pipeline/recorder.py | 70 ++++++ circuitforge_core/pipeline/registry.py | 134 ++++++++++ circuitforge_core/preferences/__init__.py | 3 + .../preferences/accessibility.py | 73 ++++++ circuitforge_core/text/__init__.py | 144 +++++++++++ circuitforge_core/text/backends/__init__.py | 10 + circuitforge_core/text/backends/base.py | 182 ++++++++++++++ circuitforge_core/text/backends/llamacpp.py | 192 ++++++++++++++ circuitforge_core/text/backends/mock.py | 104 ++++++++ .../text/backends/transformers.py | 197 +++++++++++++++ pyproject.toml | 2 +- tests/test_pipeline/__init__.py | 0 tests/test_pipeline/test_crystallizer.py | 198 +++++++++++++++ tests/test_pipeline/test_executor.py | 96 +++++++ tests/test_pipeline/test_models.py | 102 ++++++++ tests/test_pipeline/test_multimodal.py | 198 +++++++++++++++ tests/test_pipeline/test_recorder.py | 66 +++++ tests/test_pipeline/test_registry.py | 104 ++++++++ tests/test_preferences.py | 87 +++++++ tests/test_text/__init__.py | 0 tests/test_text/test_backend.py | 190 ++++++++++++++ 27 files changed, 2983 insertions(+), 6 deletions(-) create mode 100644 circuitforge_core/pipeline/crystallizer.py create mode 100644 circuitforge_core/pipeline/executor.py create mode 100644 circuitforge_core/pipeline/models.py create mode 100644 circuitforge_core/pipeline/multimodal.py create mode 100644 circuitforge_core/pipeline/recorder.py create mode 100644 circuitforge_core/pipeline/registry.py create mode 100644 circuitforge_core/preferences/accessibility.py create mode 100644 circuitforge_core/text/__init__.py create mode 100644 circuitforge_core/text/backends/__init__.py create mode 100644 circuitforge_core/text/backends/base.py create mode 100644 circuitforge_core/text/backends/llamacpp.py create mode 100644 circuitforge_core/text/backends/mock.py create mode 100644 circuitforge_core/text/backends/transformers.py create mode 100644 tests/test_pipeline/__init__.py create mode 100644 tests/test_pipeline/test_crystallizer.py create mode 100644 tests/test_pipeline/test_executor.py create mode 100644 tests/test_pipeline/test_models.py create mode 100644 tests/test_pipeline/test_multimodal.py create mode 100644 tests/test_pipeline/test_recorder.py create mode 100644 tests/test_pipeline/test_registry.py create mode 100644 tests/test_text/__init__.py create mode 100644 tests/test_text/test_backend.py diff --git a/circuitforge_core/llm/router.py b/circuitforge_core/llm/router.py index b09a3b4..2545895 100644 --- a/circuitforge_core/llm/router.py +++ b/circuitforge_core/llm/router.py @@ -199,15 +199,18 @@ class LLMRouter: continue elif backend["type"] == "openai_compat": - if not self._is_reachable(backend["base_url"]): - print(f"[LLMRouter] {name}: unreachable, skipping") - continue - # --- cf_orch: optionally override base_url with coordinator-allocated URL --- + # cf_orch: try allocation first — this may start the service on-demand. + # Do NOT reachability-check before allocating; the service may be stopped + # and the allocation is what starts it. orch_ctx = orch_alloc = None orch_result = self._try_cf_orch_alloc(backend) if orch_result is not None: orch_ctx, orch_alloc = orch_result backend = {**backend, "base_url": orch_alloc.url + "/v1"} + elif not self._is_reachable(backend["base_url"]): + # Static backend (no cf-orch) — skip if not reachable. + print(f"[LLMRouter] {name}: unreachable, skipping") + continue try: client = OpenAI( base_url=backend["base_url"], diff --git a/circuitforge_core/pipeline/__init__.py b/circuitforge_core/pipeline/__init__.py index 5f6751a..9e65f92 100644 --- a/circuitforge_core/pipeline/__init__.py +++ b/circuitforge_core/pipeline/__init__.py @@ -1,3 +1,43 @@ +# circuitforge_core/pipeline — FPGA→ASIC crystallization engine +# +# Public API: call pipeline.run() from product code instead of llm.router directly. +# The module transparently checks for crystallized workflows first, falls back +# to LLM when none match, and records each run for future crystallization. +from __future__ import annotations + +from typing import Any, Callable + +from .crystallizer import CrystallizerConfig, crystallize, evaluate_new_run, should_crystallize +from .executor import ExecutionResult, Executor, StepResult +from .models import CrystallizedWorkflow, PipelineRun, Step, hash_input +from .multimodal import MultimodalConfig, MultimodalPipeline, PageResult +from .recorder import Recorder +from .registry import Registry from .staging import StagingDB -__all__ = ["StagingDB"] +__all__ = [ + # models + "PipelineRun", + "CrystallizedWorkflow", + "Step", + "hash_input", + # recorder + "Recorder", + # crystallizer + "CrystallizerConfig", + "crystallize", + "evaluate_new_run", + "should_crystallize", + # registry + "Registry", + # executor + "Executor", + "ExecutionResult", + "StepResult", + # multimodal + "MultimodalPipeline", + "MultimodalConfig", + "PageResult", + # legacy stub + "StagingDB", +] diff --git a/circuitforge_core/pipeline/crystallizer.py b/circuitforge_core/pipeline/crystallizer.py new file mode 100644 index 0000000..9d6034e --- /dev/null +++ b/circuitforge_core/pipeline/crystallizer.py @@ -0,0 +1,177 @@ +# circuitforge_core/pipeline/crystallizer.py — promote approved runs → workflows +# +# MIT — pure logic, no inference backends. +from __future__ import annotations + +import logging +import warnings +from collections import Counter +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Literal + +from .models import CrystallizedWorkflow, PipelineRun, Step +from .recorder import Recorder + +log = logging.getLogger(__name__) + +# Minimum milliseconds of review that counts as "genuine". +# Runs shorter than this are accepted but trigger a warning. +_RUBBER_STAMP_THRESHOLD_MS = 5_000 + + +@dataclass +class CrystallizerConfig: + """Tuning knobs for one product/task-type pair. + + threshold: + Minimum number of approved runs required before crystallization. + Osprey sets this to 1 (first successful IVR navigation is enough); + Peregrine uses 3+ for cover-letter templates. + min_review_ms: + Approved runs with review_duration_ms below this value generate a + warning. Set to 0 to silence the check (tests, automated approvals). + strategy: + ``"most_recent"`` — use the latest approved run's steps verbatim. + ``"majority"`` — pick each step by majority vote across runs (requires + runs to have the same step count; falls back to most_recent otherwise). + """ + threshold: int = 3 + min_review_ms: int = _RUBBER_STAMP_THRESHOLD_MS + strategy: Literal["most_recent", "majority"] = "most_recent" + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _majority_steps(runs: list[PipelineRun]) -> list[Step] | None: + """Return majority-voted steps, or None if run lengths differ.""" + lengths = {len(r.steps) for r in runs} + if len(lengths) != 1: + return None + n = lengths.pop() + result: list[Step] = [] + for i in range(n): + counter: Counter[str] = Counter() + step_by_action: dict[str, Step] = {} + for r in runs: + s = r.steps[i] + counter[s.action] += 1 + step_by_action[s.action] = s + winner = counter.most_common(1)[0][0] + result.append(step_by_action[winner]) + return result + + +def _check_review_quality(runs: list[PipelineRun], + min_review_ms: int) -> None: + """Warn if any run has a suspiciously short review duration.""" + if min_review_ms <= 0: + return + flagged = [r for r in runs if r.review_duration_ms < min_review_ms] + if flagged: + ids = ", ".join(r.run_id for r in flagged) + warnings.warn( + f"Crystallizing from {len(flagged)} run(s) with review_duration_ms " + f"< {min_review_ms} ms — possible rubber-stamp approval: [{ids}]. " + "Verify these were genuinely human-reviewed before deployment.", + stacklevel=3, + ) + + +# ── Public API ──────────────────────────────────────────────────────────────── + +def should_crystallize(runs: list[PipelineRun], + config: CrystallizerConfig) -> bool: + """Return True if *runs* meet the threshold for crystallization.""" + approved = [r for r in runs if r.approved] + return len(approved) >= config.threshold + + +def crystallize(runs: list[PipelineRun], + config: CrystallizerConfig, + existing_version: int = 0) -> CrystallizedWorkflow: + """Promote *runs* into a CrystallizedWorkflow. + + Raises + ------ + ValueError + If fewer approved runs than ``config.threshold``, or if the runs + span more than one (product, task_type, input_hash) triple. + """ + approved = [r for r in runs if r.approved] + if len(approved) < config.threshold: + raise ValueError( + f"Need {config.threshold} approved runs, got {len(approved)}." + ) + + # Validate homogeneity + products = {r.product for r in approved} + task_types = {r.task_type for r in approved} + hashes = {r.input_hash for r in approved} + if len(products) != 1 or len(task_types) != 1 or len(hashes) != 1: + raise ValueError( + "All runs must share the same product, task_type, and input_hash. " + f"Got products={products}, task_types={task_types}, hashes={hashes}." + ) + + product = products.pop() + task_type = task_types.pop() + input_hash = hashes.pop() + + _check_review_quality(approved, config.min_review_ms) + + # Pick canonical steps + if config.strategy == "majority": + steps = _majority_steps(approved) or approved[-1].steps + else: + steps = sorted(approved, key=lambda r: r.timestamp)[-1].steps + + avg_ms = sum(r.review_duration_ms for r in approved) // len(approved) + all_unmodified = all(not r.output_modified for r in approved) + + workflow_id = f"{product}:{task_type}:{input_hash[:12]}" + return CrystallizedWorkflow( + workflow_id=workflow_id, + product=product, + task_type=task_type, + input_hash=input_hash, + steps=steps, + crystallized_at=datetime.now(timezone.utc).isoformat(), + run_ids=[r.run_id for r in approved], + approval_count=len(approved), + avg_review_duration_ms=avg_ms, + all_output_unmodified=all_unmodified, + version=existing_version + 1, + ) + + +def evaluate_new_run( + run: PipelineRun, + recorder: Recorder, + config: CrystallizerConfig, + existing_version: int = 0, +) -> CrystallizedWorkflow | None: + """Record *run* and return a new workflow if the threshold is now met. + + Products call this after each human-approved execution. Returns a + ``CrystallizedWorkflow`` if crystallization was triggered, ``None`` + otherwise. + """ + recorder.record(run) + if not run.approved: + return None + + all_runs = recorder.load_approved(run.product, run.task_type, run.input_hash) + if not should_crystallize(all_runs, config): + log.debug( + "pipeline: %d/%d approved runs for %s:%s — not yet crystallizing", + len(all_runs), config.threshold, run.product, run.task_type, + ) + return None + + workflow = crystallize(all_runs, config, existing_version=existing_version) + log.info( + "pipeline: crystallized %s after %d approvals", + workflow.workflow_id, workflow.approval_count, + ) + return workflow diff --git a/circuitforge_core/pipeline/executor.py b/circuitforge_core/pipeline/executor.py new file mode 100644 index 0000000..83f4a80 --- /dev/null +++ b/circuitforge_core/pipeline/executor.py @@ -0,0 +1,157 @@ +# circuitforge_core/pipeline/executor.py — deterministic execution with LLM fallback +# +# MIT — orchestration logic only; calls product-supplied callables. +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Any, Callable + +from .models import CrystallizedWorkflow, Step + +log = logging.getLogger(__name__) + + +@dataclass +class StepResult: + step: Step + success: bool + output: Any = None + error: str | None = None + + +@dataclass +class ExecutionResult: + """Result of running a workflow (deterministic or LLM-assisted). + + Attributes + ---------- + success: + True if all steps completed without error. + used_deterministic: + True if a crystallized workflow was used; False if LLM was called. + step_results: + Per-step outcomes from the deterministic path. + llm_output: + Raw output from the LLM fallback path, if used. + workflow_id: + ID of the workflow used, or None for LLM path. + error: + Error message if the run failed entirely. + """ + success: bool + used_deterministic: bool + step_results: list[StepResult] = field(default_factory=list) + llm_output: Any = None + workflow_id: str | None = None + error: str | None = None + + +# ── Executor ────────────────────────────────────────────────────────────────── + +class Executor: + """Runs crystallized workflows with transparent LLM fallback. + + Parameters + ---------- + step_fn: + Called for each Step: ``step_fn(step) -> (success, output)``. + The product supplies this — it knows how to turn a Step into a real + action (DTMF dial, HTTP call, form field write, etc.). + llm_fn: + Called when no workflow matches or a step fails: ``llm_fn() -> output``. + Products wire this to ``cf_core.llm.router`` or equivalent. + llm_fallback: + If False, raise RuntimeError instead of calling llm_fn on miss. + """ + + def __init__( + self, + step_fn: Callable[[Step], tuple[bool, Any]], + llm_fn: Callable[[], Any], + llm_fallback: bool = True, + ) -> None: + self._step_fn = step_fn + self._llm_fn = llm_fn + self._llm_fallback = llm_fallback + + def execute( + self, + workflow: CrystallizedWorkflow, + ) -> ExecutionResult: + """Run *workflow* deterministically. + + If a step fails, falls back to LLM (if ``llm_fallback`` is enabled). + """ + step_results: list[StepResult] = [] + for step in workflow.steps: + try: + success, output = self._step_fn(step) + except Exception as exc: + log.warning("step %s raised: %s", step.action, exc) + success, output = False, None + error_str = str(exc) + else: + error_str = None if success else "step_fn returned success=False" + + step_results.append(StepResult(step=step, success=success, + output=output, error=error_str)) + if not success: + log.info( + "workflow %s: step %s failed — triggering LLM fallback", + workflow.workflow_id, step.action, + ) + return self._llm_fallback_result( + step_results, workflow.workflow_id + ) + + log.info("workflow %s: all %d steps succeeded", + workflow.workflow_id, len(workflow.steps)) + return ExecutionResult( + success=True, + used_deterministic=True, + step_results=step_results, + workflow_id=workflow.workflow_id, + ) + + def run_with_fallback( + self, + workflow: CrystallizedWorkflow | None, + ) -> ExecutionResult: + """Run *workflow* if provided; otherwise call the LLM directly.""" + if workflow is None: + return self._llm_fallback_result([], workflow_id=None) + return self.execute(workflow) + + # ── Internal ────────────────────────────────────────────────────────────── + + def _llm_fallback_result( + self, + partial_steps: list[StepResult], + workflow_id: str | None, + ) -> ExecutionResult: + if not self._llm_fallback: + return ExecutionResult( + success=False, + used_deterministic=True, + step_results=partial_steps, + workflow_id=workflow_id, + error="LLM fallback disabled and deterministic path failed.", + ) + try: + llm_output = self._llm_fn() + except Exception as exc: + return ExecutionResult( + success=False, + used_deterministic=False, + step_results=partial_steps, + workflow_id=workflow_id, + error=f"LLM fallback raised: {exc}", + ) + return ExecutionResult( + success=True, + used_deterministic=False, + step_results=partial_steps, + llm_output=llm_output, + workflow_id=workflow_id, + ) diff --git a/circuitforge_core/pipeline/models.py b/circuitforge_core/pipeline/models.py new file mode 100644 index 0000000..426eca6 --- /dev/null +++ b/circuitforge_core/pipeline/models.py @@ -0,0 +1,216 @@ +# circuitforge_core/pipeline/models.py — crystallization data models +# +# MIT — protocol and model types only; no inference backends. +from __future__ import annotations + +import hashlib +import json +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any + + +# ── Utilities ───────────────────────────────────────────────────────────────── + +def hash_input(features: dict[str, Any]) -> str: + """Return a stable SHA-256 hex digest of *features*. + + Sorts keys before serialising so insertion order doesn't affect the hash. + Only call this on already-normalised, PII-free feature dicts — the hash is + opaque but the source dict should never contain raw user data. + """ + canonical = json.dumps(features, sort_keys=True, ensure_ascii=True) + return hashlib.sha256(canonical.encode()).hexdigest() + + +# ── Step ────────────────────────────────────────────────────────────────────── + +@dataclass +class Step: + """One atomic action in a deterministic workflow. + + The ``action`` string is product-defined (e.g. ``"dtmf"``, ``"field_fill"``, + ``"api_call"``). ``params`` carries action-specific values; ``description`` + is a plain-English summary for the approval UI. + """ + action: str + params: dict[str, Any] + description: str = "" + + def to_dict(self) -> dict[str, Any]: + return {"action": self.action, "params": self.params, + "description": self.description} + + @classmethod + def from_dict(cls, d: dict[str, Any]) -> "Step": + return cls(action=d["action"], params=d.get("params", {}), + description=d.get("description", "")) + + +# ── PipelineRun ─────────────────────────────────────────────────────────────── + +@dataclass +class PipelineRun: + """Record of one LLM-assisted execution — the raw material for crystallization. + + Fields + ------ + run_id: + UUID or unique string identifying this run. + product: + CF product code (``"osprey"``, ``"falcon"``, ``"peregrine"`` …). + task_type: + Product-defined task category (``"ivr_navigate"``, ``"form_fill"`` …). + input_hash: + SHA-256 of normalised, PII-free input features. Never store raw input. + steps: + Ordered list of Steps the LLM proposed. + approved: + True if a human approved this run before execution. + review_duration_ms: + Wall-clock milliseconds between displaying the proposal and the approval + click. Values under ~5 000 ms indicate a rubber-stamp — the + crystallizer may reject runs with suspiciously short reviews. + output_modified: + True if the user edited any step before approving. Modifications suggest + the LLM proposal was imperfect; too-easy crystallization from unmodified + runs may mean the task is already deterministic and the LLM is just + echoing a fixed pattern. + timestamp: + ISO 8601 UTC creation time. + llm_model: + Model ID that generated the steps, e.g. ``"llama3:8b-instruct"``. + metadata: + Freeform dict for product-specific extra fields. + """ + + run_id: str + product: str + task_type: str + input_hash: str + steps: list[Step] + approved: bool + review_duration_ms: int + output_modified: bool + timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) + llm_model: str | None = None + metadata: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return { + "run_id": self.run_id, + "product": self.product, + "task_type": self.task_type, + "input_hash": self.input_hash, + "steps": [s.to_dict() for s in self.steps], + "approved": self.approved, + "review_duration_ms": self.review_duration_ms, + "output_modified": self.output_modified, + "timestamp": self.timestamp, + "llm_model": self.llm_model, + "metadata": self.metadata, + } + + @classmethod + def from_dict(cls, d: dict[str, Any]) -> "PipelineRun": + return cls( + run_id=d["run_id"], + product=d["product"], + task_type=d["task_type"], + input_hash=d["input_hash"], + steps=[Step.from_dict(s) for s in d.get("steps", [])], + approved=d["approved"], + review_duration_ms=d["review_duration_ms"], + output_modified=d.get("output_modified", False), + timestamp=d.get("timestamp", ""), + llm_model=d.get("llm_model"), + metadata=d.get("metadata", {}), + ) + + +# ── CrystallizedWorkflow ────────────────────────────────────────────────────── + +@dataclass +class CrystallizedWorkflow: + """A deterministic workflow promoted from N approved PipelineRuns. + + Once crystallized, the executor runs ``steps`` directly — no LLM required + unless an edge case is encountered. + + Fields + ------ + workflow_id: + Unique identifier (typically ``{product}:{task_type}:{input_hash[:12]}``). + product / task_type / input_hash: + Same semantics as PipelineRun; the hash is the lookup key. + steps: + Canonical deterministic step sequence (majority-voted or most-recent, + per CrystallizerConfig.strategy). + crystallized_at: + ISO 8601 UTC timestamp. + run_ids: + IDs of the source PipelineRuns that contributed to this workflow. + approval_count: + Number of approved runs that went into crystallization. + avg_review_duration_ms: + Mean review_duration_ms across all source runs — low values are a + warning sign that approvals may not have been genuine. + all_output_unmodified: + True if every contributing run had output_modified=False. Combined with + a very short avg_review_duration_ms this can flag workflows that may + have crystallized from rubber-stamp approvals. + active: + Whether this workflow is in use. Set to False to disable without + deleting the record. + version: + Increments each time the workflow is re-crystallized from new runs. + """ + + workflow_id: str + product: str + task_type: str + input_hash: str + steps: list[Step] + crystallized_at: str + run_ids: list[str] + approval_count: int + avg_review_duration_ms: int + all_output_unmodified: bool + active: bool = True + version: int = 1 + metadata: dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> dict[str, Any]: + return { + "workflow_id": self.workflow_id, + "product": self.product, + "task_type": self.task_type, + "input_hash": self.input_hash, + "steps": [s.to_dict() for s in self.steps], + "crystallized_at": self.crystallized_at, + "run_ids": self.run_ids, + "approval_count": self.approval_count, + "avg_review_duration_ms": self.avg_review_duration_ms, + "all_output_unmodified": self.all_output_unmodified, + "active": self.active, + "version": self.version, + "metadata": self.metadata, + } + + @classmethod + def from_dict(cls, d: dict[str, Any]) -> "CrystallizedWorkflow": + return cls( + workflow_id=d["workflow_id"], + product=d["product"], + task_type=d["task_type"], + input_hash=d["input_hash"], + steps=[Step.from_dict(s) for s in d.get("steps", [])], + crystallized_at=d["crystallized_at"], + run_ids=d.get("run_ids", []), + approval_count=d["approval_count"], + avg_review_duration_ms=d["avg_review_duration_ms"], + all_output_unmodified=d.get("all_output_unmodified", True), + active=d.get("active", True), + version=d.get("version", 1), + metadata=d.get("metadata", {}), + ) diff --git a/circuitforge_core/pipeline/multimodal.py b/circuitforge_core/pipeline/multimodal.py new file mode 100644 index 0000000..b91bd38 --- /dev/null +++ b/circuitforge_core/pipeline/multimodal.py @@ -0,0 +1,234 @@ +# circuitforge_core/pipeline/multimodal.py — cf-docuvision + cf-text pipeline +# +# MIT — orchestration only; vision and text inference stay in their own modules. +# +# Usage (minimal): +# +# from circuitforge_core.pipeline.multimodal import MultimodalPipeline, MultimodalConfig +# +# pipe = MultimodalPipeline(MultimodalConfig()) +# for result in pipe.run(page_bytes_list): +# print(f"Page {result.page_idx}: {result.generated[:80]}") +# +# Streaming (token-by-token): +# +# for page_idx, token in pipe.stream(page_bytes_list): +# ui.append(page_idx, token) +# +from __future__ import annotations + +import logging +from collections.abc import Callable, Iterable, Iterator +from dataclasses import dataclass, field +from typing import Any + +from circuitforge_core.documents.client import DocuvisionClient +from circuitforge_core.documents.models import StructuredDocument + +log = logging.getLogger(__name__) + + +# ── Config ──────────────────────────────────────────────────────────────────── + +def _default_prompt(page_idx: int, doc: StructuredDocument) -> str: + """Build a generation prompt from a StructuredDocument.""" + header = f"[Page {page_idx + 1}]\n" if page_idx > 0 else "" + return header + doc.raw_text + + +@dataclass +class MultimodalConfig: + """Configuration for MultimodalPipeline. + + vision_url: + Base URL of the cf-docuvision service. + hint: + Docuvision extraction hint — ``"auto"`` | ``"document"`` | ``"form"`` + | ``"table"`` | ``"figure"``. + max_tokens: + Passed to cf-text generate per page. + temperature: + Sampling temperature for text generation. + vram_serialise: + When True, ``swap_fn`` is called between the vision and text steps + on each page. Use this on 8GB GPUs where Dolphin-v2 and the text + model cannot be resident simultaneously. + prompt_fn: + Callable ``(page_idx, StructuredDocument) -> str`` that builds the + generation prompt. Defaults to using ``doc.raw_text`` directly. + Products override this to add system context, few-shot examples, etc. + vision_timeout: + HTTP timeout in seconds for each cf-docuvision request. + """ + vision_url: str = "http://localhost:8003" + hint: str = "auto" + max_tokens: int = 512 + temperature: float = 0.7 + vram_serialise: bool = False + prompt_fn: Callable[[int, StructuredDocument], str] = field( + default_factory=lambda: _default_prompt + ) + vision_timeout: int = 60 + + +# ── Results ─────────────────────────────────────────────────────────────────── + +@dataclass +class PageResult: + """Result of processing one page through the vision + text pipeline. + + page_idx: + Zero-based page index. + doc: + StructuredDocument from cf-docuvision. + generated: + Full text output from cf-text for this page. + error: + Non-None if extraction or generation failed for this page. + """ + page_idx: int + doc: StructuredDocument | None + generated: str + error: str | None = None + + +# ── Pipeline ────────────────────────────────────────────────────────────────── + +class MultimodalPipeline: + """Chunk a multi-page document through vision extraction + text generation. + + Parameters + ---------- + config: + Pipeline configuration. + swap_fn: + Optional callable with no arguments, called between the vision and text + steps on each page when ``config.vram_serialise=True``. Products using + cf-orch wire this to the VRAM budget API so Dolphin-v2 can offload + before the text model loads. A no-op lambda works for testing. + generate_fn: + Text generation callable: ``(prompt, max_tokens, temperature) -> str``. + Defaults to ``circuitforge_core.text.generate``. Override in tests or + when the product manages its own text backend. + stream_fn: + Streaming text callable: ``(prompt, max_tokens, temperature) -> Iterator[str]``. + Defaults to ``circuitforge_core.text.generate`` with ``stream=True``. + """ + + def __init__( + self, + config: MultimodalConfig | None = None, + *, + swap_fn: Callable[[], None] | None = None, + generate_fn: Callable[..., str] | None = None, + stream_fn: Callable[..., Iterator[str]] | None = None, + ) -> None: + self._cfg = config or MultimodalConfig() + self._vision = DocuvisionClient( + base_url=self._cfg.vision_url, + timeout=self._cfg.vision_timeout, + ) + self._swap_fn = swap_fn + self._generate_fn = generate_fn + self._stream_fn = stream_fn + + # ── Public ──────────────────────────────────────────────────────────────── + + def run(self, pages: Iterable[bytes]) -> Iterator[PageResult]: + """Process each page and yield a PageResult as soon as it is ready. + + Callers receive pages one at a time — the UI can begin rendering + page 0 while pages 1..N are still being extracted and generated. + """ + for page_idx, page_bytes in enumerate(pages): + yield self._process_page(page_idx, page_bytes) + + def stream(self, pages: Iterable[bytes]) -> Iterator[tuple[int, str]]: + """Yield ``(page_idx, token)`` tuples for token-level progressive rendering. + + Each page is fully extracted before text generation begins, but tokens + are yielded as the text model produces them rather than waiting for the + full page output. + """ + for page_idx, page_bytes in enumerate(pages): + doc, err = self._extract(page_idx, page_bytes) + if err: + yield (page_idx, f"[extraction error: {err}]") + continue + + self._maybe_swap() + + prompt = self._cfg.prompt_fn(page_idx, doc) + try: + for token in self._stream_tokens(prompt): + yield (page_idx, token) + except Exception as exc: + log.error("page %d text streaming failed: %s", page_idx, exc) + yield (page_idx, f"[generation error: {exc}]") + + # ── Internal ────────────────────────────────────────────────────────────── + + def _process_page(self, page_idx: int, page_bytes: bytes) -> PageResult: + doc, err = self._extract(page_idx, page_bytes) + if err: + return PageResult(page_idx=page_idx, doc=None, generated="", error=err) + + self._maybe_swap() + + prompt = self._cfg.prompt_fn(page_idx, doc) + try: + text = self._generate(prompt) + except Exception as exc: + log.error("page %d generation failed: %s", page_idx, exc) + return PageResult(page_idx=page_idx, doc=doc, generated="", + error=str(exc)) + + return PageResult(page_idx=page_idx, doc=doc, generated=text) + + def _extract( + self, page_idx: int, page_bytes: bytes + ) -> tuple[StructuredDocument | None, str | None]: + try: + doc = self._vision.extract(page_bytes, hint=self._cfg.hint) + log.debug("page %d extracted: %d chars", page_idx, len(doc.raw_text)) + return doc, None + except Exception as exc: + log.error("page %d vision extraction failed: %s", page_idx, exc) + return None, str(exc) + + def _maybe_swap(self) -> None: + if self._cfg.vram_serialise and self._swap_fn is not None: + log.debug("vram_serialise: calling swap_fn") + self._swap_fn() + + def _generate(self, prompt: str) -> str: + if self._generate_fn is not None: + return self._generate_fn( + prompt, + max_tokens=self._cfg.max_tokens, + temperature=self._cfg.temperature, + ) + from circuitforge_core.text import generate + result = generate( + prompt, + max_tokens=self._cfg.max_tokens, + temperature=self._cfg.temperature, + ) + return result.text + + def _stream_tokens(self, prompt: str) -> Iterator[str]: + if self._stream_fn is not None: + yield from self._stream_fn( + prompt, + max_tokens=self._cfg.max_tokens, + temperature=self._cfg.temperature, + ) + return + from circuitforge_core.text import generate + tokens = generate( + prompt, + max_tokens=self._cfg.max_tokens, + temperature=self._cfg.temperature, + stream=True, + ) + yield from tokens diff --git a/circuitforge_core/pipeline/recorder.py b/circuitforge_core/pipeline/recorder.py new file mode 100644 index 0000000..5633129 --- /dev/null +++ b/circuitforge_core/pipeline/recorder.py @@ -0,0 +1,70 @@ +# circuitforge_core/pipeline/recorder.py — write and load PipelineRun records +# +# MIT — local file I/O only; no inference. +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Iterable + +from .models import PipelineRun + +log = logging.getLogger(__name__) + +_DEFAULT_ROOT = Path.home() / ".config" / "circuitforge" / "pipeline" / "runs" + + +class Recorder: + """Writes PipelineRun JSON records to a local directory tree. + + Layout:: + + {root}/{product}/{task_type}/{run_id}.json + + The recorder is intentionally append-only — it never deletes or modifies + existing records. Old runs accumulate as an audit trail; products that + want retention limits should prune the directory themselves. + """ + + def __init__(self, root: Path | None = None) -> None: + self._root = Path(root) if root else _DEFAULT_ROOT + + # ── Write ───────────────────────────────────────────────────────────────── + + def record(self, run: PipelineRun) -> Path: + """Persist *run* to disk and return the file path written.""" + dest = self._path_for(run.product, run.task_type, run.run_id) + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_text(json.dumps(run.to_dict(), indent=2), encoding="utf-8") + log.debug("recorded pipeline run %s → %s", run.run_id, dest) + return dest + + # ── Read ────────────────────────────────────────────────────────────────── + + def load_runs(self, product: str, task_type: str) -> list[PipelineRun]: + """Return all runs for *(product, task_type)*, newest-first.""" + directory = self._root / product / task_type + if not directory.is_dir(): + return [] + runs: list[PipelineRun] = [] + for p in directory.glob("*.json"): + try: + runs.append(PipelineRun.from_dict(json.loads(p.read_text()))) + except Exception: + log.warning("skipping unreadable run file %s", p) + runs.sort(key=lambda r: r.timestamp, reverse=True) + return runs + + def load_approved(self, product: str, task_type: str, + input_hash: str) -> list[PipelineRun]: + """Return approved runs that match *input_hash*, newest-first.""" + return [ + r for r in self.load_runs(product, task_type) + if r.approved and r.input_hash == input_hash + ] + + # ── Internal ────────────────────────────────────────────────────────────── + + def _path_for(self, product: str, task_type: str, run_id: str) -> Path: + return self._root / product / task_type / f"{run_id}.json" diff --git a/circuitforge_core/pipeline/registry.py b/circuitforge_core/pipeline/registry.py new file mode 100644 index 0000000..e295517 --- /dev/null +++ b/circuitforge_core/pipeline/registry.py @@ -0,0 +1,134 @@ +# circuitforge_core/pipeline/registry.py — workflow lookup +# +# MIT — file I/O and matching logic only. +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Callable + +from .models import CrystallizedWorkflow + +log = logging.getLogger(__name__) + +_DEFAULT_ROOT = Path.home() / ".config" / "circuitforge" / "pipeline" / "workflows" + + +class Registry: + """Loads and matches CrystallizedWorkflows from the local filesystem. + + Layout:: + + {root}/{product}/{task_type}/{workflow_id}.json + + Exact matching is always available. Products that need fuzzy/semantic + matching can supply a ``similarity_fn`` — a callable that takes two input + hashes and returns a float in [0, 1]. The registry returns the first + active workflow whose similarity score meets ``fuzzy_threshold``. + """ + + def __init__( + self, + root: Path | None = None, + similarity_fn: Callable[[str, str], float] | None = None, + fuzzy_threshold: float = 0.8, + ) -> None: + self._root = Path(root) if root else _DEFAULT_ROOT + self._similarity_fn = similarity_fn + self._fuzzy_threshold = fuzzy_threshold + + # ── Write ───────────────────────────────────────────────────────────────── + + def register(self, workflow: CrystallizedWorkflow) -> Path: + """Persist *workflow* and return the path written.""" + dest = self._path_for(workflow.product, workflow.task_type, + workflow.workflow_id) + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_text(json.dumps(workflow.to_dict(), indent=2), encoding="utf-8") + log.info("registered workflow %s (v%d)", workflow.workflow_id, + workflow.version) + return dest + + def deactivate(self, workflow_id: str, product: str, + task_type: str) -> bool: + """Set ``active=False`` on a stored workflow. Returns True if found.""" + path = self._path_for(product, task_type, workflow_id) + if not path.exists(): + return False + data = json.loads(path.read_text()) + data["active"] = False + path.write_text(json.dumps(data, indent=2), encoding="utf-8") + log.info("deactivated workflow %s", workflow_id) + return True + + # ── Read ────────────────────────────────────────────────────────────────── + + def load_all(self, product: str, task_type: str) -> list[CrystallizedWorkflow]: + """Return all (including inactive) workflows for *(product, task_type)*.""" + directory = self._root / product / task_type + if not directory.is_dir(): + return [] + workflows: list[CrystallizedWorkflow] = [] + for p in directory.glob("*.json"): + try: + workflows.append( + CrystallizedWorkflow.from_dict(json.loads(p.read_text())) + ) + except Exception: + log.warning("skipping unreadable workflow file %s", p) + return workflows + + # ── Match ───────────────────────────────────────────────────────────────── + + def match(self, product: str, task_type: str, + input_hash: str) -> CrystallizedWorkflow | None: + """Return the active workflow for an exact input_hash match, or None.""" + for wf in self.load_all(product, task_type): + if wf.active and wf.input_hash == input_hash: + log.debug("registry exact match: %s", wf.workflow_id) + return wf + return None + + def fuzzy_match(self, product: str, task_type: str, + input_hash: str) -> CrystallizedWorkflow | None: + """Return a workflow above the similarity threshold, or None. + + Requires a ``similarity_fn`` to have been supplied at construction. + If none was provided, raises ``RuntimeError``. + """ + if self._similarity_fn is None: + raise RuntimeError( + "fuzzy_match() requires a similarity_fn — none was supplied " + "to Registry.__init__()." + ) + best: CrystallizedWorkflow | None = None + best_score = 0.0 + for wf in self.load_all(product, task_type): + if not wf.active: + continue + score = self._similarity_fn(wf.input_hash, input_hash) + if score >= self._fuzzy_threshold and score > best_score: + best = wf + best_score = score + if best: + log.debug("registry fuzzy match: %s (score=%.2f)", best.workflow_id, + best_score) + return best + + def find(self, product: str, task_type: str, + input_hash: str) -> CrystallizedWorkflow | None: + """Exact match first; fuzzy match second (if similarity_fn is set).""" + exact = self.match(product, task_type, input_hash) + if exact: + return exact + if self._similarity_fn is not None: + return self.fuzzy_match(product, task_type, input_hash) + return None + + # ── Internal ────────────────────────────────────────────────────────────── + + def _path_for(self, product: str, task_type: str, + workflow_id: str) -> Path: + safe_id = workflow_id.replace(":", "_") + return self._root / product / task_type / f"{safe_id}.json" diff --git a/circuitforge_core/preferences/__init__.py b/circuitforge_core/preferences/__init__.py index 113629f..78157ff 100644 --- a/circuitforge_core/preferences/__init__.py +++ b/circuitforge_core/preferences/__init__.py @@ -40,8 +40,11 @@ def set_user_preference( s.set(user_id=user_id, path=path, value=value) +from . import accessibility as accessibility + __all__ = [ "get_path", "set_path", "get_user_preference", "set_user_preference", "LocalFileStore", "PreferenceStore", + "accessibility", ] diff --git a/circuitforge_core/preferences/accessibility.py b/circuitforge_core/preferences/accessibility.py new file mode 100644 index 0000000..e40974c --- /dev/null +++ b/circuitforge_core/preferences/accessibility.py @@ -0,0 +1,73 @@ +# circuitforge_core/preferences/accessibility.py — a11y preference keys +# +# First-class accessibility preferences so every product UI reads from +# the same store path without each implementing it separately. +# +# All keys use the "accessibility.*" namespace in the preference store. +# Products read these via get_user_preference() or the convenience helpers here. +from __future__ import annotations + +from circuitforge_core.preferences import get_user_preference, set_user_preference + +# ── Preference key constants ────────────────────────────────────────────────── + +PREF_REDUCED_MOTION = "accessibility.prefers_reduced_motion" +PREF_HIGH_CONTRAST = "accessibility.high_contrast" +PREF_FONT_SIZE = "accessibility.font_size" # "default" | "large" | "xlarge" +PREF_SCREEN_READER = "accessibility.screen_reader_mode" # reduces decorative content + +_DEFAULTS: dict[str, object] = { + PREF_REDUCED_MOTION: False, + PREF_HIGH_CONTRAST: False, + PREF_FONT_SIZE: "default", + PREF_SCREEN_READER: False, +} + + +# ── Convenience helpers ─────────────────────────────────────────────────────── + +def is_reduced_motion_preferred( + user_id: str | None = None, + store=None, +) -> bool: + """ + Return True if the user has requested reduced motion. + + Products must honour this in all animated UI elements: transitions, + auto-playing content, parallax, loaders. This maps to the CSS + `prefers-reduced-motion: reduce` media query and is the canonical + source of truth across all CF product UIs. + + Default: False. + """ + val = get_user_preference( + user_id, PREF_REDUCED_MOTION, default=False, store=store + ) + return bool(val) + + +def is_high_contrast(user_id: str | None = None, store=None) -> bool: + """Return True if the user has requested high-contrast mode.""" + return bool(get_user_preference(user_id, PREF_HIGH_CONTRAST, default=False, store=store)) + + +def get_font_size(user_id: str | None = None, store=None) -> str: + """Return the user's preferred font size: 'default' | 'large' | 'xlarge'.""" + val = get_user_preference(user_id, PREF_FONT_SIZE, default="default", store=store) + if val not in ("default", "large", "xlarge"): + return "default" + return str(val) + + +def is_screen_reader_mode(user_id: str | None = None, store=None) -> bool: + """Return True if the user has requested screen reader optimised output.""" + return bool(get_user_preference(user_id, PREF_SCREEN_READER, default=False, store=store)) + + +def set_reduced_motion( + value: bool, + user_id: str | None = None, + store=None, +) -> None: + """Persist the user's reduced-motion preference.""" + set_user_preference(user_id, PREF_REDUCED_MOTION, value, store=store) diff --git a/circuitforge_core/text/__init__.py b/circuitforge_core/text/__init__.py new file mode 100644 index 0000000..761bdc0 --- /dev/null +++ b/circuitforge_core/text/__init__.py @@ -0,0 +1,144 @@ +""" +circuitforge_core.text — direct text generation service module. + +Provides lightweight, low-overhead text generation that bypasses ollama/vllm +for products that need fast, frequent inference from small local models. + +Quick start (mock mode — no model required): + + import os; os.environ["CF_TEXT_MOCK"] = "1" + from circuitforge_core.text import generate, chat, ChatMessage + + result = generate("Write a short cover letter intro.") + print(result.text) + + reply = chat([ + ChatMessage("system", "You are a helpful recipe assistant."), + ChatMessage("user", "What can I make with eggs, spinach, and feta?"), + ]) + print(reply.text) + +Real inference (GGUF model): + + export CF_TEXT_MODEL=/Library/Assets/LLM/qwen2.5-3b-instruct-q4_k_m.gguf + from circuitforge_core.text import generate + result = generate("Summarise this job posting in 2 sentences: ...") + +Backend selection (CF_TEXT_BACKEND env or explicit): + + from circuitforge_core.text import make_backend + backend = make_backend("/path/to/model.gguf", backend="llamacpp") + +cf-orch service profile: + + service_type: cf-text + max_mb: per-model (3B Q4 ≈ 2048, 7B Q4 ≈ 4096) + preferred_compute: 7.5 minimum (INT8 tensor cores) + max_concurrent: 2 + shared: true +""" +from __future__ import annotations + +import os + +from circuitforge_core.text.backends.base import ( + ChatMessage, + GenerateResult, + TextBackend, + make_text_backend, +) +from circuitforge_core.text.backends.mock import MockTextBackend + +# ── Process-level singleton backend ────────────────────────────────────────── +# Lazily initialised on first call to generate() or chat(). +# Products that need per-user or per-request backends should use make_backend(). + +_backend: TextBackend | None = None + + +def _get_backend() -> TextBackend: + global _backend + if _backend is None: + model_path = os.environ.get("CF_TEXT_MODEL", "mock") + mock = model_path == "mock" or os.environ.get("CF_TEXT_MOCK", "") == "1" + _backend = make_text_backend(model_path, mock=mock) + return _backend + + +def make_backend( + model_path: str, + backend: str | None = None, + mock: bool | None = None, +) -> TextBackend: + """ + Create a TextBackend for the given model. + + Use this when you need a dedicated backend per request or per user, + rather than the process-level singleton used by generate() and chat(). + """ + return make_text_backend(model_path, backend=backend, mock=mock) + + +# ── Convenience functions (singleton path) ──────────────────────────────────── + + +def generate( + prompt: str, + *, + model: str | None = None, + max_tokens: int = 512, + temperature: float = 0.7, + stream: bool = False, + stop: list[str] | None = None, +): + """ + Generate text from a prompt using the process-level backend. + + stream=True returns an Iterator[str] of tokens instead of GenerateResult. + model is accepted for API symmetry with LLMRouter but ignored by the + singleton path — set CF_TEXT_MODEL to change the loaded model. + """ + backend = _get_backend() + if stream: + return backend.generate_stream(prompt, max_tokens=max_tokens, temperature=temperature, stop=stop) + return backend.generate(prompt, max_tokens=max_tokens, temperature=temperature, stop=stop) + + +def chat( + messages: list[ChatMessage], + *, + model: str | None = None, + max_tokens: int = 512, + temperature: float = 0.7, + stream: bool = False, +) -> GenerateResult: + """ + Chat completion using the process-level backend. + + messages should be a list of ChatMessage(role, content) objects. + stream=True is not yet supported on the chat path; pass stream=False. + """ + if stream: + raise NotImplementedError( + "stream=True is not yet supported for chat(). " + "Use generate_stream() directly on a backend instance." + ) + return _get_backend().chat(messages, max_tokens=max_tokens, temperature=temperature) + + +def reset_backend() -> None: + """Reset the process-level singleton. Test teardown only.""" + global _backend + _backend = None + + +__all__ = [ + "ChatMessage", + "GenerateResult", + "TextBackend", + "MockTextBackend", + "make_backend", + "generate", + "chat", + "reset_backend", +] diff --git a/circuitforge_core/text/backends/__init__.py b/circuitforge_core/text/backends/__init__.py new file mode 100644 index 0000000..06981e8 --- /dev/null +++ b/circuitforge_core/text/backends/__init__.py @@ -0,0 +1,10 @@ +from .base import ChatMessage, GenerateResult, TextBackend, make_text_backend +from .mock import MockTextBackend + +__all__ = [ + "ChatMessage", + "GenerateResult", + "TextBackend", + "MockTextBackend", + "make_text_backend", +] diff --git a/circuitforge_core/text/backends/base.py b/circuitforge_core/text/backends/base.py new file mode 100644 index 0000000..8233165 --- /dev/null +++ b/circuitforge_core/text/backends/base.py @@ -0,0 +1,182 @@ +# circuitforge_core/text/backends/base.py — TextBackend Protocol + factory +# +# MIT licensed. The Protocol and mock backend are always importable. +# Real backends (LlamaCppBackend, TransformersBackend) require optional extras. +from __future__ import annotations + +import os +from typing import AsyncIterator, Iterator, Protocol, runtime_checkable + + +# ── Shared result types ─────────────────────────────────────────────────────── + + +class GenerateResult: + """Result from a single non-streaming generate() call.""" + + def __init__(self, text: str, tokens_used: int = 0, model: str = "") -> None: + self.text = text + self.tokens_used = tokens_used + self.model = model + + def __repr__(self) -> str: + return f"GenerateResult(text={self.text!r:.40}, tokens={self.tokens_used})" + + +class ChatMessage: + """A single message in a chat conversation.""" + + def __init__(self, role: str, content: str) -> None: + if role not in ("system", "user", "assistant"): + raise ValueError(f"Invalid role {role!r}. Must be system, user, or assistant.") + self.role = role + self.content = content + + def to_dict(self) -> dict: + return {"role": self.role, "content": self.content} + + +# ── TextBackend Protocol ────────────────────────────────────────────────────── + + +@runtime_checkable +class TextBackend(Protocol): + """ + Abstract interface for direct text generation backends. + + All generate/chat methods have both sync and async variants. + Streaming variants yield str tokens rather than a complete result. + + Implementations must be safe to construct once and call concurrently + (the model is loaded at construction time and reused across calls). + """ + + def generate( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> GenerateResult: + """Synchronous generate — blocks until the full response is produced.""" + ... + + def generate_stream( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> Iterator[str]: + """Synchronous streaming — yields tokens as they are produced.""" + ... + + async def generate_async( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> GenerateResult: + """Async generate — runs in thread pool, never blocks the event loop.""" + ... + + async def generate_stream_async( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> AsyncIterator[str]: + """Async streaming — yields tokens without blocking the event loop.""" + ... + + def chat( + self, + messages: list[ChatMessage], + *, + max_tokens: int = 512, + temperature: float = 0.7, + ) -> GenerateResult: + """Chat completion — formats messages into a prompt and generates.""" + ... + + @property + def model_name(self) -> str: + """Identifier for the loaded model (path stem or HF repo ID).""" + ... + + @property + def vram_mb(self) -> int: + """Approximate VRAM footprint in MB. Used by cf-orch service registry.""" + ... + + +# ── Backend selection ───────────────────────────────────────────────────────── + + +def _select_backend(model_path: str, backend: str | None) -> str: + """ + Return "llamacpp" or "transformers" for the given model path. + + Parameters + ---------- + model_path Path to the model file or HuggingFace repo ID (e.g. "Qwen/Qwen2.5-3B"). + backend Explicit override from the caller ("llamacpp" | "transformers" | None). + When provided, trust it without inspection. + + Return "llamacpp" or "transformers". Raise ValueError for unrecognised values. + """ + _VALID = ("llamacpp", "transformers") + + # 1. Caller-supplied override — highest trust, no inspection needed. + resolved = backend or os.environ.get("CF_TEXT_BACKEND") + if resolved: + if resolved not in _VALID: + raise ValueError( + f"CF_TEXT_BACKEND={resolved!r} is not valid. Choose: {', '.join(_VALID)}" + ) + return resolved + + # 2. Format detection — GGUF files are unambiguously llama-cpp territory. + if model_path.lower().endswith(".gguf"): + return "llamacpp" + + # 3. Safe default — transformers covers HF repo IDs and safetensors dirs. + return "transformers" + + +# ── Factory ─────────────────────────────────────────────────────────────────── + + +def make_text_backend( + model_path: str, + backend: str | None = None, + mock: bool | None = None, +) -> "TextBackend": + """ + Return a TextBackend for the given model. + + mock=True or CF_TEXT_MOCK=1 → MockTextBackend (no GPU, no model file needed) + Otherwise → backend resolved via _select_backend() + """ + use_mock = mock if mock is not None else os.environ.get("CF_TEXT_MOCK", "") == "1" + if use_mock: + from circuitforge_core.text.backends.mock import MockTextBackend + return MockTextBackend(model_name=model_path) + + resolved = _select_backend(model_path, backend) + + if resolved == "llamacpp": + from circuitforge_core.text.backends.llamacpp import LlamaCppBackend + return LlamaCppBackend(model_path=model_path) + + if resolved == "transformers": + from circuitforge_core.text.backends.transformers import TransformersBackend + return TransformersBackend(model_path=model_path) + + raise ValueError(f"Unknown backend {resolved!r}. Expected 'llamacpp' or 'transformers'.") diff --git a/circuitforge_core/text/backends/llamacpp.py b/circuitforge_core/text/backends/llamacpp.py new file mode 100644 index 0000000..2ddc932 --- /dev/null +++ b/circuitforge_core/text/backends/llamacpp.py @@ -0,0 +1,192 @@ +# circuitforge_core/text/backends/llamacpp.py — llama-cpp-python backend +# +# BSL 1.1: real inference. Requires llama-cpp-python + a GGUF model file. +# Install: pip install circuitforge-core[text-llamacpp] +# +# VRAM estimates (Q4_K_M quant): +# 1B → ~700MB 3B → ~2048MB 7B → ~4096MB +# 13B → ~7500MB 70B → ~40000MB +from __future__ import annotations + +import asyncio +import logging +import os +from pathlib import Path +from typing import AsyncIterator, Iterator + +from circuitforge_core.text.backends.base import ChatMessage, GenerateResult + +logger = logging.getLogger(__name__) + +# Q4_K_M is the recommended default — best accuracy/size tradeoff for local use. +_DEFAULT_N_CTX = int(os.environ.get("CF_TEXT_CTX", "4096")) +_DEFAULT_N_GPU_LAYERS = int(os.environ.get("CF_TEXT_GPU_LAYERS", "-1")) # -1 = all layers + + +def _estimate_vram_mb(model_path: str) -> int: + """Rough VRAM estimate from file size. Accurate enough for cf-orch budgeting.""" + try: + size_mb = Path(model_path).stat().st_size // (1024 * 1024) + # GGUF models typically need ~1.1× file size in VRAM (KV cache overhead) + return int(size_mb * 1.1) + except OSError: + return 4096 # conservative default + + +class LlamaCppBackend: + """ + Direct llama-cpp-python inference backend for GGUF models. + + The model is loaded once at construction. All inference runs in a thread + pool executor so async callers never block the event loop. + + Context window, GPU layers, and thread count are configurable via env: + CF_TEXT_CTX token context window (default 4096) + CF_TEXT_GPU_LAYERS GPU layers to offload, -1 = all (default -1) + CF_TEXT_THREADS CPU thread count (default: auto) + + Requires: pip install circuitforge-core[text-llamacpp] + """ + + def __init__(self, model_path: str) -> None: + try: + from llama_cpp import Llama # type: ignore[import] + except ImportError as exc: + raise ImportError( + "llama-cpp-python is required for LlamaCppBackend. " + "Install with: pip install circuitforge-core[text-llamacpp]" + ) from exc + + if not Path(model_path).exists(): + raise FileNotFoundError( + f"GGUF model not found: {model_path}\n" + "Download a GGUF model and set CF_TEXT_MODEL to its path." + ) + + n_threads = int(os.environ.get("CF_TEXT_THREADS", "0")) or None + logger.info( + "Loading GGUF model %s (ctx=%d, gpu_layers=%d)", + model_path, _DEFAULT_N_CTX, _DEFAULT_N_GPU_LAYERS, + ) + self._llm = Llama( + model_path=model_path, + n_ctx=_DEFAULT_N_CTX, + n_gpu_layers=_DEFAULT_N_GPU_LAYERS, + n_threads=n_threads, + verbose=False, + ) + self._model_path = model_path + self._vram_mb = _estimate_vram_mb(model_path) + + @property + def model_name(self) -> str: + return Path(self._model_path).stem + + @property + def vram_mb(self) -> int: + return self._vram_mb + + def generate( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> GenerateResult: + output = self._llm( + prompt, + max_tokens=max_tokens, + temperature=temperature, + stop=stop or [], + stream=False, + ) + text = output["choices"][0]["text"] + tokens_used = output["usage"]["completion_tokens"] + return GenerateResult(text=text, tokens_used=tokens_used, model=self.model_name) + + def generate_stream( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> Iterator[str]: + for chunk in self._llm( + prompt, + max_tokens=max_tokens, + temperature=temperature, + stop=stop or [], + stream=True, + ): + yield chunk["choices"][0]["text"] + + async def generate_async( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> GenerateResult: + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.generate(prompt, max_tokens=max_tokens, temperature=temperature, stop=stop), + ) + + async def generate_stream_async( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> AsyncIterator[str]: + # llama_cpp streaming is synchronous — run in executor and re-emit tokens + import queue + import threading + + token_queue: queue.Queue = queue.Queue() + _DONE = object() + + def _produce() -> None: + try: + for chunk in self._llm( + prompt, + max_tokens=max_tokens, + temperature=temperature, + stop=stop or [], + stream=True, + ): + token_queue.put(chunk["choices"][0]["text"]) + finally: + token_queue.put(_DONE) + + thread = threading.Thread(target=_produce, daemon=True) + thread.start() + + loop = asyncio.get_event_loop() + while True: + token = await loop.run_in_executor(None, token_queue.get) + if token is _DONE: + break + yield token + + def chat( + self, + messages: list[ChatMessage], + *, + max_tokens: int = 512, + temperature: float = 0.7, + ) -> GenerateResult: + # llama-cpp-python has native chat_completion for instruct models + output = self._llm.create_chat_completion( + messages=[m.to_dict() for m in messages], + max_tokens=max_tokens, + temperature=temperature, + ) + text = output["choices"][0]["message"]["content"] + tokens_used = output["usage"]["completion_tokens"] + return GenerateResult(text=text, tokens_used=tokens_used, model=self.model_name) diff --git a/circuitforge_core/text/backends/mock.py b/circuitforge_core/text/backends/mock.py new file mode 100644 index 0000000..373b8c9 --- /dev/null +++ b/circuitforge_core/text/backends/mock.py @@ -0,0 +1,104 @@ +# circuitforge_core/text/backends/mock.py — synthetic text backend +# +# MIT licensed. No model file, no GPU, no extras required. +# Used in dev, CI, and free-tier nodes below the minimum VRAM threshold. +from __future__ import annotations + +import asyncio +from typing import AsyncIterator, Iterator + +from circuitforge_core.text.backends.base import ChatMessage, GenerateResult + +_MOCK_RESPONSE = ( + "This is a synthetic response from MockTextBackend. " + "Install a real backend (llama-cpp-python or transformers) and provide a model path " + "to generate real text." +) + + +class MockTextBackend: + """ + Deterministic synthetic text backend for development and CI. + + Always returns the same fixed response so tests are reproducible without + a GPU or model file. Streaming emits the response word-by-word with a + configurable delay so UI streaming paths can be exercised. + """ + + def __init__( + self, + model_name: str = "mock", + token_delay_s: float = 0.0, + ) -> None: + self._model_name = model_name + self._token_delay_s = token_delay_s + + @property + def model_name(self) -> str: + return self._model_name + + @property + def vram_mb(self) -> int: + return 0 + + def _response_for(self, prompt_or_messages: str) -> str: + return _MOCK_RESPONSE + + def generate( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> GenerateResult: + text = self._response_for(prompt) + return GenerateResult(text=text, tokens_used=len(text.split()), model=self._model_name) + + def generate_stream( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> Iterator[str]: + import time + for word in self._response_for(prompt).split(): + yield word + " " + if self._token_delay_s: + time.sleep(self._token_delay_s) + + async def generate_async( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> GenerateResult: + return self.generate(prompt, max_tokens=max_tokens, temperature=temperature, stop=stop) + + async def generate_stream_async( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> AsyncIterator[str]: + for word in self._response_for(prompt).split(): + yield word + " " + if self._token_delay_s: + await asyncio.sleep(self._token_delay_s) + + def chat( + self, + messages: list[ChatMessage], + *, + max_tokens: int = 512, + temperature: float = 0.7, + ) -> GenerateResult: + # Format messages into a simple prompt for the mock response + prompt = "\n".join(f"{m.role}: {m.content}" for m in messages) + return self.generate(prompt, max_tokens=max_tokens, temperature=temperature) diff --git a/circuitforge_core/text/backends/transformers.py b/circuitforge_core/text/backends/transformers.py new file mode 100644 index 0000000..bb17591 --- /dev/null +++ b/circuitforge_core/text/backends/transformers.py @@ -0,0 +1,197 @@ +# circuitforge_core/text/backends/transformers.py — HuggingFace transformers backend +# +# BSL 1.1: real inference. Requires torch + transformers + a model checkpoint. +# Install: pip install circuitforge-core[text-transformers] +# +# Best for: HF repo IDs, safetensors checkpoints, models without GGUF versions. +# For GGUF models prefer LlamaCppBackend — lower overhead, smaller install. +from __future__ import annotations + +import asyncio +import logging +import os +from typing import AsyncIterator, Iterator + +from circuitforge_core.text.backends.base import ChatMessage, GenerateResult + +logger = logging.getLogger(__name__) + +_DEFAULT_MAX_NEW_TOKENS = 512 +_LOAD_IN_4BIT = os.environ.get("CF_TEXT_4BIT", "0") == "1" +_LOAD_IN_8BIT = os.environ.get("CF_TEXT_8BIT", "0") == "1" + + +class TransformersBackend: + """ + HuggingFace transformers inference backend. + + Loads any causal LM available on HuggingFace Hub or a local checkpoint dir. + Supports 4-bit and 8-bit quantization via bitsandbytes when VRAM is limited: + CF_TEXT_4BIT=1 — load_in_4bit (requires bitsandbytes) + CF_TEXT_8BIT=1 — load_in_8bit (requires bitsandbytes) + + Chat completion uses the tokenizer's apply_chat_template() when available, + falling back to a simple "User: / Assistant:" prompt format. + + Requires: pip install circuitforge-core[text-transformers] + """ + + def __init__(self, model_path: str) -> None: + try: + import torch + from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer + except ImportError as exc: + raise ImportError( + "torch and transformers are required for TransformersBackend. " + "Install with: pip install circuitforge-core[text-transformers]" + ) from exc + + self._device = "cuda" if torch.cuda.is_available() else "cpu" + logger.info("Loading transformers model %s on %s", model_path, self._device) + + load_kwargs: dict = {"device_map": "auto" if self._device == "cuda" else None} + if _LOAD_IN_4BIT: + load_kwargs["load_in_4bit"] = True + elif _LOAD_IN_8BIT: + load_kwargs["load_in_8bit"] = True + + self._tokenizer = AutoTokenizer.from_pretrained(model_path) + self._model = AutoModelForCausalLM.from_pretrained(model_path, **load_kwargs) + if self._device == "cpu": + self._model = self._model.to("cpu") + + self._model_path = model_path + self._TextIteratorStreamer = TextIteratorStreamer + + @property + def model_name(self) -> str: + # HF repo IDs contain "/" — use the part after the slash as a short name + return self._model_path.split("/")[-1] + + @property + def vram_mb(self) -> int: + try: + import torch + if torch.cuda.is_available(): + return torch.cuda.memory_allocated() // (1024 * 1024) + except Exception: + pass + return 0 + + def _build_inputs(self, prompt: str): + return self._tokenizer(prompt, return_tensors="pt").to(self._device) + + def generate( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> GenerateResult: + inputs = self._build_inputs(prompt) + input_len = inputs["input_ids"].shape[1] + outputs = self._model.generate( + **inputs, + max_new_tokens=max_tokens, + temperature=temperature, + do_sample=temperature > 0, + pad_token_id=self._tokenizer.eos_token_id, + ) + new_tokens = outputs[0][input_len:] + text = self._tokenizer.decode(new_tokens, skip_special_tokens=True) + return GenerateResult(text=text, tokens_used=len(new_tokens), model=self.model_name) + + def generate_stream( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> Iterator[str]: + import threading + + inputs = self._build_inputs(prompt) + streamer = self._TextIteratorStreamer( + self._tokenizer, skip_prompt=True, skip_special_tokens=True + ) + gen_kwargs = dict( + **inputs, + max_new_tokens=max_tokens, + temperature=temperature, + do_sample=temperature > 0, + streamer=streamer, + pad_token_id=self._tokenizer.eos_token_id, + ) + thread = threading.Thread(target=self._model.generate, kwargs=gen_kwargs, daemon=True) + thread.start() + yield from streamer + + async def generate_async( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> GenerateResult: + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + lambda: self.generate(prompt, max_tokens=max_tokens, temperature=temperature, stop=stop), + ) + + async def generate_stream_async( + self, + prompt: str, + *, + max_tokens: int = 512, + temperature: float = 0.7, + stop: list[str] | None = None, + ) -> AsyncIterator[str]: + import queue + import threading + + token_queue: queue.Queue = queue.Queue() + _DONE = object() + + def _produce() -> None: + try: + for token in self.generate_stream( + prompt, max_tokens=max_tokens, temperature=temperature + ): + token_queue.put(token) + finally: + token_queue.put(_DONE) + + threading.Thread(target=_produce, daemon=True).start() + loop = asyncio.get_event_loop() + while True: + token = await loop.run_in_executor(None, token_queue.get) + if token is _DONE: + break + yield token + + def chat( + self, + messages: list[ChatMessage], + *, + max_tokens: int = 512, + temperature: float = 0.7, + ) -> GenerateResult: + # Use the tokenizer's chat template when available (instruct models) + if hasattr(self._tokenizer, "apply_chat_template") and self._tokenizer.chat_template: + prompt = self._tokenizer.apply_chat_template( + [m.to_dict() for m in messages], + tokenize=False, + add_generation_prompt=True, + ) + else: + prompt = "\n".join( + f"{'User' if m.role == 'user' else 'Assistant'}: {m.content}" + for m in messages + if m.role != "system" + ) + "\nAssistant:" + + return self.generate(prompt, max_tokens=max_tokens, temperature=temperature) diff --git a/pyproject.toml b/pyproject.toml index d34a5a1..ca053fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "circuitforge-core" -version = "0.8.0" +version = "0.9.0" description = "Shared scaffold for CircuitForge products (MIT)" requires-python = ">=3.11" dependencies = [ diff --git a/tests/test_pipeline/__init__.py b/tests/test_pipeline/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_pipeline/test_crystallizer.py b/tests/test_pipeline/test_crystallizer.py new file mode 100644 index 0000000..41fa1eb --- /dev/null +++ b/tests/test_pipeline/test_crystallizer.py @@ -0,0 +1,198 @@ +"""Tests for pipeline.crystallizer — the core promotion logic.""" +import warnings +import pytest +from circuitforge_core.pipeline.crystallizer import ( + CrystallizerConfig, + crystallize, + evaluate_new_run, + should_crystallize, +) +from circuitforge_core.pipeline.models import PipelineRun, Step +from circuitforge_core.pipeline.recorder import Recorder + + +# ── Fixtures / helpers ──────────────────────────────────────────────────────── + +def _run(run_id, approved=True, review_ms=8000, modified=False, + steps=None, input_hash="fixedhash", + ts="2026-04-08T00:00:00+00:00") -> PipelineRun: + return PipelineRun( + run_id=run_id, + product="osprey", + task_type="ivr_navigate", + input_hash=input_hash, + steps=steps or [Step("dtmf", {"digits": "1"})], + approved=approved, + review_duration_ms=review_ms, + output_modified=modified, + timestamp=ts, + ) + + +_CFG = CrystallizerConfig(threshold=3, min_review_ms=5_000) + + +# ── should_crystallize ──────────────────────────────────────────────────────── + +class TestShouldCrystallize: + def test_returns_false_below_threshold(self): + runs = [_run(f"r{i}") for i in range(2)] + assert should_crystallize(runs, _CFG) is False + + def test_returns_true_at_threshold(self): + runs = [_run(f"r{i}") for i in range(3)] + assert should_crystallize(runs, _CFG) is True + + def test_returns_true_above_threshold(self): + runs = [_run(f"r{i}") for i in range(10)] + assert should_crystallize(runs, _CFG) is True + + def test_unapproved_runs_not_counted(self): + approved = [_run(f"r{i}") for i in range(2)] + unapproved = [_run(f"u{i}", approved=False) for i in range(10)] + assert should_crystallize(approved + unapproved, _CFG) is False + + def test_threshold_one(self): + cfg = CrystallizerConfig(threshold=1) + assert should_crystallize([_run("r1")], cfg) is True + + +# ── crystallize ─────────────────────────────────────────────────────────────── + +class TestCrystallize: + def _approved_runs(self, n=3, review_ms=8000): + return [_run(f"r{i}", review_ms=review_ms) for i in range(n)] + + def test_produces_workflow(self): + wf = crystallize(self._approved_runs(), _CFG) + assert wf.product == "osprey" + assert wf.task_type == "ivr_navigate" + assert wf.approval_count == 3 + + def test_workflow_id_format(self): + wf = crystallize(self._approved_runs(), _CFG) + assert wf.workflow_id.startswith("osprey:ivr_navigate:") + + def test_avg_review_duration_computed(self): + runs = [_run("r0", review_ms=6000), _run("r1", review_ms=10000), + _run("r2", review_ms=8000)] + wf = crystallize(runs, _CFG) + assert wf.avg_review_duration_ms == 8000 + + def test_all_output_unmodified_true(self): + runs = self._approved_runs() + wf = crystallize(runs, _CFG) + assert wf.all_output_unmodified is True + + def test_all_output_unmodified_false_when_any_modified(self): + runs = [_run("r0"), _run("r1"), _run("r2", modified=True)] + wf = crystallize(runs, _CFG) + assert wf.all_output_unmodified is False + + def test_raises_below_threshold(self): + with pytest.raises(ValueError, match="Need 3"): + crystallize([_run("r0"), _run("r1")], _CFG) + + def test_raises_on_mixed_products(self): + r1 = _run("r1") + r2 = PipelineRun( + run_id="r2", product="falcon", task_type="ivr_navigate", + input_hash="fixedhash", steps=r1.steps, approved=True, + review_duration_ms=8000, output_modified=False, + ) + with pytest.raises(ValueError, match="product"): + crystallize([r1, r2, r1], _CFG) + + def test_raises_on_mixed_hashes(self): + runs = [_run("r0", input_hash="hash_a"), + _run("r1", input_hash="hash_b"), + _run("r2", input_hash="hash_a")] + with pytest.raises(ValueError, match="input_hash"): + crystallize(runs, _CFG) + + def test_rubber_stamp_warning(self): + runs = [_run(f"r{i}", review_ms=100) for i in range(3)] + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + crystallize(runs, _CFG) + assert any("rubber-stamp" in str(w.message) for w in caught) + + def test_no_warning_when_min_review_ms_zero(self): + cfg = CrystallizerConfig(threshold=3, min_review_ms=0) + runs = [_run(f"r{i}", review_ms=1) for i in range(3)] + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + crystallize(runs, cfg) + assert not any("rubber-stamp" in str(w.message) for w in caught) + + def test_version_increments(self): + wf = crystallize(self._approved_runs(), _CFG, existing_version=2) + assert wf.version == 3 + + def test_strategy_most_recent_uses_latest(self): + steps_old = [Step("dtmf", {"digits": "9"})] + steps_new = [Step("dtmf", {"digits": "1"})] + runs = [ + _run("r0", steps=steps_old, ts="2026-01-01T00:00:00+00:00"), + _run("r1", steps=steps_old, ts="2026-01-02T00:00:00+00:00"), + _run("r2", steps=steps_new, ts="2026-04-08T00:00:00+00:00"), + ] + cfg = CrystallizerConfig(threshold=3, strategy="most_recent") + wf = crystallize(runs, cfg) + assert wf.steps[0].params["digits"] == "1" + + def test_strategy_majority_picks_common_action(self): + steps_a = [Step("dtmf", {"digits": "1"})] + steps_b = [Step("press_key", {"key": "2"})] + runs = [ + _run("r0", steps=steps_a), + _run("r1", steps=steps_a), + _run("r2", steps=steps_b), + ] + cfg = CrystallizerConfig(threshold=3, strategy="majority") + wf = crystallize(runs, cfg) + assert wf.steps[0].action == "dtmf" + + def test_strategy_majority_falls_back_on_length_mismatch(self): + runs = [ + _run("r0", steps=[Step("dtmf", {"digits": "1"})]), + _run("r1", steps=[Step("dtmf", {"digits": "1"}), + Step("dtmf", {"digits": "2"})]), + _run("r2", steps=[Step("dtmf", {"digits": "1"})], + ts="2026-04-08T00:00:00+00:00"), + ] + cfg = CrystallizerConfig(threshold=3, strategy="majority") + # Should not raise — falls back to most_recent + wf = crystallize(runs, cfg) + assert wf.steps is not None + + +# ── evaluate_new_run ────────────────────────────────────────────────────────── + +class TestEvaluateNewRun: + def test_returns_none_before_threshold(self, tmp_path): + rec = Recorder(root=tmp_path) + cfg = CrystallizerConfig(threshold=3, min_review_ms=0) + result = evaluate_new_run(_run("r1"), rec, cfg) + assert result is None + + def test_returns_workflow_at_threshold(self, tmp_path): + rec = Recorder(root=tmp_path) + cfg = CrystallizerConfig(threshold=3, min_review_ms=0) + for i in range(2): + evaluate_new_run(_run(f"r{i}"), rec, cfg) + wf = evaluate_new_run(_run("r2"), rec, cfg) + assert wf is not None + assert wf.approval_count == 3 + + def test_unapproved_run_does_not_trigger(self, tmp_path): + rec = Recorder(root=tmp_path) + cfg = CrystallizerConfig(threshold=1, min_review_ms=0) + result = evaluate_new_run(_run("r1", approved=False), rec, cfg) + assert result is None + + def test_run_is_recorded_even_if_not_approved(self, tmp_path): + rec = Recorder(root=tmp_path) + cfg = CrystallizerConfig(threshold=3, min_review_ms=0) + evaluate_new_run(_run("r1", approved=False), rec, cfg) + assert len(rec.load_runs("osprey", "ivr_navigate")) == 1 diff --git a/tests/test_pipeline/test_executor.py b/tests/test_pipeline/test_executor.py new file mode 100644 index 0000000..afb5928 --- /dev/null +++ b/tests/test_pipeline/test_executor.py @@ -0,0 +1,96 @@ +"""Tests for pipeline.Executor.""" +import pytest +from circuitforge_core.pipeline.executor import Executor, ExecutionResult +from circuitforge_core.pipeline.models import CrystallizedWorkflow, Step + + +def _wf(steps=None) -> CrystallizedWorkflow: + return CrystallizedWorkflow( + workflow_id="osprey:ivr_navigate:abc", + product="osprey", + task_type="ivr_navigate", + input_hash="abc", + steps=[Step("dtmf", {"digits": "1"}), Step("dtmf", {"digits": "2"})] + if steps is None else steps, + crystallized_at="2026-04-08T00:00:00+00:00", + run_ids=["r1"], + approval_count=1, + avg_review_duration_ms=8000, + all_output_unmodified=True, + ) + + +def _ok_step(_step): + return True, "ok" + + +def _fail_step(_step): + return False, None + + +def _raise_step(_step): + raise RuntimeError("hardware error") + + +def _llm(): + return "llm-output" + + +class TestExecutor: + def test_all_steps_succeed(self): + ex = Executor(step_fn=_ok_step, llm_fn=_llm) + result = ex.execute(_wf()) + assert result.success is True + assert result.used_deterministic is True + assert len(result.step_results) == 2 + + def test_failed_step_triggers_llm_fallback(self): + ex = Executor(step_fn=_fail_step, llm_fn=_llm) + result = ex.execute(_wf()) + assert result.success is True + assert result.used_deterministic is False + assert result.llm_output == "llm-output" + + def test_raising_step_triggers_llm_fallback(self): + ex = Executor(step_fn=_raise_step, llm_fn=_llm) + result = ex.execute(_wf()) + assert result.success is True + assert result.used_deterministic is False + + def test_llm_fallback_disabled_returns_failure(self): + ex = Executor(step_fn=_fail_step, llm_fn=_llm, llm_fallback=False) + result = ex.execute(_wf()) + assert result.success is False + assert "disabled" in (result.error or "") + + def test_run_with_fallback_no_workflow_calls_llm(self): + ex = Executor(step_fn=_ok_step, llm_fn=_llm) + result = ex.run_with_fallback(workflow=None) + assert result.success is True + assert result.used_deterministic is False + assert result.llm_output == "llm-output" + + def test_run_with_fallback_uses_workflow_when_given(self): + ex = Executor(step_fn=_ok_step, llm_fn=_llm) + result = ex.run_with_fallback(workflow=_wf()) + assert result.used_deterministic is True + + def test_llm_fn_raises_returns_failure(self): + def _bad_llm(): + raise ValueError("no model") + + ex = Executor(step_fn=_fail_step, llm_fn=_bad_llm) + result = ex.execute(_wf()) + assert result.success is False + assert "no model" in (result.error or "") + + def test_workflow_id_preserved_in_result(self): + ex = Executor(step_fn=_ok_step, llm_fn=_llm) + result = ex.execute(_wf()) + assert result.workflow_id == "osprey:ivr_navigate:abc" + + def test_empty_workflow_succeeds_immediately(self): + ex = Executor(step_fn=_ok_step, llm_fn=_llm) + result = ex.execute(_wf(steps=[])) + assert result.success is True + assert result.step_results == [] diff --git a/tests/test_pipeline/test_models.py b/tests/test_pipeline/test_models.py new file mode 100644 index 0000000..9c836ac --- /dev/null +++ b/tests/test_pipeline/test_models.py @@ -0,0 +1,102 @@ +"""Tests for pipeline models and hash_input utility.""" +import pytest +from circuitforge_core.pipeline.models import ( + CrystallizedWorkflow, + PipelineRun, + Step, + hash_input, +) + + +class TestHashInput: + def test_stable_across_calls(self): + feat = {"agency": "FTB", "menu_depth": 2} + assert hash_input(feat) == hash_input(feat) + + def test_key_order_irrelevant(self): + a = hash_input({"b": 2, "a": 1}) + b = hash_input({"a": 1, "b": 2}) + assert a == b + + def test_different_values_differ(self): + assert hash_input({"a": 1}) != hash_input({"a": 2}) + + def test_returns_hex_string(self): + h = hash_input({"x": "y"}) + assert isinstance(h, str) + assert len(h) == 64 # SHA-256 hex + + +class TestStep: + def test_roundtrip(self): + s = Step(action="dtmf", params={"digits": "1"}, description="Press 1") + assert Step.from_dict(s.to_dict()) == s + + def test_description_optional(self): + s = Step.from_dict({"action": "dtmf", "params": {}}) + assert s.description == "" + + +class TestPipelineRun: + def _run(self, **kwargs) -> PipelineRun: + defaults = dict( + run_id="r1", + product="osprey", + task_type="ivr_navigate", + input_hash="abc123", + steps=[Step("dtmf", {"digits": "1"})], + approved=True, + review_duration_ms=8000, + output_modified=False, + ) + defaults.update(kwargs) + return PipelineRun(**defaults) + + def test_roundtrip(self): + run = self._run() + assert PipelineRun.from_dict(run.to_dict()).run_id == "r1" + + def test_output_modified_false_default(self): + d = self._run().to_dict() + d.pop("output_modified", None) + run = PipelineRun.from_dict(d) + assert run.output_modified is False + + def test_timestamp_auto_set(self): + run = self._run() + assert run.timestamp # non-empty + + +class TestCrystallizedWorkflow: + def _wf(self) -> CrystallizedWorkflow: + return CrystallizedWorkflow( + workflow_id="osprey:ivr_navigate:abc123abc123", + product="osprey", + task_type="ivr_navigate", + input_hash="abc123", + steps=[Step("dtmf", {"digits": "1"})], + crystallized_at="2026-04-08T00:00:00+00:00", + run_ids=["r1", "r2", "r3"], + approval_count=3, + avg_review_duration_ms=9000, + all_output_unmodified=True, + ) + + def test_roundtrip(self): + wf = self._wf() + restored = CrystallizedWorkflow.from_dict(wf.to_dict()) + assert restored.workflow_id == wf.workflow_id + assert restored.avg_review_duration_ms == 9000 + assert restored.all_output_unmodified is True + + def test_active_default_true(self): + d = self._wf().to_dict() + d.pop("active", None) + wf = CrystallizedWorkflow.from_dict(d) + assert wf.active is True + + def test_version_default_one(self): + d = self._wf().to_dict() + d.pop("version", None) + wf = CrystallizedWorkflow.from_dict(d) + assert wf.version == 1 diff --git a/tests/test_pipeline/test_multimodal.py b/tests/test_pipeline/test_multimodal.py new file mode 100644 index 0000000..c720a83 --- /dev/null +++ b/tests/test_pipeline/test_multimodal.py @@ -0,0 +1,198 @@ +"""Tests for pipeline.MultimodalPipeline — mock vision and text backends.""" +import pytest +from unittest.mock import MagicMock, patch +from circuitforge_core.documents.models import Element, StructuredDocument +from circuitforge_core.pipeline.multimodal import ( + MultimodalConfig, + MultimodalPipeline, + PageResult, + _default_prompt, +) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _doc(text="extracted text", page=0) -> StructuredDocument: + return StructuredDocument( + elements=[Element(type="paragraph", text=text)], + raw_text=text, + ) + + +def _vision_ok(text="extracted text"): + """Mock DocuvisionClient.extract that returns a StructuredDocument.""" + mock = MagicMock() + mock.extract.return_value = _doc(text) + return mock + + +def _vision_fail(exc=None): + mock = MagicMock() + mock.extract.side_effect = exc or ConnectionError("service down") + return mock + + +def _generate_fn(prompt, max_tokens=512, temperature=0.7): + return f"generated: {prompt[:20]}" + + +def _stream_fn(prompt, max_tokens=512, temperature=0.7): + yield "tok1" + yield "tok2" + yield "tok3" + + +def _pipe(vision_mock=None, generate_fn=None, stream_fn=None, + vram_serialise=False, swap_fn=None, prompt_fn=None) -> MultimodalPipeline: + cfg = MultimodalConfig(vram_serialise=vram_serialise) + if prompt_fn: + cfg.prompt_fn = prompt_fn + pipe = MultimodalPipeline(cfg, generate_fn=generate_fn or _generate_fn, + stream_fn=stream_fn, swap_fn=swap_fn) + if vision_mock is not None: + pipe._vision = vision_mock + return pipe + + +# ── DefaultPrompt ───────────────────────────────────────────────────────────── + +class TestDefaultPrompt: + def test_page_zero_no_header(self): + doc = _doc("hello") + assert _default_prompt(0, doc) == "hello" + + def test_page_one_has_header(self): + doc = _doc("content") + prompt = _default_prompt(1, doc) + assert "[Page 2]" in prompt + assert "content" in prompt + + +# ── run() ───────────────────────────────────────────────────────────────────── + +class TestMultimodalPipelineRun: + def test_single_page_success(self): + pipe = _pipe(vision_mock=_vision_ok("resume text")) + results = list(pipe.run([b"page0_bytes"])) + assert len(results) == 1 + assert results[0].page_idx == 0 + assert results[0].error is None + assert "generated" in results[0].generated + + def test_multiple_pages_all_yielded(self): + pipe = _pipe(vision_mock=_vision_ok()) + results = list(pipe.run([b"p0", b"p1", b"p2"])) + assert len(results) == 3 + assert [r.page_idx for r in results] == [0, 1, 2] + + def test_vision_failure_yields_error_page(self): + pipe = _pipe(vision_mock=_vision_fail()) + results = list(pipe.run([b"p0"])) + assert results[0].error is not None + assert results[0].doc is None + assert results[0].generated == "" + + def test_partial_failure_does_not_stop_pipeline(self): + """One bad page should not prevent subsequent pages from processing.""" + mock = MagicMock() + mock.extract.side_effect = [ + ConnectionError("fail"), + _doc("good text"), + ] + pipe = _pipe(vision_mock=mock) + results = list(pipe.run([b"p0", b"p1"])) + assert results[0].error is not None + assert results[1].error is None + + def test_generation_failure_yields_error_page(self): + def _bad_gen(prompt, **kw): + raise RuntimeError("model OOM") + + pipe = _pipe(vision_mock=_vision_ok(), generate_fn=_bad_gen) + results = list(pipe.run([b"p0"])) + assert results[0].error is not None + assert "OOM" in results[0].error + + def test_doc_attached_to_result(self): + pipe = _pipe(vision_mock=_vision_ok("some text")) + results = list(pipe.run([b"p0"])) + assert results[0].doc is not None + assert results[0].doc.raw_text == "some text" + + def test_empty_pages_yields_nothing(self): + pipe = _pipe(vision_mock=_vision_ok()) + assert list(pipe.run([])) == [] + + def test_custom_prompt_fn_called(self): + calls = [] + + def _prompt_fn(page_idx, doc): + calls.append((page_idx, doc.raw_text)) + return f"custom:{doc.raw_text}" + + pipe = _pipe(vision_mock=_vision_ok("txt"), prompt_fn=_prompt_fn) + list(pipe.run([b"p0"])) + assert calls == [(0, "txt")] + + def test_vram_serialise_calls_swap_fn(self): + swaps = [] + pipe = _pipe(vision_mock=_vision_ok(), vram_serialise=True, + swap_fn=lambda: swaps.append(1)) + list(pipe.run([b"p0", b"p1"])) + assert len(swaps) == 2 # once per page + + def test_vram_serialise_false_no_swap_called(self): + swaps = [] + pipe = _pipe(vision_mock=_vision_ok(), vram_serialise=False, + swap_fn=lambda: swaps.append(1)) + list(pipe.run([b"p0"])) + assert swaps == [] + + def test_swap_fn_none_does_not_raise(self): + pipe = _pipe(vision_mock=_vision_ok(), vram_serialise=True, swap_fn=None) + results = list(pipe.run([b"p0"])) + assert results[0].error is None + + +# ── stream() ────────────────────────────────────────────────────────────────── + +class TestMultimodalPipelineStream: + def test_yields_page_idx_token_tuples(self): + pipe = _pipe(vision_mock=_vision_ok(), stream_fn=_stream_fn) + tokens = list(pipe.stream([b"p0"])) + assert all(isinstance(t, tuple) and len(t) == 2 for t in tokens) + assert tokens[0][0] == 0 # page_idx + assert tokens[0][1] == "tok1" + + def test_multiple_pages_interleaved_by_page(self): + pipe = _pipe(vision_mock=_vision_ok(), stream_fn=_stream_fn) + tokens = list(pipe.stream([b"p0", b"p1"])) + page_indices = [t[0] for t in tokens] + # All page-0 tokens come before page-1 tokens (pages are sequential) + assert page_indices == sorted(page_indices) + + def test_vision_failure_yields_error_token(self): + pipe = _pipe(vision_mock=_vision_fail(), stream_fn=_stream_fn) + tokens = list(pipe.stream([b"p0"])) + assert len(tokens) == 1 + assert "extraction error" in tokens[0][1] + + def test_stream_fn_error_yields_error_token(self): + def _bad_stream(prompt, **kw): + raise RuntimeError("GPU gone") + yield # make it a generator + + pipe = _pipe(vision_mock=_vision_ok(), stream_fn=_bad_stream) + tokens = list(pipe.stream([b"p0"])) + assert any("generation error" in t[1] for t in tokens) + + def test_empty_pages_yields_nothing(self): + pipe = _pipe(vision_mock=_vision_ok(), stream_fn=_stream_fn) + assert list(pipe.stream([])) == [] + + +# ── Import check ────────────────────────────────────────────────────────────── + +def test_exported_from_pipeline_package(): + from circuitforge_core.pipeline import MultimodalPipeline, MultimodalConfig, PageResult + assert MultimodalPipeline is not None diff --git a/tests/test_pipeline/test_recorder.py b/tests/test_pipeline/test_recorder.py new file mode 100644 index 0000000..e3262bd --- /dev/null +++ b/tests/test_pipeline/test_recorder.py @@ -0,0 +1,66 @@ +"""Tests for pipeline.Recorder.""" +import pytest +from circuitforge_core.pipeline.models import PipelineRun, Step +from circuitforge_core.pipeline.recorder import Recorder + + +def _run(run_id="r1", approved=True, input_hash="abc", review_ms=8000, + modified=False, ts="2026-04-08T01:00:00+00:00") -> PipelineRun: + return PipelineRun( + run_id=run_id, + product="osprey", + task_type="ivr_navigate", + input_hash=input_hash, + steps=[Step("dtmf", {"digits": "1"})], + approved=approved, + review_duration_ms=review_ms, + output_modified=modified, + timestamp=ts, + ) + + +class TestRecorder: + def test_record_creates_file(self, tmp_path): + rec = Recorder(root=tmp_path) + path = rec.record(_run()) + assert path.exists() + + def test_load_runs_empty_when_no_directory(self, tmp_path): + rec = Recorder(root=tmp_path) + assert rec.load_runs("osprey", "ivr_navigate") == [] + + def test_load_runs_returns_recorded(self, tmp_path): + rec = Recorder(root=tmp_path) + rec.record(_run("r1")) + rec.record(_run("r2")) + runs = rec.load_runs("osprey", "ivr_navigate") + assert len(runs) == 2 + + def test_load_runs_newest_first(self, tmp_path): + rec = Recorder(root=tmp_path) + rec.record(_run("r_old", ts="2026-01-01T00:00:00+00:00")) + rec.record(_run("r_new", ts="2026-04-08T00:00:00+00:00")) + runs = rec.load_runs("osprey", "ivr_navigate") + assert runs[0].run_id == "r_new" + + def test_load_approved_filters(self, tmp_path): + rec = Recorder(root=tmp_path) + rec.record(_run("r1", approved=True)) + rec.record(_run("r2", approved=False)) + approved = rec.load_approved("osprey", "ivr_navigate", "abc") + assert all(r.approved for r in approved) + assert len(approved) == 1 + + def test_load_approved_filters_by_hash(self, tmp_path): + rec = Recorder(root=tmp_path) + rec.record(_run("r1", input_hash="hash_a")) + rec.record(_run("r2", input_hash="hash_b")) + result = rec.load_approved("osprey", "ivr_navigate", "hash_a") + assert len(result) == 1 + assert result[0].run_id == "r1" + + def test_record_is_append_only(self, tmp_path): + rec = Recorder(root=tmp_path) + for i in range(5): + rec.record(_run(f"r{i}")) + assert len(rec.load_runs("osprey", "ivr_navigate")) == 5 diff --git a/tests/test_pipeline/test_registry.py b/tests/test_pipeline/test_registry.py new file mode 100644 index 0000000..b86dad6 --- /dev/null +++ b/tests/test_pipeline/test_registry.py @@ -0,0 +1,104 @@ +"""Tests for pipeline.Registry — workflow lookup.""" +import pytest +from circuitforge_core.pipeline.models import CrystallizedWorkflow, Step +from circuitforge_core.pipeline.registry import Registry + + +def _wf(input_hash="abc", active=True, wf_id=None) -> CrystallizedWorkflow: + wid = wf_id or f"osprey:ivr_navigate:{input_hash[:12]}" + return CrystallizedWorkflow( + workflow_id=wid, + product="osprey", + task_type="ivr_navigate", + input_hash=input_hash, + steps=[Step("dtmf", {"digits": "1"})], + crystallized_at="2026-04-08T00:00:00+00:00", + run_ids=["r1", "r2", "r3"], + approval_count=3, + avg_review_duration_ms=9000, + all_output_unmodified=True, + active=active, + ) + + +class TestRegistry: + def test_register_creates_file(self, tmp_path): + reg = Registry(root=tmp_path) + path = reg.register(_wf()) + assert path.exists() + + def test_load_all_empty_when_no_directory(self, tmp_path): + reg = Registry(root=tmp_path) + assert reg.load_all("osprey", "ivr_navigate") == [] + + def test_load_all_returns_registered(self, tmp_path): + reg = Registry(root=tmp_path) + reg.register(_wf("hash_a", wf_id="osprey:ivr_navigate:hash_a")) + reg.register(_wf("hash_b", wf_id="osprey:ivr_navigate:hash_b")) + assert len(reg.load_all("osprey", "ivr_navigate")) == 2 + + def test_match_exact_hit(self, tmp_path): + reg = Registry(root=tmp_path) + reg.register(_wf("abc123")) + wf = reg.match("osprey", "ivr_navigate", "abc123") + assert wf is not None + assert wf.input_hash == "abc123" + + def test_match_returns_none_on_miss(self, tmp_path): + reg = Registry(root=tmp_path) + reg.register(_wf("abc123")) + assert reg.match("osprey", "ivr_navigate", "different") is None + + def test_match_ignores_inactive(self, tmp_path): + reg = Registry(root=tmp_path) + reg.register(_wf("abc123", active=False)) + assert reg.match("osprey", "ivr_navigate", "abc123") is None + + def test_deactivate_sets_active_false(self, tmp_path): + reg = Registry(root=tmp_path) + wf = _wf("abc123") + reg.register(wf) + reg.deactivate(wf.workflow_id, "osprey", "ivr_navigate") + assert reg.match("osprey", "ivr_navigate", "abc123") is None + + def test_deactivate_returns_false_when_not_found(self, tmp_path): + reg = Registry(root=tmp_path) + assert reg.deactivate("nonexistent", "osprey", "ivr_navigate") is False + + def test_find_falls_through_to_fuzzy(self, tmp_path): + reg = Registry(root=tmp_path, + similarity_fn=lambda a, b: 1.0 if a == b else 0.5, + fuzzy_threshold=0.4) + reg.register(_wf("hash_stored")) + # No exact match for "hash_query" but similarity returns 0.5 >= 0.4 + wf = reg.find("osprey", "ivr_navigate", "hash_query") + assert wf is not None + + def test_fuzzy_match_raises_without_fn(self, tmp_path): + reg = Registry(root=tmp_path) + with pytest.raises(RuntimeError, match="similarity_fn"): + reg.fuzzy_match("osprey", "ivr_navigate", "any") + + def test_fuzzy_match_below_threshold_returns_none(self, tmp_path): + reg = Registry(root=tmp_path, + similarity_fn=lambda a, b: 0.1, + fuzzy_threshold=0.8) + reg.register(_wf("hash_stored")) + assert reg.fuzzy_match("osprey", "ivr_navigate", "hash_query") is None + + def test_find_exact_takes_priority(self, tmp_path): + reg = Registry(root=tmp_path, + similarity_fn=lambda a, b: 0.9, + fuzzy_threshold=0.8) + reg.register(_wf("exact_hash")) + wf = reg.find("osprey", "ivr_navigate", "exact_hash") + # Should be the exact-match workflow + assert wf.input_hash == "exact_hash" + + def test_workflow_id_colon_safe_in_filename(self, tmp_path): + """Colons in workflow_id must not break file creation on any OS.""" + reg = Registry(root=tmp_path) + wf = _wf("abc", wf_id="osprey:ivr_navigate:abc123abc123") + path = reg.register(wf) + assert path.exists() + assert ":" not in path.name diff --git a/tests/test_preferences.py b/tests/test_preferences.py index 1934536..ddf5cb5 100644 --- a/tests/test_preferences.py +++ b/tests/test_preferences.py @@ -120,6 +120,93 @@ class TestLocalFileStore: from circuitforge_core.preferences import get_user_preference, set_user_preference +from circuitforge_core.preferences.accessibility import ( + is_reduced_motion_preferred, + is_high_contrast, + get_font_size, + is_screen_reader_mode, + set_reduced_motion, + PREF_REDUCED_MOTION, + PREF_HIGH_CONTRAST, + PREF_FONT_SIZE, + PREF_SCREEN_READER, +) + + +class TestAccessibilityPreferences: + def _store(self, tmp_path) -> LocalFileStore: + return LocalFileStore(prefs_path=tmp_path / "preferences.yaml") + + def test_reduced_motion_default_false(self, tmp_path): + store = self._store(tmp_path) + assert is_reduced_motion_preferred(store=store) is False + + def test_set_reduced_motion_persists(self, tmp_path): + store = self._store(tmp_path) + set_reduced_motion(True, store=store) + assert is_reduced_motion_preferred(store=store) is True + + def test_reduced_motion_false_roundtrip(self, tmp_path): + store = self._store(tmp_path) + set_reduced_motion(True, store=store) + set_reduced_motion(False, store=store) + assert is_reduced_motion_preferred(store=store) is False + + def test_high_contrast_default_false(self, tmp_path): + store = self._store(tmp_path) + assert is_high_contrast(store=store) is False + + def test_high_contrast_set_and_read(self, tmp_path): + store = self._store(tmp_path) + store.set(user_id=None, path=PREF_HIGH_CONTRAST, value=True) + assert is_high_contrast(store=store) is True + + def test_font_size_default(self, tmp_path): + store = self._store(tmp_path) + assert get_font_size(store=store) == "default" + + def test_font_size_large(self, tmp_path): + store = self._store(tmp_path) + store.set(user_id=None, path=PREF_FONT_SIZE, value="large") + assert get_font_size(store=store) == "large" + + def test_font_size_xlarge(self, tmp_path): + store = self._store(tmp_path) + store.set(user_id=None, path=PREF_FONT_SIZE, value="xlarge") + assert get_font_size(store=store) == "xlarge" + + def test_font_size_invalid_falls_back_to_default(self, tmp_path): + store = self._store(tmp_path) + store.set(user_id=None, path=PREF_FONT_SIZE, value="gigantic") + assert get_font_size(store=store) == "default" + + def test_screen_reader_mode_default_false(self, tmp_path): + store = self._store(tmp_path) + assert is_screen_reader_mode(store=store) is False + + def test_screen_reader_mode_set(self, tmp_path): + store = self._store(tmp_path) + store.set(user_id=None, path=PREF_SCREEN_READER, value=True) + assert is_screen_reader_mode(store=store) is True + + def test_preferences_are_independent(self, tmp_path): + """Setting one a11y pref doesn't affect others.""" + store = self._store(tmp_path) + set_reduced_motion(True, store=store) + assert is_high_contrast(store=store) is False + assert get_font_size(store=store) == "default" + assert is_screen_reader_mode(store=store) is False + + def test_user_id_threaded_through(self, tmp_path): + """user_id param is accepted (LocalFileStore ignores it, but must not error).""" + store = self._store(tmp_path) + set_reduced_motion(True, user_id="u999", store=store) + assert is_reduced_motion_preferred(user_id="u999", store=store) is True + + def test_accessibility_exported_from_package(self): + from circuitforge_core.preferences import accessibility + assert hasattr(accessibility, "is_reduced_motion_preferred") + assert hasattr(accessibility, "PREF_REDUCED_MOTION") class TestPreferenceHelpers: diff --git a/tests/test_text/__init__.py b/tests/test_text/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_text/test_backend.py b/tests/test_text/test_backend.py new file mode 100644 index 0000000..7fe6c1d --- /dev/null +++ b/tests/test_text/test_backend.py @@ -0,0 +1,190 @@ +"""Tests for cf-text backend selection, mock backend, and public API.""" +import os +import pytest + +from circuitforge_core.text.backends.base import ( + ChatMessage, + GenerateResult, + TextBackend, + _select_backend, + make_text_backend, +) +from circuitforge_core.text.backends.mock import MockTextBackend +from circuitforge_core.text import generate, chat, reset_backend, make_backend + + +# ── _select_backend ─────────────────────────────────────────────────────────── + +class TestSelectBackend: + def test_explicit_llamacpp(self): + assert _select_backend("model.gguf", "llamacpp") == "llamacpp" + + def test_explicit_transformers(self): + assert _select_backend("model.gguf", "transformers") == "transformers" + + def test_explicit_invalid_raises(self): + with pytest.raises(ValueError, match="not valid"): + _select_backend("model.gguf", "ctransformers") + + def test_env_override_llamacpp(self, monkeypatch): + monkeypatch.setenv("CF_TEXT_BACKEND", "llamacpp") + assert _select_backend("Qwen/Qwen2.5-3B", None) == "llamacpp" + + def test_env_override_transformers(self, monkeypatch): + monkeypatch.setenv("CF_TEXT_BACKEND", "transformers") + assert _select_backend("model.gguf", None) == "transformers" + + def test_env_override_invalid_raises(self, monkeypatch): + monkeypatch.setenv("CF_TEXT_BACKEND", "ctransformers") + with pytest.raises(ValueError): + _select_backend("model.gguf", None) + + def test_caller_beats_env(self, monkeypatch): + monkeypatch.setenv("CF_TEXT_BACKEND", "transformers") + assert _select_backend("model.gguf", "llamacpp") == "llamacpp" + + def test_gguf_extension_selects_llamacpp(self, monkeypatch): + monkeypatch.delenv("CF_TEXT_BACKEND", raising=False) + assert _select_backend("/models/qwen2.5-3b-q4.gguf", None) == "llamacpp" + + def test_gguf_uppercase_extension(self, monkeypatch): + monkeypatch.delenv("CF_TEXT_BACKEND", raising=False) + assert _select_backend("/models/model.GGUF", None) == "llamacpp" + + def test_hf_repo_id_selects_transformers(self, monkeypatch): + monkeypatch.delenv("CF_TEXT_BACKEND", raising=False) + assert _select_backend("Qwen/Qwen2.5-3B-Instruct", None) == "transformers" + + def test_safetensors_dir_selects_transformers(self, monkeypatch): + monkeypatch.delenv("CF_TEXT_BACKEND", raising=False) + assert _select_backend("/models/qwen2.5-3b/", None) == "transformers" + + +# ── ChatMessage ─────────────────────────────────────────────────────────────── + +class TestChatMessage: + def test_valid_roles(self): + for role in ("system", "user", "assistant"): + msg = ChatMessage(role, "hello") + assert msg.role == role + + def test_invalid_role_raises(self): + with pytest.raises(ValueError, match="Invalid role"): + ChatMessage("bot", "hello") + + def test_to_dict(self): + msg = ChatMessage("user", "hello") + assert msg.to_dict() == {"role": "user", "content": "hello"} + + +# ── MockTextBackend ─────────────────────────────────────────────────────────── + +class TestMockTextBackend: + def test_generate_returns_result(self): + backend = MockTextBackend() + result = backend.generate("write something") + assert isinstance(result, GenerateResult) + assert len(result.text) > 0 + + def test_vram_mb_is_zero(self): + assert MockTextBackend().vram_mb == 0 + + def test_model_name(self): + assert MockTextBackend(model_name="test-model").model_name == "test-model" + + def test_generate_stream_yields_tokens(self): + backend = MockTextBackend() + tokens = list(backend.generate_stream("hello")) + assert len(tokens) > 0 + assert "".join(tokens).strip() == backend.generate("hello").text.strip() + + @pytest.mark.asyncio + async def test_generate_async(self): + backend = MockTextBackend() + result = await backend.generate_async("hello") + assert isinstance(result, GenerateResult) + + @pytest.mark.asyncio + async def test_generate_stream_async(self): + backend = MockTextBackend() + tokens = [] + async for token in backend.generate_stream_async("hello"): + tokens.append(token) + assert len(tokens) > 0 + + def test_chat(self): + backend = MockTextBackend() + messages = [ChatMessage("user", "hello")] + result = backend.chat(messages) + assert isinstance(result, GenerateResult) + + def test_isinstance_protocol(self): + assert isinstance(MockTextBackend(), TextBackend) + + +# ── make_text_backend ───────────────────────────────────────────────────────── + +class TestMakeTextBackend: + def test_mock_flag(self): + backend = make_text_backend("any-model", mock=True) + assert isinstance(backend, MockTextBackend) + + def test_mock_env(self, monkeypatch): + monkeypatch.setenv("CF_TEXT_MOCK", "1") + backend = make_text_backend("any-model") + assert isinstance(backend, MockTextBackend) + + def test_real_gguf_raises_import_error(self, monkeypatch): + monkeypatch.delenv("CF_TEXT_MOCK", raising=False) + monkeypatch.delenv("CF_TEXT_BACKEND", raising=False) + with pytest.raises((ImportError, FileNotFoundError)): + make_text_backend("/nonexistent/model.gguf", mock=False) + + def test_real_transformers_nonexistent_model_raises(self, monkeypatch): + monkeypatch.delenv("CF_TEXT_MOCK", raising=False) + monkeypatch.setenv("CF_TEXT_BACKEND", "transformers") + # Use a clearly nonexistent local path — avoids a network hit and HF download + with pytest.raises(Exception): + make_text_backend("/nonexistent/local/model-dir", mock=False) + + +# ── Public API (singleton) ──────────────────────────────────────────────────── + +class TestPublicAPI: + def setup_method(self): + reset_backend() + + def teardown_method(self): + reset_backend() + + def test_generate_mock(self, monkeypatch): + monkeypatch.setenv("CF_TEXT_MOCK", "1") + result = generate("write something") + assert isinstance(result, GenerateResult) + + def test_generate_stream_mock(self, monkeypatch): + monkeypatch.setenv("CF_TEXT_MOCK", "1") + tokens = list(generate("hello", stream=True)) + assert len(tokens) > 0 + + def test_chat_mock(self, monkeypatch): + monkeypatch.setenv("CF_TEXT_MOCK", "1") + result = chat([ChatMessage("user", "hello")]) + assert isinstance(result, GenerateResult) + + def test_chat_stream_raises(self, monkeypatch): + monkeypatch.setenv("CF_TEXT_MOCK", "1") + with pytest.raises(NotImplementedError): + chat([ChatMessage("user", "hello")], stream=True) + + def test_make_backend_returns_mock(self): + backend = make_backend("any", mock=True) + assert isinstance(backend, MockTextBackend) + + def test_singleton_reused(self, monkeypatch): + monkeypatch.setenv("CF_TEXT_MOCK", "1") + r1 = generate("a") + r2 = generate("b") + # Both calls should succeed (singleton loaded once) + assert isinstance(r1, GenerateResult) + assert isinstance(r2, GenerateResult)