circuitforge-core/docs/modules/pipeline.md
pyr0ball 383897f990
Some checks are pending
CI / test (push) Waiting to run
Mirror / mirror (push) Waiting to run
Release — PyPI / release (push) Waiting to run
feat: platforms module + docs + scripts
- platforms/: eBay platform adapter (snipe integration layer)
- docs/: developer guide, module reference, getting-started docs
- scripts/: utility scripts for development and deployment
2026-04-24 15:23:16 -07:00

2.1 KiB

pipeline

Staging queue base class. Stub — partially implemented.

from circuitforge_core.pipeline import StagingDB  # base class

Purpose

StagingDB is the base class for the staging layer that sits between discovery/ingestion and the main product workflow. Products subclass it to add their concrete schema.

The pattern:

Source (scraper / scan / upload)
    → StagingDB (raw, unreviewed records)
    → Human review / approval
    → Main product DB (approved records)

This is explicit in Peregrine (jobs go from pendingapprovedapplied) and analogous in Kiwi (receipts go from uploadedparsedpantry).

Crystallization engine

The pipeline module also contains the crystallization engine: a system for promoting AI-generated drafts through a series of structured human-approval checkpoints before the output "crystallizes" into a permanent record.

Each stage in the pipeline has:

  • An AI step that produces a draft
  • A human approval gate that must be explicitly cleared
  • A rollback path back to the previous stage if rejected

This is the architectural embodiment of the "LLMs as drafts, never decisions" principle.

Current status

StagingDB base class exists and is used by Peregrine's job pipeline. The crystallization engine design is documented in circuitforge-plans/shared/superpowers/specs/ and is being extracted into this module as it stabilizes across products.

StagingDB base class

class StagingDB:
    def __init__(self, db: Connection):
        self.db = db

    def stage(self, record: dict) -> str:
        """Insert a record into staging. Returns record ID."""
        raise NotImplementedError

    def approve(self, record_id: str, reviewer_id: str | None = None):
        """Promote a record past the approval gate."""
        raise NotImplementedError

    def reject(self, record_id: str, reason: str | None = None):
        """Mark a record as rejected."""
        raise NotImplementedError

    def pending(self) -> list[dict]:
        """Return all records awaiting review."""
        raise NotImplementedError