#!/usr/bin/env python3 """ Enhanced Voice Server with Multiple Wake Words and Speaker Identification Path: /home/alan/voice-assistant/voice_server_enhanced.py This enhanced version adds: - Multiple wake word support - Speaker identification using pyannote.audio - Per-user customization - Wake word-specific responses Usage: python3 voice_server_enhanced.py \ --enable-precise \ --multi-wake-word \ --enable-speaker-id """ import os import sys import json import argparse import tempfile import wave import io import re import threading import queue import time from pathlib import Path from typing import Optional, Dict, Any, Tuple, List import whisper import requests from flask import Flask, request, jsonify, send_file from werkzeug.exceptions import BadRequest try: from dotenv import load_dotenv load_dotenv() except ImportError: pass # Mycroft Precise PRECISE_AVAILABLE = False try: from precise_runner import PreciseEngine, PreciseRunner import pyaudio PRECISE_AVAILABLE = True except ImportError: print("Warning: Mycroft Precise not installed") # Speaker identification SPEAKER_ID_AVAILABLE = False try: from pyannote.audio import Inference from scipy.spatial.distance import cosine import numpy as np SPEAKER_ID_AVAILABLE = True except ImportError: print("Warning: Speaker ID not available. Install: pip install pyannote.audio scipy") # Configuration DEFAULT_HOST = "0.0.0.0" DEFAULT_PORT = 5000 DEFAULT_WHISPER_MODEL = "medium" DEFAULT_HA_URL = os.getenv("HA_URL", "http://homeassistant.local:8123") DEFAULT_HA_TOKEN = os.getenv("HA_TOKEN", "") DEFAULT_PRECISE_ENGINE = "/usr/local/bin/precise-engine" DEFAULT_HF_TOKEN = os.getenv("HF_TOKEN", "") # Wake word configurations WAKE_WORD_CONFIGS = { 'hey_mycroft': { 'model': os.path.expanduser('~/precise-models/pretrained/hey-mycroft.net'), 'sensitivity': 0.5, 'response': 'Yes?', 'enabled': True, 'context': 'general' }, 'hey_computer': { 'model': os.path.expanduser('~/precise-models/hey-computer/hey-computer.net'), 'sensitivity': 0.5, 'response': 'I\'m listening', 'enabled': False, # Disabled by default (requires training) 'context': 'general' }, 'jarvis': { 'model': os.path.expanduser('~/precise-models/jarvis/jarvis.net'), 'sensitivity': 0.6, 'response': 'At your service', 'enabled': False, 'context': 'personal' }, } # Speaker profiles (stored in JSON file) SPEAKER_PROFILES_FILE = os.path.expanduser('~/voice-assistant/config/speaker_profiles.json') # Flask app app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # Global state whisper_model = None ha_client = None precise_runners = {} precise_enabled = False speaker_id_enabled = False speaker_inference = None speaker_profiles = {} wake_word_queue = queue.Queue() class HomeAssistantClient: """Client for Home Assistant API""" def __init__(self, base_url: str, token: str): self.base_url = base_url.rstrip('/') self.token = token self.session = requests.Session() self.session.headers.update({ 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json' }) def get_state(self, entity_id: str) -> Optional[Dict[str, Any]]: try: response = self.session.get(f'{self.base_url}/api/states/{entity_id}') response.raise_for_status() return response.json() except requests.RequestException as e: print(f"Error getting state for {entity_id}: {e}") return None def call_service(self, domain: str, service: str, entity_id: str, **kwargs) -> bool: try: data = {'entity_id': entity_id} data.update(kwargs) response = self.session.post( f'{self.base_url}/api/services/{domain}/{service}', json=data ) response.raise_for_status() return True except requests.RequestException as e: print(f"Error calling service {domain}.{service}: {e}") return False def turn_on(self, entity_id: str, **kwargs) -> bool: domain = entity_id.split('.')[0] return self.call_service(domain, 'turn_on', entity_id, **kwargs) def turn_off(self, entity_id: str, **kwargs) -> bool: domain = entity_id.split('.')[0] return self.call_service(domain, 'turn_off', entity_id, **kwargs) class SpeakerIdentification: """Speaker identification using pyannote.audio""" def __init__(self, hf_token: str): if not SPEAKER_ID_AVAILABLE: raise ImportError("Speaker ID dependencies not available") self.inference = Inference( "pyannote/embedding", use_auth_token=hf_token ) self.profiles = {} def enroll_speaker(self, name: str, audio_file: str): """Enroll a speaker from audio file""" embedding = self.inference(audio_file) self.profiles[name] = { 'embedding': embedding.tolist(), # Convert to list for JSON 'enrolled': time.time() } print(f"Enrolled speaker: {name}") def identify_speaker(self, audio_file: str, threshold: float = 0.7) -> Optional[str]: """Identify speaker from audio file""" if not self.profiles: return None unknown_embedding = self.inference(audio_file) best_match = None best_similarity = 0.0 for name, profile in self.profiles.items(): known_embedding = np.array(profile['embedding']) similarity = 1 - cosine(unknown_embedding, known_embedding) if similarity > best_similarity: best_similarity = similarity best_match = name if best_similarity >= threshold: return best_match return 'unknown' def load_profiles(self, filepath: str): """Load speaker profiles from JSON""" if os.path.exists(filepath): with open(filepath, 'r') as f: self.profiles = json.load(f) print(f"Loaded {len(self.profiles)} speaker profiles") def save_profiles(self, filepath: str): """Save speaker profiles to JSON""" os.makedirs(os.path.dirname(filepath), exist_ok=True) with open(filepath, 'w') as f: json.dump(self.profiles, f, indent=2) print(f"Saved {len(self.profiles)} speaker profiles") def load_whisper_model(model_name: str = DEFAULT_WHISPER_MODEL): """Load Whisper model""" global whisper_model if whisper_model is None: print(f"Loading Whisper model: {model_name}") whisper_model = whisper.load_model(model_name) print("Whisper model loaded") return whisper_model def transcribe_audio(audio_file_path: str) -> Optional[str]: """Transcribe audio file""" try: model = load_whisper_model() result = model.transcribe(audio_file_path) return result['text'].strip() except Exception as e: print(f"Error transcribing: {e}") return None def on_wake_word_detected(wake_word_name: str): """Callback factory for wake word detection""" def callback(): config = WAKE_WORD_CONFIGS.get(wake_word_name, {}) print(f"Wake word detected: {wake_word_name}") wake_word_queue.put({ 'timestamp': time.time(), 'wake_word': wake_word_name, 'response': config.get('response', 'Yes?'), 'context': config.get('context', 'general') }) return callback def start_multiple_wake_words(configs: Dict[str, Dict], engine_path: str): """Start multiple Precise wake word listeners""" global precise_runners, precise_enabled if not PRECISE_AVAILABLE: print("Error: Precise not available") return False active_count = 0 for name, config in configs.items(): if not config.get('enabled', False): continue model_path = config['model'] if not os.path.exists(model_path): print(f"Warning: Model not found: {model_path} (skipping {name})") continue try: engine = PreciseEngine(engine_path, model_path) runner = PreciseRunner( engine, sensitivity=config.get('sensitivity', 0.5), on_activation=on_wake_word_detected(name) ) runner.start() precise_runners[name] = runner active_count += 1 print(f"✓ Started wake word: {name}") print(f" Model: {model_path}") print(f" Sensitivity: {config.get('sensitivity', 0.5)}") except Exception as e: print(f"✗ Failed to start {name}: {e}") if active_count > 0: precise_enabled = True print(f"\nTotal active wake words: {active_count}") return True return False def stop_all_wake_words(): """Stop all wake word listeners""" global precise_runners, precise_enabled for name, runner in precise_runners.items(): try: runner.stop() print(f"Stopped wake word: {name}") except Exception as e: print(f"Error stopping {name}: {e}") precise_runners = {} precise_enabled = False def init_speaker_identification(hf_token: str) -> Optional[SpeakerIdentification]: """Initialize speaker identification""" global speaker_inference, speaker_id_enabled if not SPEAKER_ID_AVAILABLE: print("Speaker ID not available") return None try: speaker_inference = SpeakerIdentification(hf_token) # Load existing profiles if os.path.exists(SPEAKER_PROFILES_FILE): speaker_inference.load_profiles(SPEAKER_PROFILES_FILE) speaker_id_enabled = True print("Speaker identification initialized") return speaker_inference except Exception as e: print(f"Error initializing speaker ID: {e}") return None # Flask routes @app.route('/health', methods=['GET']) def health(): """Health check""" return jsonify({ 'status': 'healthy', 'whisper_loaded': whisper_model is not None, 'ha_connected': ha_client is not None, 'precise_enabled': precise_enabled, 'active_wake_words': list(precise_runners.keys()), 'speaker_id_enabled': speaker_id_enabled, 'enrolled_speakers': list(speaker_inference.profiles.keys()) if speaker_inference else [] }) @app.route('/wake-words', methods=['GET']) def list_wake_words(): """List all configured wake words""" wake_words = [] for name, config in WAKE_WORD_CONFIGS.items(): wake_words.append({ 'name': name, 'enabled': config.get('enabled', False), 'active': name in precise_runners, 'model': config['model'], 'sensitivity': config.get('sensitivity', 0.5), 'response': config.get('response', ''), 'context': config.get('context', 'general') }) return jsonify({ 'wake_words': wake_words, 'total': len(wake_words), 'active': len(precise_runners) }) @app.route('/wake-words//enable', methods=['POST']) def enable_wake_word(name): """Enable a wake word""" if name not in WAKE_WORD_CONFIGS: return jsonify({'error': 'Wake word not found'}), 404 config = WAKE_WORD_CONFIGS[name] config['enabled'] = True # Start the wake word if not already running if name not in precise_runners: # Restart all wake words to pick up changes # (simpler than starting individual ones) return jsonify({ 'message': f'Enabled {name}. Restart server to activate.' }) return jsonify({'message': f'Wake word {name} enabled'}) @app.route('/speakers/enroll', methods=['POST']) def enroll_speaker(): """Enroll a new speaker""" if not speaker_id_enabled or not speaker_inference: return jsonify({'error': 'Speaker ID not enabled'}), 400 if 'audio' not in request.files: return jsonify({'error': 'No audio file'}), 400 name = request.form.get('name') if not name: return jsonify({'error': 'No speaker name provided'}), 400 audio_file = request.files['audio'] # Save temporarily with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp: audio_file.save(temp.name) temp_path = temp.name try: speaker_inference.enroll_speaker(name, temp_path) speaker_inference.save_profiles(SPEAKER_PROFILES_FILE) return jsonify({ 'message': f'Enrolled speaker: {name}', 'total_speakers': len(speaker_inference.profiles) }) except Exception as e: return jsonify({'error': str(e)}), 500 finally: if os.path.exists(temp_path): os.remove(temp_path) @app.route('/speakers', methods=['GET']) def list_speakers(): """List enrolled speakers""" if not speaker_id_enabled or not speaker_inference: return jsonify({'error': 'Speaker ID not enabled'}), 400 speakers = [] for name, profile in speaker_inference.profiles.items(): speakers.append({ 'name': name, 'enrolled': profile.get('enrolled', 0) }) return jsonify({ 'speakers': speakers, 'total': len(speakers) }) @app.route('/process-enhanced', methods=['POST']) def process_enhanced(): """ Enhanced processing with speaker ID and wake word context """ if 'audio' not in request.files: return jsonify({'error': 'No audio file'}), 400 wake_word = request.form.get('wake_word', 'unknown') audio_file = request.files['audio'] with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp: audio_file.save(temp.name) temp_path = temp.name try: # Identify speaker (if enabled) speaker = 'unknown' if speaker_id_enabled and speaker_inference: speaker = speaker_inference.identify_speaker(temp_path) print(f"Identified speaker: {speaker}") # Transcribe text = transcribe_audio(temp_path) if not text: return jsonify({'error': 'Transcription failed'}), 500 print(f"[{speaker}] via [{wake_word}]: {text}") # Get wake word config config = WAKE_WORD_CONFIGS.get(wake_word, {}) context = config.get('context', 'general') # Process based on context and speaker response = f"Heard via {wake_word}: {text}" return jsonify({ 'success': True, 'transcription': text, 'speaker': speaker, 'wake_word': wake_word, 'context': context, 'response': response }) finally: if os.path.exists(temp_path): os.remove(temp_path) def main(): parser = argparse.ArgumentParser( description="Enhanced Voice Server with Multi-Wake-Word and Speaker ID" ) parser.add_argument('--host', default=DEFAULT_HOST) parser.add_argument('--port', type=int, default=DEFAULT_PORT) parser.add_argument('--whisper-model', default=DEFAULT_WHISPER_MODEL) parser.add_argument('--ha-url', default=DEFAULT_HA_URL) parser.add_argument('--ha-token', default=DEFAULT_HA_TOKEN) parser.add_argument('--enable-precise', action='store_true', help='Enable wake word detection') parser.add_argument('--multi-wake-word', action='store_true', help='Enable multiple wake words') parser.add_argument('--precise-engine', default=DEFAULT_PRECISE_ENGINE) parser.add_argument('--enable-speaker-id', action='store_true', help='Enable speaker identification') parser.add_argument('--hf-token', default=DEFAULT_HF_TOKEN, help='HuggingFace token for speaker ID') args = parser.parse_args() # Initialize HA client global ha_client ha_client = HomeAssistantClient(args.ha_url, args.ha_token) # Load Whisper print(f"Starting enhanced voice server on {args.host}:{args.port}") load_whisper_model(args.whisper_model) # Start Precise (multiple wake words) if args.enable_precise: if not PRECISE_AVAILABLE: print("Error: Precise not available") sys.exit(1) # Enable all or just first wake word if args.multi_wake_word: # Enable all configured wake words enabled_count = sum(1 for c in WAKE_WORD_CONFIGS.values() if c.get('enabled')) print(f"\nStarting {enabled_count} wake words...") else: # Enable only first wake word first_key = list(WAKE_WORD_CONFIGS.keys())[0] WAKE_WORD_CONFIGS[first_key]['enabled'] = True for key in list(WAKE_WORD_CONFIGS.keys())[1:]: WAKE_WORD_CONFIGS[key]['enabled'] = False if not start_multiple_wake_words(WAKE_WORD_CONFIGS, args.precise_engine): print("Error: No wake words started") sys.exit(1) # Initialize speaker ID if args.enable_speaker_id: if not args.hf_token: print("Error: --hf-token required for speaker ID") sys.exit(1) if not init_speaker_identification(args.hf_token): print("Warning: Speaker ID initialization failed") # Start server try: print("\n" + "="*50) print("Server ready!") print("="*50 + "\n") app.run(host=args.host, port=args.port, debug=False) except KeyboardInterrupt: print("\nShutting down...") stop_all_wake_words() sys.exit(0) if __name__ == '__main__': main()