feat: v0.9.0 — cf-text, pipeline crystallization engine, multimodal pipeline, a11y preferences
Some checks failed
CI / test (push) Waiting to run
Mirror / mirror (push) Has been cancelled
Release — PyPI / release (push) Has been cancelled

Closes #33, #37, #38, #41, #42.

## cf-text (closes #41)
- New module: `circuitforge_core.text` — direct local inference bypassing ollama/vllm
- Backends: llama.cpp (GGUF), transformers (HF), mock
- Auto-detects backend from file extension; CF_TEXT_BACKEND env override
- Optional 4-bit/8-bit quantisation via bitsandbytes (CF_TEXT_4BIT / CF_TEXT_8BIT)
- process-level singleton + per-request `make_backend()` path

## Pipeline crystallization engine (closes #33, #37)
- FPGA→ASIC model: LLM-discovered paths → deterministic workflows after N approvals
- `models.py`: PipelineRun (incl. review_duration_ms + output_modified per #37),
  CrystallizedWorkflow, Step, hash_input()
- `recorder.py`: append-only JSON run log under ~/.config/circuitforge/pipeline/
- `crystallizer.py`: threshold check, majority/most-recent step strategy,
  rubber-stamp warning (review_duration_ms < 5s triggers warnings.warn)
- `registry.py`: exact + fuzzy match, deactivate-without-delete, colon-safe filenames
- `executor.py`: deterministic steps with transparent LLM fallback

## Multimodal chunked pipeline (closes #42)
- `pipeline/multimodal.py`: cf-docuvision pages → cf-text streaming
- `run()` yields PageResult per page (progressive, no full-doc buffer)
- `stream()` yields (page_idx, token) tuples for token-level UI rendering
- `vram_serialise` flag + `swap_fn` hook for 8GB GPU VRAM management
- `prompt_fn` callback for product-specific prompt construction

## Accessibility preferences (closes #38)
- `preferences/accessibility.py`: PREF_REDUCED_MOTION, PREF_HIGH_CONTRAST,
  PREF_FONT_SIZE, PREF_SCREEN_READER with get/set helpers
- Exported from preferences package __init__

## LLM router fix
- cf-orch backends: skip reachability pre-check; allocation starts the service
- Static backends: reachability check remains in place
This commit is contained in:
pyr0ball 2026-04-08 23:17:18 -07:00
parent 3075e5d3da
commit 80b0d5fd34
27 changed files with 2983 additions and 6 deletions

View file

@ -199,15 +199,18 @@ class LLMRouter:
continue
elif backend["type"] == "openai_compat":
if not self._is_reachable(backend["base_url"]):
print(f"[LLMRouter] {name}: unreachable, skipping")
continue
# --- cf_orch: optionally override base_url with coordinator-allocated URL ---
# cf_orch: try allocation first — this may start the service on-demand.
# Do NOT reachability-check before allocating; the service may be stopped
# and the allocation is what starts it.
orch_ctx = orch_alloc = None
orch_result = self._try_cf_orch_alloc(backend)
if orch_result is not None:
orch_ctx, orch_alloc = orch_result
backend = {**backend, "base_url": orch_alloc.url + "/v1"}
elif not self._is_reachable(backend["base_url"]):
# Static backend (no cf-orch) — skip if not reachable.
print(f"[LLMRouter] {name}: unreachable, skipping")
continue
try:
client = OpenAI(
base_url=backend["base_url"],

View file

@ -1,3 +1,43 @@
# circuitforge_core/pipeline — FPGA→ASIC crystallization engine
#
# Public API: call pipeline.run() from product code instead of llm.router directly.
# The module transparently checks for crystallized workflows first, falls back
# to LLM when none match, and records each run for future crystallization.
from __future__ import annotations
from typing import Any, Callable
from .crystallizer import CrystallizerConfig, crystallize, evaluate_new_run, should_crystallize
from .executor import ExecutionResult, Executor, StepResult
from .models import CrystallizedWorkflow, PipelineRun, Step, hash_input
from .multimodal import MultimodalConfig, MultimodalPipeline, PageResult
from .recorder import Recorder
from .registry import Registry
from .staging import StagingDB
__all__ = ["StagingDB"]
__all__ = [
# models
"PipelineRun",
"CrystallizedWorkflow",
"Step",
"hash_input",
# recorder
"Recorder",
# crystallizer
"CrystallizerConfig",
"crystallize",
"evaluate_new_run",
"should_crystallize",
# registry
"Registry",
# executor
"Executor",
"ExecutionResult",
"StepResult",
# multimodal
"MultimodalPipeline",
"MultimodalConfig",
"PageResult",
# legacy stub
"StagingDB",
]

View file

@ -0,0 +1,177 @@
# circuitforge_core/pipeline/crystallizer.py — promote approved runs → workflows
#
# MIT — pure logic, no inference backends.
from __future__ import annotations
import logging
import warnings
from collections import Counter
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Literal
from .models import CrystallizedWorkflow, PipelineRun, Step
from .recorder import Recorder
log = logging.getLogger(__name__)
# Minimum milliseconds of review that counts as "genuine".
# Runs shorter than this are accepted but trigger a warning.
_RUBBER_STAMP_THRESHOLD_MS = 5_000
@dataclass
class CrystallizerConfig:
"""Tuning knobs for one product/task-type pair.
threshold:
Minimum number of approved runs required before crystallization.
Osprey sets this to 1 (first successful IVR navigation is enough);
Peregrine uses 3+ for cover-letter templates.
min_review_ms:
Approved runs with review_duration_ms below this value generate a
warning. Set to 0 to silence the check (tests, automated approvals).
strategy:
``"most_recent"`` use the latest approved run's steps verbatim.
``"majority"`` pick each step by majority vote across runs (requires
runs to have the same step count; falls back to most_recent otherwise).
"""
threshold: int = 3
min_review_ms: int = _RUBBER_STAMP_THRESHOLD_MS
strategy: Literal["most_recent", "majority"] = "most_recent"
# ── Helpers ───────────────────────────────────────────────────────────────────
def _majority_steps(runs: list[PipelineRun]) -> list[Step] | None:
"""Return majority-voted steps, or None if run lengths differ."""
lengths = {len(r.steps) for r in runs}
if len(lengths) != 1:
return None
n = lengths.pop()
result: list[Step] = []
for i in range(n):
counter: Counter[str] = Counter()
step_by_action: dict[str, Step] = {}
for r in runs:
s = r.steps[i]
counter[s.action] += 1
step_by_action[s.action] = s
winner = counter.most_common(1)[0][0]
result.append(step_by_action[winner])
return result
def _check_review_quality(runs: list[PipelineRun],
min_review_ms: int) -> None:
"""Warn if any run has a suspiciously short review duration."""
if min_review_ms <= 0:
return
flagged = [r for r in runs if r.review_duration_ms < min_review_ms]
if flagged:
ids = ", ".join(r.run_id for r in flagged)
warnings.warn(
f"Crystallizing from {len(flagged)} run(s) with review_duration_ms "
f"< {min_review_ms} ms — possible rubber-stamp approval: [{ids}]. "
"Verify these were genuinely human-reviewed before deployment.",
stacklevel=3,
)
# ── Public API ────────────────────────────────────────────────────────────────
def should_crystallize(runs: list[PipelineRun],
config: CrystallizerConfig) -> bool:
"""Return True if *runs* meet the threshold for crystallization."""
approved = [r for r in runs if r.approved]
return len(approved) >= config.threshold
def crystallize(runs: list[PipelineRun],
config: CrystallizerConfig,
existing_version: int = 0) -> CrystallizedWorkflow:
"""Promote *runs* into a CrystallizedWorkflow.
Raises
------
ValueError
If fewer approved runs than ``config.threshold``, or if the runs
span more than one (product, task_type, input_hash) triple.
"""
approved = [r for r in runs if r.approved]
if len(approved) < config.threshold:
raise ValueError(
f"Need {config.threshold} approved runs, got {len(approved)}."
)
# Validate homogeneity
products = {r.product for r in approved}
task_types = {r.task_type for r in approved}
hashes = {r.input_hash for r in approved}
if len(products) != 1 or len(task_types) != 1 or len(hashes) != 1:
raise ValueError(
"All runs must share the same product, task_type, and input_hash. "
f"Got products={products}, task_types={task_types}, hashes={hashes}."
)
product = products.pop()
task_type = task_types.pop()
input_hash = hashes.pop()
_check_review_quality(approved, config.min_review_ms)
# Pick canonical steps
if config.strategy == "majority":
steps = _majority_steps(approved) or approved[-1].steps
else:
steps = sorted(approved, key=lambda r: r.timestamp)[-1].steps
avg_ms = sum(r.review_duration_ms for r in approved) // len(approved)
all_unmodified = all(not r.output_modified for r in approved)
workflow_id = f"{product}:{task_type}:{input_hash[:12]}"
return CrystallizedWorkflow(
workflow_id=workflow_id,
product=product,
task_type=task_type,
input_hash=input_hash,
steps=steps,
crystallized_at=datetime.now(timezone.utc).isoformat(),
run_ids=[r.run_id for r in approved],
approval_count=len(approved),
avg_review_duration_ms=avg_ms,
all_output_unmodified=all_unmodified,
version=existing_version + 1,
)
def evaluate_new_run(
run: PipelineRun,
recorder: Recorder,
config: CrystallizerConfig,
existing_version: int = 0,
) -> CrystallizedWorkflow | None:
"""Record *run* and return a new workflow if the threshold is now met.
Products call this after each human-approved execution. Returns a
``CrystallizedWorkflow`` if crystallization was triggered, ``None``
otherwise.
"""
recorder.record(run)
if not run.approved:
return None
all_runs = recorder.load_approved(run.product, run.task_type, run.input_hash)
if not should_crystallize(all_runs, config):
log.debug(
"pipeline: %d/%d approved runs for %s:%s — not yet crystallizing",
len(all_runs), config.threshold, run.product, run.task_type,
)
return None
workflow = crystallize(all_runs, config, existing_version=existing_version)
log.info(
"pipeline: crystallized %s after %d approvals",
workflow.workflow_id, workflow.approval_count,
)
return workflow

View file

@ -0,0 +1,157 @@
# circuitforge_core/pipeline/executor.py — deterministic execution with LLM fallback
#
# MIT — orchestration logic only; calls product-supplied callables.
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any, Callable
from .models import CrystallizedWorkflow, Step
log = logging.getLogger(__name__)
@dataclass
class StepResult:
step: Step
success: bool
output: Any = None
error: str | None = None
@dataclass
class ExecutionResult:
"""Result of running a workflow (deterministic or LLM-assisted).
Attributes
----------
success:
True if all steps completed without error.
used_deterministic:
True if a crystallized workflow was used; False if LLM was called.
step_results:
Per-step outcomes from the deterministic path.
llm_output:
Raw output from the LLM fallback path, if used.
workflow_id:
ID of the workflow used, or None for LLM path.
error:
Error message if the run failed entirely.
"""
success: bool
used_deterministic: bool
step_results: list[StepResult] = field(default_factory=list)
llm_output: Any = None
workflow_id: str | None = None
error: str | None = None
# ── Executor ──────────────────────────────────────────────────────────────────
class Executor:
"""Runs crystallized workflows with transparent LLM fallback.
Parameters
----------
step_fn:
Called for each Step: ``step_fn(step) -> (success, output)``.
The product supplies this it knows how to turn a Step into a real
action (DTMF dial, HTTP call, form field write, etc.).
llm_fn:
Called when no workflow matches or a step fails: ``llm_fn() -> output``.
Products wire this to ``cf_core.llm.router`` or equivalent.
llm_fallback:
If False, raise RuntimeError instead of calling llm_fn on miss.
"""
def __init__(
self,
step_fn: Callable[[Step], tuple[bool, Any]],
llm_fn: Callable[[], Any],
llm_fallback: bool = True,
) -> None:
self._step_fn = step_fn
self._llm_fn = llm_fn
self._llm_fallback = llm_fallback
def execute(
self,
workflow: CrystallizedWorkflow,
) -> ExecutionResult:
"""Run *workflow* deterministically.
If a step fails, falls back to LLM (if ``llm_fallback`` is enabled).
"""
step_results: list[StepResult] = []
for step in workflow.steps:
try:
success, output = self._step_fn(step)
except Exception as exc:
log.warning("step %s raised: %s", step.action, exc)
success, output = False, None
error_str = str(exc)
else:
error_str = None if success else "step_fn returned success=False"
step_results.append(StepResult(step=step, success=success,
output=output, error=error_str))
if not success:
log.info(
"workflow %s: step %s failed — triggering LLM fallback",
workflow.workflow_id, step.action,
)
return self._llm_fallback_result(
step_results, workflow.workflow_id
)
log.info("workflow %s: all %d steps succeeded",
workflow.workflow_id, len(workflow.steps))
return ExecutionResult(
success=True,
used_deterministic=True,
step_results=step_results,
workflow_id=workflow.workflow_id,
)
def run_with_fallback(
self,
workflow: CrystallizedWorkflow | None,
) -> ExecutionResult:
"""Run *workflow* if provided; otherwise call the LLM directly."""
if workflow is None:
return self._llm_fallback_result([], workflow_id=None)
return self.execute(workflow)
# ── Internal ──────────────────────────────────────────────────────────────
def _llm_fallback_result(
self,
partial_steps: list[StepResult],
workflow_id: str | None,
) -> ExecutionResult:
if not self._llm_fallback:
return ExecutionResult(
success=False,
used_deterministic=True,
step_results=partial_steps,
workflow_id=workflow_id,
error="LLM fallback disabled and deterministic path failed.",
)
try:
llm_output = self._llm_fn()
except Exception as exc:
return ExecutionResult(
success=False,
used_deterministic=False,
step_results=partial_steps,
workflow_id=workflow_id,
error=f"LLM fallback raised: {exc}",
)
return ExecutionResult(
success=True,
used_deterministic=False,
step_results=partial_steps,
llm_output=llm_output,
workflow_id=workflow_id,
)

View file

@ -0,0 +1,216 @@
# circuitforge_core/pipeline/models.py — crystallization data models
#
# MIT — protocol and model types only; no inference backends.
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
# ── Utilities ─────────────────────────────────────────────────────────────────
def hash_input(features: dict[str, Any]) -> str:
"""Return a stable SHA-256 hex digest of *features*.
Sorts keys before serialising so insertion order doesn't affect the hash.
Only call this on already-normalised, PII-free feature dicts the hash is
opaque but the source dict should never contain raw user data.
"""
canonical = json.dumps(features, sort_keys=True, ensure_ascii=True)
return hashlib.sha256(canonical.encode()).hexdigest()
# ── Step ──────────────────────────────────────────────────────────────────────
@dataclass
class Step:
"""One atomic action in a deterministic workflow.
The ``action`` string is product-defined (e.g. ``"dtmf"``, ``"field_fill"``,
``"api_call"``). ``params`` carries action-specific values; ``description``
is a plain-English summary for the approval UI.
"""
action: str
params: dict[str, Any]
description: str = ""
def to_dict(self) -> dict[str, Any]:
return {"action": self.action, "params": self.params,
"description": self.description}
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "Step":
return cls(action=d["action"], params=d.get("params", {}),
description=d.get("description", ""))
# ── PipelineRun ───────────────────────────────────────────────────────────────
@dataclass
class PipelineRun:
"""Record of one LLM-assisted execution — the raw material for crystallization.
Fields
------
run_id:
UUID or unique string identifying this run.
product:
CF product code (``"osprey"``, ``"falcon"``, ``"peregrine"`` ).
task_type:
Product-defined task category (``"ivr_navigate"``, ``"form_fill"`` ).
input_hash:
SHA-256 of normalised, PII-free input features. Never store raw input.
steps:
Ordered list of Steps the LLM proposed.
approved:
True if a human approved this run before execution.
review_duration_ms:
Wall-clock milliseconds between displaying the proposal and the approval
click. Values under ~5 000 ms indicate a rubber-stamp the
crystallizer may reject runs with suspiciously short reviews.
output_modified:
True if the user edited any step before approving. Modifications suggest
the LLM proposal was imperfect; too-easy crystallization from unmodified
runs may mean the task is already deterministic and the LLM is just
echoing a fixed pattern.
timestamp:
ISO 8601 UTC creation time.
llm_model:
Model ID that generated the steps, e.g. ``"llama3:8b-instruct"``.
metadata:
Freeform dict for product-specific extra fields.
"""
run_id: str
product: str
task_type: str
input_hash: str
steps: list[Step]
approved: bool
review_duration_ms: int
output_modified: bool
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
llm_model: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return {
"run_id": self.run_id,
"product": self.product,
"task_type": self.task_type,
"input_hash": self.input_hash,
"steps": [s.to_dict() for s in self.steps],
"approved": self.approved,
"review_duration_ms": self.review_duration_ms,
"output_modified": self.output_modified,
"timestamp": self.timestamp,
"llm_model": self.llm_model,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "PipelineRun":
return cls(
run_id=d["run_id"],
product=d["product"],
task_type=d["task_type"],
input_hash=d["input_hash"],
steps=[Step.from_dict(s) for s in d.get("steps", [])],
approved=d["approved"],
review_duration_ms=d["review_duration_ms"],
output_modified=d.get("output_modified", False),
timestamp=d.get("timestamp", ""),
llm_model=d.get("llm_model"),
metadata=d.get("metadata", {}),
)
# ── CrystallizedWorkflow ──────────────────────────────────────────────────────
@dataclass
class CrystallizedWorkflow:
"""A deterministic workflow promoted from N approved PipelineRuns.
Once crystallized, the executor runs ``steps`` directly no LLM required
unless an edge case is encountered.
Fields
------
workflow_id:
Unique identifier (typically ``{product}:{task_type}:{input_hash[:12]}``).
product / task_type / input_hash:
Same semantics as PipelineRun; the hash is the lookup key.
steps:
Canonical deterministic step sequence (majority-voted or most-recent,
per CrystallizerConfig.strategy).
crystallized_at:
ISO 8601 UTC timestamp.
run_ids:
IDs of the source PipelineRuns that contributed to this workflow.
approval_count:
Number of approved runs that went into crystallization.
avg_review_duration_ms:
Mean review_duration_ms across all source runs low values are a
warning sign that approvals may not have been genuine.
all_output_unmodified:
True if every contributing run had output_modified=False. Combined with
a very short avg_review_duration_ms this can flag workflows that may
have crystallized from rubber-stamp approvals.
active:
Whether this workflow is in use. Set to False to disable without
deleting the record.
version:
Increments each time the workflow is re-crystallized from new runs.
"""
workflow_id: str
product: str
task_type: str
input_hash: str
steps: list[Step]
crystallized_at: str
run_ids: list[str]
approval_count: int
avg_review_duration_ms: int
all_output_unmodified: bool
active: bool = True
version: int = 1
metadata: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
return {
"workflow_id": self.workflow_id,
"product": self.product,
"task_type": self.task_type,
"input_hash": self.input_hash,
"steps": [s.to_dict() for s in self.steps],
"crystallized_at": self.crystallized_at,
"run_ids": self.run_ids,
"approval_count": self.approval_count,
"avg_review_duration_ms": self.avg_review_duration_ms,
"all_output_unmodified": self.all_output_unmodified,
"active": self.active,
"version": self.version,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, d: dict[str, Any]) -> "CrystallizedWorkflow":
return cls(
workflow_id=d["workflow_id"],
product=d["product"],
task_type=d["task_type"],
input_hash=d["input_hash"],
steps=[Step.from_dict(s) for s in d.get("steps", [])],
crystallized_at=d["crystallized_at"],
run_ids=d.get("run_ids", []),
approval_count=d["approval_count"],
avg_review_duration_ms=d["avg_review_duration_ms"],
all_output_unmodified=d.get("all_output_unmodified", True),
active=d.get("active", True),
version=d.get("version", 1),
metadata=d.get("metadata", {}),
)

View file

@ -0,0 +1,234 @@
# circuitforge_core/pipeline/multimodal.py — cf-docuvision + cf-text pipeline
#
# MIT — orchestration only; vision and text inference stay in their own modules.
#
# Usage (minimal):
#
# from circuitforge_core.pipeline.multimodal import MultimodalPipeline, MultimodalConfig
#
# pipe = MultimodalPipeline(MultimodalConfig())
# for result in pipe.run(page_bytes_list):
# print(f"Page {result.page_idx}: {result.generated[:80]}")
#
# Streaming (token-by-token):
#
# for page_idx, token in pipe.stream(page_bytes_list):
# ui.append(page_idx, token)
#
from __future__ import annotations
import logging
from collections.abc import Callable, Iterable, Iterator
from dataclasses import dataclass, field
from typing import Any
from circuitforge_core.documents.client import DocuvisionClient
from circuitforge_core.documents.models import StructuredDocument
log = logging.getLogger(__name__)
# ── Config ────────────────────────────────────────────────────────────────────
def _default_prompt(page_idx: int, doc: StructuredDocument) -> str:
"""Build a generation prompt from a StructuredDocument."""
header = f"[Page {page_idx + 1}]\n" if page_idx > 0 else ""
return header + doc.raw_text
@dataclass
class MultimodalConfig:
"""Configuration for MultimodalPipeline.
vision_url:
Base URL of the cf-docuvision service.
hint:
Docuvision extraction hint ``"auto"`` | ``"document"`` | ``"form"``
| ``"table"`` | ``"figure"``.
max_tokens:
Passed to cf-text generate per page.
temperature:
Sampling temperature for text generation.
vram_serialise:
When True, ``swap_fn`` is called between the vision and text steps
on each page. Use this on 8GB GPUs where Dolphin-v2 and the text
model cannot be resident simultaneously.
prompt_fn:
Callable ``(page_idx, StructuredDocument) -> str`` that builds the
generation prompt. Defaults to using ``doc.raw_text`` directly.
Products override this to add system context, few-shot examples, etc.
vision_timeout:
HTTP timeout in seconds for each cf-docuvision request.
"""
vision_url: str = "http://localhost:8003"
hint: str = "auto"
max_tokens: int = 512
temperature: float = 0.7
vram_serialise: bool = False
prompt_fn: Callable[[int, StructuredDocument], str] = field(
default_factory=lambda: _default_prompt
)
vision_timeout: int = 60
# ── Results ───────────────────────────────────────────────────────────────────
@dataclass
class PageResult:
"""Result of processing one page through the vision + text pipeline.
page_idx:
Zero-based page index.
doc:
StructuredDocument from cf-docuvision.
generated:
Full text output from cf-text for this page.
error:
Non-None if extraction or generation failed for this page.
"""
page_idx: int
doc: StructuredDocument | None
generated: str
error: str | None = None
# ── Pipeline ──────────────────────────────────────────────────────────────────
class MultimodalPipeline:
"""Chunk a multi-page document through vision extraction + text generation.
Parameters
----------
config:
Pipeline configuration.
swap_fn:
Optional callable with no arguments, called between the vision and text
steps on each page when ``config.vram_serialise=True``. Products using
cf-orch wire this to the VRAM budget API so Dolphin-v2 can offload
before the text model loads. A no-op lambda works for testing.
generate_fn:
Text generation callable: ``(prompt, max_tokens, temperature) -> str``.
Defaults to ``circuitforge_core.text.generate``. Override in tests or
when the product manages its own text backend.
stream_fn:
Streaming text callable: ``(prompt, max_tokens, temperature) -> Iterator[str]``.
Defaults to ``circuitforge_core.text.generate`` with ``stream=True``.
"""
def __init__(
self,
config: MultimodalConfig | None = None,
*,
swap_fn: Callable[[], None] | None = None,
generate_fn: Callable[..., str] | None = None,
stream_fn: Callable[..., Iterator[str]] | None = None,
) -> None:
self._cfg = config or MultimodalConfig()
self._vision = DocuvisionClient(
base_url=self._cfg.vision_url,
timeout=self._cfg.vision_timeout,
)
self._swap_fn = swap_fn
self._generate_fn = generate_fn
self._stream_fn = stream_fn
# ── Public ────────────────────────────────────────────────────────────────
def run(self, pages: Iterable[bytes]) -> Iterator[PageResult]:
"""Process each page and yield a PageResult as soon as it is ready.
Callers receive pages one at a time the UI can begin rendering
page 0 while pages 1..N are still being extracted and generated.
"""
for page_idx, page_bytes in enumerate(pages):
yield self._process_page(page_idx, page_bytes)
def stream(self, pages: Iterable[bytes]) -> Iterator[tuple[int, str]]:
"""Yield ``(page_idx, token)`` tuples for token-level progressive rendering.
Each page is fully extracted before text generation begins, but tokens
are yielded as the text model produces them rather than waiting for the
full page output.
"""
for page_idx, page_bytes in enumerate(pages):
doc, err = self._extract(page_idx, page_bytes)
if err:
yield (page_idx, f"[extraction error: {err}]")
continue
self._maybe_swap()
prompt = self._cfg.prompt_fn(page_idx, doc)
try:
for token in self._stream_tokens(prompt):
yield (page_idx, token)
except Exception as exc:
log.error("page %d text streaming failed: %s", page_idx, exc)
yield (page_idx, f"[generation error: {exc}]")
# ── Internal ──────────────────────────────────────────────────────────────
def _process_page(self, page_idx: int, page_bytes: bytes) -> PageResult:
doc, err = self._extract(page_idx, page_bytes)
if err:
return PageResult(page_idx=page_idx, doc=None, generated="", error=err)
self._maybe_swap()
prompt = self._cfg.prompt_fn(page_idx, doc)
try:
text = self._generate(prompt)
except Exception as exc:
log.error("page %d generation failed: %s", page_idx, exc)
return PageResult(page_idx=page_idx, doc=doc, generated="",
error=str(exc))
return PageResult(page_idx=page_idx, doc=doc, generated=text)
def _extract(
self, page_idx: int, page_bytes: bytes
) -> tuple[StructuredDocument | None, str | None]:
try:
doc = self._vision.extract(page_bytes, hint=self._cfg.hint)
log.debug("page %d extracted: %d chars", page_idx, len(doc.raw_text))
return doc, None
except Exception as exc:
log.error("page %d vision extraction failed: %s", page_idx, exc)
return None, str(exc)
def _maybe_swap(self) -> None:
if self._cfg.vram_serialise and self._swap_fn is not None:
log.debug("vram_serialise: calling swap_fn")
self._swap_fn()
def _generate(self, prompt: str) -> str:
if self._generate_fn is not None:
return self._generate_fn(
prompt,
max_tokens=self._cfg.max_tokens,
temperature=self._cfg.temperature,
)
from circuitforge_core.text import generate
result = generate(
prompt,
max_tokens=self._cfg.max_tokens,
temperature=self._cfg.temperature,
)
return result.text
def _stream_tokens(self, prompt: str) -> Iterator[str]:
if self._stream_fn is not None:
yield from self._stream_fn(
prompt,
max_tokens=self._cfg.max_tokens,
temperature=self._cfg.temperature,
)
return
from circuitforge_core.text import generate
tokens = generate(
prompt,
max_tokens=self._cfg.max_tokens,
temperature=self._cfg.temperature,
stream=True,
)
yield from tokens

View file

@ -0,0 +1,70 @@
# circuitforge_core/pipeline/recorder.py — write and load PipelineRun records
#
# MIT — local file I/O only; no inference.
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Iterable
from .models import PipelineRun
log = logging.getLogger(__name__)
_DEFAULT_ROOT = Path.home() / ".config" / "circuitforge" / "pipeline" / "runs"
class Recorder:
"""Writes PipelineRun JSON records to a local directory tree.
Layout::
{root}/{product}/{task_type}/{run_id}.json
The recorder is intentionally append-only it never deletes or modifies
existing records. Old runs accumulate as an audit trail; products that
want retention limits should prune the directory themselves.
"""
def __init__(self, root: Path | None = None) -> None:
self._root = Path(root) if root else _DEFAULT_ROOT
# ── Write ─────────────────────────────────────────────────────────────────
def record(self, run: PipelineRun) -> Path:
"""Persist *run* to disk and return the file path written."""
dest = self._path_for(run.product, run.task_type, run.run_id)
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_text(json.dumps(run.to_dict(), indent=2), encoding="utf-8")
log.debug("recorded pipeline run %s%s", run.run_id, dest)
return dest
# ── Read ──────────────────────────────────────────────────────────────────
def load_runs(self, product: str, task_type: str) -> list[PipelineRun]:
"""Return all runs for *(product, task_type)*, newest-first."""
directory = self._root / product / task_type
if not directory.is_dir():
return []
runs: list[PipelineRun] = []
for p in directory.glob("*.json"):
try:
runs.append(PipelineRun.from_dict(json.loads(p.read_text())))
except Exception:
log.warning("skipping unreadable run file %s", p)
runs.sort(key=lambda r: r.timestamp, reverse=True)
return runs
def load_approved(self, product: str, task_type: str,
input_hash: str) -> list[PipelineRun]:
"""Return approved runs that match *input_hash*, newest-first."""
return [
r for r in self.load_runs(product, task_type)
if r.approved and r.input_hash == input_hash
]
# ── Internal ──────────────────────────────────────────────────────────────
def _path_for(self, product: str, task_type: str, run_id: str) -> Path:
return self._root / product / task_type / f"{run_id}.json"

View file

@ -0,0 +1,134 @@
# circuitforge_core/pipeline/registry.py — workflow lookup
#
# MIT — file I/O and matching logic only.
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Callable
from .models import CrystallizedWorkflow
log = logging.getLogger(__name__)
_DEFAULT_ROOT = Path.home() / ".config" / "circuitforge" / "pipeline" / "workflows"
class Registry:
"""Loads and matches CrystallizedWorkflows from the local filesystem.
Layout::
{root}/{product}/{task_type}/{workflow_id}.json
Exact matching is always available. Products that need fuzzy/semantic
matching can supply a ``similarity_fn`` a callable that takes two input
hashes and returns a float in [0, 1]. The registry returns the first
active workflow whose similarity score meets ``fuzzy_threshold``.
"""
def __init__(
self,
root: Path | None = None,
similarity_fn: Callable[[str, str], float] | None = None,
fuzzy_threshold: float = 0.8,
) -> None:
self._root = Path(root) if root else _DEFAULT_ROOT
self._similarity_fn = similarity_fn
self._fuzzy_threshold = fuzzy_threshold
# ── Write ─────────────────────────────────────────────────────────────────
def register(self, workflow: CrystallizedWorkflow) -> Path:
"""Persist *workflow* and return the path written."""
dest = self._path_for(workflow.product, workflow.task_type,
workflow.workflow_id)
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_text(json.dumps(workflow.to_dict(), indent=2), encoding="utf-8")
log.info("registered workflow %s (v%d)", workflow.workflow_id,
workflow.version)
return dest
def deactivate(self, workflow_id: str, product: str,
task_type: str) -> bool:
"""Set ``active=False`` on a stored workflow. Returns True if found."""
path = self._path_for(product, task_type, workflow_id)
if not path.exists():
return False
data = json.loads(path.read_text())
data["active"] = False
path.write_text(json.dumps(data, indent=2), encoding="utf-8")
log.info("deactivated workflow %s", workflow_id)
return True
# ── Read ──────────────────────────────────────────────────────────────────
def load_all(self, product: str, task_type: str) -> list[CrystallizedWorkflow]:
"""Return all (including inactive) workflows for *(product, task_type)*."""
directory = self._root / product / task_type
if not directory.is_dir():
return []
workflows: list[CrystallizedWorkflow] = []
for p in directory.glob("*.json"):
try:
workflows.append(
CrystallizedWorkflow.from_dict(json.loads(p.read_text()))
)
except Exception:
log.warning("skipping unreadable workflow file %s", p)
return workflows
# ── Match ─────────────────────────────────────────────────────────────────
def match(self, product: str, task_type: str,
input_hash: str) -> CrystallizedWorkflow | None:
"""Return the active workflow for an exact input_hash match, or None."""
for wf in self.load_all(product, task_type):
if wf.active and wf.input_hash == input_hash:
log.debug("registry exact match: %s", wf.workflow_id)
return wf
return None
def fuzzy_match(self, product: str, task_type: str,
input_hash: str) -> CrystallizedWorkflow | None:
"""Return a workflow above the similarity threshold, or None.
Requires a ``similarity_fn`` to have been supplied at construction.
If none was provided, raises ``RuntimeError``.
"""
if self._similarity_fn is None:
raise RuntimeError(
"fuzzy_match() requires a similarity_fn — none was supplied "
"to Registry.__init__()."
)
best: CrystallizedWorkflow | None = None
best_score = 0.0
for wf in self.load_all(product, task_type):
if not wf.active:
continue
score = self._similarity_fn(wf.input_hash, input_hash)
if score >= self._fuzzy_threshold and score > best_score:
best = wf
best_score = score
if best:
log.debug("registry fuzzy match: %s (score=%.2f)", best.workflow_id,
best_score)
return best
def find(self, product: str, task_type: str,
input_hash: str) -> CrystallizedWorkflow | None:
"""Exact match first; fuzzy match second (if similarity_fn is set)."""
exact = self.match(product, task_type, input_hash)
if exact:
return exact
if self._similarity_fn is not None:
return self.fuzzy_match(product, task_type, input_hash)
return None
# ── Internal ──────────────────────────────────────────────────────────────
def _path_for(self, product: str, task_type: str,
workflow_id: str) -> Path:
safe_id = workflow_id.replace(":", "_")
return self._root / product / task_type / f"{safe_id}.json"

View file

@ -40,8 +40,11 @@ def set_user_preference(
s.set(user_id=user_id, path=path, value=value)
from . import accessibility as accessibility
__all__ = [
"get_path", "set_path",
"get_user_preference", "set_user_preference",
"LocalFileStore", "PreferenceStore",
"accessibility",
]

View file

@ -0,0 +1,73 @@
# circuitforge_core/preferences/accessibility.py — a11y preference keys
#
# First-class accessibility preferences so every product UI reads from
# the same store path without each implementing it separately.
#
# All keys use the "accessibility.*" namespace in the preference store.
# Products read these via get_user_preference() or the convenience helpers here.
from __future__ import annotations
from circuitforge_core.preferences import get_user_preference, set_user_preference
# ── Preference key constants ──────────────────────────────────────────────────
PREF_REDUCED_MOTION = "accessibility.prefers_reduced_motion"
PREF_HIGH_CONTRAST = "accessibility.high_contrast"
PREF_FONT_SIZE = "accessibility.font_size" # "default" | "large" | "xlarge"
PREF_SCREEN_READER = "accessibility.screen_reader_mode" # reduces decorative content
_DEFAULTS: dict[str, object] = {
PREF_REDUCED_MOTION: False,
PREF_HIGH_CONTRAST: False,
PREF_FONT_SIZE: "default",
PREF_SCREEN_READER: False,
}
# ── Convenience helpers ───────────────────────────────────────────────────────
def is_reduced_motion_preferred(
user_id: str | None = None,
store=None,
) -> bool:
"""
Return True if the user has requested reduced motion.
Products must honour this in all animated UI elements: transitions,
auto-playing content, parallax, loaders. This maps to the CSS
`prefers-reduced-motion: reduce` media query and is the canonical
source of truth across all CF product UIs.
Default: False.
"""
val = get_user_preference(
user_id, PREF_REDUCED_MOTION, default=False, store=store
)
return bool(val)
def is_high_contrast(user_id: str | None = None, store=None) -> bool:
"""Return True if the user has requested high-contrast mode."""
return bool(get_user_preference(user_id, PREF_HIGH_CONTRAST, default=False, store=store))
def get_font_size(user_id: str | None = None, store=None) -> str:
"""Return the user's preferred font size: 'default' | 'large' | 'xlarge'."""
val = get_user_preference(user_id, PREF_FONT_SIZE, default="default", store=store)
if val not in ("default", "large", "xlarge"):
return "default"
return str(val)
def is_screen_reader_mode(user_id: str | None = None, store=None) -> bool:
"""Return True if the user has requested screen reader optimised output."""
return bool(get_user_preference(user_id, PREF_SCREEN_READER, default=False, store=store))
def set_reduced_motion(
value: bool,
user_id: str | None = None,
store=None,
) -> None:
"""Persist the user's reduced-motion preference."""
set_user_preference(user_id, PREF_REDUCED_MOTION, value, store=store)

View file

@ -0,0 +1,144 @@
"""
circuitforge_core.text direct text generation service module.
Provides lightweight, low-overhead text generation that bypasses ollama/vllm
for products that need fast, frequent inference from small local models.
Quick start (mock mode no model required):
import os; os.environ["CF_TEXT_MOCK"] = "1"
from circuitforge_core.text import generate, chat, ChatMessage
result = generate("Write a short cover letter intro.")
print(result.text)
reply = chat([
ChatMessage("system", "You are a helpful recipe assistant."),
ChatMessage("user", "What can I make with eggs, spinach, and feta?"),
])
print(reply.text)
Real inference (GGUF model):
export CF_TEXT_MODEL=/Library/Assets/LLM/qwen2.5-3b-instruct-q4_k_m.gguf
from circuitforge_core.text import generate
result = generate("Summarise this job posting in 2 sentences: ...")
Backend selection (CF_TEXT_BACKEND env or explicit):
from circuitforge_core.text import make_backend
backend = make_backend("/path/to/model.gguf", backend="llamacpp")
cf-orch service profile:
service_type: cf-text
max_mb: per-model (3B Q4 2048, 7B Q4 4096)
preferred_compute: 7.5 minimum (INT8 tensor cores)
max_concurrent: 2
shared: true
"""
from __future__ import annotations
import os
from circuitforge_core.text.backends.base import (
ChatMessage,
GenerateResult,
TextBackend,
make_text_backend,
)
from circuitforge_core.text.backends.mock import MockTextBackend
# ── Process-level singleton backend ──────────────────────────────────────────
# Lazily initialised on first call to generate() or chat().
# Products that need per-user or per-request backends should use make_backend().
_backend: TextBackend | None = None
def _get_backend() -> TextBackend:
global _backend
if _backend is None:
model_path = os.environ.get("CF_TEXT_MODEL", "mock")
mock = model_path == "mock" or os.environ.get("CF_TEXT_MOCK", "") == "1"
_backend = make_text_backend(model_path, mock=mock)
return _backend
def make_backend(
model_path: str,
backend: str | None = None,
mock: bool | None = None,
) -> TextBackend:
"""
Create a TextBackend for the given model.
Use this when you need a dedicated backend per request or per user,
rather than the process-level singleton used by generate() and chat().
"""
return make_text_backend(model_path, backend=backend, mock=mock)
# ── Convenience functions (singleton path) ────────────────────────────────────
def generate(
prompt: str,
*,
model: str | None = None,
max_tokens: int = 512,
temperature: float = 0.7,
stream: bool = False,
stop: list[str] | None = None,
):
"""
Generate text from a prompt using the process-level backend.
stream=True returns an Iterator[str] of tokens instead of GenerateResult.
model is accepted for API symmetry with LLMRouter but ignored by the
singleton path set CF_TEXT_MODEL to change the loaded model.
"""
backend = _get_backend()
if stream:
return backend.generate_stream(prompt, max_tokens=max_tokens, temperature=temperature, stop=stop)
return backend.generate(prompt, max_tokens=max_tokens, temperature=temperature, stop=stop)
def chat(
messages: list[ChatMessage],
*,
model: str | None = None,
max_tokens: int = 512,
temperature: float = 0.7,
stream: bool = False,
) -> GenerateResult:
"""
Chat completion using the process-level backend.
messages should be a list of ChatMessage(role, content) objects.
stream=True is not yet supported on the chat path; pass stream=False.
"""
if stream:
raise NotImplementedError(
"stream=True is not yet supported for chat(). "
"Use generate_stream() directly on a backend instance."
)
return _get_backend().chat(messages, max_tokens=max_tokens, temperature=temperature)
def reset_backend() -> None:
"""Reset the process-level singleton. Test teardown only."""
global _backend
_backend = None
__all__ = [
"ChatMessage",
"GenerateResult",
"TextBackend",
"MockTextBackend",
"make_backend",
"generate",
"chat",
"reset_backend",
]

View file

@ -0,0 +1,10 @@
from .base import ChatMessage, GenerateResult, TextBackend, make_text_backend
from .mock import MockTextBackend
__all__ = [
"ChatMessage",
"GenerateResult",
"TextBackend",
"MockTextBackend",
"make_text_backend",
]

View file

@ -0,0 +1,182 @@
# circuitforge_core/text/backends/base.py — TextBackend Protocol + factory
#
# MIT licensed. The Protocol and mock backend are always importable.
# Real backends (LlamaCppBackend, TransformersBackend) require optional extras.
from __future__ import annotations
import os
from typing import AsyncIterator, Iterator, Protocol, runtime_checkable
# ── Shared result types ───────────────────────────────────────────────────────
class GenerateResult:
"""Result from a single non-streaming generate() call."""
def __init__(self, text: str, tokens_used: int = 0, model: str = "") -> None:
self.text = text
self.tokens_used = tokens_used
self.model = model
def __repr__(self) -> str:
return f"GenerateResult(text={self.text!r:.40}, tokens={self.tokens_used})"
class ChatMessage:
"""A single message in a chat conversation."""
def __init__(self, role: str, content: str) -> None:
if role not in ("system", "user", "assistant"):
raise ValueError(f"Invalid role {role!r}. Must be system, user, or assistant.")
self.role = role
self.content = content
def to_dict(self) -> dict:
return {"role": self.role, "content": self.content}
# ── TextBackend Protocol ──────────────────────────────────────────────────────
@runtime_checkable
class TextBackend(Protocol):
"""
Abstract interface for direct text generation backends.
All generate/chat methods have both sync and async variants.
Streaming variants yield str tokens rather than a complete result.
Implementations must be safe to construct once and call concurrently
(the model is loaded at construction time and reused across calls).
"""
def generate(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> GenerateResult:
"""Synchronous generate — blocks until the full response is produced."""
...
def generate_stream(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> Iterator[str]:
"""Synchronous streaming — yields tokens as they are produced."""
...
async def generate_async(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> GenerateResult:
"""Async generate — runs in thread pool, never blocks the event loop."""
...
async def generate_stream_async(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> AsyncIterator[str]:
"""Async streaming — yields tokens without blocking the event loop."""
...
def chat(
self,
messages: list[ChatMessage],
*,
max_tokens: int = 512,
temperature: float = 0.7,
) -> GenerateResult:
"""Chat completion — formats messages into a prompt and generates."""
...
@property
def model_name(self) -> str:
"""Identifier for the loaded model (path stem or HF repo ID)."""
...
@property
def vram_mb(self) -> int:
"""Approximate VRAM footprint in MB. Used by cf-orch service registry."""
...
# ── Backend selection ─────────────────────────────────────────────────────────
def _select_backend(model_path: str, backend: str | None) -> str:
"""
Return "llamacpp" or "transformers" for the given model path.
Parameters
----------
model_path Path to the model file or HuggingFace repo ID (e.g. "Qwen/Qwen2.5-3B").
backend Explicit override from the caller ("llamacpp" | "transformers" | None).
When provided, trust it without inspection.
Return "llamacpp" or "transformers". Raise ValueError for unrecognised values.
"""
_VALID = ("llamacpp", "transformers")
# 1. Caller-supplied override — highest trust, no inspection needed.
resolved = backend or os.environ.get("CF_TEXT_BACKEND")
if resolved:
if resolved not in _VALID:
raise ValueError(
f"CF_TEXT_BACKEND={resolved!r} is not valid. Choose: {', '.join(_VALID)}"
)
return resolved
# 2. Format detection — GGUF files are unambiguously llama-cpp territory.
if model_path.lower().endswith(".gguf"):
return "llamacpp"
# 3. Safe default — transformers covers HF repo IDs and safetensors dirs.
return "transformers"
# ── Factory ───────────────────────────────────────────────────────────────────
def make_text_backend(
model_path: str,
backend: str | None = None,
mock: bool | None = None,
) -> "TextBackend":
"""
Return a TextBackend for the given model.
mock=True or CF_TEXT_MOCK=1 MockTextBackend (no GPU, no model file needed)
Otherwise backend resolved via _select_backend()
"""
use_mock = mock if mock is not None else os.environ.get("CF_TEXT_MOCK", "") == "1"
if use_mock:
from circuitforge_core.text.backends.mock import MockTextBackend
return MockTextBackend(model_name=model_path)
resolved = _select_backend(model_path, backend)
if resolved == "llamacpp":
from circuitforge_core.text.backends.llamacpp import LlamaCppBackend
return LlamaCppBackend(model_path=model_path)
if resolved == "transformers":
from circuitforge_core.text.backends.transformers import TransformersBackend
return TransformersBackend(model_path=model_path)
raise ValueError(f"Unknown backend {resolved!r}. Expected 'llamacpp' or 'transformers'.")

View file

@ -0,0 +1,192 @@
# circuitforge_core/text/backends/llamacpp.py — llama-cpp-python backend
#
# BSL 1.1: real inference. Requires llama-cpp-python + a GGUF model file.
# Install: pip install circuitforge-core[text-llamacpp]
#
# VRAM estimates (Q4_K_M quant):
# 1B → ~700MB 3B → ~2048MB 7B → ~4096MB
# 13B → ~7500MB 70B → ~40000MB
from __future__ import annotations
import asyncio
import logging
import os
from pathlib import Path
from typing import AsyncIterator, Iterator
from circuitforge_core.text.backends.base import ChatMessage, GenerateResult
logger = logging.getLogger(__name__)
# Q4_K_M is the recommended default — best accuracy/size tradeoff for local use.
_DEFAULT_N_CTX = int(os.environ.get("CF_TEXT_CTX", "4096"))
_DEFAULT_N_GPU_LAYERS = int(os.environ.get("CF_TEXT_GPU_LAYERS", "-1")) # -1 = all layers
def _estimate_vram_mb(model_path: str) -> int:
"""Rough VRAM estimate from file size. Accurate enough for cf-orch budgeting."""
try:
size_mb = Path(model_path).stat().st_size // (1024 * 1024)
# GGUF models typically need ~1.1× file size in VRAM (KV cache overhead)
return int(size_mb * 1.1)
except OSError:
return 4096 # conservative default
class LlamaCppBackend:
"""
Direct llama-cpp-python inference backend for GGUF models.
The model is loaded once at construction. All inference runs in a thread
pool executor so async callers never block the event loop.
Context window, GPU layers, and thread count are configurable via env:
CF_TEXT_CTX token context window (default 4096)
CF_TEXT_GPU_LAYERS GPU layers to offload, -1 = all (default -1)
CF_TEXT_THREADS CPU thread count (default: auto)
Requires: pip install circuitforge-core[text-llamacpp]
"""
def __init__(self, model_path: str) -> None:
try:
from llama_cpp import Llama # type: ignore[import]
except ImportError as exc:
raise ImportError(
"llama-cpp-python is required for LlamaCppBackend. "
"Install with: pip install circuitforge-core[text-llamacpp]"
) from exc
if not Path(model_path).exists():
raise FileNotFoundError(
f"GGUF model not found: {model_path}\n"
"Download a GGUF model and set CF_TEXT_MODEL to its path."
)
n_threads = int(os.environ.get("CF_TEXT_THREADS", "0")) or None
logger.info(
"Loading GGUF model %s (ctx=%d, gpu_layers=%d)",
model_path, _DEFAULT_N_CTX, _DEFAULT_N_GPU_LAYERS,
)
self._llm = Llama(
model_path=model_path,
n_ctx=_DEFAULT_N_CTX,
n_gpu_layers=_DEFAULT_N_GPU_LAYERS,
n_threads=n_threads,
verbose=False,
)
self._model_path = model_path
self._vram_mb = _estimate_vram_mb(model_path)
@property
def model_name(self) -> str:
return Path(self._model_path).stem
@property
def vram_mb(self) -> int:
return self._vram_mb
def generate(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> GenerateResult:
output = self._llm(
prompt,
max_tokens=max_tokens,
temperature=temperature,
stop=stop or [],
stream=False,
)
text = output["choices"][0]["text"]
tokens_used = output["usage"]["completion_tokens"]
return GenerateResult(text=text, tokens_used=tokens_used, model=self.model_name)
def generate_stream(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> Iterator[str]:
for chunk in self._llm(
prompt,
max_tokens=max_tokens,
temperature=temperature,
stop=stop or [],
stream=True,
):
yield chunk["choices"][0]["text"]
async def generate_async(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> GenerateResult:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.generate(prompt, max_tokens=max_tokens, temperature=temperature, stop=stop),
)
async def generate_stream_async(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> AsyncIterator[str]:
# llama_cpp streaming is synchronous — run in executor and re-emit tokens
import queue
import threading
token_queue: queue.Queue = queue.Queue()
_DONE = object()
def _produce() -> None:
try:
for chunk in self._llm(
prompt,
max_tokens=max_tokens,
temperature=temperature,
stop=stop or [],
stream=True,
):
token_queue.put(chunk["choices"][0]["text"])
finally:
token_queue.put(_DONE)
thread = threading.Thread(target=_produce, daemon=True)
thread.start()
loop = asyncio.get_event_loop()
while True:
token = await loop.run_in_executor(None, token_queue.get)
if token is _DONE:
break
yield token
def chat(
self,
messages: list[ChatMessage],
*,
max_tokens: int = 512,
temperature: float = 0.7,
) -> GenerateResult:
# llama-cpp-python has native chat_completion for instruct models
output = self._llm.create_chat_completion(
messages=[m.to_dict() for m in messages],
max_tokens=max_tokens,
temperature=temperature,
)
text = output["choices"][0]["message"]["content"]
tokens_used = output["usage"]["completion_tokens"]
return GenerateResult(text=text, tokens_used=tokens_used, model=self.model_name)

View file

@ -0,0 +1,104 @@
# circuitforge_core/text/backends/mock.py — synthetic text backend
#
# MIT licensed. No model file, no GPU, no extras required.
# Used in dev, CI, and free-tier nodes below the minimum VRAM threshold.
from __future__ import annotations
import asyncio
from typing import AsyncIterator, Iterator
from circuitforge_core.text.backends.base import ChatMessage, GenerateResult
_MOCK_RESPONSE = (
"This is a synthetic response from MockTextBackend. "
"Install a real backend (llama-cpp-python or transformers) and provide a model path "
"to generate real text."
)
class MockTextBackend:
"""
Deterministic synthetic text backend for development and CI.
Always returns the same fixed response so tests are reproducible without
a GPU or model file. Streaming emits the response word-by-word with a
configurable delay so UI streaming paths can be exercised.
"""
def __init__(
self,
model_name: str = "mock",
token_delay_s: float = 0.0,
) -> None:
self._model_name = model_name
self._token_delay_s = token_delay_s
@property
def model_name(self) -> str:
return self._model_name
@property
def vram_mb(self) -> int:
return 0
def _response_for(self, prompt_or_messages: str) -> str:
return _MOCK_RESPONSE
def generate(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> GenerateResult:
text = self._response_for(prompt)
return GenerateResult(text=text, tokens_used=len(text.split()), model=self._model_name)
def generate_stream(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> Iterator[str]:
import time
for word in self._response_for(prompt).split():
yield word + " "
if self._token_delay_s:
time.sleep(self._token_delay_s)
async def generate_async(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> GenerateResult:
return self.generate(prompt, max_tokens=max_tokens, temperature=temperature, stop=stop)
async def generate_stream_async(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> AsyncIterator[str]:
for word in self._response_for(prompt).split():
yield word + " "
if self._token_delay_s:
await asyncio.sleep(self._token_delay_s)
def chat(
self,
messages: list[ChatMessage],
*,
max_tokens: int = 512,
temperature: float = 0.7,
) -> GenerateResult:
# Format messages into a simple prompt for the mock response
prompt = "\n".join(f"{m.role}: {m.content}" for m in messages)
return self.generate(prompt, max_tokens=max_tokens, temperature=temperature)

View file

@ -0,0 +1,197 @@
# circuitforge_core/text/backends/transformers.py — HuggingFace transformers backend
#
# BSL 1.1: real inference. Requires torch + transformers + a model checkpoint.
# Install: pip install circuitforge-core[text-transformers]
#
# Best for: HF repo IDs, safetensors checkpoints, models without GGUF versions.
# For GGUF models prefer LlamaCppBackend — lower overhead, smaller install.
from __future__ import annotations
import asyncio
import logging
import os
from typing import AsyncIterator, Iterator
from circuitforge_core.text.backends.base import ChatMessage, GenerateResult
logger = logging.getLogger(__name__)
_DEFAULT_MAX_NEW_TOKENS = 512
_LOAD_IN_4BIT = os.environ.get("CF_TEXT_4BIT", "0") == "1"
_LOAD_IN_8BIT = os.environ.get("CF_TEXT_8BIT", "0") == "1"
class TransformersBackend:
"""
HuggingFace transformers inference backend.
Loads any causal LM available on HuggingFace Hub or a local checkpoint dir.
Supports 4-bit and 8-bit quantization via bitsandbytes when VRAM is limited:
CF_TEXT_4BIT=1 load_in_4bit (requires bitsandbytes)
CF_TEXT_8BIT=1 load_in_8bit (requires bitsandbytes)
Chat completion uses the tokenizer's apply_chat_template() when available,
falling back to a simple "User: / Assistant:" prompt format.
Requires: pip install circuitforge-core[text-transformers]
"""
def __init__(self, model_path: str) -> None:
try:
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer
except ImportError as exc:
raise ImportError(
"torch and transformers are required for TransformersBackend. "
"Install with: pip install circuitforge-core[text-transformers]"
) from exc
self._device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info("Loading transformers model %s on %s", model_path, self._device)
load_kwargs: dict = {"device_map": "auto" if self._device == "cuda" else None}
if _LOAD_IN_4BIT:
load_kwargs["load_in_4bit"] = True
elif _LOAD_IN_8BIT:
load_kwargs["load_in_8bit"] = True
self._tokenizer = AutoTokenizer.from_pretrained(model_path)
self._model = AutoModelForCausalLM.from_pretrained(model_path, **load_kwargs)
if self._device == "cpu":
self._model = self._model.to("cpu")
self._model_path = model_path
self._TextIteratorStreamer = TextIteratorStreamer
@property
def model_name(self) -> str:
# HF repo IDs contain "/" — use the part after the slash as a short name
return self._model_path.split("/")[-1]
@property
def vram_mb(self) -> int:
try:
import torch
if torch.cuda.is_available():
return torch.cuda.memory_allocated() // (1024 * 1024)
except Exception:
pass
return 0
def _build_inputs(self, prompt: str):
return self._tokenizer(prompt, return_tensors="pt").to(self._device)
def generate(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> GenerateResult:
inputs = self._build_inputs(prompt)
input_len = inputs["input_ids"].shape[1]
outputs = self._model.generate(
**inputs,
max_new_tokens=max_tokens,
temperature=temperature,
do_sample=temperature > 0,
pad_token_id=self._tokenizer.eos_token_id,
)
new_tokens = outputs[0][input_len:]
text = self._tokenizer.decode(new_tokens, skip_special_tokens=True)
return GenerateResult(text=text, tokens_used=len(new_tokens), model=self.model_name)
def generate_stream(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> Iterator[str]:
import threading
inputs = self._build_inputs(prompt)
streamer = self._TextIteratorStreamer(
self._tokenizer, skip_prompt=True, skip_special_tokens=True
)
gen_kwargs = dict(
**inputs,
max_new_tokens=max_tokens,
temperature=temperature,
do_sample=temperature > 0,
streamer=streamer,
pad_token_id=self._tokenizer.eos_token_id,
)
thread = threading.Thread(target=self._model.generate, kwargs=gen_kwargs, daemon=True)
thread.start()
yield from streamer
async def generate_async(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> GenerateResult:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: self.generate(prompt, max_tokens=max_tokens, temperature=temperature, stop=stop),
)
async def generate_stream_async(
self,
prompt: str,
*,
max_tokens: int = 512,
temperature: float = 0.7,
stop: list[str] | None = None,
) -> AsyncIterator[str]:
import queue
import threading
token_queue: queue.Queue = queue.Queue()
_DONE = object()
def _produce() -> None:
try:
for token in self.generate_stream(
prompt, max_tokens=max_tokens, temperature=temperature
):
token_queue.put(token)
finally:
token_queue.put(_DONE)
threading.Thread(target=_produce, daemon=True).start()
loop = asyncio.get_event_loop()
while True:
token = await loop.run_in_executor(None, token_queue.get)
if token is _DONE:
break
yield token
def chat(
self,
messages: list[ChatMessage],
*,
max_tokens: int = 512,
temperature: float = 0.7,
) -> GenerateResult:
# Use the tokenizer's chat template when available (instruct models)
if hasattr(self._tokenizer, "apply_chat_template") and self._tokenizer.chat_template:
prompt = self._tokenizer.apply_chat_template(
[m.to_dict() for m in messages],
tokenize=False,
add_generation_prompt=True,
)
else:
prompt = "\n".join(
f"{'User' if m.role == 'user' else 'Assistant'}: {m.content}"
for m in messages
if m.role != "system"
) + "\nAssistant:"
return self.generate(prompt, max_tokens=max_tokens, temperature=temperature)

View file

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "circuitforge-core"
version = "0.8.0"
version = "0.9.0"
description = "Shared scaffold for CircuitForge products (MIT)"
requires-python = ">=3.11"
dependencies = [

View file

View file

@ -0,0 +1,198 @@
"""Tests for pipeline.crystallizer — the core promotion logic."""
import warnings
import pytest
from circuitforge_core.pipeline.crystallizer import (
CrystallizerConfig,
crystallize,
evaluate_new_run,
should_crystallize,
)
from circuitforge_core.pipeline.models import PipelineRun, Step
from circuitforge_core.pipeline.recorder import Recorder
# ── Fixtures / helpers ────────────────────────────────────────────────────────
def _run(run_id, approved=True, review_ms=8000, modified=False,
steps=None, input_hash="fixedhash",
ts="2026-04-08T00:00:00+00:00") -> PipelineRun:
return PipelineRun(
run_id=run_id,
product="osprey",
task_type="ivr_navigate",
input_hash=input_hash,
steps=steps or [Step("dtmf", {"digits": "1"})],
approved=approved,
review_duration_ms=review_ms,
output_modified=modified,
timestamp=ts,
)
_CFG = CrystallizerConfig(threshold=3, min_review_ms=5_000)
# ── should_crystallize ────────────────────────────────────────────────────────
class TestShouldCrystallize:
def test_returns_false_below_threshold(self):
runs = [_run(f"r{i}") for i in range(2)]
assert should_crystallize(runs, _CFG) is False
def test_returns_true_at_threshold(self):
runs = [_run(f"r{i}") for i in range(3)]
assert should_crystallize(runs, _CFG) is True
def test_returns_true_above_threshold(self):
runs = [_run(f"r{i}") for i in range(10)]
assert should_crystallize(runs, _CFG) is True
def test_unapproved_runs_not_counted(self):
approved = [_run(f"r{i}") for i in range(2)]
unapproved = [_run(f"u{i}", approved=False) for i in range(10)]
assert should_crystallize(approved + unapproved, _CFG) is False
def test_threshold_one(self):
cfg = CrystallizerConfig(threshold=1)
assert should_crystallize([_run("r1")], cfg) is True
# ── crystallize ───────────────────────────────────────────────────────────────
class TestCrystallize:
def _approved_runs(self, n=3, review_ms=8000):
return [_run(f"r{i}", review_ms=review_ms) for i in range(n)]
def test_produces_workflow(self):
wf = crystallize(self._approved_runs(), _CFG)
assert wf.product == "osprey"
assert wf.task_type == "ivr_navigate"
assert wf.approval_count == 3
def test_workflow_id_format(self):
wf = crystallize(self._approved_runs(), _CFG)
assert wf.workflow_id.startswith("osprey:ivr_navigate:")
def test_avg_review_duration_computed(self):
runs = [_run("r0", review_ms=6000), _run("r1", review_ms=10000),
_run("r2", review_ms=8000)]
wf = crystallize(runs, _CFG)
assert wf.avg_review_duration_ms == 8000
def test_all_output_unmodified_true(self):
runs = self._approved_runs()
wf = crystallize(runs, _CFG)
assert wf.all_output_unmodified is True
def test_all_output_unmodified_false_when_any_modified(self):
runs = [_run("r0"), _run("r1"), _run("r2", modified=True)]
wf = crystallize(runs, _CFG)
assert wf.all_output_unmodified is False
def test_raises_below_threshold(self):
with pytest.raises(ValueError, match="Need 3"):
crystallize([_run("r0"), _run("r1")], _CFG)
def test_raises_on_mixed_products(self):
r1 = _run("r1")
r2 = PipelineRun(
run_id="r2", product="falcon", task_type="ivr_navigate",
input_hash="fixedhash", steps=r1.steps, approved=True,
review_duration_ms=8000, output_modified=False,
)
with pytest.raises(ValueError, match="product"):
crystallize([r1, r2, r1], _CFG)
def test_raises_on_mixed_hashes(self):
runs = [_run("r0", input_hash="hash_a"),
_run("r1", input_hash="hash_b"),
_run("r2", input_hash="hash_a")]
with pytest.raises(ValueError, match="input_hash"):
crystallize(runs, _CFG)
def test_rubber_stamp_warning(self):
runs = [_run(f"r{i}", review_ms=100) for i in range(3)]
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
crystallize(runs, _CFG)
assert any("rubber-stamp" in str(w.message) for w in caught)
def test_no_warning_when_min_review_ms_zero(self):
cfg = CrystallizerConfig(threshold=3, min_review_ms=0)
runs = [_run(f"r{i}", review_ms=1) for i in range(3)]
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter("always")
crystallize(runs, cfg)
assert not any("rubber-stamp" in str(w.message) for w in caught)
def test_version_increments(self):
wf = crystallize(self._approved_runs(), _CFG, existing_version=2)
assert wf.version == 3
def test_strategy_most_recent_uses_latest(self):
steps_old = [Step("dtmf", {"digits": "9"})]
steps_new = [Step("dtmf", {"digits": "1"})]
runs = [
_run("r0", steps=steps_old, ts="2026-01-01T00:00:00+00:00"),
_run("r1", steps=steps_old, ts="2026-01-02T00:00:00+00:00"),
_run("r2", steps=steps_new, ts="2026-04-08T00:00:00+00:00"),
]
cfg = CrystallizerConfig(threshold=3, strategy="most_recent")
wf = crystallize(runs, cfg)
assert wf.steps[0].params["digits"] == "1"
def test_strategy_majority_picks_common_action(self):
steps_a = [Step("dtmf", {"digits": "1"})]
steps_b = [Step("press_key", {"key": "2"})]
runs = [
_run("r0", steps=steps_a),
_run("r1", steps=steps_a),
_run("r2", steps=steps_b),
]
cfg = CrystallizerConfig(threshold=3, strategy="majority")
wf = crystallize(runs, cfg)
assert wf.steps[0].action == "dtmf"
def test_strategy_majority_falls_back_on_length_mismatch(self):
runs = [
_run("r0", steps=[Step("dtmf", {"digits": "1"})]),
_run("r1", steps=[Step("dtmf", {"digits": "1"}),
Step("dtmf", {"digits": "2"})]),
_run("r2", steps=[Step("dtmf", {"digits": "1"})],
ts="2026-04-08T00:00:00+00:00"),
]
cfg = CrystallizerConfig(threshold=3, strategy="majority")
# Should not raise — falls back to most_recent
wf = crystallize(runs, cfg)
assert wf.steps is not None
# ── evaluate_new_run ──────────────────────────────────────────────────────────
class TestEvaluateNewRun:
def test_returns_none_before_threshold(self, tmp_path):
rec = Recorder(root=tmp_path)
cfg = CrystallizerConfig(threshold=3, min_review_ms=0)
result = evaluate_new_run(_run("r1"), rec, cfg)
assert result is None
def test_returns_workflow_at_threshold(self, tmp_path):
rec = Recorder(root=tmp_path)
cfg = CrystallizerConfig(threshold=3, min_review_ms=0)
for i in range(2):
evaluate_new_run(_run(f"r{i}"), rec, cfg)
wf = evaluate_new_run(_run("r2"), rec, cfg)
assert wf is not None
assert wf.approval_count == 3
def test_unapproved_run_does_not_trigger(self, tmp_path):
rec = Recorder(root=tmp_path)
cfg = CrystallizerConfig(threshold=1, min_review_ms=0)
result = evaluate_new_run(_run("r1", approved=False), rec, cfg)
assert result is None
def test_run_is_recorded_even_if_not_approved(self, tmp_path):
rec = Recorder(root=tmp_path)
cfg = CrystallizerConfig(threshold=3, min_review_ms=0)
evaluate_new_run(_run("r1", approved=False), rec, cfg)
assert len(rec.load_runs("osprey", "ivr_navigate")) == 1

View file

@ -0,0 +1,96 @@
"""Tests for pipeline.Executor."""
import pytest
from circuitforge_core.pipeline.executor import Executor, ExecutionResult
from circuitforge_core.pipeline.models import CrystallizedWorkflow, Step
def _wf(steps=None) -> CrystallizedWorkflow:
return CrystallizedWorkflow(
workflow_id="osprey:ivr_navigate:abc",
product="osprey",
task_type="ivr_navigate",
input_hash="abc",
steps=[Step("dtmf", {"digits": "1"}), Step("dtmf", {"digits": "2"})]
if steps is None else steps,
crystallized_at="2026-04-08T00:00:00+00:00",
run_ids=["r1"],
approval_count=1,
avg_review_duration_ms=8000,
all_output_unmodified=True,
)
def _ok_step(_step):
return True, "ok"
def _fail_step(_step):
return False, None
def _raise_step(_step):
raise RuntimeError("hardware error")
def _llm():
return "llm-output"
class TestExecutor:
def test_all_steps_succeed(self):
ex = Executor(step_fn=_ok_step, llm_fn=_llm)
result = ex.execute(_wf())
assert result.success is True
assert result.used_deterministic is True
assert len(result.step_results) == 2
def test_failed_step_triggers_llm_fallback(self):
ex = Executor(step_fn=_fail_step, llm_fn=_llm)
result = ex.execute(_wf())
assert result.success is True
assert result.used_deterministic is False
assert result.llm_output == "llm-output"
def test_raising_step_triggers_llm_fallback(self):
ex = Executor(step_fn=_raise_step, llm_fn=_llm)
result = ex.execute(_wf())
assert result.success is True
assert result.used_deterministic is False
def test_llm_fallback_disabled_returns_failure(self):
ex = Executor(step_fn=_fail_step, llm_fn=_llm, llm_fallback=False)
result = ex.execute(_wf())
assert result.success is False
assert "disabled" in (result.error or "")
def test_run_with_fallback_no_workflow_calls_llm(self):
ex = Executor(step_fn=_ok_step, llm_fn=_llm)
result = ex.run_with_fallback(workflow=None)
assert result.success is True
assert result.used_deterministic is False
assert result.llm_output == "llm-output"
def test_run_with_fallback_uses_workflow_when_given(self):
ex = Executor(step_fn=_ok_step, llm_fn=_llm)
result = ex.run_with_fallback(workflow=_wf())
assert result.used_deterministic is True
def test_llm_fn_raises_returns_failure(self):
def _bad_llm():
raise ValueError("no model")
ex = Executor(step_fn=_fail_step, llm_fn=_bad_llm)
result = ex.execute(_wf())
assert result.success is False
assert "no model" in (result.error or "")
def test_workflow_id_preserved_in_result(self):
ex = Executor(step_fn=_ok_step, llm_fn=_llm)
result = ex.execute(_wf())
assert result.workflow_id == "osprey:ivr_navigate:abc"
def test_empty_workflow_succeeds_immediately(self):
ex = Executor(step_fn=_ok_step, llm_fn=_llm)
result = ex.execute(_wf(steps=[]))
assert result.success is True
assert result.step_results == []

View file

@ -0,0 +1,102 @@
"""Tests for pipeline models and hash_input utility."""
import pytest
from circuitforge_core.pipeline.models import (
CrystallizedWorkflow,
PipelineRun,
Step,
hash_input,
)
class TestHashInput:
def test_stable_across_calls(self):
feat = {"agency": "FTB", "menu_depth": 2}
assert hash_input(feat) == hash_input(feat)
def test_key_order_irrelevant(self):
a = hash_input({"b": 2, "a": 1})
b = hash_input({"a": 1, "b": 2})
assert a == b
def test_different_values_differ(self):
assert hash_input({"a": 1}) != hash_input({"a": 2})
def test_returns_hex_string(self):
h = hash_input({"x": "y"})
assert isinstance(h, str)
assert len(h) == 64 # SHA-256 hex
class TestStep:
def test_roundtrip(self):
s = Step(action="dtmf", params={"digits": "1"}, description="Press 1")
assert Step.from_dict(s.to_dict()) == s
def test_description_optional(self):
s = Step.from_dict({"action": "dtmf", "params": {}})
assert s.description == ""
class TestPipelineRun:
def _run(self, **kwargs) -> PipelineRun:
defaults = dict(
run_id="r1",
product="osprey",
task_type="ivr_navigate",
input_hash="abc123",
steps=[Step("dtmf", {"digits": "1"})],
approved=True,
review_duration_ms=8000,
output_modified=False,
)
defaults.update(kwargs)
return PipelineRun(**defaults)
def test_roundtrip(self):
run = self._run()
assert PipelineRun.from_dict(run.to_dict()).run_id == "r1"
def test_output_modified_false_default(self):
d = self._run().to_dict()
d.pop("output_modified", None)
run = PipelineRun.from_dict(d)
assert run.output_modified is False
def test_timestamp_auto_set(self):
run = self._run()
assert run.timestamp # non-empty
class TestCrystallizedWorkflow:
def _wf(self) -> CrystallizedWorkflow:
return CrystallizedWorkflow(
workflow_id="osprey:ivr_navigate:abc123abc123",
product="osprey",
task_type="ivr_navigate",
input_hash="abc123",
steps=[Step("dtmf", {"digits": "1"})],
crystallized_at="2026-04-08T00:00:00+00:00",
run_ids=["r1", "r2", "r3"],
approval_count=3,
avg_review_duration_ms=9000,
all_output_unmodified=True,
)
def test_roundtrip(self):
wf = self._wf()
restored = CrystallizedWorkflow.from_dict(wf.to_dict())
assert restored.workflow_id == wf.workflow_id
assert restored.avg_review_duration_ms == 9000
assert restored.all_output_unmodified is True
def test_active_default_true(self):
d = self._wf().to_dict()
d.pop("active", None)
wf = CrystallizedWorkflow.from_dict(d)
assert wf.active is True
def test_version_default_one(self):
d = self._wf().to_dict()
d.pop("version", None)
wf = CrystallizedWorkflow.from_dict(d)
assert wf.version == 1

View file

@ -0,0 +1,198 @@
"""Tests for pipeline.MultimodalPipeline — mock vision and text backends."""
import pytest
from unittest.mock import MagicMock, patch
from circuitforge_core.documents.models import Element, StructuredDocument
from circuitforge_core.pipeline.multimodal import (
MultimodalConfig,
MultimodalPipeline,
PageResult,
_default_prompt,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _doc(text="extracted text", page=0) -> StructuredDocument:
return StructuredDocument(
elements=[Element(type="paragraph", text=text)],
raw_text=text,
)
def _vision_ok(text="extracted text"):
"""Mock DocuvisionClient.extract that returns a StructuredDocument."""
mock = MagicMock()
mock.extract.return_value = _doc(text)
return mock
def _vision_fail(exc=None):
mock = MagicMock()
mock.extract.side_effect = exc or ConnectionError("service down")
return mock
def _generate_fn(prompt, max_tokens=512, temperature=0.7):
return f"generated: {prompt[:20]}"
def _stream_fn(prompt, max_tokens=512, temperature=0.7):
yield "tok1"
yield "tok2"
yield "tok3"
def _pipe(vision_mock=None, generate_fn=None, stream_fn=None,
vram_serialise=False, swap_fn=None, prompt_fn=None) -> MultimodalPipeline:
cfg = MultimodalConfig(vram_serialise=vram_serialise)
if prompt_fn:
cfg.prompt_fn = prompt_fn
pipe = MultimodalPipeline(cfg, generate_fn=generate_fn or _generate_fn,
stream_fn=stream_fn, swap_fn=swap_fn)
if vision_mock is not None:
pipe._vision = vision_mock
return pipe
# ── DefaultPrompt ─────────────────────────────────────────────────────────────
class TestDefaultPrompt:
def test_page_zero_no_header(self):
doc = _doc("hello")
assert _default_prompt(0, doc) == "hello"
def test_page_one_has_header(self):
doc = _doc("content")
prompt = _default_prompt(1, doc)
assert "[Page 2]" in prompt
assert "content" in prompt
# ── run() ─────────────────────────────────────────────────────────────────────
class TestMultimodalPipelineRun:
def test_single_page_success(self):
pipe = _pipe(vision_mock=_vision_ok("resume text"))
results = list(pipe.run([b"page0_bytes"]))
assert len(results) == 1
assert results[0].page_idx == 0
assert results[0].error is None
assert "generated" in results[0].generated
def test_multiple_pages_all_yielded(self):
pipe = _pipe(vision_mock=_vision_ok())
results = list(pipe.run([b"p0", b"p1", b"p2"]))
assert len(results) == 3
assert [r.page_idx for r in results] == [0, 1, 2]
def test_vision_failure_yields_error_page(self):
pipe = _pipe(vision_mock=_vision_fail())
results = list(pipe.run([b"p0"]))
assert results[0].error is not None
assert results[0].doc is None
assert results[0].generated == ""
def test_partial_failure_does_not_stop_pipeline(self):
"""One bad page should not prevent subsequent pages from processing."""
mock = MagicMock()
mock.extract.side_effect = [
ConnectionError("fail"),
_doc("good text"),
]
pipe = _pipe(vision_mock=mock)
results = list(pipe.run([b"p0", b"p1"]))
assert results[0].error is not None
assert results[1].error is None
def test_generation_failure_yields_error_page(self):
def _bad_gen(prompt, **kw):
raise RuntimeError("model OOM")
pipe = _pipe(vision_mock=_vision_ok(), generate_fn=_bad_gen)
results = list(pipe.run([b"p0"]))
assert results[0].error is not None
assert "OOM" in results[0].error
def test_doc_attached_to_result(self):
pipe = _pipe(vision_mock=_vision_ok("some text"))
results = list(pipe.run([b"p0"]))
assert results[0].doc is not None
assert results[0].doc.raw_text == "some text"
def test_empty_pages_yields_nothing(self):
pipe = _pipe(vision_mock=_vision_ok())
assert list(pipe.run([])) == []
def test_custom_prompt_fn_called(self):
calls = []
def _prompt_fn(page_idx, doc):
calls.append((page_idx, doc.raw_text))
return f"custom:{doc.raw_text}"
pipe = _pipe(vision_mock=_vision_ok("txt"), prompt_fn=_prompt_fn)
list(pipe.run([b"p0"]))
assert calls == [(0, "txt")]
def test_vram_serialise_calls_swap_fn(self):
swaps = []
pipe = _pipe(vision_mock=_vision_ok(), vram_serialise=True,
swap_fn=lambda: swaps.append(1))
list(pipe.run([b"p0", b"p1"]))
assert len(swaps) == 2 # once per page
def test_vram_serialise_false_no_swap_called(self):
swaps = []
pipe = _pipe(vision_mock=_vision_ok(), vram_serialise=False,
swap_fn=lambda: swaps.append(1))
list(pipe.run([b"p0"]))
assert swaps == []
def test_swap_fn_none_does_not_raise(self):
pipe = _pipe(vision_mock=_vision_ok(), vram_serialise=True, swap_fn=None)
results = list(pipe.run([b"p0"]))
assert results[0].error is None
# ── stream() ──────────────────────────────────────────────────────────────────
class TestMultimodalPipelineStream:
def test_yields_page_idx_token_tuples(self):
pipe = _pipe(vision_mock=_vision_ok(), stream_fn=_stream_fn)
tokens = list(pipe.stream([b"p0"]))
assert all(isinstance(t, tuple) and len(t) == 2 for t in tokens)
assert tokens[0][0] == 0 # page_idx
assert tokens[0][1] == "tok1"
def test_multiple_pages_interleaved_by_page(self):
pipe = _pipe(vision_mock=_vision_ok(), stream_fn=_stream_fn)
tokens = list(pipe.stream([b"p0", b"p1"]))
page_indices = [t[0] for t in tokens]
# All page-0 tokens come before page-1 tokens (pages are sequential)
assert page_indices == sorted(page_indices)
def test_vision_failure_yields_error_token(self):
pipe = _pipe(vision_mock=_vision_fail(), stream_fn=_stream_fn)
tokens = list(pipe.stream([b"p0"]))
assert len(tokens) == 1
assert "extraction error" in tokens[0][1]
def test_stream_fn_error_yields_error_token(self):
def _bad_stream(prompt, **kw):
raise RuntimeError("GPU gone")
yield # make it a generator
pipe = _pipe(vision_mock=_vision_ok(), stream_fn=_bad_stream)
tokens = list(pipe.stream([b"p0"]))
assert any("generation error" in t[1] for t in tokens)
def test_empty_pages_yields_nothing(self):
pipe = _pipe(vision_mock=_vision_ok(), stream_fn=_stream_fn)
assert list(pipe.stream([])) == []
# ── Import check ──────────────────────────────────────────────────────────────
def test_exported_from_pipeline_package():
from circuitforge_core.pipeline import MultimodalPipeline, MultimodalConfig, PageResult
assert MultimodalPipeline is not None

View file

@ -0,0 +1,66 @@
"""Tests for pipeline.Recorder."""
import pytest
from circuitforge_core.pipeline.models import PipelineRun, Step
from circuitforge_core.pipeline.recorder import Recorder
def _run(run_id="r1", approved=True, input_hash="abc", review_ms=8000,
modified=False, ts="2026-04-08T01:00:00+00:00") -> PipelineRun:
return PipelineRun(
run_id=run_id,
product="osprey",
task_type="ivr_navigate",
input_hash=input_hash,
steps=[Step("dtmf", {"digits": "1"})],
approved=approved,
review_duration_ms=review_ms,
output_modified=modified,
timestamp=ts,
)
class TestRecorder:
def test_record_creates_file(self, tmp_path):
rec = Recorder(root=tmp_path)
path = rec.record(_run())
assert path.exists()
def test_load_runs_empty_when_no_directory(self, tmp_path):
rec = Recorder(root=tmp_path)
assert rec.load_runs("osprey", "ivr_navigate") == []
def test_load_runs_returns_recorded(self, tmp_path):
rec = Recorder(root=tmp_path)
rec.record(_run("r1"))
rec.record(_run("r2"))
runs = rec.load_runs("osprey", "ivr_navigate")
assert len(runs) == 2
def test_load_runs_newest_first(self, tmp_path):
rec = Recorder(root=tmp_path)
rec.record(_run("r_old", ts="2026-01-01T00:00:00+00:00"))
rec.record(_run("r_new", ts="2026-04-08T00:00:00+00:00"))
runs = rec.load_runs("osprey", "ivr_navigate")
assert runs[0].run_id == "r_new"
def test_load_approved_filters(self, tmp_path):
rec = Recorder(root=tmp_path)
rec.record(_run("r1", approved=True))
rec.record(_run("r2", approved=False))
approved = rec.load_approved("osprey", "ivr_navigate", "abc")
assert all(r.approved for r in approved)
assert len(approved) == 1
def test_load_approved_filters_by_hash(self, tmp_path):
rec = Recorder(root=tmp_path)
rec.record(_run("r1", input_hash="hash_a"))
rec.record(_run("r2", input_hash="hash_b"))
result = rec.load_approved("osprey", "ivr_navigate", "hash_a")
assert len(result) == 1
assert result[0].run_id == "r1"
def test_record_is_append_only(self, tmp_path):
rec = Recorder(root=tmp_path)
for i in range(5):
rec.record(_run(f"r{i}"))
assert len(rec.load_runs("osprey", "ivr_navigate")) == 5

View file

@ -0,0 +1,104 @@
"""Tests for pipeline.Registry — workflow lookup."""
import pytest
from circuitforge_core.pipeline.models import CrystallizedWorkflow, Step
from circuitforge_core.pipeline.registry import Registry
def _wf(input_hash="abc", active=True, wf_id=None) -> CrystallizedWorkflow:
wid = wf_id or f"osprey:ivr_navigate:{input_hash[:12]}"
return CrystallizedWorkflow(
workflow_id=wid,
product="osprey",
task_type="ivr_navigate",
input_hash=input_hash,
steps=[Step("dtmf", {"digits": "1"})],
crystallized_at="2026-04-08T00:00:00+00:00",
run_ids=["r1", "r2", "r3"],
approval_count=3,
avg_review_duration_ms=9000,
all_output_unmodified=True,
active=active,
)
class TestRegistry:
def test_register_creates_file(self, tmp_path):
reg = Registry(root=tmp_path)
path = reg.register(_wf())
assert path.exists()
def test_load_all_empty_when_no_directory(self, tmp_path):
reg = Registry(root=tmp_path)
assert reg.load_all("osprey", "ivr_navigate") == []
def test_load_all_returns_registered(self, tmp_path):
reg = Registry(root=tmp_path)
reg.register(_wf("hash_a", wf_id="osprey:ivr_navigate:hash_a"))
reg.register(_wf("hash_b", wf_id="osprey:ivr_navigate:hash_b"))
assert len(reg.load_all("osprey", "ivr_navigate")) == 2
def test_match_exact_hit(self, tmp_path):
reg = Registry(root=tmp_path)
reg.register(_wf("abc123"))
wf = reg.match("osprey", "ivr_navigate", "abc123")
assert wf is not None
assert wf.input_hash == "abc123"
def test_match_returns_none_on_miss(self, tmp_path):
reg = Registry(root=tmp_path)
reg.register(_wf("abc123"))
assert reg.match("osprey", "ivr_navigate", "different") is None
def test_match_ignores_inactive(self, tmp_path):
reg = Registry(root=tmp_path)
reg.register(_wf("abc123", active=False))
assert reg.match("osprey", "ivr_navigate", "abc123") is None
def test_deactivate_sets_active_false(self, tmp_path):
reg = Registry(root=tmp_path)
wf = _wf("abc123")
reg.register(wf)
reg.deactivate(wf.workflow_id, "osprey", "ivr_navigate")
assert reg.match("osprey", "ivr_navigate", "abc123") is None
def test_deactivate_returns_false_when_not_found(self, tmp_path):
reg = Registry(root=tmp_path)
assert reg.deactivate("nonexistent", "osprey", "ivr_navigate") is False
def test_find_falls_through_to_fuzzy(self, tmp_path):
reg = Registry(root=tmp_path,
similarity_fn=lambda a, b: 1.0 if a == b else 0.5,
fuzzy_threshold=0.4)
reg.register(_wf("hash_stored"))
# No exact match for "hash_query" but similarity returns 0.5 >= 0.4
wf = reg.find("osprey", "ivr_navigate", "hash_query")
assert wf is not None
def test_fuzzy_match_raises_without_fn(self, tmp_path):
reg = Registry(root=tmp_path)
with pytest.raises(RuntimeError, match="similarity_fn"):
reg.fuzzy_match("osprey", "ivr_navigate", "any")
def test_fuzzy_match_below_threshold_returns_none(self, tmp_path):
reg = Registry(root=tmp_path,
similarity_fn=lambda a, b: 0.1,
fuzzy_threshold=0.8)
reg.register(_wf("hash_stored"))
assert reg.fuzzy_match("osprey", "ivr_navigate", "hash_query") is None
def test_find_exact_takes_priority(self, tmp_path):
reg = Registry(root=tmp_path,
similarity_fn=lambda a, b: 0.9,
fuzzy_threshold=0.8)
reg.register(_wf("exact_hash"))
wf = reg.find("osprey", "ivr_navigate", "exact_hash")
# Should be the exact-match workflow
assert wf.input_hash == "exact_hash"
def test_workflow_id_colon_safe_in_filename(self, tmp_path):
"""Colons in workflow_id must not break file creation on any OS."""
reg = Registry(root=tmp_path)
wf = _wf("abc", wf_id="osprey:ivr_navigate:abc123abc123")
path = reg.register(wf)
assert path.exists()
assert ":" not in path.name

View file

@ -120,6 +120,93 @@ class TestLocalFileStore:
from circuitforge_core.preferences import get_user_preference, set_user_preference
from circuitforge_core.preferences.accessibility import (
is_reduced_motion_preferred,
is_high_contrast,
get_font_size,
is_screen_reader_mode,
set_reduced_motion,
PREF_REDUCED_MOTION,
PREF_HIGH_CONTRAST,
PREF_FONT_SIZE,
PREF_SCREEN_READER,
)
class TestAccessibilityPreferences:
def _store(self, tmp_path) -> LocalFileStore:
return LocalFileStore(prefs_path=tmp_path / "preferences.yaml")
def test_reduced_motion_default_false(self, tmp_path):
store = self._store(tmp_path)
assert is_reduced_motion_preferred(store=store) is False
def test_set_reduced_motion_persists(self, tmp_path):
store = self._store(tmp_path)
set_reduced_motion(True, store=store)
assert is_reduced_motion_preferred(store=store) is True
def test_reduced_motion_false_roundtrip(self, tmp_path):
store = self._store(tmp_path)
set_reduced_motion(True, store=store)
set_reduced_motion(False, store=store)
assert is_reduced_motion_preferred(store=store) is False
def test_high_contrast_default_false(self, tmp_path):
store = self._store(tmp_path)
assert is_high_contrast(store=store) is False
def test_high_contrast_set_and_read(self, tmp_path):
store = self._store(tmp_path)
store.set(user_id=None, path=PREF_HIGH_CONTRAST, value=True)
assert is_high_contrast(store=store) is True
def test_font_size_default(self, tmp_path):
store = self._store(tmp_path)
assert get_font_size(store=store) == "default"
def test_font_size_large(self, tmp_path):
store = self._store(tmp_path)
store.set(user_id=None, path=PREF_FONT_SIZE, value="large")
assert get_font_size(store=store) == "large"
def test_font_size_xlarge(self, tmp_path):
store = self._store(tmp_path)
store.set(user_id=None, path=PREF_FONT_SIZE, value="xlarge")
assert get_font_size(store=store) == "xlarge"
def test_font_size_invalid_falls_back_to_default(self, tmp_path):
store = self._store(tmp_path)
store.set(user_id=None, path=PREF_FONT_SIZE, value="gigantic")
assert get_font_size(store=store) == "default"
def test_screen_reader_mode_default_false(self, tmp_path):
store = self._store(tmp_path)
assert is_screen_reader_mode(store=store) is False
def test_screen_reader_mode_set(self, tmp_path):
store = self._store(tmp_path)
store.set(user_id=None, path=PREF_SCREEN_READER, value=True)
assert is_screen_reader_mode(store=store) is True
def test_preferences_are_independent(self, tmp_path):
"""Setting one a11y pref doesn't affect others."""
store = self._store(tmp_path)
set_reduced_motion(True, store=store)
assert is_high_contrast(store=store) is False
assert get_font_size(store=store) == "default"
assert is_screen_reader_mode(store=store) is False
def test_user_id_threaded_through(self, tmp_path):
"""user_id param is accepted (LocalFileStore ignores it, but must not error)."""
store = self._store(tmp_path)
set_reduced_motion(True, user_id="u999", store=store)
assert is_reduced_motion_preferred(user_id="u999", store=store) is True
def test_accessibility_exported_from_package(self):
from circuitforge_core.preferences import accessibility
assert hasattr(accessibility, "is_reduced_motion_preferred")
assert hasattr(accessibility, "PREF_REDUCED_MOTION")
class TestPreferenceHelpers:

View file

View file

@ -0,0 +1,190 @@
"""Tests for cf-text backend selection, mock backend, and public API."""
import os
import pytest
from circuitforge_core.text.backends.base import (
ChatMessage,
GenerateResult,
TextBackend,
_select_backend,
make_text_backend,
)
from circuitforge_core.text.backends.mock import MockTextBackend
from circuitforge_core.text import generate, chat, reset_backend, make_backend
# ── _select_backend ───────────────────────────────────────────────────────────
class TestSelectBackend:
def test_explicit_llamacpp(self):
assert _select_backend("model.gguf", "llamacpp") == "llamacpp"
def test_explicit_transformers(self):
assert _select_backend("model.gguf", "transformers") == "transformers"
def test_explicit_invalid_raises(self):
with pytest.raises(ValueError, match="not valid"):
_select_backend("model.gguf", "ctransformers")
def test_env_override_llamacpp(self, monkeypatch):
monkeypatch.setenv("CF_TEXT_BACKEND", "llamacpp")
assert _select_backend("Qwen/Qwen2.5-3B", None) == "llamacpp"
def test_env_override_transformers(self, monkeypatch):
monkeypatch.setenv("CF_TEXT_BACKEND", "transformers")
assert _select_backend("model.gguf", None) == "transformers"
def test_env_override_invalid_raises(self, monkeypatch):
monkeypatch.setenv("CF_TEXT_BACKEND", "ctransformers")
with pytest.raises(ValueError):
_select_backend("model.gguf", None)
def test_caller_beats_env(self, monkeypatch):
monkeypatch.setenv("CF_TEXT_BACKEND", "transformers")
assert _select_backend("model.gguf", "llamacpp") == "llamacpp"
def test_gguf_extension_selects_llamacpp(self, monkeypatch):
monkeypatch.delenv("CF_TEXT_BACKEND", raising=False)
assert _select_backend("/models/qwen2.5-3b-q4.gguf", None) == "llamacpp"
def test_gguf_uppercase_extension(self, monkeypatch):
monkeypatch.delenv("CF_TEXT_BACKEND", raising=False)
assert _select_backend("/models/model.GGUF", None) == "llamacpp"
def test_hf_repo_id_selects_transformers(self, monkeypatch):
monkeypatch.delenv("CF_TEXT_BACKEND", raising=False)
assert _select_backend("Qwen/Qwen2.5-3B-Instruct", None) == "transformers"
def test_safetensors_dir_selects_transformers(self, monkeypatch):
monkeypatch.delenv("CF_TEXT_BACKEND", raising=False)
assert _select_backend("/models/qwen2.5-3b/", None) == "transformers"
# ── ChatMessage ───────────────────────────────────────────────────────────────
class TestChatMessage:
def test_valid_roles(self):
for role in ("system", "user", "assistant"):
msg = ChatMessage(role, "hello")
assert msg.role == role
def test_invalid_role_raises(self):
with pytest.raises(ValueError, match="Invalid role"):
ChatMessage("bot", "hello")
def test_to_dict(self):
msg = ChatMessage("user", "hello")
assert msg.to_dict() == {"role": "user", "content": "hello"}
# ── MockTextBackend ───────────────────────────────────────────────────────────
class TestMockTextBackend:
def test_generate_returns_result(self):
backend = MockTextBackend()
result = backend.generate("write something")
assert isinstance(result, GenerateResult)
assert len(result.text) > 0
def test_vram_mb_is_zero(self):
assert MockTextBackend().vram_mb == 0
def test_model_name(self):
assert MockTextBackend(model_name="test-model").model_name == "test-model"
def test_generate_stream_yields_tokens(self):
backend = MockTextBackend()
tokens = list(backend.generate_stream("hello"))
assert len(tokens) > 0
assert "".join(tokens).strip() == backend.generate("hello").text.strip()
@pytest.mark.asyncio
async def test_generate_async(self):
backend = MockTextBackend()
result = await backend.generate_async("hello")
assert isinstance(result, GenerateResult)
@pytest.mark.asyncio
async def test_generate_stream_async(self):
backend = MockTextBackend()
tokens = []
async for token in backend.generate_stream_async("hello"):
tokens.append(token)
assert len(tokens) > 0
def test_chat(self):
backend = MockTextBackend()
messages = [ChatMessage("user", "hello")]
result = backend.chat(messages)
assert isinstance(result, GenerateResult)
def test_isinstance_protocol(self):
assert isinstance(MockTextBackend(), TextBackend)
# ── make_text_backend ─────────────────────────────────────────────────────────
class TestMakeTextBackend:
def test_mock_flag(self):
backend = make_text_backend("any-model", mock=True)
assert isinstance(backend, MockTextBackend)
def test_mock_env(self, monkeypatch):
monkeypatch.setenv("CF_TEXT_MOCK", "1")
backend = make_text_backend("any-model")
assert isinstance(backend, MockTextBackend)
def test_real_gguf_raises_import_error(self, monkeypatch):
monkeypatch.delenv("CF_TEXT_MOCK", raising=False)
monkeypatch.delenv("CF_TEXT_BACKEND", raising=False)
with pytest.raises((ImportError, FileNotFoundError)):
make_text_backend("/nonexistent/model.gguf", mock=False)
def test_real_transformers_nonexistent_model_raises(self, monkeypatch):
monkeypatch.delenv("CF_TEXT_MOCK", raising=False)
monkeypatch.setenv("CF_TEXT_BACKEND", "transformers")
# Use a clearly nonexistent local path — avoids a network hit and HF download
with pytest.raises(Exception):
make_text_backend("/nonexistent/local/model-dir", mock=False)
# ── Public API (singleton) ────────────────────────────────────────────────────
class TestPublicAPI:
def setup_method(self):
reset_backend()
def teardown_method(self):
reset_backend()
def test_generate_mock(self, monkeypatch):
monkeypatch.setenv("CF_TEXT_MOCK", "1")
result = generate("write something")
assert isinstance(result, GenerateResult)
def test_generate_stream_mock(self, monkeypatch):
monkeypatch.setenv("CF_TEXT_MOCK", "1")
tokens = list(generate("hello", stream=True))
assert len(tokens) > 0
def test_chat_mock(self, monkeypatch):
monkeypatch.setenv("CF_TEXT_MOCK", "1")
result = chat([ChatMessage("user", "hello")])
assert isinstance(result, GenerateResult)
def test_chat_stream_raises(self, monkeypatch):
monkeypatch.setenv("CF_TEXT_MOCK", "1")
with pytest.raises(NotImplementedError):
chat([ChatMessage("user", "hello")], stream=True)
def test_make_backend_returns_mock(self):
backend = make_backend("any", mock=True)
assert isinstance(backend, MockTextBackend)
def test_singleton_reused(self, monkeypatch):
monkeypatch.setenv("CF_TEXT_MOCK", "1")
r1 = generate("a")
r2 = generate("b")
# Both calls should succeed (singleton loaded once)
assert isinstance(r1, GenerateResult)
assert isinstance(r2, GenerateResult)