Streaming Speech Processing Pipeline
Build real-time speech processing pipelines that handle audio streams with minimal latency for live transcription and voice interfaces.
Introduction
Streaming speech processing handles audio in real-time as it’s captured, without waiting for the entire recording.
Why streaming matters:
- Low latency: Start processing immediately (< 100ms)
- Live applications: Transcription, translation, voice assistants
- Memory efficiency: Process chunks, not entire recordings
- Better UX: Instant feedback to users
Challenges:
- Chunking audio correctly
- Managing state across chunks
- Handling network delays
- Synchronization issues
Streaming Pipeline Architecture
┌─────────────┐
│ Microphone │
└──────┬──────┘
│ Audio stream (PCM)
▼
┌──────────────────┐
│ Audio Chunker │ ← Split into chunks (e.g., 100ms)
└──────┬───────────┘
│ Chunks
▼
┌──────────────────┐
│ Preprocessor │ ← Normalize, filter
└──────┬───────────┘
│
▼
┌──────────────────┐
│ Feature Extract │ ← MFCC, Mel-spec
└──────┬───────────┘
│
▼
┌──────────────────┐
│ ML Model │ ← ASR, classification
└──────┬───────────┘
│
▼
┌──────────────────┐
│ Post-processing │ ← Smoothing, formatting
└──────┬───────────┘
│
▼
┌──────────────────┐
│ Output │ ← Transcription, action
└──────────────────┘
Audio Capture & Chunking
Real-time Audio Capture
import pyaudio
import numpy as np
from queue import Queue
import threading
class AudioStreamer:
"""
Capture audio from microphone in real-time
Buffers chunks for processing
"""
def __init__(self, sample_rate=16000, chunk_duration_ms=100):
self.sample_rate = sample_rate
self.chunk_duration_ms = chunk_duration_ms
self.chunk_size = int(sample_rate * chunk_duration_ms / 1000)
self.audio_queue = Queue()
self.stream = None
self.running = False
def _audio_callback(self, in_data, frame_count, time_info, status):
"""
Callback called by PyAudio for each audio chunk
Runs in separate thread
"""
if status:
print(f"Audio status: {status}")
# Convert bytes to numpy array
audio_data = np.frombuffer(in_data, dtype=np.int16)
# Normalize to [-1, 1]
audio_data = audio_data.astype(np.float32) / 32768.0
# Add to queue
self.audio_queue.put(audio_data)
return (in_data, pyaudio.paContinue)
def start(self):
"""Start capturing audio"""
self.running = True
p = pyaudio.PyAudio()
self.stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=self.sample_rate,
input=True,
frames_per_buffer=self.chunk_size,
stream_callback=self._audio_callback
)
self.stream.start_stream()
print(f"Started audio capture (chunk={self.chunk_duration_ms}ms)")
def stop(self):
"""Stop capturing audio"""
self.running = False
if self.stream:
self.stream.stop_stream()
self.stream.close()
print("Stopped audio capture")
def get_chunk(self, timeout=1.0):
"""
Get next audio chunk
Returns: numpy array of audio samples
"""
try:
return self.audio_queue.get(timeout=timeout)
except:
return None
# Usage
streamer = AudioStreamer(sample_rate=16000, chunk_duration_ms=100)
streamer.start()
# Process chunks in real-time
while True:
chunk = streamer.get_chunk()
if chunk is not None:
# Process chunk
process_audio_chunk(chunk)
Chunk Buffering Strategy
class ChunkBuffer:
"""
Buffer audio chunks with overlap
Helps models that need context from previous chunks
"""
def __init__(self, buffer_size=3, overlap_size=1):
"""
Args:
buffer_size: Number of chunks to keep
overlap_size: Number of chunks to overlap
"""
self.buffer_size = buffer_size
self.overlap_size = overlap_size
self.chunks = []
def add_chunk(self, chunk):
"""Add new chunk to buffer"""
self.chunks.append(chunk)
# Keep only recent chunks
if len(self.chunks) > self.buffer_size:
self.chunks.pop(0)
def get_buffered_audio(self):
"""
Get concatenated audio with overlap
Returns: numpy array
"""
if not self.chunks:
return None
return np.concatenate(self.chunks)
def get_latest_with_context(self):
"""
Get latest chunk with context from previous chunks
Useful for models that need history
"""
if len(self.chunks) < 2:
return self.chunks[-1] if self.chunks else None
# Return last 'overlap_size + 1' chunks
context_chunks = self.chunks[-(self.overlap_size + 1):]
return np.concatenate(context_chunks)
# Usage
buffer = ChunkBuffer(buffer_size=3, overlap_size=1)
for chunk in audio_chunks:
buffer.add_chunk(chunk)
audio_with_context = buffer.get_latest_with_context()
# Process audio with context
WebSocket-Based Streaming
Server Side
import asyncio
import websockets
import json
import numpy as np
import time
class StreamingASRServer:
"""
WebSocket server for streaming ASR
Clients send audio chunks, server returns transcriptions
"""
def __init__(self, model, port=8765):
self.model = model
self.port = port
self.active_connections = set()
async def handle_client(self, websocket, path):
"""Handle single client connection"""
client_id = id(websocket)
self.active_connections.add(websocket)
print(f"Client {client_id} connected")
try:
async for message in websocket:
# Decode message
data = json.loads(message)
if data['type'] == 'audio':
# Process audio chunk
audio_bytes = bytes.fromhex(data['audio'])
audio_chunk = np.frombuffer(audio_bytes, dtype=np.float32)
# Run inference
transcription = await self.process_chunk(audio_chunk)
# Send result
response = {
'type': 'transcription',
'text': transcription,
'is_final': data.get('is_final', False)
}
await websocket.send(json.dumps(response))
elif data['type'] == 'end':
# Session ended
break
except websockets.exceptions.ConnectionClosed:
print(f"Client {client_id} disconnected")
finally:
self.active_connections.remove(websocket)
async def process_chunk(self, audio_chunk):
"""
Process audio chunk
Returns: Transcription text
"""
# Extract features
# Placeholder feature extractor (should match your model's expected input)
features = extract_features(audio_chunk)
# Run model inference
transcription = self.model.predict(features)
return transcription
def start(self):
"""Start WebSocket server"""
print(f"Starting ASR server on port {self.port}")
start_server = websockets.serve(
self.handle_client,
'localhost',
self.port
)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
# Usage
server = StreamingASRServer(model=asr_model, port=8765)
server.start()
Client Side
import asyncio
import websockets
import json
import numpy as np
class StreamingASRClient:
"""
WebSocket client for streaming ASR
Sends audio chunks and receives transcriptions
"""
def __init__(self, server_url='ws://localhost:8765'):
self.server_url = server_url
self.websocket = None
async def connect(self):
"""Connect to server"""
self.websocket = await websockets.connect(self.server_url)
print(f"Connected to {self.server_url}")
async def send_audio_chunk(self, audio_chunk, is_final=False):
"""
Send audio chunk to server
Args:
audio_chunk: numpy array
is_final: Whether this is the last chunk
"""
# Convert to bytes
audio_bytes = audio_chunk.astype(np.float32).tobytes()
audio_hex = audio_bytes.hex()
# Create message
message = {
'type': 'audio',
'audio': audio_hex,
'is_final': is_final
}
# Send
await self.websocket.send(json.dumps(message))
async def receive_transcription(self):
"""
Receive transcription from server
Returns: Dict with transcription
"""
response = await self.websocket.recv()
return json.loads(response)
async def close(self):
"""Close connection"""
if self.websocket:
await self.websocket.send(json.dumps({'type': 'end'}))
await self.websocket.close()
# Usage
async def stream_audio():
client = StreamingASRClient()
await client.connect()
# Stream audio chunks
streamer = AudioStreamer()
streamer.start()
try:
while True:
chunk = streamer.get_chunk()
if chunk is None:
break
# Send chunk
await client.send_audio_chunk(chunk)
# Receive transcription
result = await client.receive_transcription()
print(f"Transcription: {result['text']}")
finally:
streamer.stop()
await client.close()
# Run
asyncio.run(stream_audio())
Latency Optimization
Latency Breakdown
Total Latency = Audio Capture + Network + Processing + Network + Display
Typical values:
- Audio capture: 10-50ms (chunk duration)
- Network (client → server): 10-30ms
- Feature extraction: 5-10ms
- Model inference: 20-100ms (depends on model)
- Network (server → client): 10-30ms
- Display: 1-5ms
Total: 56-225ms (aim for < 100ms)
Optimization Strategies
class OptimizedStreamingPipeline:
"""
Optimized streaming pipeline
Techniques:
- Smaller chunks
- Model quantization
- Batch processing
- Prefetching
"""
def __init__(self, model, chunk_duration_ms=50):
"""
Args:
chunk_duration_ms: Smaller chunks = lower latency
"""
self.model = model
self.chunk_duration_ms = chunk_duration_ms
# Prefetch buffer
self.prefetch_buffer = asyncio.Queue(maxsize=3)
# Start prefetching thread
self.prefetch_task = None
async def start_prefetching(self, audio_source):
"""
Prefetch audio chunks
Reduces waiting time
"""
async for chunk in audio_source:
await self.prefetch_buffer.put(chunk)
async def process_stream(self, audio_source):
"""
Process audio stream with optimizations
"""
# Start prefetching
self.prefetch_task = asyncio.create_task(
self.start_prefetching(audio_source)
)
while True:
# Get prefetched chunk (zero wait!)
chunk = await self.prefetch_buffer.get()
if chunk is None:
break
# Process chunk
result = await self.process_chunk_optimized(chunk)
yield result
async def process_chunk_optimized(self, chunk):
"""
Optimized chunk processing
Uses quantized model for faster inference
"""
# Extract features (optimized)
features = self.extract_features_fast(chunk)
# Run inference (quantized model)
result = self.model.predict(features)
return result
def extract_features_fast(self, audio):
"""
Fast feature extraction
Uses caching and vectorization
"""
# Vectorized operations are faster
mfcc = librosa.feature.mfcc(
y=audio,
sr=16000,
n_mfcc=13,
hop_length=160 # Smaller hop = more features
)
return mfcc
State Management
Stateful Streaming
class StatefulStreamingProcessor:
"""
Maintain state across chunks
Important for context-dependent models
"""
def __init__(self, model):
self.model = model
self.state = None # Hidden state for RNN/LSTM models
self.previous_chunks = []
self.partial_results = []
def process_chunk(self, audio_chunk):
"""
Process chunk with state
Returns: (result, is_complete)
"""
# Extract features
features = extract_features(audio_chunk)
# Run model with state
if hasattr(self.model, 'predict_stateful'):
result, self.state = self.model.predict_stateful(
features,
previous_state=self.state
)
else:
# Fallback: concatenate with previous chunks
self.previous_chunks.append(audio_chunk)
if len(self.previous_chunks) > 5:
self.previous_chunks.pop(0)
combined_audio = np.concatenate(self.previous_chunks)
combined_features = extract_features(combined_audio)
result = self.model.predict(combined_features)
# Determine if result is complete
is_complete = self.check_completeness(result)
if is_complete:
self.partial_results.append(result)
return result, is_complete
def check_completeness(self, result):
"""
Check if result is a complete utterance
Uses heuristics:
- Pause detection
- Confidence threshold
- Length limits
"""
# Simple heuristic: check for pause
# (In practice, use more sophisticated methods)
if hasattr(result, 'confidence') and result.confidence > 0.9:
return True
return False
def reset_state(self):
"""Reset state (e.g., after complete utterance)"""
self.state = None
self.previous_chunks = []
self.partial_results = []
Error Handling & Recovery
Robust Streaming
class RobustStreamingPipeline:
"""
Streaming pipeline with error handling
Handles:
- Network failures
- Audio glitches
- Model errors
"""
def __init__(self, model):
self.model = model
self.error_count = 0
self.max_errors = 10
async def process_stream_robust(self, audio_source):
"""
Process stream with error recovery
"""
retry_count = 0
max_retries = 3
async for chunk in audio_source:
try:
# Process chunk
result = await self.process_chunk_safe(chunk)
# Reset retry count on success
retry_count = 0
yield result
except AudioGlitchError as e:
# Audio glitch: skip chunk
print(f"Audio glitch detected: {e}")
self.error_count += 1
continue
except ModelInferenceError as e:
# Model error: retry with fallback
print(f"Model inference failed: {e}")
if retry_count < max_retries:
retry_count += 1
# Use simpler fallback model
result = await self.fallback_inference(chunk)
yield result
else:
# Give up after max retries
print("Max retries exceeded, skipping chunk")
retry_count = 0
except Exception as e:
# Unexpected error
print(f"Unexpected error: {e}")
self.error_count += 1
if self.error_count > self.max_errors:
raise RuntimeError("Too many errors, stopping stream")
async def process_chunk_safe(self, chunk):
"""
Process chunk with validation
"""
# Validate chunk
if not self.validate_chunk(chunk):
raise AudioGlitchError("Invalid audio chunk")
# Process
try:
features = extract_features(chunk)
result = self.model.predict(features)
return result
except Exception as e:
raise ModelInferenceError(f"Inference failed: {e}")
def validate_chunk(self, chunk):
"""
Validate audio chunk
Checks for:
- Correct length
- Valid range
- No NaN values
"""
if chunk is None or len(chunk) == 0:
return False
if np.any(np.isnan(chunk)):
return False
if np.max(np.abs(chunk)) > 10: # Suspiciously large
return False
return True
async def fallback_inference(self, chunk):
"""
Fallback inference with simpler model
Trades accuracy for reliability
"""
# Use cached results or simple heuristics
return {"text": "[processing...]", "confidence": 0.5}
class AudioGlitchError(Exception):
pass
class ModelInferenceError(Exception):
pass
Connection to Model Serving (Day 8 ML)
Streaming speech pipelines use model serving patterns:
class StreamingSpeechServer:
"""
Streaming speech server with model serving best practices
Combines:
- Model serving (Day 8 ML)
- Streaming audio (Day 8 Speech)
- Validation (Day 8 DSA)
"""
def __init__(self, model_path):
# Load model (model serving pattern)
self.model = self.load_model(model_path)
# Validation (BST-like range checking)
self.validator = AudioValidator()
# Monitoring
self.metrics = StreamingMetrics()
def load_model(self, model_path):
"""Load model with caching (from model serving)"""
import joblib
return joblib.load(model_path)
async def process_audio_stream(self, audio_chunks):
"""
Process streaming audio
Uses patterns from all Day 8 topics
"""
for chunk in audio_chunks:
# Validate input (BST validation pattern)
is_valid, violations = self.validator.validate(chunk)
if not is_valid:
print(f"Invalid chunk: {violations}")
continue
# Process chunk (model serving)
start_time = time.time()
result = self.model.predict(chunk)
latency = time.time() - start_time
# Monitor (model serving)
self.metrics.record_prediction(latency)
yield result
class AudioValidator:
"""Validate audio chunks (similar to BST validation)"""
def __init__(self):
# Define valid ranges (like BST min/max)
self.amplitude_range = (-1.0, 1.0)
self.length_range = (100, 10000) # samples
def validate(self, chunk):
"""
Validate chunk falls within ranges
Like BST validation with [min, max] bounds
"""
violations = []
# Check amplitude range
if np.min(chunk) < self.amplitude_range[0]:
violations.append("Amplitude too low")
if np.max(chunk) > self.amplitude_range[1]:
violations.append("Amplitude too high")
# Check length range
if len(chunk) < self.length_range[0]:
violations.append("Chunk too short")
if len(chunk) > self.length_range[1]:
violations.append("Chunk too long")
return len(violations) == 0, violations
Production Patterns
1. Multi-Channel Audio Streaming
class MultiChannelStreamingProcessor:
"""
Process multiple audio streams simultaneously
Use case: Conference calls, multi-mic arrays
"""
def __init__(self, num_channels=4):
self.num_channels = num_channels
self.channel_buffers = [ChunkBuffer() for _ in range(num_channels)]
self.processors = [StreamingProcessor() for _ in range(num_channels)]
async def process_multi_channel(self, channel_chunks: dict):
"""
Process multiple channels in parallel
Args:
channel_chunks: Dict {channel_id: audio_chunk}
Returns: Dict {channel_id: result}
"""
import asyncio
# Process channels in parallel
tasks = []
for channel_id, chunk in channel_chunks.items():
task = self.processors[channel_id].process_chunk_async(chunk)
tasks.append((channel_id, task))
# Wait for all results
results = {}
for channel_id, task in tasks:
result = await task
results[channel_id] = result
return results
def merge_results(self, channel_results: dict):
"""
Merge results from multiple channels
E.g., speaker diarization, beam forming
"""
# Simple merging: concatenate transcriptions
merged_text = []
for channel_id in sorted(channel_results.keys()):
result = channel_results[channel_id]
if result:
merged_text.append(f"[Channel {channel_id}]: {result['text']}")
return '\n'.join(merged_text)
# Usage
multi_processor = MultiChannelStreamingProcessor(num_channels=4)
# Stream audio from 4 microphones
async def process_meeting():
while True:
# Get chunks from all channels
chunks = {
0: mic1.get_chunk(),
1: mic2.get_chunk(),
2: mic3.get_chunk(),
3: mic4.get_chunk()
}
# Process in parallel
results = await multi_processor.process_multi_channel(chunks)
# Merge and display
merged = multi_processor.merge_results(results)
print(merged)
2. Adaptive Chunk Size
class AdaptiveChunkingProcessor:
"""
Dynamically adjust chunk size based on network/compute conditions
Smaller chunks: Lower latency but higher overhead
Larger chunks: Higher latency but more efficient
"""
def __init__(self, min_chunk_ms=50, max_chunk_ms=200):
self.min_chunk_ms = min_chunk_ms
self.max_chunk_ms = max_chunk_ms
self.current_chunk_ms = 100 # Start with middle value
self.latency_history = []
def adjust_chunk_size(self, recent_latency_ms):
"""
Adjust chunk size based on latency
High latency → smaller chunks (more responsive)
Low latency → larger chunks (more efficient)
"""
self.latency_history.append(recent_latency_ms)
if len(self.latency_history) < 10:
return self.current_chunk_ms
# Calculate average latency
avg_latency = np.mean(self.latency_history[-10:])
# Adjust chunk size
if avg_latency > 150: # High latency
# Reduce chunk size for better responsiveness
self.current_chunk_ms = max(
self.min_chunk_ms,
self.current_chunk_ms - 10
)
print(f"↓ Reducing chunk size to {self.current_chunk_ms}ms")
elif avg_latency < 50: # Very low latency
# Increase chunk size for efficiency
self.current_chunk_ms = min(
self.max_chunk_ms,
self.current_chunk_ms + 10
)
print(f"↑ Increasing chunk size to {self.current_chunk_ms}ms")
return self.current_chunk_ms
async def process_with_adaptive_chunking(self, audio_stream):
"""Process stream with adaptive chunk sizing"""
for chunk in audio_stream:
start_time = time.time()
# Process chunk
result = await self.process_chunk(chunk)
# Calculate latency
latency_ms = (time.time() - start_time) * 1000
# Adjust chunk size for next iteration
next_chunk_ms = self.adjust_chunk_size(latency_ms)
yield result, next_chunk_ms
3. Buffering Strategy for Unreliable Networks
class NetworkAwareStreamingBuffer:
"""
Buffer audio to handle network issues
Maintains smooth playback despite packet loss
"""
def __init__(self, buffer_size_seconds=2.0, sample_rate=16000):
self.buffer_size = int(buffer_size_seconds * sample_rate)
self.buffer = np.zeros(self.buffer_size, dtype=np.float32)
self.write_pos = 0
self.read_pos = 0
self.underrun_count = 0
self.overrun_count = 0
def write_chunk(self, chunk):
"""
Write audio chunk to buffer
Returns: Success status
"""
chunk_size = len(chunk)
# Check for buffer overrun
available_space = self.buffer_size - (self.write_pos - self.read_pos)
if chunk_size > available_space:
self.overrun_count += 1
print("⚠️ Buffer overrun - dropping oldest data")
# Drop oldest data
self.read_pos = self.write_pos - self.buffer_size + chunk_size
# Write to circular buffer
for i, sample in enumerate(chunk):
pos = (self.write_pos + i) % self.buffer_size
self.buffer[pos] = sample
self.write_pos += chunk_size
return True
def read_chunk(self, chunk_size):
"""
Read audio chunk from buffer
Returns: Audio chunk or None if underrun
"""
# Check for buffer underrun
available_data = self.write_pos - self.read_pos
if available_data < chunk_size:
self.underrun_count += 1
print("⚠️ Buffer underrun - not enough data")
return None
# Read from circular buffer
chunk = np.zeros(chunk_size, dtype=np.float32)
for i in range(chunk_size):
pos = (self.read_pos + i) % self.buffer_size
chunk[i] = self.buffer[pos]
self.read_pos += chunk_size
return chunk
def get_buffer_level(self):
"""Get current buffer fill level (0-1)"""
available = self.write_pos - self.read_pos
return available / self.buffer_size
def get_stats(self):
"""Get buffer statistics"""
return {
'buffer_level': self.get_buffer_level(),
'underruns': self.underrun_count,
'overruns': self.overrun_count
}
# Usage
buffer = NetworkAwareStreamingBuffer(buffer_size_seconds=2.0)
# Writer thread (receiving from network)
async def receive_audio():
async for chunk in network_stream:
buffer.write_chunk(chunk)
# Adaptive buffering
level = buffer.get_buffer_level()
if level < 0.2:
print("⚠️ Low buffer, may need to increase")
# Reader thread (processing)
async def process_audio():
while True:
chunk = buffer.read_chunk(chunk_size=1600) # 100ms at 16kHz
if chunk is not None:
result = await process_chunk(chunk)
yield result
else:
await asyncio.sleep(0.01) # Wait for more data
Advanced Optimization Techniques
1. Model Warm-Up
class WarmUpStreamingProcessor:
"""
Pre-warm model for lower latency on first request
Cold start can add 100-500ms latency
"""
def __init__(self, model):
self.model = model
self.is_warm = False
def warm_up(self, sample_rate=16000):
"""
Warm up model with dummy input
Call during initialization
"""
print("Warming up model...")
# Create dummy audio chunk
dummy_chunk = np.random.randn(int(sample_rate * 0.1)) # 100ms
# Run inference to warm up
for _ in range(3):
_ = self.model.predict(dummy_chunk)
self.is_warm = True
print("Model warm-up complete")
def process_chunk(self, chunk):
"""Process with warm-up check"""
if not self.is_warm:
self.warm_up()
return self.model.predict(chunk)
# Usage
processor = WarmUpStreamingProcessor(model)
processor.warm_up() # Do this during server startup
2. GPU Batching for Throughput
class GPUBatchProcessor:
"""
Batch multiple streams for GPU efficiency
GPUs are most efficient with batch processing
"""
def __init__(self, model, max_batch_size=16, max_wait_ms=50):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.pending_batches = []
async def process_chunk_batched(self, chunk, stream_id):
"""
Add chunk to batch and process when ready
Returns: Future that resolves with result
"""
future = asyncio.Future()
self.pending_batches.append((chunk, stream_id, future))
# Process batch if ready
if len(self.pending_batches) >= self.max_batch_size:
await self._process_batch()
else:
# Wait for more requests or timeout
asyncio.create_task(self._process_batch_after_delay())
return await future
async def _process_batch(self):
"""Process accumulated batch on GPU"""
if not self.pending_batches:
return
# Extract batch
chunks = [item[0] for item in self.pending_batches]
stream_ids = [item[1] for item in self.pending_batches]
futures = [item[2] for item in self.pending_batches]
# Pad to same length
max_len = max(len(c) for c in chunks)
padded_chunks = [
np.pad(c, (0, max_len - len(c)), mode='constant')
for c in chunks
]
# Stack into batch
batch = np.stack(padded_chunks)
# Run batch inference on GPU
results = self.model.predict_batch(batch)
# Distribute results
for result, future in zip(results, futures):
future.set_result(result)
# Clear batch
self.pending_batches = []
async def _process_batch_after_delay(self):
"""Process batch after timeout"""
await asyncio.sleep(self.max_wait_ms / 1000.0)
await self._process_batch()
# Usage
gpu_processor = GPUBatchProcessor(model, max_batch_size=16)
# Multiple concurrent streams
async def process_stream(stream_id):
async for chunk in audio_streams[stream_id]:
result = await gpu_processor.process_chunk_batched(chunk, stream_id)
yield result
# Run multiple streams in parallel
await asyncio.gather(*[
process_stream(i) for i in range(10)
])
3. Quantized Models for Edge Devices
import torch
import torchaudio
class EdgeOptimizedStreamingASR:
"""
Streaming ASR optimized for edge devices
Uses INT8 quantization for faster inference
"""
def __init__(self, model_path):
# Load and quantize model
self.model = torch.jit.load(model_path)
self.model = torch.quantization.quantize_dynamic(
self.model,
{torch.nn.Linear, torch.nn.LSTM},
dtype=torch.qint8
)
self.model.eval()
def process_chunk_optimized(self, audio_chunk):
"""
Process chunk with optimizations
- INT8 quantization: 4x faster
- No gradient computation
- Minimal memory allocation
"""
with torch.no_grad():
# Convert to tensor
audio_tensor = torch.from_numpy(audio_chunk).float()
audio_tensor = audio_tensor.unsqueeze(0) # Add batch dim
# Extract features (optimized)
features = torchaudio.compliance.kaldi.mfcc(
audio_tensor,
sample_frequency=16000,
num_ceps=13
)
# Run inference
output = self.model(features)
# Decode
transcription = self.decode(output)
return transcription
def decode(self, output):
"""Simple greedy decoding"""
# Get most likely tokens
tokens = torch.argmax(output, dim=-1)
# Convert to text (simplified)
transcription = self.tokens_to_text(tokens)
return transcription
# Benchmark: Quantized vs Full Precision
def benchmark_models():
"""Compare quantized vs full precision"""
full_model = load_model('model_fp32.pt')
quant_model = EdgeOptimizedStreamingASR('model_int8.pt')
audio_chunk = np.random.randn(1600) # 100ms at 16kHz
# Full precision
start = time.time()
for _ in range(100):
_ = full_model.predict(audio_chunk)
fp32_time = time.time() - start
# Quantized
start = time.time()
for _ in range(100):
_ = quant_model.process_chunk_optimized(audio_chunk)
int8_time = time.time() - start
print(f"FP32: {fp32_time:.2f}s")
print(f"INT8: {int8_time:.2f}s")
print(f"Speedup: {fp32_time / int8_time:.1f}x")
Real-World Integration Examples
1. Zoom-like Meeting Transcription
class MeetingTranscriptionService:
"""
Real-time meeting transcription
Similar to Zoom's live transcription
"""
def __init__(self):
self.asr_model = load_asr_model()
self.active_sessions = {}
def start_session(self, meeting_id):
"""Start transcription session"""
self.active_sessions[meeting_id] = {
'participants': {},
'transcript': [],
'start_time': time.time()
}
async def process_participant_audio(self, meeting_id, participant_id, audio_stream):
"""
Process audio from single participant
Returns: Real-time transcription
"""
session = self.active_sessions[meeting_id]
# Initialize participant
if participant_id not in session['participants']:
session['participants'][participant_id] = {
'processor': StatefulStreamingProcessor(self.asr_model),
'transcript_buffer': []
}
participant = session['participants'][participant_id]
processor = participant['processor']
async for chunk in audio_stream:
# Process chunk
result, is_complete = processor.process_chunk(chunk)
if is_complete:
# Add to transcript
timestamp = time.time() - session['start_time']
transcript_entry = {
'participant_id': participant_id,
'text': result['text'],
'timestamp': timestamp,
'confidence': result.get('confidence', 1.0)
}
session['transcript'].append(transcript_entry)
yield transcript_entry
def get_full_transcript(self, meeting_id):
"""Get complete meeting transcript"""
if meeting_id not in self.active_sessions:
return []
transcript = self.active_sessions[meeting_id]['transcript']
# Format as readable text
formatted = []
for entry in transcript:
time_str = format_timestamp(entry['timestamp'])
formatted.append(
f"[{time_str}] Participant {entry['participant_id']}: {entry['text']}"
)
return '\n'.join(formatted)
def format_timestamp(seconds):
"""Format seconds as MM:SS"""
minutes = int(seconds // 60)
secs = int(seconds % 60)
return f"{minutes:02d}:{secs:02d}"
# Usage
service = MeetingTranscriptionService()
service.start_session('meeting-123')
# Process audio from multiple participants
async def transcribe_meeting():
participants = ['user1', 'user2', 'user3']
# Process all participants in parallel
tasks = [
service.process_participant_audio(
'meeting-123',
participant_id,
get_audio_stream(participant_id)
)
for participant_id in participants
]
# Collect transcriptions
# Collect tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
for res in results:
if isinstance(res, Exception):
print(f"Stream error: {res}")
else:
for entry in res:
print(f"[{entry['timestamp']:.1f}s] {entry['participant_id']}: {entry['text']}")
2. Voice Assistant Backend
class VoiceAssistantPipeline:
"""
Complete voice assistant pipeline
ASR → NLU → Action → TTS
"""
def __init__(self):
self.asr = StreamingASR()
self.nlu = IntentClassifier()
self.action_executor = ActionExecutor()
self.tts = TextToSpeech()
async def process_voice_command(self, audio_stream):
"""
Process voice command end-to-end
Returns: Audio response
"""
# 1. Speech Recognition
transcription = await self.asr.transcribe_stream(audio_stream)
print(f"User said: {transcription}")
# 2. Natural Language Understanding
intent = self.nlu.classify(transcription)
print(f"Intent: {intent['name']} (confidence: {intent['confidence']:.2f})")
# 3. Execute Action
if intent['confidence'] > 0.7:
response_text = await self.action_executor.execute(intent)
else:
response_text = "I'm not sure what you mean. Could you rephrase that?"
# 4. Text-to-Speech
response_audio = self.tts.synthesize(response_text)
return {
'transcription': transcription,
'intent': intent,
'response_text': response_text,
'response_audio': response_audio
}
async def continuous_listening(self, audio_source):
"""
Continuously listen for wake word + command
Efficient always-on listening
"""
wake_word_detector = WakeWordDetector('hey assistant')
async for chunk in audio_source:
# Check for wake word (lightweight model)
if wake_word_detector.detect(chunk):
print("🎤 Wake word detected!")
# Start full ASR
command_audio = await self.capture_command(audio_source, timeout=5.0)
# Process command
result = await self.process_voice_command(command_audio)
# Play response
play_audio(result['response_audio'])
async def capture_command(self, audio_source, timeout=5.0):
"""Capture audio command after wake word"""
command_chunks = []
start_time = time.time()
async for chunk in audio_source:
command_chunks.append(chunk)
# Check timeout
if time.time() - start_time > timeout:
break
# Check for end of speech (silence)
if self.is_silence(chunk):
break
return np.concatenate(command_chunks)
def is_silence(self, chunk, threshold=0.01):
"""Detect if chunk is silence"""
energy = np.sqrt(np.mean(chunk ** 2))
return energy < threshold
# Usage
assistant = VoiceAssistantPipeline()
# Continuous listening
await assistant.continuous_listening(microphone_stream)
Performance Metrics & SLAs
Latency Tracking
class StreamingLatencyTracker:
"""
Track end-to-end latency for streaming pipeline
Measures:
- Audio capture latency
- Network latency
- Processing latency
- Total latency
"""
def __init__(self):
self.metrics = {
'capture_latency': [],
'network_latency': [],
'processing_latency': [],
'total_latency': []
}
async def process_with_tracking(self, audio_chunk, capture_timestamp):
"""
Process chunk with latency tracking
Args:
audio_chunk: Audio data
capture_timestamp: When audio was captured
Returns: (result, latency_breakdown)
"""
# Network latency (time from capture to arrival)
network_start = time.time()
network_latency = (network_start - capture_timestamp) * 1000
self.metrics['network_latency'].append(network_latency)
# Processing latency
processing_start = time.time()
result = await self.process_chunk(audio_chunk)
processing_end = time.time()
processing_latency = (processing_end - processing_start) * 1000
self.metrics['processing_latency'].append(processing_latency)
# Total latency
total_latency = (processing_end - capture_timestamp) * 1000
self.metrics['total_latency'].append(total_latency)
latency_breakdown = {
'network_ms': network_latency,
'processing_ms': processing_latency,
'total_ms': total_latency
}
return result, latency_breakdown
def get_latency_stats(self):
"""Get latency statistics"""
stats = {}
for metric_name, values in self.metrics.items():
if values:
stats[metric_name] = {
'p50': np.percentile(values, 50),
'p95': np.percentile(values, 95),
'p99': np.percentile(values, 99),
'mean': np.mean(values),
'max': np.max(values)
}
return stats
def check_sla(self, sla_ms=100):
"""
Check if meeting SLA
Returns: (is_meeting_sla, violation_rate)
"""
if not self.metrics['total_latency']:
return True, 0.0
violations = sum(1 for lat in self.metrics['total_latency'] if lat > sla_ms)
violation_rate = violations / len(self.metrics['total_latency'])
is_meeting_sla = violation_rate < 0.01 # < 1% violations
return is_meeting_sla, violation_rate
# Usage
tracker = StreamingLatencyTracker()
# Process with tracking
result, latency = await tracker.process_with_tracking(chunk, capture_time)
# Check SLA
is_ok, violation_rate = tracker.check_sla(sla_ms=100)
if not is_ok:
print(f"⚠️ SLA violation rate: {violation_rate:.1%}")
# Get detailed stats
stats = tracker.get_latency_stats()
print(f"P95 latency: {stats['total_latency']['p95']:.1f}ms")
Key Takeaways
✅ Chunk audio correctly - Balance latency vs context
✅ Manage state - RNN/LSTM models need previous chunks
✅ Optimize latency - Smaller chunks, quantization, prefetching
✅ Handle errors gracefully - Network failures, audio glitches
✅ Validate inputs - Like BST range checking
✅ Monitor performance - Latency, error rate, throughput
✅ WebSocket for streaming - Bidirectional, low-latency
Originally published at: arunbaby.com/speech-tech/0008-streaming-speech-pipeline
If you found this helpful, consider sharing it with others who might benefit.