From cca4c54a62a83e990fef37b7ed87855419a29d56 Mon Sep 17 00:00:00 2001 From: pyr0ball Date: Fri, 5 Jun 2026 10:19:11 -0700 Subject: [PATCH] feat(memory): persistent knowledge graph via mnemo sidecar Add circuitforge_core.memory module: MemoryClient wraps the mnemo HTTP sidecar for entity / relation storage. All operations no-op gracefully when sidecar is unavailable so products can import unconditionally. Adds optional [memory] extras entry in pyproject.toml (mnemo-sdk>=0.1.0). --- circuitforge_core/memory/__init__.py | 54 +++++ circuitforge_core/memory/client.py | 317 +++++++++++++++++++++++++++ circuitforge_core/memory/models.py | 73 ++++++ docs/modules/memory.md | 151 +++++++++++++ pyproject.toml | 3 + tests/test_memory.py | 281 ++++++++++++++++++++++++ 6 files changed, 879 insertions(+) create mode 100644 circuitforge_core/memory/__init__.py create mode 100644 circuitforge_core/memory/client.py create mode 100644 circuitforge_core/memory/models.py create mode 100644 docs/modules/memory.md create mode 100644 tests/test_memory.py diff --git a/circuitforge_core/memory/__init__.py b/circuitforge_core/memory/__init__.py new file mode 100644 index 0000000..689b816 --- /dev/null +++ b/circuitforge_core/memory/__init__.py @@ -0,0 +1,54 @@ +"""circuitforge_core.memory — persistent knowledge graph via mnemo sidecar. + +MIT licensed. + +Requires the mnemo sidecar to be running (https://github.com/zaydmulani09/mnemo). +If the sidecar is not available, all operations silently no-op so products +can call memory methods unconditionally. + +Quick start (in a FastAPI lifespan):: + + from circuitforge_core.memory import MemoryClient, MemoryConfig + + memory = MemoryClient(MemoryConfig.from_env()) + + @asynccontextmanager + async def lifespan(app): + await memory.connect() + yield + await memory.close() + + # In a route: + await memory.remember("User avoids shellfish", source="dietary-prefs") + context = await memory.recall("What are this user's food restrictions?") + +Docker Compose setup:: + + services: + mnemo: + image: ghcr.io/zaydmulani09/mnemo:latest + ports: ["8080:8080"] + environment: + MNEMO_LLM_PROVIDER: ollama + MNEMO_LLM_BASE_URL: http://ollama:11434/v1 + MNEMO_LLM_MODEL: llama3 + volumes: + - mnemo-data:/data + +Environment variables (for MemoryConfig.from_env()):: + + MNEMO_HOST — default: localhost + MNEMO_PORT — default: 8080 + MNEMO_TIMEOUT — default: 10.0 +""" + +from circuitforge_core.memory.client import MemoryClient, MemoryUnavailableError +from circuitforge_core.memory.models import MemoryConfig, MemoryEntity, MemoryStats + +__all__ = [ + "MemoryClient", + "MemoryConfig", + "MemoryEntity", + "MemoryStats", + "MemoryUnavailableError", +] diff --git a/circuitforge_core/memory/client.py b/circuitforge_core/memory/client.py new file mode 100644 index 0000000..91fb41b --- /dev/null +++ b/circuitforge_core/memory/client.py @@ -0,0 +1,317 @@ +"""MemoryClient — async wrapper around the mnemo persistent knowledge graph. + +mnemo is an optional sidecar (https://github.com/zaydmulani09/mnemo). +When the sidecar is not running, all operations silently no-op so products +can call memory methods unconditionally without try/except. + +MIT licensed. +""" +from __future__ import annotations + +import logging +import time +from typing import Any + +from circuitforge_core.memory.models import MemoryConfig, MemoryEntity, MemoryStats + +logger = logging.getLogger(__name__) + +# Backoff schedule: 5 * 2^(failure-1), capped at _MAX_BACKOFF seconds. +# failure 1 → 5s, 2 → 10s, 3 → 20s, 4 → 40s, 5+ → 60s +_MAX_FAILURES: int = 3 +_MAX_BACKOFF: float = 60.0 + + +class MemoryUnavailableError(RuntimeError): + """Raised only when strict=True and mnemo is not reachable.""" + + +class MemoryClient: + """Async interface to the mnemo knowledge graph sidecar. + + Resilience model: + - If the sidecar is unreachable at connect(), logs once and enters no-op mode. + - If a live call fails, the failure is counted. Each failure schedules an + exponentially increasing cooldown before the next reconnect attempt. + - After _MAX_FAILURES consecutive failures the client is marked unavailable; + all calls no-op until the cooldown elapses and a reconnect succeeds. + - Any successful call resets the failure counter. + + Usage (in a FastAPI lifespan):: + + from circuitforge_core.memory import MemoryClient, MemoryConfig + + memory = MemoryClient(MemoryConfig.from_env()) + + @asynccontextmanager + async def lifespan(app): + await memory.connect() + yield + await memory.close() + + Then in handlers:: + + await memory.remember("User prefers dark mode", source="settings") + context = await memory.recall("What are the user's UI preferences?") + """ + + def __init__(self, config: MemoryConfig | None = None, *, strict: bool = False) -> None: + """ + Args: + config: connection settings; defaults to MemoryConfig.from_env() + strict: if True, MemoryUnavailableError is raised on connect failure + or after _MAX_FAILURES consecutive call failures + """ + self._config = config or MemoryConfig.from_env() + self._strict = strict + self._available = False + self._client: Any = None # mnemo AsyncMnemoClient, set in connect() + self._failure_count: int = 0 + self._retry_at: float | None = None # monotonic timestamp; None = no retry pending + + @property + def available(self) -> bool: + """True if the mnemo sidecar was reachable at last health check.""" + return self._available + + @property + def failure_count(self) -> int: + """Consecutive call failures since the last success.""" + return self._failure_count + + # ── Lifecycle ───────────────────────────────────────────────────────────── + + async def connect(self) -> None: + """Attempt to connect to the mnemo sidecar and run a health check. + + Safe to call multiple times (used internally for reconnect). If the + sidecar is not reachable, logs a warning and enters no-op mode. + Does NOT raise unless strict=True. + """ + try: + from mnemo import AsyncMnemoClient + except ImportError: + logger.debug( + "mnemo-sdk not installed — memory module disabled. " + "Install with: pip install circuitforge-core[memory]" + ) + self._available = False + return + + self._client = AsyncMnemoClient( + base_url=self._config.base_url, + timeout=self._config.timeout, + ) + try: + health = await self._client.health() + if health.status == "ok": + self._available = True + self._on_call_success() + logger.info( + "mnemo memory sidecar connected at %s (LLM: %s/%s)", + self._config.base_url, + health.provider_type, + health.provider_model, + ) + else: + self._handle_unavailable("connect", reason=f"health status={health.status!r}") + except Exception as exc: + self._handle_unavailable("connect", reason=str(exc)) + + async def close(self) -> None: + """Close the underlying HTTP client.""" + if self._client is not None: + try: + await self._client.__aexit__(None, None, None) + except Exception: + pass + self._client = None + self._available = False + self._retry_at = None + + # ── Core API ────────────────────────────────────────────────────────────── + + async def remember( + self, + text: str, + *, + source: str = "cf-core", + session_id: str | None = None, + ) -> bool: + """Store a text fragment in the knowledge graph. + + mnemo extracts named entities and relationships from the text and + updates its graph. Large texts should be pre-chunked by the caller + (mnemo stores each call as a single chunk with no sub-splitting). + + Args: + text: the text to store (conversation turn, fact, note, etc.) + source: label for the origin (e.g. "chat", "settings", "search") + session_id: optional session grouping for multi-turn retrieval + + Returns: + True if stored, False if sidecar unavailable. + """ + if not await self._maybe_reconnect(): + return False + try: + await self._client.ingest(content=text, source=source, session_id=session_id) + self._on_call_success() + return True + except Exception as exc: + self._on_call_error("remember", exc) + return False + + async def recall( + self, + query: str, + *, + session_id: str | None = None, + ) -> str: + """Retrieve a formatted context block relevant to query. + + Returns a prompt-ready string (or empty string if unavailable). + Inject the result directly into a system prompt:: + + context = await memory.recall("user dietary restrictions") + system = f"You are a helpful assistant.\\n\\n{context}" + + Args: + query: natural language question or topic to retrieve context for + session_id: restrict retrieval to a specific session (optional) + + Returns: + Formatted context string, or "" if sidecar unavailable. + """ + if not await self._maybe_reconnect(): + return "" + try: + result = await self._client.get_context(text=query, session_id=session_id) + self._failure_count = 0 + return result + except Exception as exc: + self._on_call_error("recall", exc) + return "" + + async def entities(self, *, limit: int = 50) -> list[MemoryEntity]: + """Return the most recent named entities in the knowledge graph. + + Args: + limit: max entities to return (default 50) + + Returns: + List of MemoryEntity objects, or [] if unavailable. + """ + if not await self._maybe_reconnect(): + return [] + try: + raw = await self._client.list_entities(limit=limit) + self._on_call_success() + return [MemoryEntity.from_mnemo(e) for e in raw] + except Exception as exc: + self._on_call_error("entities", exc) + return [] + + async def stats(self) -> MemoryStats | None: + """Return knowledge graph statistics, or None if unavailable.""" + if not await self._maybe_reconnect(): + return None + try: + s = await self._client.stats() + self._on_call_success() + return MemoryStats( + entity_count=s.entity_count, + chunk_count=s.chunk_count, + node_count=s.node_count, + edge_count=s.edge_count, + uptime_seconds=s.uptime_seconds, + available=True, + ) + except Exception as exc: + self._on_call_error("stats", exc) + return None + + async def wipe(self) -> bool: + """Delete all stored memory. Irreversible. + + Returns True on success, False if unavailable or failed. + """ + if not await self._maybe_reconnect(): + return False + try: + await self._client.wipe() + self._on_call_success() + logger.warning("mnemo memory wiped — all entities and chunks deleted") + return True + except Exception as exc: + self._on_call_error("wipe", exc) + return False + + # ── Internal ────────────────────────────────────────────────────────────── + + async def _maybe_reconnect(self) -> bool: + """Return True if the client is available (or just became available). + + Called at the top of every public method. If the client is unavailable + but the retry cooldown has elapsed, silently attempts reconnect before + answering. No-ops immediately if still within the cooldown window. + """ + if self._available: + return True + if self._retry_at is not None and time.monotonic() >= self._retry_at: + logger.info( + "mnemo: cooldown elapsed after %d failure(s) — attempting reconnect", + self._failure_count, + ) + self._retry_at = None + self._client = None + await self.connect() + return self._available + + def _on_call_success(self) -> None: + """Reset failure state after a successful call.""" + self._failure_count = 0 + self._retry_at = None + + def _handle_unavailable(self, operation: str, reason: str = "") -> None: + """Called when the sidecar is unreachable at connect() time.""" + self._available = False + msg = f"mnemo memory sidecar unavailable (operation={operation!r})" + if reason: + msg += f": {reason}" + if self._strict: + raise MemoryUnavailableError(msg) + logger.warning("%s — memory features disabled", msg) + + def _on_call_error(self, operation: str, exc: Exception) -> None: + """Count consecutive failures and schedule exponential backoff retry. + + Backoff: 5 * 2^(failure-1) seconds, capped at 60s. + failure 1 → 5s + failure 2 → 10s + failure 3 → 20s ← _MAX_FAILURES default; client disabled here + failure 4 → 40s + failure 5+ → 60s + + After _MAX_FAILURES, _available is set to False and all calls no-op + until _maybe_reconnect() fires after the cooldown elapses. + """ + self._failure_count += 1 + backoff = min(5.0 * (2 ** (self._failure_count - 1)), _MAX_BACKOFF) + self._retry_at = time.monotonic() + backoff + + if self._failure_count >= _MAX_FAILURES: + self._available = False + logger.warning( + "mnemo %r failed %d consecutive times (%s) — disabled, reconnect in %.0fs", + operation, self._failure_count, exc, backoff, + ) + if self._strict: + raise MemoryUnavailableError( + f"mnemo {operation!r} failed {self._failure_count} consecutive times: {exc}" + ) + else: + logger.warning( + "mnemo %r failed (%d/%d): %s — retry in %.0fs", + operation, self._failure_count, _MAX_FAILURES, exc, backoff, + ) diff --git a/circuitforge_core/memory/models.py b/circuitforge_core/memory/models.py new file mode 100644 index 0000000..5414809 --- /dev/null +++ b/circuitforge_core/memory/models.py @@ -0,0 +1,73 @@ +"""Data models for the cf-core memory module. + +MIT licensed. +""" +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from datetime import datetime + + +@dataclass(frozen=True) +class MemoryConfig: + """Connection config for a mnemo sidecar.""" + + host: str = "localhost" + port: int = 8080 + timeout: float = 10.0 + + @classmethod + def from_env(cls) -> MemoryConfig: + """Read config from environment variables. + + Variables: + MNEMO_HOST — default: localhost + MNEMO_PORT — default: 8080 + MNEMO_TIMEOUT — default: 10.0 + """ + return cls( + host=os.environ.get("MNEMO_HOST", "localhost"), + port=int(os.environ.get("MNEMO_PORT", "8080")), + timeout=float(os.environ.get("MNEMO_TIMEOUT", "10.0")), + ) + + @property + def base_url(self) -> str: + return f"http://{self.host}:{self.port}" + + +@dataclass(frozen=True) +class MemoryEntity: + """A named entity extracted and stored by the mnemo knowledge graph.""" + + entity_id: str + name: str + entity_type: str + aliases: list[str] = field(default_factory=list) + confidence: float = 1.0 + source_count: int = 1 + + @classmethod + def from_mnemo(cls, obj) -> MemoryEntity: + """Convert a mnemo-sdk Entity object to MemoryEntity.""" + return cls( + entity_id=str(obj.id), + name=obj.name, + entity_type=obj.entity_type, + aliases=list(obj.aliases or []), + confidence=float(obj.confidence or 1.0), + source_count=int(obj.source_count or 1), + ) + + +@dataclass(frozen=True) +class MemoryStats: + """Snapshot of the mnemo knowledge graph state.""" + + entity_count: int + chunk_count: int + node_count: int + edge_count: int + uptime_seconds: float + available: bool diff --git a/docs/modules/memory.md b/docs/modules/memory.md new file mode 100644 index 0000000..eb60a43 --- /dev/null +++ b/docs/modules/memory.md @@ -0,0 +1,151 @@ +# circuitforge_core.memory + +Persistent knowledge graph for CF products, backed by the +[mnemo](https://github.com/zaydmulani09/mnemo) sidecar. + +## What it does + +mnemo runs as a sidecar process alongside a product's FastAPI backend. It: + +- Extracts named entities and relationships from text you feed it +- Persists them in a local SQLite database with WAL mode +- Returns a formatted context block for prompt injection in under 5ms + +`cf_core.memory` wraps mnemo's Python SDK with CF-standard config, +graceful degradation (no-ops when the sidecar is absent), and +exponential backoff with automatic reconnect after transient failures. + +## Install + +```bash +pip install circuitforge-core[memory] +``` + +## Docker Compose setup + +Add the `mnemo` service to your product's `compose.yml` alongside `ollama`. +Peregrine is the reference implementation — copy the block from +`peregrine/compose.yml`: + +```yaml +services: + + mnemo: + image: ghcr.io/zaydmulani09/mnemo:latest + ports: + - "${MNEMO_PORT:-8080}:8080" + volumes: + - mnemo-data:/data + environment: + - MNEMO_DB_PATH=/data/mnemo.db + - MNEMO_LLM_PROVIDER=${MNEMO_LLM_PROVIDER:-ollama} + - MNEMO_LLM_BASE_URL=${MNEMO_LLM_BASE_URL:-http://ollama:11434/v1} + - MNEMO_LLM_API_KEY=${MNEMO_LLM_API_KEY:-ollama} + - MNEMO_LLM_MODEL=${MNEMO_LLM_MODEL:-llama3.2:3b} + depends_on: + - ollama + healthcheck: + test: ["CMD", "wget", "-q", "--spider", "http://localhost:8080/health"] + interval: 15s + timeout: 5s + retries: 3 + profiles: [memory] + restart: unless-stopped + +volumes: + mnemo-data: +``` + +Add these to the product's api service environment: + +```yaml + environment: + - MNEMO_HOST=${MNEMO_HOST:-mnemo} + - MNEMO_PORT=${MNEMO_PORT:-8080} +``` + +Launch with: + +```bash +docker compose --profile memory --profile cpu up -d +# or alongside a GPU profile: +docker compose --profile memory --profile single-gpu up -d +``` + +## Environment variables + +| Variable | Default | Description | +|---|---|---| +| `MNEMO_HOST` | `localhost` | Sidecar hostname (use `mnemo` in Docker) | +| `MNEMO_PORT` | `8080` | Sidecar port | +| `MNEMO_TIMEOUT` | `10.0` | HTTP timeout in seconds | + +The sidecar itself is configured via `MNEMO_LLM_*` env vars (see compose block above). + +## FastAPI integration + +```python +from contextlib import asynccontextmanager +from fastapi import FastAPI +from circuitforge_core.memory import MemoryClient, MemoryConfig + +memory = MemoryClient(MemoryConfig.from_env()) + +@asynccontextmanager +async def lifespan(app: FastAPI): + await memory.connect() # no-op + warning if sidecar absent + yield + await memory.close() + +app = FastAPI(lifespan=lifespan) +``` + +## API + +```python +# Store a text fragment (conversation turn, fact, user preference, etc.) +await memory.remember("User avoids shellfish and prefers dark mode", source="settings") + +# Retrieve a prompt-ready context block +context = await memory.recall("What are this user's dietary restrictions?") +system_prompt = f"You are a helpful assistant.\n\n{context}" + +# List extracted entities +entities = await memory.entities(limit=20) + +# Stats snapshot +stats = await memory.stats() # MemoryStats | None + +# Wipe everything (irreversible) +await memory.wipe() +``` + +All methods return empty values (`False`, `""`, `[]`, `None`) when the +sidecar is not available — no try/except needed in product code. + +## Resilience model + +| Event | Behaviour | +|---|---| +| Sidecar absent at startup | `connect()` logs once, enters no-op mode | +| First call failure | Warning logged, 5s backoff scheduled | +| Nth consecutive failure | Backoff doubles each time (5→10→20→40→60s cap) | +| After `_MAX_FAILURES` (3) | Client marked unavailable; all calls no-op | +| Cooldown elapses | Next call silently attempts reconnect | +| Successful call | Failure counter and retry timer reset | +| `strict=True` | `MemoryUnavailableError` raised instead of no-op | + +## Chunking note + +mnemo stores each `remember()` call as a single chunk — it does **not** +automatically split large texts. For best retrieval quality, chunk on the +caller side before ingesting: + +```python +# Good: one turn per ingest call +for turn in conversation_turns: + await memory.remember(turn, source="chat", session_id=session_id) + +# Avoid: one giant blob +await memory.remember(entire_conversation_as_one_string) +``` diff --git a/pyproject.toml b/pyproject.toml index 4c627e9..58157b6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,9 @@ dependencies = [ ] [project.optional-dependencies] +memory = [ + "mnemo-sdk>=0.1.0", +] community = [ "psycopg2>=2.9", ] diff --git a/tests/test_memory.py b/tests/test_memory.py new file mode 100644 index 0000000..21d876a --- /dev/null +++ b/tests/test_memory.py @@ -0,0 +1,281 @@ +"""Tests for circuitforge_core.memory. + +These tests mock the mnemo SDK so no live sidecar is required. +""" +from __future__ import annotations + +import sys +import time +from types import ModuleType +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from circuitforge_core.memory import MemoryClient, MemoryConfig, MemoryUnavailableError +from circuitforge_core.memory.client import _MAX_FAILURES + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _make_mock_mnemo(health_ok: bool = True): + """Return a (mock_module, mock_inner_client) pair.""" + mock_health = MagicMock( + status="ok" if health_ok else "error", + provider_type="ollama", + provider_model="llama3", + ) + mock_client = AsyncMock() + mock_client.health = AsyncMock(return_value=mock_health) + mock_client.ingest = AsyncMock(return_value=MagicMock(chunk_id="abc", entities_extracted=2)) + mock_client.get_context = AsyncMock(return_value="Relevant context: user prefers dark mode") + mock_client.list_entities = AsyncMock(return_value=[]) + mock_client.stats = AsyncMock(return_value=MagicMock( + entity_count=5, chunk_count=10, node_count=5, edge_count=3, uptime_seconds=120.0 + )) + mock_client.wipe = AsyncMock(return_value=None) + mock_client.__aexit__ = AsyncMock(return_value=None) + + mock_module = ModuleType("mnemo") + mock_module.AsyncMnemoClient = MagicMock(return_value=mock_client) + return mock_module, mock_client + + +async def _connected(health_ok: bool = True): + """Return a connected MemoryClient with mock inner client attached.""" + mock_module, mock_inner = _make_mock_mnemo(health_ok=health_ok) + client = MemoryClient(MemoryConfig()) + with patch.dict(sys.modules, {"mnemo": mock_module}): + await client.connect() + client._mock_inner = mock_inner + return client + + +# ── Config ──────────────────────────────────────────────────────────────────── + +class TestMemoryConfig: + def test_defaults(self): + cfg = MemoryConfig() + assert cfg.host == "localhost" + assert cfg.port == 8080 + assert cfg.base_url == "http://localhost:8080" + + def test_from_env(self, monkeypatch): + monkeypatch.setenv("MNEMO_HOST", "mnemo-sidecar") + monkeypatch.setenv("MNEMO_PORT", "9090") + monkeypatch.setenv("MNEMO_TIMEOUT", "30.0") + cfg = MemoryConfig.from_env() + assert cfg.host == "mnemo-sidecar" + assert cfg.port == 9090 + assert cfg.timeout == 30.0 + + def test_base_url(self): + cfg = MemoryConfig(host="10.1.10.5", port=8080) + assert cfg.base_url == "http://10.1.10.5:8080" + + +# ── connect() ───────────────────────────────────────────────────────────────── + +class TestConnect: + @pytest.mark.asyncio + async def test_connect_success(self): + client = await _connected(health_ok=True) + assert client.available is True + assert client.failure_count == 0 + + @pytest.mark.asyncio + async def test_connect_bad_health_status(self): + client = await _connected(health_ok=False) + assert client.available is False + + @pytest.mark.asyncio + async def test_connect_sidecar_unreachable(self): + mock_module, mock_client = _make_mock_mnemo() + mock_client.health.side_effect = ConnectionRefusedError("refused") + client = MemoryClient(MemoryConfig()) + with patch.dict(sys.modules, {"mnemo": mock_module}): + await client.connect() # must not raise + assert client.available is False + + @pytest.mark.asyncio + async def test_connect_strict_raises(self): + mock_module, mock_client = _make_mock_mnemo() + mock_client.health.side_effect = ConnectionRefusedError("refused") + client = MemoryClient(MemoryConfig(), strict=True) + with patch.dict(sys.modules, {"mnemo": mock_module}): + with pytest.raises(MemoryUnavailableError): + await client.connect() + + @pytest.mark.asyncio + async def test_connect_missing_sdk(self): + client = MemoryClient(MemoryConfig()) + with patch.dict(sys.modules, {"mnemo": None}): + await client.connect() + assert client.available is False + + +# ── No-op when unavailable ──────────────────────────────────────────────────── + +class TestNoopWhenUnavailable: + @pytest.fixture + def unavailable(self): + return MemoryClient(MemoryConfig()) + + @pytest.mark.asyncio + async def test_remember_noop(self, unavailable): + assert await unavailable.remember("text") is False + + @pytest.mark.asyncio + async def test_recall_noop(self, unavailable): + assert await unavailable.recall("query") == "" + + @pytest.mark.asyncio + async def test_entities_noop(self, unavailable): + assert await unavailable.entities() == [] + + @pytest.mark.asyncio + async def test_stats_noop(self, unavailable): + assert await unavailable.stats() is None + + @pytest.mark.asyncio + async def test_wipe_noop(self, unavailable): + assert await unavailable.wipe() is False + + +# ── Live calls when connected ───────────────────────────────────────────────── + +class TestLiveCalls: + @pytest.mark.asyncio + async def test_remember_calls_ingest(self): + client = await _connected() + result = await client.remember("hello world", source="test") + assert result is True + client._mock_inner.ingest.assert_awaited_once_with( + content="hello world", source="test", session_id=None + ) + + @pytest.mark.asyncio + async def test_remember_resets_failure_count(self): + client = await _connected() + client._failure_count = 2 # simulate prior failures + await client.remember("text") + assert client.failure_count == 0 + + @pytest.mark.asyncio + async def test_recall_returns_context(self): + client = await _connected() + ctx = await client.recall("dark mode preference") + assert "dark mode" in ctx + + @pytest.mark.asyncio + async def test_recall_with_session(self): + client = await _connected() + await client.recall("query", session_id="user-123") + client._mock_inner.get_context.assert_awaited_once_with( + text="query", session_id="user-123" + ) + + @pytest.mark.asyncio + async def test_stats_returns_memory_stats(self): + from circuitforge_core.memory import MemoryStats + client = await _connected() + result = await client.stats() + assert isinstance(result, MemoryStats) + assert result.available is True + assert result.entity_count == 5 + + +# ── Backoff and reconnect ───────────────────────────────────────────────────── + +class TestBackoffAndReconnect: + @pytest.mark.asyncio + async def test_failure_count_increments(self): + client = await _connected() + client._mock_inner.ingest.side_effect = ConnectionResetError("reset") + await client.remember("text") + assert client.failure_count == 1 + + @pytest.mark.asyncio + async def test_client_disabled_after_max_failures(self): + client = await _connected() + client._mock_inner.ingest.side_effect = ConnectionResetError("reset") + # drive failures to the limit + for _ in range(_MAX_FAILURES): + await client.remember("text") + assert client.available is False + + @pytest.mark.asyncio + async def test_retry_at_set_after_failure(self): + client = await _connected() + client._mock_inner.ingest.side_effect = ConnectionResetError("reset") + before = time.monotonic() + await client.remember("text") + assert client._retry_at is not None + assert client._retry_at > before + + @pytest.mark.asyncio + async def test_backoff_increases_with_failures(self): + client = await _connected() + client._mock_inner.ingest.side_effect = ConnectionResetError("reset") + + retry_times = [] + t0 = time.monotonic() + for _ in range(3): + await client.remember("text") + retry_times.append(client._retry_at - t0) + + # Each cooldown should be longer than the previous + assert retry_times[1] > retry_times[0] + assert retry_times[2] > retry_times[1] + + @pytest.mark.asyncio + async def test_reconnect_attempted_after_cooldown(self): + """Once the retry window elapses, the next call triggers a reconnect.""" + client = await _connected() + # Force unavailable with an expired retry window + client._available = False + client._retry_at = time.monotonic() - 1.0 # already elapsed + + mock_module, mock_inner = _make_mock_mnemo(health_ok=True) + with patch.dict(sys.modules, {"mnemo": mock_module}): + result = await client.remember("text after reconnect") + + # Reconnect should have restored availability + assert client.available is True + assert result is True + + @pytest.mark.asyncio + async def test_no_reconnect_during_cooldown(self): + """Within the cooldown window, calls no-op without attempting reconnect.""" + client = await _connected() + client._available = False + client._retry_at = time.monotonic() + 999.0 # far in the future + + mock_module, _ = _make_mock_mnemo(health_ok=True) + with patch.dict(sys.modules, {"mnemo": mock_module}): + result = await client.remember("text during cooldown") + + assert result is False + assert client.available is False # no reconnect fired + + @pytest.mark.asyncio + async def test_success_resets_retry_state(self): + """A successful call clears failure_count and retry_at.""" + client = await _connected() + client._failure_count = 2 + client._retry_at = time.monotonic() + 30.0 + + await client.remember("successful call") + + assert client.failure_count == 0 + assert client._retry_at is None + + @pytest.mark.asyncio + async def test_strict_raises_after_max_failures(self): + """strict=True raises MemoryUnavailableError once failure threshold is hit.""" + client = await _connected() + client._strict = True + client._mock_inner.ingest.side_effect = ConnectionResetError("reset") + + with pytest.raises(MemoryUnavailableError): + for _ in range(_MAX_FAILURES): + await client.remember("text")