Full FastAPI backend for the AI music continuation editor:
Services
- chain.py: chain + node CRUD, commit/discard, recursive CTE spine query
- musicgen.py: MusicGenClient with cf-orch allocation + mock mode (CF_MUSICGEN_MOCK=1)
- stems.py: Demucs 4-stem separation subprocess wrapper + mock mode
- export.py: ffmpeg concat demuxer to stitch committed spine into WAV/MP3
API endpoints
- chains: CRUD, multipart audio upload (WAV/MP3/FLAC/OGG/M4A/AIFF)
- nodes: branch creation (202 + BackgroundTasks), commit, discard, audio stream
- gpu: cf-orch capacity status; session allocation stubbed pending cf-orch#43
- stems: Paid-tier stem separation (Demucs, gated via tiers.py)
- export: POST /{chain_id}/export → FileResponse download
- events: SSE stream (node-status events) per chain via asyncio Queue pub/sub
Infrastructure
- lifespan: reads SPARROW_DB_PATH/DATA_DIR at startup (not import time)
- events_store: subscribe/unsubscribe/broadcast pattern for SSE
- CORS: open in dev, SPARROW_CORS_ORIGINS in production
- Background generation opens its own DB connection (WAL-safe)
Tests: 30/30 passing across service units and API integration
65 lines
1.5 KiB
Python
65 lines
1.5 KiB
Python
# tests/conftest.py — shared fixtures for Sparrow test suite
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import tempfile
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
from fastapi.testclient import TestClient
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
# Use mock mode for all tests — no GPU or ffmpeg required
|
|
os.environ.setdefault("CF_MUSICGEN_MOCK", "1")
|
|
os.environ.setdefault("CF_STEMS_MOCK", "1")
|
|
|
|
|
|
@pytest.fixture
|
|
def tmp_db(tmp_path):
|
|
"""Temporary SQLite DB path."""
|
|
return str(tmp_path / "test.db")
|
|
|
|
|
|
@pytest.fixture
|
|
def tmp_data(tmp_path):
|
|
"""Temporary data directory."""
|
|
d = tmp_path / "data"
|
|
d.mkdir()
|
|
return str(d)
|
|
|
|
|
|
@pytest.fixture
|
|
def conn(tmp_db):
|
|
"""SQLite connection with migrations applied."""
|
|
from app.db.store import get_connection, run_migrations
|
|
c = get_connection(tmp_db)
|
|
run_migrations(c)
|
|
yield c
|
|
c.close()
|
|
|
|
|
|
@pytest.fixture
|
|
def app(tmp_db, tmp_data):
|
|
"""FastAPI test app with isolated DB and data dir."""
|
|
os.environ["SPARROW_DB_PATH"] = tmp_db
|
|
os.environ["SPARROW_DATA_DIR"] = tmp_data
|
|
os.environ["SPARROW_ENV"] = "development"
|
|
|
|
from app.main import create_app
|
|
return create_app()
|
|
|
|
|
|
@pytest.fixture
|
|
def client(app):
|
|
"""Synchronous test client."""
|
|
with TestClient(app, raise_server_exceptions=True) as c:
|
|
yield c
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def async_client(app):
|
|
"""Async test client."""
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as c:
|
|
yield c
|