# cf_voice/telephony.py — outbound telephony abstraction # # Protocol + mock backend: MIT licensed. # SignalWireBackend, FreeSWITCHBackend: BSL 1.1 (real telephony, cloud credentials). # # Consumers (Osprey, Harrier, Ibis, Kestrel) depend only on TelephonyBackend # and CallSession — both MIT. The concrete backends are selected by make_telephony() # based on the tier and available credentials. # # Requires optional extras for real backends: # pip install cf-voice[signalwire] — SignalWire (paid tier, CF-provisioned) # pip install cf-voice[freeswitch] — FreeSWITCH ESL (free tier, self-hosted) from __future__ import annotations import asyncio import logging import os from dataclasses import dataclass, field from typing import Literal, Protocol, runtime_checkable logger = logging.getLogger(__name__) CallState = Literal[ "dialing", "ringing", "in_progress", "hold", "bridged", "completed", "failed", "no_answer", "busy", ] @dataclass class CallSession: """ Represents an active or completed outbound call. call_sid is the backend-assigned identifier — for SignalWire this is a Twilio-compatible SID string; for FreeSWITCH it is the UUID. state is updated by the backend as the call progresses. Consumers should poll via backend.get_state() or subscribe to webhook events. """ call_sid: str to: str from_: str state: CallState = "dialing" duration_s: float = 0.0 # AMD result: "human" | "machine" | "unknown" # Populated once the backend resolves answering machine detection. amd_result: str = "unknown" error: str | None = None @runtime_checkable class TelephonyBackend(Protocol): """ Abstract telephony backend interface. All methods are async. Implementations must be safe to call from an asyncio event loop. Long-running network operations run in a thread pool (not the caller's responsibility). Field names are stable as of cf-voice v0.1.0. """ async def dial( self, to: str, from_: str, webhook_url: str, *, amd: bool = False, ) -> CallSession: """ Initiate an outbound call. to / from_ E.164 numbers ("+15551234567"). webhook_url URL the backend will POST call events to (SignalWire/TwiML style). amd If True, request answering machine detection. Result lands in CallSession.amd_result once the backend resolves it. Returns a CallSession with state="dialing". """ ... async def send_dtmf(self, call_sid: str, digits: str) -> None: """ Send DTMF (dual-tone multi-frequency) tones mid-call. digits String of 0-9, *, #, A-D. Each character is one tone. Pauses may be represented as 'w' (0.5s) or 'W' (1s) if the backend supports them. """ ... async def bridge(self, call_sid: str, target: str) -> None: """ Bridge the active call to a second E.164 number or SIP URI. Used to connect the user directly to a human agent after Osprey has navigated the IVR. The original call leg remains connected. """ ... async def hangup(self, call_sid: str) -> None: """Terminate the call. Idempotent — safe to call on already-ended calls.""" ... async def announce( self, call_sid: str, text: str, voice: str = "default", ) -> None: """ Play synthesised speech into the call. Implements the adaptive service identification requirement (osprey#21): Osprey announces its identity before navigating an IVR so that the other party can consent to automated interaction. voice Backend-specific voice identifier. "default" uses the backend's default TTS voice. """ ... async def get_state(self, call_sid: str) -> CallState: """Fetch the current state of a call from the backend.""" ... # ── Mock backend (MIT) ──────────────────────────────────────────────────────── class MockTelephonyBackend: """ Synthetic telephony backend for development and CI. No real calls are placed. Operations log to cf_voice.telephony and update in-memory CallSession objects. AMD resolves to "human" after a simulated delay. Usage: backend = MockTelephonyBackend() session = await backend.dial("+15551234567", "+18005550000", "https://...") await backend.send_dtmf(session.call_sid, "1") await backend.hangup(session.call_sid) """ def __init__(self, amd_delay_s: float = 0.5) -> None: self._sessions: dict[str, CallSession] = {} self._amd_delay_s = amd_delay_s self._call_counter = 0 def _next_sid(self) -> str: self._call_counter += 1 return f"mock_sid_{self._call_counter:04d}" async def dial( self, to: str, from_: str, webhook_url: str, *, amd: bool = False, ) -> CallSession: sid = self._next_sid() session = CallSession(call_sid=sid, to=to, from_=from_, state="ringing") self._sessions[sid] = session logger.debug("MockTelephony: dial %s → %s (sid=%s)", from_, to, sid) async def _progress() -> None: await asyncio.sleep(0.05) session.state = "in_progress" if amd: await asyncio.sleep(self._amd_delay_s) session.amd_result = "human" logger.debug("MockTelephony: AMD resolved human (sid=%s)", sid) asyncio.create_task(_progress()) return session async def send_dtmf(self, call_sid: str, digits: str) -> None: self._sessions[call_sid] # KeyError if unknown — intentional logger.debug("MockTelephony: DTMF %r (sid=%s)", digits, call_sid) async def bridge(self, call_sid: str, target: str) -> None: session = self._sessions[call_sid] session.state = "bridged" logger.debug("MockTelephony: bridge → %s (sid=%s)", target, call_sid) async def hangup(self, call_sid: str) -> None: session = self._sessions.get(call_sid) if session: session.state = "completed" logger.debug("MockTelephony: hangup (sid=%s)", call_sid) async def announce( self, call_sid: str, text: str, voice: str = "default", ) -> None: self._sessions[call_sid] # KeyError if unknown — intentional logger.debug( "MockTelephony: announce voice=%s text=%r (sid=%s)", voice, text, call_sid ) async def get_state(self, call_sid: str) -> CallState: return self._sessions[call_sid].state # ── SignalWire backend (BSL 1.1) ────────────────────────────────────────────── class SignalWireBackend: """ SignalWire outbound telephony (Twilio-compatible REST API). BSL 1.1 — requires paid tier or self-hosted CF SignalWire project. Credentials sourced from environment: CF_SW_PROJECT_ID — SignalWire project ID CF_SW_AUTH_TOKEN — SignalWire auth token CF_SW_SPACE_URL — space URL, e.g. "yourspace.signalwire.com" Requires: pip install cf-voice[signalwire] """ def __init__( self, project_id: str | None = None, auth_token: str | None = None, space_url: str | None = None, ) -> None: try: from signalwire.rest import Client as SWClient # type: ignore[import] except ImportError as exc: raise ImportError( "SignalWire SDK is required for SignalWireBackend. " "Install with: pip install cf-voice[signalwire]" ) from exc self._project_id = project_id or os.environ["CF_SW_PROJECT_ID"] self._auth_token = auth_token or os.environ["CF_SW_AUTH_TOKEN"] self._space_url = space_url or os.environ["CF_SW_SPACE_URL"] self._client = SWClient( self._project_id, self._auth_token, signalwire_space_url=self._space_url, ) self._loop = asyncio.get_event_loop() async def dial( self, to: str, from_: str, webhook_url: str, *, amd: bool = False, ) -> CallSession: call_kwargs: dict = dict( to=to, from_=from_, url=webhook_url, status_callback=webhook_url, ) if amd: call_kwargs["machine_detection"] = "Enable" call_kwargs["async_amd"] = True call = await asyncio.get_event_loop().run_in_executor( None, lambda: self._client.calls.create(**call_kwargs), ) return CallSession( call_sid=call.sid, to=to, from_=from_, state="dialing", ) async def send_dtmf(self, call_sid: str, digits: str) -> None: await asyncio.get_event_loop().run_in_executor( None, lambda: self._client.calls(call_sid).update( twiml=f"" ), ) async def bridge(self, call_sid: str, target: str) -> None: await asyncio.get_event_loop().run_in_executor( None, lambda: self._client.calls(call_sid).update( twiml=( f"{target}" ) ), ) async def hangup(self, call_sid: str) -> None: await asyncio.get_event_loop().run_in_executor( None, lambda: self._client.calls(call_sid).update(status="completed"), ) async def announce( self, call_sid: str, text: str, voice: str = "alice", ) -> None: await asyncio.get_event_loop().run_in_executor( None, lambda: self._client.calls(call_sid).update( twiml=f"{text}" ), ) async def get_state(self, call_sid: str) -> CallState: call = await asyncio.get_event_loop().run_in_executor( None, lambda: self._client.calls(call_sid).fetch(), ) _sw_map: dict[str, CallState] = { "queued": "dialing", "ringing": "ringing", "in-progress": "in_progress", "completed": "completed", "failed": "failed", "busy": "busy", "no-answer": "no_answer", } return _sw_map.get(call.status, "failed") # ── FreeSWITCH backend (BSL 1.1) ───────────────────────────────────────────── class FreeSWITCHBackend: """ Self-hosted FreeSWITCH outbound telephony via ESL (event socket layer). BSL 1.1 — requires free tier + user-provisioned FreeSWITCH + VoIP.ms SIP trunk. Credentials sourced from environment: CF_ESL_HOST — FreeSWITCH ESL host (default: 127.0.0.1) CF_ESL_PORT — FreeSWITCH ESL port (default: 8021) CF_ESL_PASSWORD — FreeSWITCH ESL password Requires: pip install cf-voice[freeswitch] Note: FreeSWITCH AMD (mod_vad + custom heuristic or Whisper pipe) is not yet implemented. The amd parameter is accepted but amd_result stays "unknown". """ def __init__( self, host: str | None = None, port: int | None = None, password: str | None = None, ) -> None: try: import ESL # type: ignore[import] except ImportError as exc: raise ImportError( "FreeSWITCH ESL bindings are required for FreeSWITCHBackend. " "Install with: pip install cf-voice[freeswitch]" ) from exc self._host = host or os.environ.get("CF_ESL_HOST", "127.0.0.1") self._port = int(port or os.environ.get("CF_ESL_PORT", 8021)) self._password = password or os.environ["CF_ESL_PASSWORD"] self._esl = ESL def _connect(self): conn = self._esl.ESLconnection(self._host, str(self._port), self._password) if not conn.connected(): raise RuntimeError( f"Could not connect to FreeSWITCH ESL at {self._host}:{self._port}" ) return conn async def dial( self, to: str, from_: str, webhook_url: str, *, amd: bool = False, ) -> CallSession: def _originate() -> str: conn = self._connect() # ESL originate: sofia/gateway/voipms/{to} {from_} XML default cmd = ( f"originate {{origination_caller_id_number={from_}," f"origination_caller_id_name=CircuitForge}}" f"sofia/gateway/voipms/{to.lstrip('+')} &park()" ) result = conn.api("originate", cmd) return result.getBody().strip() body = await asyncio.get_event_loop().run_in_executor(None, _originate) # FreeSWITCH returns "+OK " on success if not body.startswith("+OK"): raise RuntimeError(f"FreeSWITCH originate failed: {body}") uuid = body.removeprefix("+OK").strip() return CallSession(call_sid=uuid, to=to, from_=from_, state="dialing") async def send_dtmf(self, call_sid: str, digits: str) -> None: def _dtmf() -> None: conn = self._connect() conn.api("uuid_send_dtmf", f"{call_sid} {digits}") await asyncio.get_event_loop().run_in_executor(None, _dtmf) async def bridge(self, call_sid: str, target: str) -> None: def _bridge() -> None: conn = self._connect() conn.api( "uuid_bridge", f"{call_sid} sofia/gateway/voipms/{target.lstrip('+')}", ) await asyncio.get_event_loop().run_in_executor(None, _bridge) async def hangup(self, call_sid: str) -> None: def _hangup() -> None: conn = self._connect() conn.api("uuid_kill", call_sid) await asyncio.get_event_loop().run_in_executor(None, _hangup) async def announce( self, call_sid: str, text: str, voice: str = "default", ) -> None: # FreeSWITCH TTS via mod_tts_commandline or Piper pipe def _say() -> None: conn = self._connect() conn.api("uuid_broadcast", f"{call_sid} say::en CHAT SPOKEN {text}") await asyncio.get_event_loop().run_in_executor(None, _say) async def get_state(self, call_sid: str) -> CallState: def _fetch() -> str: conn = self._connect() return conn.api("uuid_getvar", f"{call_sid} call_state").getBody().strip() raw = await asyncio.get_event_loop().run_in_executor(None, _fetch) _fs_map: dict[str, CallState] = { "CS_INIT": "dialing", "CS_ROUTING": "ringing", "CS_EXECUTE": "in_progress", "CS_HANGUP": "completed", "CS_DESTROY": "completed", } return _fs_map.get(raw, "failed") # ── Factory ─────────────────────────────────────────────────────────────────── def make_telephony( mock: bool | None = None, backend: str | None = None, ) -> MockTelephonyBackend | SignalWireBackend | FreeSWITCHBackend: """ Factory: return a TelephonyBackend appropriate for the current environment. Resolution order: 1. mock=True or CF_VOICE_MOCK=1 → MockTelephonyBackend 2. backend="signalwire" or CF_SW_PROJECT_ID present → SignalWireBackend 3. backend="freeswitch" or CF_ESL_PASSWORD present → FreeSWITCHBackend 4. Raises RuntimeError — no usable backend configured In production, backend selection is driven by the tier system: Free tier → FreeSWITCHBackend (BYOK VoIP) Paid tier → SignalWireBackend (CF-provisioned) """ use_mock = mock if mock is not None else os.environ.get("CF_VOICE_MOCK", "") == "1" if use_mock: return MockTelephonyBackend() resolved_backend = backend or ( "signalwire" if os.environ.get("CF_SW_PROJECT_ID") else "freeswitch" if os.environ.get("CF_ESL_PASSWORD") else None ) if resolved_backend == "signalwire": return SignalWireBackend() if resolved_backend == "freeswitch": return FreeSWITCHBackend() raise RuntimeError( "No telephony backend configured. " "Set CF_VOICE_MOCK=1 for mock mode, or provide SignalWire / FreeSWITCH credentials." )