Ports prior voice assistant research and prototypes from devl/Devops into the Minerva repo. Includes: - docs/: architecture, wake word guides, ESP32-S3 spec, hardware buying guide - scripts/: voice_server.py, voice_server_enhanced.py, setup scripts - hardware/maixduino/: edge device scripts with WiFi credentials scrubbed (replaced hardcoded password with secrets.py pattern) - config/.env.example: server config template - .gitignore: excludes .env, secrets.py, model blobs, ELF firmware - CLAUDE.md: Minerva product context and connection to cf-voice roadmap
465 lines
13 KiB
Python
Executable file
465 lines
13 KiB
Python
Executable file
# Maix Duino Voice Assistant Client
|
|
# Path: maix_voice_client.py (upload to Maix Duino SD card)
|
|
#
|
|
# Purpose and usage:
|
|
# This script runs on the Maix Duino board and handles:
|
|
# - Wake word detection using KPU
|
|
# - Audio capture from I2S microphone
|
|
# - Streaming audio to voice processing server
|
|
# - Playing back TTS responses
|
|
# - LED feedback for user interaction
|
|
#
|
|
# Requirements:
|
|
# - MaixPy firmware (latest version)
|
|
# - I2S microphone connected
|
|
# - Speaker or audio output connected
|
|
# - WiFi configured (see config below)
|
|
#
|
|
# Upload to board:
|
|
# 1. Copy this file to SD card as boot.py or main.py
|
|
# 2. Update WiFi credentials below
|
|
# 3. Update server URL to your Heimdall IP
|
|
# 4. Power cycle the board
|
|
|
|
import time
|
|
import audio
|
|
import image
|
|
from Maix import GPIO
|
|
from fpioa_manager import fm
|
|
from machine import I2S
|
|
import KPU as kpu
|
|
import sensor
|
|
import lcd
|
|
import gc
|
|
|
|
# ----- Configuration -----
|
|
|
|
# WiFi Settings
|
|
WIFI_SSID = "YourSSID"
|
|
WIFI_PASSWORD = "YourPassword"
|
|
|
|
# Server Settings
|
|
VOICE_SERVER_URL = "http://10.1.10.71:5000"
|
|
PROCESS_ENDPOINT = "/process"
|
|
|
|
# Audio Settings
|
|
SAMPLE_RATE = 16000 # 16kHz for Whisper
|
|
CHANNELS = 1 # Mono
|
|
SAMPLE_WIDTH = 2 # 16-bit
|
|
CHUNK_SIZE = 1024
|
|
|
|
# Wake Word Settings
|
|
WAKE_WORD_THRESHOLD = 0.7 # Confidence threshold (0.0-1.0)
|
|
WAKE_WORD_MODEL = "/sd/models/wake_word.kmodel" # Path to wake word model
|
|
|
|
# LED Pin for feedback
|
|
LED_PIN = 13 # Onboard LED (adjust if needed)
|
|
|
|
# Recording Settings
|
|
MAX_RECORD_TIME = 10 # Maximum seconds to record after wake word
|
|
SILENCE_THRESHOLD = 500 # Amplitude threshold for silence detection
|
|
SILENCE_DURATION = 2 # Seconds of silence before stopping recording
|
|
|
|
# ----- Color definitions for LCD -----
|
|
COLOR_RED = (255, 0, 0)
|
|
COLOR_GREEN = (0, 255, 0)
|
|
COLOR_BLUE = (0, 0, 255)
|
|
COLOR_YELLOW = (255, 255, 0)
|
|
COLOR_BLACK = (0, 0, 0)
|
|
COLOR_WHITE = (255, 255, 255)
|
|
|
|
# ----- Global Variables -----
|
|
led = None
|
|
i2s_dev = None
|
|
kpu_task = None
|
|
listening = False
|
|
|
|
|
|
def init_hardware():
|
|
"""Initialize hardware components"""
|
|
global led, i2s_dev
|
|
|
|
# Initialize LED
|
|
fm.register(LED_PIN, fm.fpioa.GPIO0)
|
|
led = GPIO(GPIO.GPIO0, GPIO.OUT)
|
|
led.value(0) # Turn off initially
|
|
|
|
# Initialize LCD
|
|
lcd.init()
|
|
lcd.clear(COLOR_BLACK)
|
|
lcd.draw_string(lcd.width()//2 - 50, lcd.height()//2,
|
|
"Initializing...",
|
|
lcd.WHITE, lcd.BLACK)
|
|
|
|
# Initialize I2S for audio (microphone)
|
|
# Note: Pin configuration may vary based on your specific hardware
|
|
fm.register(20, fm.fpioa.I2S0_IN_D0)
|
|
fm.register(19, fm.fpioa.I2S0_WS)
|
|
fm.register(18, fm.fpioa.I2S0_SCLK)
|
|
|
|
i2s_dev = I2S(I2S.DEVICE_0)
|
|
i2s_dev.channel_config(I2S.CHANNEL_0, I2S.RECEIVER,
|
|
align_mode=I2S.STANDARD_MODE,
|
|
data_width=I2S.RESOLUTION_16_BIT)
|
|
i2s_dev.set_sample_rate(SAMPLE_RATE)
|
|
|
|
print("Hardware initialized")
|
|
|
|
|
|
def init_network():
|
|
"""Initialize WiFi connection"""
|
|
import network
|
|
|
|
lcd.clear(COLOR_BLACK)
|
|
lcd.draw_string(10, 50, "Connecting to WiFi...", COLOR_WHITE, COLOR_BLACK)
|
|
|
|
wlan = network.WLAN(network.STA_IF)
|
|
wlan.active(True)
|
|
|
|
if not wlan.isconnected():
|
|
print(f"Connecting to {WIFI_SSID}...")
|
|
wlan.connect(WIFI_SSID, WIFI_PASSWORD)
|
|
|
|
# Wait for connection
|
|
timeout = 20
|
|
while not wlan.isconnected() and timeout > 0:
|
|
time.sleep(1)
|
|
timeout -= 1
|
|
print(f"Waiting for connection... {timeout}s")
|
|
|
|
if not wlan.isconnected():
|
|
print("Failed to connect to WiFi")
|
|
lcd.clear(COLOR_BLACK)
|
|
lcd.draw_string(10, 50, "WiFi Failed!", COLOR_RED, COLOR_BLACK)
|
|
return False
|
|
|
|
print("Network connected:", wlan.ifconfig())
|
|
lcd.clear(COLOR_BLACK)
|
|
lcd.draw_string(10, 50, "WiFi Connected", COLOR_GREEN, COLOR_BLACK)
|
|
lcd.draw_string(10, 70, f"IP: {wlan.ifconfig()[0]}", COLOR_WHITE, COLOR_BLACK)
|
|
time.sleep(2)
|
|
|
|
return True
|
|
|
|
|
|
def load_wake_word_model():
|
|
"""Load wake word detection model"""
|
|
global kpu_task
|
|
|
|
try:
|
|
# This is a placeholder - you'll need to train and convert a wake word model
|
|
# For now, we'll skip KPU wake word and use a simpler approach
|
|
print("Wake word model loading skipped (implement after model training)")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Failed to load wake word model: {e}")
|
|
return False
|
|
|
|
|
|
def detect_wake_word():
|
|
"""
|
|
Detect wake word in audio stream
|
|
|
|
Returns:
|
|
True if wake word detected, False otherwise
|
|
|
|
Note: This is a simplified version. For production, you should:
|
|
1. Train a wake word model using Mycroft Precise or similar
|
|
2. Convert the model to .kmodel format for K210
|
|
3. Load and run inference using KPU
|
|
|
|
For now, we'll use a simple amplitude-based trigger
|
|
"""
|
|
# Simple amplitude-based detection (placeholder)
|
|
# Replace with actual KPU inference
|
|
|
|
audio_data = i2s_dev.record(CHUNK_SIZE)
|
|
|
|
if audio_data:
|
|
# Calculate amplitude
|
|
amplitude = 0
|
|
for i in range(0, len(audio_data), 2):
|
|
sample = int.from_bytes(audio_data[i:i+2], 'little', True)
|
|
amplitude += abs(sample)
|
|
|
|
amplitude = amplitude / (len(audio_data) // 2)
|
|
|
|
# Simple threshold detection (replace with KPU inference)
|
|
if amplitude > 3000: # Adjust threshold based on your microphone
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def record_audio(max_duration=MAX_RECORD_TIME):
|
|
"""
|
|
Record audio until silence or max duration
|
|
|
|
Returns:
|
|
bytes: Recorded audio data in WAV format
|
|
"""
|
|
print(f"Recording audio (max {max_duration}s)...")
|
|
|
|
audio_buffer = bytearray()
|
|
start_time = time.time()
|
|
silence_start = None
|
|
|
|
# Record in chunks
|
|
while True:
|
|
elapsed = time.time() - start_time
|
|
|
|
# Check max duration
|
|
if elapsed > max_duration:
|
|
print("Max recording duration reached")
|
|
break
|
|
|
|
# Record chunk
|
|
chunk = i2s_dev.record(CHUNK_SIZE)
|
|
|
|
if chunk:
|
|
audio_buffer.extend(chunk)
|
|
|
|
# Calculate amplitude for silence detection
|
|
amplitude = 0
|
|
for i in range(0, len(chunk), 2):
|
|
sample = int.from_bytes(chunk[i:i+2], 'little', True)
|
|
amplitude += abs(sample)
|
|
|
|
amplitude = amplitude / (len(chunk) // 2)
|
|
|
|
# Silence detection
|
|
if amplitude < SILENCE_THRESHOLD:
|
|
if silence_start is None:
|
|
silence_start = time.time()
|
|
elif time.time() - silence_start > SILENCE_DURATION:
|
|
print("Silence detected, stopping recording")
|
|
break
|
|
else:
|
|
silence_start = None
|
|
|
|
# Update LCD with recording time
|
|
if int(elapsed) % 1 == 0:
|
|
lcd.clear(COLOR_BLACK)
|
|
lcd.draw_string(10, 50, f"Recording... {int(elapsed)}s",
|
|
COLOR_RED, COLOR_BLACK)
|
|
|
|
print(f"Recorded {len(audio_buffer)} bytes")
|
|
|
|
# Convert to WAV format
|
|
return create_wav(audio_buffer)
|
|
|
|
|
|
def create_wav(audio_data):
|
|
"""Create WAV file header and combine with audio data"""
|
|
import struct
|
|
|
|
# WAV header
|
|
sample_rate = SAMPLE_RATE
|
|
channels = CHANNELS
|
|
sample_width = SAMPLE_WIDTH
|
|
data_size = len(audio_data)
|
|
|
|
# RIFF header
|
|
wav = bytearray(b'RIFF')
|
|
wav.extend(struct.pack('<I', 36 + data_size)) # File size - 8
|
|
wav.extend(b'WAVE')
|
|
|
|
# fmt chunk
|
|
wav.extend(b'fmt ')
|
|
wav.extend(struct.pack('<I', 16)) # fmt chunk size
|
|
wav.extend(struct.pack('<H', 1)) # PCM format
|
|
wav.extend(struct.pack('<H', channels))
|
|
wav.extend(struct.pack('<I', sample_rate))
|
|
wav.extend(struct.pack('<I', sample_rate * channels * sample_width))
|
|
wav.extend(struct.pack('<H', channels * sample_width))
|
|
wav.extend(struct.pack('<H', sample_width * 8))
|
|
|
|
# data chunk
|
|
wav.extend(b'data')
|
|
wav.extend(struct.pack('<I', data_size))
|
|
wav.extend(audio_data)
|
|
|
|
return bytes(wav)
|
|
|
|
|
|
def send_audio_to_server(audio_data):
|
|
"""
|
|
Send audio to voice processing server and get response
|
|
|
|
Returns:
|
|
dict: Response from server or None on failure
|
|
"""
|
|
import urequests
|
|
|
|
try:
|
|
# Prepare multipart form data
|
|
url = f"{VOICE_SERVER_URL}{PROCESS_ENDPOINT}"
|
|
|
|
print(f"Sending audio to {url}...")
|
|
lcd.clear(COLOR_BLACK)
|
|
lcd.draw_string(10, 50, "Processing...", COLOR_YELLOW, COLOR_BLACK)
|
|
|
|
# Send POST request with audio file
|
|
# Note: MaixPy's urequests doesn't support multipart, so we need a workaround
|
|
# For now, send raw audio with appropriate headers
|
|
headers = {
|
|
'Content-Type': 'audio/wav',
|
|
}
|
|
|
|
response = urequests.post(url, data=audio_data, headers=headers)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
response.close()
|
|
return result
|
|
else:
|
|
print(f"Server error: {response.status_code}")
|
|
response.close()
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"Error sending audio: {e}")
|
|
return None
|
|
|
|
|
|
def display_response(response_text):
|
|
"""Display response on LCD"""
|
|
lcd.clear(COLOR_BLACK)
|
|
|
|
# Word wrap for LCD
|
|
words = response_text.split()
|
|
lines = []
|
|
current_line = ""
|
|
|
|
for word in words:
|
|
test_line = current_line + word + " "
|
|
if len(test_line) * 8 > lcd.width() - 20: # Rough character width
|
|
if current_line:
|
|
lines.append(current_line.strip())
|
|
current_line = word + " "
|
|
else:
|
|
current_line = test_line
|
|
|
|
if current_line:
|
|
lines.append(current_line.strip())
|
|
|
|
# Display lines
|
|
y = 30
|
|
for line in lines[:5]: # Max 5 lines
|
|
lcd.draw_string(10, y, line, COLOR_GREEN, COLOR_BLACK)
|
|
y += 20
|
|
|
|
|
|
def set_led(state):
|
|
"""Control LED state"""
|
|
if led:
|
|
led.value(1 if state else 0)
|
|
|
|
|
|
def main_loop():
|
|
"""Main voice assistant loop"""
|
|
global listening
|
|
|
|
# Show ready status
|
|
lcd.clear(COLOR_BLACK)
|
|
lcd.draw_string(10, lcd.height()//2 - 10, "Say wake word...",
|
|
COLOR_BLUE, COLOR_BLACK)
|
|
|
|
print("Voice assistant ready. Listening for wake word...")
|
|
|
|
while True:
|
|
try:
|
|
# Listen for wake word
|
|
if detect_wake_word():
|
|
print("Wake word detected!")
|
|
|
|
# Visual feedback
|
|
set_led(True)
|
|
lcd.clear(COLOR_BLACK)
|
|
lcd.draw_string(10, 50, "Listening...", COLOR_RED, COLOR_BLACK)
|
|
|
|
# Small delay to skip the wake word itself
|
|
time.sleep(0.5)
|
|
|
|
# Record command
|
|
audio_data = record_audio()
|
|
|
|
# Send to server
|
|
response = send_audio_to_server(audio_data)
|
|
|
|
if response and response.get('success'):
|
|
transcription = response.get('transcription', '')
|
|
response_text = response.get('response', 'No response')
|
|
|
|
print(f"You said: {transcription}")
|
|
print(f"Response: {response_text}")
|
|
|
|
# Display response
|
|
display_response(response_text)
|
|
|
|
# TODO: Play TTS audio response
|
|
|
|
else:
|
|
lcd.clear(COLOR_BLACK)
|
|
lcd.draw_string(10, 50, "Error processing",
|
|
COLOR_RED, COLOR_BLACK)
|
|
|
|
# Turn off LED
|
|
set_led(False)
|
|
|
|
# Pause before listening again
|
|
time.sleep(2)
|
|
|
|
# Reset display
|
|
lcd.clear(COLOR_BLACK)
|
|
lcd.draw_string(10, lcd.height()//2 - 10, "Say wake word...",
|
|
COLOR_BLUE, COLOR_BLACK)
|
|
|
|
# Small delay to prevent tight loop
|
|
time.sleep(0.1)
|
|
|
|
# Garbage collection
|
|
if gc.mem_free() < 100000: # If free memory < 100KB
|
|
gc.collect()
|
|
|
|
except KeyboardInterrupt:
|
|
print("Exiting...")
|
|
break
|
|
except Exception as e:
|
|
print(f"Error in main loop: {e}")
|
|
time.sleep(1)
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
print("=" * 40)
|
|
print("Maix Duino Voice Assistant")
|
|
print("=" * 40)
|
|
|
|
# Initialize hardware
|
|
init_hardware()
|
|
|
|
# Connect to network
|
|
if not init_network():
|
|
print("Failed to initialize network. Exiting.")
|
|
return
|
|
|
|
# Load wake word model (optional)
|
|
load_wake_word_model()
|
|
|
|
# Start main loop
|
|
try:
|
|
main_loop()
|
|
except Exception as e:
|
|
print(f"Fatal error: {e}")
|
|
finally:
|
|
# Cleanup
|
|
set_led(False)
|
|
lcd.clear(COLOR_BLACK)
|
|
lcd.draw_string(10, lcd.height()//2, "Stopped",
|
|
COLOR_RED, COLOR_BLACK)
|
|
|
|
|
|
# Run main program
|
|
if __name__ == "__main__":
|
|
main()
|