feat(memory): persistent knowledge graph via mnemo sidecar

Add circuitforge_core.memory module: MemoryClient wraps the mnemo HTTP sidecar
for entity / relation storage. All operations no-op gracefully when sidecar is
unavailable so products can import unconditionally. Adds optional [memory]
extras entry in pyproject.toml (mnemo-sdk>=0.1.0).
This commit is contained in:
pyr0ball 2026-06-05 10:19:11 -07:00
parent 0c43e95991
commit cca4c54a62
6 changed files with 879 additions and 0 deletions

View file

@ -0,0 +1,54 @@
"""circuitforge_core.memory — persistent knowledge graph via mnemo sidecar.
MIT licensed.
Requires the mnemo sidecar to be running (https://github.com/zaydmulani09/mnemo).
If the sidecar is not available, all operations silently no-op so products
can call memory methods unconditionally.
Quick start (in a FastAPI lifespan)::
from circuitforge_core.memory import MemoryClient, MemoryConfig
memory = MemoryClient(MemoryConfig.from_env())
@asynccontextmanager
async def lifespan(app):
await memory.connect()
yield
await memory.close()
# In a route:
await memory.remember("User avoids shellfish", source="dietary-prefs")
context = await memory.recall("What are this user's food restrictions?")
Docker Compose setup::
services:
mnemo:
image: ghcr.io/zaydmulani09/mnemo:latest
ports: ["8080:8080"]
environment:
MNEMO_LLM_PROVIDER: ollama
MNEMO_LLM_BASE_URL: http://ollama:11434/v1
MNEMO_LLM_MODEL: llama3
volumes:
- mnemo-data:/data
Environment variables (for MemoryConfig.from_env())::
MNEMO_HOST default: localhost
MNEMO_PORT default: 8080
MNEMO_TIMEOUT default: 10.0
"""
from circuitforge_core.memory.client import MemoryClient, MemoryUnavailableError
from circuitforge_core.memory.models import MemoryConfig, MemoryEntity, MemoryStats
__all__ = [
"MemoryClient",
"MemoryConfig",
"MemoryEntity",
"MemoryStats",
"MemoryUnavailableError",
]

View file

@ -0,0 +1,317 @@
"""MemoryClient — async wrapper around the mnemo persistent knowledge graph.
mnemo is an optional sidecar (https://github.com/zaydmulani09/mnemo).
When the sidecar is not running, all operations silently no-op so products
can call memory methods unconditionally without try/except.
MIT licensed.
"""
from __future__ import annotations
import logging
import time
from typing import Any
from circuitforge_core.memory.models import MemoryConfig, MemoryEntity, MemoryStats
logger = logging.getLogger(__name__)
# Backoff schedule: 5 * 2^(failure-1), capped at _MAX_BACKOFF seconds.
# failure 1 → 5s, 2 → 10s, 3 → 20s, 4 → 40s, 5+ → 60s
_MAX_FAILURES: int = 3
_MAX_BACKOFF: float = 60.0
class MemoryUnavailableError(RuntimeError):
"""Raised only when strict=True and mnemo is not reachable."""
class MemoryClient:
"""Async interface to the mnemo knowledge graph sidecar.
Resilience model:
- If the sidecar is unreachable at connect(), logs once and enters no-op mode.
- If a live call fails, the failure is counted. Each failure schedules an
exponentially increasing cooldown before the next reconnect attempt.
- After _MAX_FAILURES consecutive failures the client is marked unavailable;
all calls no-op until the cooldown elapses and a reconnect succeeds.
- Any successful call resets the failure counter.
Usage (in a FastAPI lifespan)::
from circuitforge_core.memory import MemoryClient, MemoryConfig
memory = MemoryClient(MemoryConfig.from_env())
@asynccontextmanager
async def lifespan(app):
await memory.connect()
yield
await memory.close()
Then in handlers::
await memory.remember("User prefers dark mode", source="settings")
context = await memory.recall("What are the user's UI preferences?")
"""
def __init__(self, config: MemoryConfig | None = None, *, strict: bool = False) -> None:
"""
Args:
config: connection settings; defaults to MemoryConfig.from_env()
strict: if True, MemoryUnavailableError is raised on connect failure
or after _MAX_FAILURES consecutive call failures
"""
self._config = config or MemoryConfig.from_env()
self._strict = strict
self._available = False
self._client: Any = None # mnemo AsyncMnemoClient, set in connect()
self._failure_count: int = 0
self._retry_at: float | None = None # monotonic timestamp; None = no retry pending
@property
def available(self) -> bool:
"""True if the mnemo sidecar was reachable at last health check."""
return self._available
@property
def failure_count(self) -> int:
"""Consecutive call failures since the last success."""
return self._failure_count
# ── Lifecycle ─────────────────────────────────────────────────────────────
async def connect(self) -> None:
"""Attempt to connect to the mnemo sidecar and run a health check.
Safe to call multiple times (used internally for reconnect). If the
sidecar is not reachable, logs a warning and enters no-op mode.
Does NOT raise unless strict=True.
"""
try:
from mnemo import AsyncMnemoClient
except ImportError:
logger.debug(
"mnemo-sdk not installed — memory module disabled. "
"Install with: pip install circuitforge-core[memory]"
)
self._available = False
return
self._client = AsyncMnemoClient(
base_url=self._config.base_url,
timeout=self._config.timeout,
)
try:
health = await self._client.health()
if health.status == "ok":
self._available = True
self._on_call_success()
logger.info(
"mnemo memory sidecar connected at %s (LLM: %s/%s)",
self._config.base_url,
health.provider_type,
health.provider_model,
)
else:
self._handle_unavailable("connect", reason=f"health status={health.status!r}")
except Exception as exc:
self._handle_unavailable("connect", reason=str(exc))
async def close(self) -> None:
"""Close the underlying HTTP client."""
if self._client is not None:
try:
await self._client.__aexit__(None, None, None)
except Exception:
pass
self._client = None
self._available = False
self._retry_at = None
# ── Core API ──────────────────────────────────────────────────────────────
async def remember(
self,
text: str,
*,
source: str = "cf-core",
session_id: str | None = None,
) -> bool:
"""Store a text fragment in the knowledge graph.
mnemo extracts named entities and relationships from the text and
updates its graph. Large texts should be pre-chunked by the caller
(mnemo stores each call as a single chunk with no sub-splitting).
Args:
text: the text to store (conversation turn, fact, note, etc.)
source: label for the origin (e.g. "chat", "settings", "search")
session_id: optional session grouping for multi-turn retrieval
Returns:
True if stored, False if sidecar unavailable.
"""
if not await self._maybe_reconnect():
return False
try:
await self._client.ingest(content=text, source=source, session_id=session_id)
self._on_call_success()
return True
except Exception as exc:
self._on_call_error("remember", exc)
return False
async def recall(
self,
query: str,
*,
session_id: str | None = None,
) -> str:
"""Retrieve a formatted context block relevant to query.
Returns a prompt-ready string (or empty string if unavailable).
Inject the result directly into a system prompt::
context = await memory.recall("user dietary restrictions")
system = f"You are a helpful assistant.\\n\\n{context}"
Args:
query: natural language question or topic to retrieve context for
session_id: restrict retrieval to a specific session (optional)
Returns:
Formatted context string, or "" if sidecar unavailable.
"""
if not await self._maybe_reconnect():
return ""
try:
result = await self._client.get_context(text=query, session_id=session_id)
self._failure_count = 0
return result
except Exception as exc:
self._on_call_error("recall", exc)
return ""
async def entities(self, *, limit: int = 50) -> list[MemoryEntity]:
"""Return the most recent named entities in the knowledge graph.
Args:
limit: max entities to return (default 50)
Returns:
List of MemoryEntity objects, or [] if unavailable.
"""
if not await self._maybe_reconnect():
return []
try:
raw = await self._client.list_entities(limit=limit)
self._on_call_success()
return [MemoryEntity.from_mnemo(e) for e in raw]
except Exception as exc:
self._on_call_error("entities", exc)
return []
async def stats(self) -> MemoryStats | None:
"""Return knowledge graph statistics, or None if unavailable."""
if not await self._maybe_reconnect():
return None
try:
s = await self._client.stats()
self._on_call_success()
return MemoryStats(
entity_count=s.entity_count,
chunk_count=s.chunk_count,
node_count=s.node_count,
edge_count=s.edge_count,
uptime_seconds=s.uptime_seconds,
available=True,
)
except Exception as exc:
self._on_call_error("stats", exc)
return None
async def wipe(self) -> bool:
"""Delete all stored memory. Irreversible.
Returns True on success, False if unavailable or failed.
"""
if not await self._maybe_reconnect():
return False
try:
await self._client.wipe()
self._on_call_success()
logger.warning("mnemo memory wiped — all entities and chunks deleted")
return True
except Exception as exc:
self._on_call_error("wipe", exc)
return False
# ── Internal ──────────────────────────────────────────────────────────────
async def _maybe_reconnect(self) -> bool:
"""Return True if the client is available (or just became available).
Called at the top of every public method. If the client is unavailable
but the retry cooldown has elapsed, silently attempts reconnect before
answering. No-ops immediately if still within the cooldown window.
"""
if self._available:
return True
if self._retry_at is not None and time.monotonic() >= self._retry_at:
logger.info(
"mnemo: cooldown elapsed after %d failure(s) — attempting reconnect",
self._failure_count,
)
self._retry_at = None
self._client = None
await self.connect()
return self._available
def _on_call_success(self) -> None:
"""Reset failure state after a successful call."""
self._failure_count = 0
self._retry_at = None
def _handle_unavailable(self, operation: str, reason: str = "") -> None:
"""Called when the sidecar is unreachable at connect() time."""
self._available = False
msg = f"mnemo memory sidecar unavailable (operation={operation!r})"
if reason:
msg += f": {reason}"
if self._strict:
raise MemoryUnavailableError(msg)
logger.warning("%s — memory features disabled", msg)
def _on_call_error(self, operation: str, exc: Exception) -> None:
"""Count consecutive failures and schedule exponential backoff retry.
Backoff: 5 * 2^(failure-1) seconds, capped at 60s.
failure 1 5s
failure 2 10s
failure 3 20s _MAX_FAILURES default; client disabled here
failure 4 40s
failure 5+ 60s
After _MAX_FAILURES, _available is set to False and all calls no-op
until _maybe_reconnect() fires after the cooldown elapses.
"""
self._failure_count += 1
backoff = min(5.0 * (2 ** (self._failure_count - 1)), _MAX_BACKOFF)
self._retry_at = time.monotonic() + backoff
if self._failure_count >= _MAX_FAILURES:
self._available = False
logger.warning(
"mnemo %r failed %d consecutive times (%s) — disabled, reconnect in %.0fs",
operation, self._failure_count, exc, backoff,
)
if self._strict:
raise MemoryUnavailableError(
f"mnemo {operation!r} failed {self._failure_count} consecutive times: {exc}"
)
else:
logger.warning(
"mnemo %r failed (%d/%d): %s — retry in %.0fs",
operation, self._failure_count, _MAX_FAILURES, exc, backoff,
)

View file

@ -0,0 +1,73 @@
"""Data models for the cf-core memory module.
MIT licensed.
"""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from datetime import datetime
@dataclass(frozen=True)
class MemoryConfig:
"""Connection config for a mnemo sidecar."""
host: str = "localhost"
port: int = 8080
timeout: float = 10.0
@classmethod
def from_env(cls) -> MemoryConfig:
"""Read config from environment variables.
Variables:
MNEMO_HOST default: localhost
MNEMO_PORT default: 8080
MNEMO_TIMEOUT default: 10.0
"""
return cls(
host=os.environ.get("MNEMO_HOST", "localhost"),
port=int(os.environ.get("MNEMO_PORT", "8080")),
timeout=float(os.environ.get("MNEMO_TIMEOUT", "10.0")),
)
@property
def base_url(self) -> str:
return f"http://{self.host}:{self.port}"
@dataclass(frozen=True)
class MemoryEntity:
"""A named entity extracted and stored by the mnemo knowledge graph."""
entity_id: str
name: str
entity_type: str
aliases: list[str] = field(default_factory=list)
confidence: float = 1.0
source_count: int = 1
@classmethod
def from_mnemo(cls, obj) -> MemoryEntity:
"""Convert a mnemo-sdk Entity object to MemoryEntity."""
return cls(
entity_id=str(obj.id),
name=obj.name,
entity_type=obj.entity_type,
aliases=list(obj.aliases or []),
confidence=float(obj.confidence or 1.0),
source_count=int(obj.source_count or 1),
)
@dataclass(frozen=True)
class MemoryStats:
"""Snapshot of the mnemo knowledge graph state."""
entity_count: int
chunk_count: int
node_count: int
edge_count: int
uptime_seconds: float
available: bool

151
docs/modules/memory.md Normal file
View file

@ -0,0 +1,151 @@
# circuitforge_core.memory
Persistent knowledge graph for CF products, backed by the
[mnemo](https://github.com/zaydmulani09/mnemo) sidecar.
## What it does
mnemo runs as a sidecar process alongside a product's FastAPI backend. It:
- Extracts named entities and relationships from text you feed it
- Persists them in a local SQLite database with WAL mode
- Returns a formatted context block for prompt injection in under 5ms
`cf_core.memory` wraps mnemo's Python SDK with CF-standard config,
graceful degradation (no-ops when the sidecar is absent), and
exponential backoff with automatic reconnect after transient failures.
## Install
```bash
pip install circuitforge-core[memory]
```
## Docker Compose setup
Add the `mnemo` service to your product's `compose.yml` alongside `ollama`.
Peregrine is the reference implementation — copy the block from
`peregrine/compose.yml`:
```yaml
services:
mnemo:
image: ghcr.io/zaydmulani09/mnemo:latest
ports:
- "${MNEMO_PORT:-8080}:8080"
volumes:
- mnemo-data:/data
environment:
- MNEMO_DB_PATH=/data/mnemo.db
- MNEMO_LLM_PROVIDER=${MNEMO_LLM_PROVIDER:-ollama}
- MNEMO_LLM_BASE_URL=${MNEMO_LLM_BASE_URL:-http://ollama:11434/v1}
- MNEMO_LLM_API_KEY=${MNEMO_LLM_API_KEY:-ollama}
- MNEMO_LLM_MODEL=${MNEMO_LLM_MODEL:-llama3.2:3b}
depends_on:
- ollama
healthcheck:
test: ["CMD", "wget", "-q", "--spider", "http://localhost:8080/health"]
interval: 15s
timeout: 5s
retries: 3
profiles: [memory]
restart: unless-stopped
volumes:
mnemo-data:
```
Add these to the product's api service environment:
```yaml
environment:
- MNEMO_HOST=${MNEMO_HOST:-mnemo}
- MNEMO_PORT=${MNEMO_PORT:-8080}
```
Launch with:
```bash
docker compose --profile memory --profile cpu up -d
# or alongside a GPU profile:
docker compose --profile memory --profile single-gpu up -d
```
## Environment variables
| Variable | Default | Description |
|---|---|---|
| `MNEMO_HOST` | `localhost` | Sidecar hostname (use `mnemo` in Docker) |
| `MNEMO_PORT` | `8080` | Sidecar port |
| `MNEMO_TIMEOUT` | `10.0` | HTTP timeout in seconds |
The sidecar itself is configured via `MNEMO_LLM_*` env vars (see compose block above).
## FastAPI integration
```python
from contextlib import asynccontextmanager
from fastapi import FastAPI
from circuitforge_core.memory import MemoryClient, MemoryConfig
memory = MemoryClient(MemoryConfig.from_env())
@asynccontextmanager
async def lifespan(app: FastAPI):
await memory.connect() # no-op + warning if sidecar absent
yield
await memory.close()
app = FastAPI(lifespan=lifespan)
```
## API
```python
# Store a text fragment (conversation turn, fact, user preference, etc.)
await memory.remember("User avoids shellfish and prefers dark mode", source="settings")
# Retrieve a prompt-ready context block
context = await memory.recall("What are this user's dietary restrictions?")
system_prompt = f"You are a helpful assistant.\n\n{context}"
# List extracted entities
entities = await memory.entities(limit=20)
# Stats snapshot
stats = await memory.stats() # MemoryStats | None
# Wipe everything (irreversible)
await memory.wipe()
```
All methods return empty values (`False`, `""`, `[]`, `None`) when the
sidecar is not available — no try/except needed in product code.
## Resilience model
| Event | Behaviour |
|---|---|
| Sidecar absent at startup | `connect()` logs once, enters no-op mode |
| First call failure | Warning logged, 5s backoff scheduled |
| Nth consecutive failure | Backoff doubles each time (5→10→20→40→60s cap) |
| After `_MAX_FAILURES` (3) | Client marked unavailable; all calls no-op |
| Cooldown elapses | Next call silently attempts reconnect |
| Successful call | Failure counter and retry timer reset |
| `strict=True` | `MemoryUnavailableError` raised instead of no-op |
## Chunking note
mnemo stores each `remember()` call as a single chunk — it does **not**
automatically split large texts. For best retrieval quality, chunk on the
caller side before ingesting:
```python
# Good: one turn per ingest call
for turn in conversation_turns:
await memory.remember(turn, source="chat", session_id=session_id)
# Avoid: one giant blob
await memory.remember(entire_conversation_as_one_string)
```

View file

@ -14,6 +14,9 @@ dependencies = [
] ]
[project.optional-dependencies] [project.optional-dependencies]
memory = [
"mnemo-sdk>=0.1.0",
]
community = [ community = [
"psycopg2>=2.9", "psycopg2>=2.9",
] ]

281
tests/test_memory.py Normal file
View file

@ -0,0 +1,281 @@
"""Tests for circuitforge_core.memory.
These tests mock the mnemo SDK so no live sidecar is required.
"""
from __future__ import annotations
import sys
import time
from types import ModuleType
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from circuitforge_core.memory import MemoryClient, MemoryConfig, MemoryUnavailableError
from circuitforge_core.memory.client import _MAX_FAILURES
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_mock_mnemo(health_ok: bool = True):
"""Return a (mock_module, mock_inner_client) pair."""
mock_health = MagicMock(
status="ok" if health_ok else "error",
provider_type="ollama",
provider_model="llama3",
)
mock_client = AsyncMock()
mock_client.health = AsyncMock(return_value=mock_health)
mock_client.ingest = AsyncMock(return_value=MagicMock(chunk_id="abc", entities_extracted=2))
mock_client.get_context = AsyncMock(return_value="Relevant context: user prefers dark mode")
mock_client.list_entities = AsyncMock(return_value=[])
mock_client.stats = AsyncMock(return_value=MagicMock(
entity_count=5, chunk_count=10, node_count=5, edge_count=3, uptime_seconds=120.0
))
mock_client.wipe = AsyncMock(return_value=None)
mock_client.__aexit__ = AsyncMock(return_value=None)
mock_module = ModuleType("mnemo")
mock_module.AsyncMnemoClient = MagicMock(return_value=mock_client)
return mock_module, mock_client
async def _connected(health_ok: bool = True):
"""Return a connected MemoryClient with mock inner client attached."""
mock_module, mock_inner = _make_mock_mnemo(health_ok=health_ok)
client = MemoryClient(MemoryConfig())
with patch.dict(sys.modules, {"mnemo": mock_module}):
await client.connect()
client._mock_inner = mock_inner
return client
# ── Config ────────────────────────────────────────────────────────────────────
class TestMemoryConfig:
def test_defaults(self):
cfg = MemoryConfig()
assert cfg.host == "localhost"
assert cfg.port == 8080
assert cfg.base_url == "http://localhost:8080"
def test_from_env(self, monkeypatch):
monkeypatch.setenv("MNEMO_HOST", "mnemo-sidecar")
monkeypatch.setenv("MNEMO_PORT", "9090")
monkeypatch.setenv("MNEMO_TIMEOUT", "30.0")
cfg = MemoryConfig.from_env()
assert cfg.host == "mnemo-sidecar"
assert cfg.port == 9090
assert cfg.timeout == 30.0
def test_base_url(self):
cfg = MemoryConfig(host="10.1.10.5", port=8080)
assert cfg.base_url == "http://10.1.10.5:8080"
# ── connect() ─────────────────────────────────────────────────────────────────
class TestConnect:
@pytest.mark.asyncio
async def test_connect_success(self):
client = await _connected(health_ok=True)
assert client.available is True
assert client.failure_count == 0
@pytest.mark.asyncio
async def test_connect_bad_health_status(self):
client = await _connected(health_ok=False)
assert client.available is False
@pytest.mark.asyncio
async def test_connect_sidecar_unreachable(self):
mock_module, mock_client = _make_mock_mnemo()
mock_client.health.side_effect = ConnectionRefusedError("refused")
client = MemoryClient(MemoryConfig())
with patch.dict(sys.modules, {"mnemo": mock_module}):
await client.connect() # must not raise
assert client.available is False
@pytest.mark.asyncio
async def test_connect_strict_raises(self):
mock_module, mock_client = _make_mock_mnemo()
mock_client.health.side_effect = ConnectionRefusedError("refused")
client = MemoryClient(MemoryConfig(), strict=True)
with patch.dict(sys.modules, {"mnemo": mock_module}):
with pytest.raises(MemoryUnavailableError):
await client.connect()
@pytest.mark.asyncio
async def test_connect_missing_sdk(self):
client = MemoryClient(MemoryConfig())
with patch.dict(sys.modules, {"mnemo": None}):
await client.connect()
assert client.available is False
# ── No-op when unavailable ────────────────────────────────────────────────────
class TestNoopWhenUnavailable:
@pytest.fixture
def unavailable(self):
return MemoryClient(MemoryConfig())
@pytest.mark.asyncio
async def test_remember_noop(self, unavailable):
assert await unavailable.remember("text") is False
@pytest.mark.asyncio
async def test_recall_noop(self, unavailable):
assert await unavailable.recall("query") == ""
@pytest.mark.asyncio
async def test_entities_noop(self, unavailable):
assert await unavailable.entities() == []
@pytest.mark.asyncio
async def test_stats_noop(self, unavailable):
assert await unavailable.stats() is None
@pytest.mark.asyncio
async def test_wipe_noop(self, unavailable):
assert await unavailable.wipe() is False
# ── Live calls when connected ─────────────────────────────────────────────────
class TestLiveCalls:
@pytest.mark.asyncio
async def test_remember_calls_ingest(self):
client = await _connected()
result = await client.remember("hello world", source="test")
assert result is True
client._mock_inner.ingest.assert_awaited_once_with(
content="hello world", source="test", session_id=None
)
@pytest.mark.asyncio
async def test_remember_resets_failure_count(self):
client = await _connected()
client._failure_count = 2 # simulate prior failures
await client.remember("text")
assert client.failure_count == 0
@pytest.mark.asyncio
async def test_recall_returns_context(self):
client = await _connected()
ctx = await client.recall("dark mode preference")
assert "dark mode" in ctx
@pytest.mark.asyncio
async def test_recall_with_session(self):
client = await _connected()
await client.recall("query", session_id="user-123")
client._mock_inner.get_context.assert_awaited_once_with(
text="query", session_id="user-123"
)
@pytest.mark.asyncio
async def test_stats_returns_memory_stats(self):
from circuitforge_core.memory import MemoryStats
client = await _connected()
result = await client.stats()
assert isinstance(result, MemoryStats)
assert result.available is True
assert result.entity_count == 5
# ── Backoff and reconnect ─────────────────────────────────────────────────────
class TestBackoffAndReconnect:
@pytest.mark.asyncio
async def test_failure_count_increments(self):
client = await _connected()
client._mock_inner.ingest.side_effect = ConnectionResetError("reset")
await client.remember("text")
assert client.failure_count == 1
@pytest.mark.asyncio
async def test_client_disabled_after_max_failures(self):
client = await _connected()
client._mock_inner.ingest.side_effect = ConnectionResetError("reset")
# drive failures to the limit
for _ in range(_MAX_FAILURES):
await client.remember("text")
assert client.available is False
@pytest.mark.asyncio
async def test_retry_at_set_after_failure(self):
client = await _connected()
client._mock_inner.ingest.side_effect = ConnectionResetError("reset")
before = time.monotonic()
await client.remember("text")
assert client._retry_at is not None
assert client._retry_at > before
@pytest.mark.asyncio
async def test_backoff_increases_with_failures(self):
client = await _connected()
client._mock_inner.ingest.side_effect = ConnectionResetError("reset")
retry_times = []
t0 = time.monotonic()
for _ in range(3):
await client.remember("text")
retry_times.append(client._retry_at - t0)
# Each cooldown should be longer than the previous
assert retry_times[1] > retry_times[0]
assert retry_times[2] > retry_times[1]
@pytest.mark.asyncio
async def test_reconnect_attempted_after_cooldown(self):
"""Once the retry window elapses, the next call triggers a reconnect."""
client = await _connected()
# Force unavailable with an expired retry window
client._available = False
client._retry_at = time.monotonic() - 1.0 # already elapsed
mock_module, mock_inner = _make_mock_mnemo(health_ok=True)
with patch.dict(sys.modules, {"mnemo": mock_module}):
result = await client.remember("text after reconnect")
# Reconnect should have restored availability
assert client.available is True
assert result is True
@pytest.mark.asyncio
async def test_no_reconnect_during_cooldown(self):
"""Within the cooldown window, calls no-op without attempting reconnect."""
client = await _connected()
client._available = False
client._retry_at = time.monotonic() + 999.0 # far in the future
mock_module, _ = _make_mock_mnemo(health_ok=True)
with patch.dict(sys.modules, {"mnemo": mock_module}):
result = await client.remember("text during cooldown")
assert result is False
assert client.available is False # no reconnect fired
@pytest.mark.asyncio
async def test_success_resets_retry_state(self):
"""A successful call clears failure_count and retry_at."""
client = await _connected()
client._failure_count = 2
client._retry_at = time.monotonic() + 30.0
await client.remember("successful call")
assert client.failure_count == 0
assert client._retry_at is None
@pytest.mark.asyncio
async def test_strict_raises_after_max_failures(self):
"""strict=True raises MemoryUnavailableError once failure threshold is hit."""
client = await _connected()
client._strict = True
client._mock_inner.ingest.side_effect = ConnectionResetError("reset")
with pytest.raises(MemoryUnavailableError):
for _ in range(_MAX_FAILURES):
await client.remember("text")