Streaming Speech Processing Pipeline
Build real-time speech processing pipelines that handle audio streams with minimal latency for live transcription and voice interfaces.
TL;DR
Streaming speech pipelines break continuous audio into chunks (50-200ms), process them through a feature extraction and ML inference pipeline, and return results in real-time over WebSockets. Critical techniques include overlap buffering for context continuity, prefetch queues to hide I/O latency, adaptive chunk sizing based on network conditions, model warm-up to avoid cold-start delays, and GPU batching across concurrent streams. End-to-end latency target is under 100ms. Related: streaming ASR architecture for the model layer and keyword spotting for always-on wake word detection.

Introduction
Streaming speech processing handles audio in real-time as it’s captured, without waiting for the entire recording.
Why streaming matters:
- Low latency: Start processing immediately (< 100ms)
- Live applications: Transcription, translation, voice assistants
- Memory efficiency: Process chunks, not entire recordings
- Better UX: Instant feedback to users
Challenges:
- Chunking audio correctly
- Managing state across chunks
- Handling network delays
- Synchronization issues
Streaming Pipeline Architecture
┌─────────────┐
│ Microphone │
└──────┬──────┘
│ Audio stream (PCM)
▼
┌──────────────────┐
│ Audio Chunker │ ← Split into chunks (e.g., 100ms)
└──────┬───────────┘
│ Chunks
▼
┌──────────────────┐
│ Preprocessor │ ← Normalize, filter
└──────┬───────────┘
│
▼
┌──────────────────┐
│ Feature Extract │ ← MFCC, Mel-spec
└──────┬───────────┘
│
▼
┌──────────────────┐
│ ML Model │ ← ASR, classification
└──────┬───────────┘
│
▼
┌──────────────────┐
│ Post-processing │ ← Smoothing, formatting
└──────┬───────────┘
│
▼
┌──────────────────┐
│ Output │ ← Transcription, action
└──────────────────┘
Audio Capture & Chunking
Real-time Audio Capture
import pyaudio
import numpy as np
from queue import Queue
import threading
class AudioStreamer:
"""
Capture audio from microphone in real-time
Buffers chunks for processing
"""
def __init__(self, sample_rate=16000, chunk_duration_ms=100):
self.sample_rate = sample_rate
self.chunk_duration_ms = chunk_duration_ms
self.chunk_size = int(sample_rate * chunk_duration_ms / 1000)
self.audio_queue = Queue()
self.stream = None
self.running = False
def _audio_callback(self, in_data, frame_count, time_info, status):
"""
Callback called by PyAudio for each audio chunk
Runs in separate thread
"""
if status:
print(f"Audio status: {status}")
# Convert bytes to numpy array
audio_data = np.frombuffer(in_data, dtype=np.int16)
# Normalize to [-1, 1]
audio_data = audio_data.astype(np.float32) / 32768.0
# Add to queue
self.audio_queue.put(audio_data)
return (in_data, pyaudio.paContinue)
def start(self):
"""Start capturing audio"""
self.running = True
p = pyaudio.PyAudio()
self.stream = p.open(
format=pyaudio.paInt16,
channels=1,
rate=self.sample_rate,
input=True,
frames_per_buffer=self.chunk_size,
stream_callback=self._audio_callback
)
self.stream.start_stream()
print(f"Started audio capture (chunk={self.chunk_duration_ms}ms)")
def stop(self):
"""Stop capturing audio"""
self.running = False
if self.stream:
self.stream.stop_stream()
self.stream.close()
print("Stopped audio capture")
def get_chunk(self, timeout=1.0):
"""
Get next audio chunk
Returns: numpy array of audio samples
"""
try:
return self.audio_queue.get(timeout=timeout)
except:
return None
# Usage
streamer = AudioStreamer(sample_rate=16000, chunk_duration_ms=100)
streamer.start()
# Process chunks in real-time
while True:
chunk = streamer.get_chunk()
if chunk is not None:
# Process chunk
process_audio_chunk(chunk)
Chunk Buffering Strategy
class ChunkBuffer:
"""
Buffer audio chunks with overlap
Helps models that need context from previous chunks
"""
def __init__(self, buffer_size=3, overlap_size=1):
"""
Args:
buffer_size: Number of chunks to keep
overlap_size: Number of chunks to overlap
"""
self.buffer_size = buffer_size
self.overlap_size = overlap_size
self.chunks = []
def add_chunk(self, chunk):
"""Add new chunk to buffer"""
self.chunks.append(chunk)
# Keep only recent chunks
if len(self.chunks) > self.buffer_size:
self.chunks.pop(0)
def get_buffered_audio(self):
"""
Get concatenated audio with overlap
Returns: numpy array
"""
if not self.chunks:
return None
return np.concatenate(self.chunks)
def get_latest_with_context(self):
"""
Get latest chunk with context from previous chunks
Useful for models that need history
"""
if len(self.chunks) < 2:
return self.chunks[-1] if self.chunks else None
# Return last 'overlap_size + 1' chunks
context_chunks = self.chunks[-(self.overlap_size + 1):]
return np.concatenate(context_chunks)
# Usage
buffer = ChunkBuffer(buffer_size=3, overlap_size=1)
for chunk in audio_chunks:
buffer.add_chunk(chunk)
audio_with_context = buffer.get_latest_with_context()
# Process audio with context
WebSocket-Based Streaming
Server Side
import asyncio
import websockets
import json
import numpy as np
import time
class StreamingASRServer:
"""
WebSocket server for streaming ASR
Clients send audio chunks, server returns transcriptions
"""
def __init__(self, model, port=8765):
self.model = model
self.port = port
self.active_connections = set()
async def handle_client(self, websocket, path):
"""Handle single client connection"""
client_id = id(websocket)
self.active_connections.add(websocket)
print(f"Client {client_id} connected")
try:
async for message in websocket:
# Decode message
data = json.loads(message)
if data['type'] == 'audio':
# Process audio chunk
audio_bytes = bytes.fromhex(data['audio'])
audio_chunk = np.frombuffer(audio_bytes, dtype=np.float32)
# Run inference
transcription = await self.process_chunk(audio_chunk)
# Send result
response = {
'type': 'transcription',
'text': transcription,
'is_final': data.get('is_final', False)
}
await websocket.send(json.dumps(response))
elif data['type'] == 'end':
# Session ended
break
except websockets.exceptions.ConnectionClosed:
print(f"Client {client_id} disconnected")
finally:
self.active_connections.remove(websocket)
async def process_chunk(self, audio_chunk):
"""
Process audio chunk
Returns: Transcription text
"""
# Extract features
# Placeholder feature extractor (should match your model's expected input)
features = extract_features(audio_chunk)
# Run model inference
transcription = self.model.predict(features)
return transcription
def start(self):
"""Start WebSocket server"""
print(f"Starting ASR server on port {self.port}")
start_server = websockets.serve(
self.handle_client,
'localhost',
self.port
)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
# Usage
server = StreamingASRServer(model=asr_model, port=8765)
server.start()
Client Side
import asyncio
import websockets
import json
import numpy as np
class StreamingASRClient:
"""
WebSocket client for streaming ASR
Sends audio chunks and receives transcriptions
"""
def __init__(self, server_url='ws://localhost:8765'):
self.server_url = server_url
self.websocket = None
async def connect(self):
"""Connect to server"""
self.websocket = await websockets.connect(self.server_url)
print(f"Connected to {self.server_url}")
async def send_audio_chunk(self, audio_chunk, is_final=False):
"""
Send audio chunk to server
Args:
audio_chunk: numpy array
is_final: Whether this is the last chunk
"""
# Convert to bytes
audio_bytes = audio_chunk.astype(np.float32).tobytes()
audio_hex = audio_bytes.hex()
# Create message
message = {
'type': 'audio',
'audio': audio_hex,
'is_final': is_final
}
# Send
await self.websocket.send(json.dumps(message))
async def receive_transcription(self):
"""
Receive transcription from server
Returns: Dict with transcription
"""
response = await self.websocket.recv()
return json.loads(response)
async def close(self):
"""Close connection"""
if self.websocket:
await self.websocket.send(json.dumps({'type': 'end'}))
await self.websocket.close()
# Usage
async def stream_audio():
client = StreamingASRClient()
await client.connect()
# Stream audio chunks
streamer = AudioStreamer()
streamer.start()
try:
while True:
chunk = streamer.get_chunk()
if chunk is None:
break
# Send chunk
await client.send_audio_chunk(chunk)
# Receive transcription
result = await client.receive_transcription()
print(f"Transcription: {result['text']}")
finally:
streamer.stop()
await client.close()
# Run
asyncio.run(stream_audio())
Latency Optimization
Latency Breakdown
Total Latency = Audio Capture + Network + Processing + Network + Display
Typical values:
- Audio capture: 10-50ms (chunk duration)
- Network (client → server): 10-30ms
- Feature extraction: 5-10ms
- Model inference: 20-100ms (depends on model)
- Network (server → client): 10-30ms
- Display: 1-5ms
Total: 56-225ms (aim for < 100ms)
Optimization Strategies
class OptimizedStreamingPipeline:
"""
Optimized streaming pipeline
Techniques:
- Smaller chunks
- Model quantization
- Batch processing
- Prefetching
"""
def __init__(self, model, chunk_duration_ms=50):
"""
Args:
chunk_duration_ms: Smaller chunks = lower latency
"""
self.model = model
self.chunk_duration_ms = chunk_duration_ms
# Prefetch buffer
self.prefetch_buffer = asyncio.Queue(maxsize=3)
# Start prefetching thread
self.prefetch_task = None
async def start_prefetching(self, audio_source):
"""
Prefetch audio chunks
Reduces waiting time
"""
async for chunk in audio_source:
await self.prefetch_buffer.put(chunk)
async def process_stream(self, audio_source):
"""
Process audio stream with optimizations
"""
# Start prefetching
self.prefetch_task = asyncio.create_task(
self.start_prefetching(audio_source)
)
while True:
# Get prefetched chunk (zero wait!)
chunk = await self.prefetch_buffer.get()
if chunk is None:
break
# Process chunk
result = await self.process_chunk_optimized(chunk)
yield result
async def process_chunk_optimized(self, chunk):
"""
Optimized chunk processing
Uses quantized model for faster inference
"""
# Extract features (optimized)
features = self.extract_features_fast(chunk)
# Run inference (quantized model)
result = self.model.predict(features)
return result
def extract_features_fast(self, audio):
"""
Fast feature extraction
Uses caching and vectorization
"""
# Vectorized operations are faster
mfcc = librosa.feature.mfcc(
y=audio,
sr=16000,
n_mfcc=13,
hop_length=160 # Smaller hop = more features
)
return mfcc
State Management
Stateful Streaming
class StatefulStreamingProcessor:
"""
Maintain state across chunks
Important for context-dependent models
"""
def __init__(self, model):
self.model = model
self.state = None # Hidden state for RNN/LSTM models
self.previous_chunks = []
self.partial_results = []
def process_chunk(self, audio_chunk):
"""
Process chunk with state
Returns: (result, is_complete)
"""
# Extract features
features = extract_features(audio_chunk)
# Run model with state
if hasattr(self.model, 'predict_stateful'):
result, self.state = self.model.predict_stateful(
features,
previous_state=self.state
)
else:
# Fallback: concatenate with previous chunks
self.previous_chunks.append(audio_chunk)
if len(self.previous_chunks) > 5:
self.previous_chunks.pop(0)
combined_audio = np.concatenate(self.previous_chunks)
combined_features = extract_features(combined_audio)
result = self.model.predict(combined_features)
# Determine if result is complete
is_complete = self.check_completeness(result)
if is_complete:
self.partial_results.append(result)
return result, is_complete
def check_completeness(self, result):
"""
Check if result is a complete utterance
Uses heuristics:
- Pause detection
- Confidence threshold
- Length limits
"""
# Simple heuristic: check for pause
# (In practice, use more sophisticated methods)
if hasattr(result, 'confidence') and result.confidence > 0.9:
return True
return False
def reset_state(self):
"""Reset state (e.g., after complete utterance)"""
self.state = None
self.previous_chunks = []
self.partial_results = []
Error Handling & Recovery
Robust Streaming
class RobustStreamingPipeline:
"""
Streaming pipeline with error handling
Handles:
- Network failures
- Audio glitches
- Model errors
"""
def __init__(self, model):
self.model = model
self.error_count = 0
self.max_errors = 10
async def process_stream_robust(self, audio_source):
"""
Process stream with error recovery
"""
retry_count = 0
max_retries = 3
async for chunk in audio_source:
try:
# Process chunk
result = await self.process_chunk_safe(chunk)
# Reset retry count on success
retry_count = 0
yield result
except AudioGlitchError as e:
# Audio glitch: skip chunk
print(f"Audio glitch detected: {e}")
self.error_count += 1
continue
except ModelInferenceError as e:
# Model error: retry with fallback
print(f"Model inference failed: {e}")
if retry_count < max_retries:
retry_count += 1
# Use simpler fallback model
result = await self.fallback_inference(chunk)
yield result
else:
# Give up after max retries
print("Max retries exceeded, skipping chunk")
retry_count = 0
except Exception as e:
# Unexpected error
print(f"Unexpected error: {e}")
self.error_count += 1
if self.error_count > self.max_errors:
raise RuntimeError("Too many errors, stopping stream")
async def process_chunk_safe(self, chunk):
"""
Process chunk with validation
"""
# Validate chunk
if not self.validate_chunk(chunk):
raise AudioGlitchError("Invalid audio chunk")
# Process
try:
features = extract_features(chunk)
result = self.model.predict(features)
return result
except Exception as e:
raise ModelInferenceError(f"Inference failed: {e}")
def validate_chunk(self, chunk):
"""
Validate audio chunk
Checks for:
- Correct length
- Valid range
- No NaN values
"""
if chunk is None or len(chunk) == 0:
return False
if np.any(np.isnan(chunk)):
return False
if np.max(np.abs(chunk)) > 10: # Suspiciously large
return False
return True
async def fallback_inference(self, chunk):
"""
Fallback inference with simpler model
Trades accuracy for reliability
"""
# Use cached results or simple heuristics
return {"text": "[processing...]", "confidence": 0.5}
class AudioGlitchError(Exception):
pass
class ModelInferenceError(Exception):
pass
Connection to Model Serving (ML)
Streaming speech pipelines use model serving patterns:
class StreamingSpeechServer:
"""
Streaming speech server with model serving best practices
Combines:
- Model serving (ML)
- Streaming audio (Speech)
- Validation (DSA)
"""
def __init__(self, model_path):
# Load model (model serving pattern)
self.model = self.load_model(model_path)
# Validation (BST-like range checking)
self.validator = AudioValidator()
# Monitoring
self.metrics = StreamingMetrics()
def load_model(self, model_path):
"""Load model with caching (from model serving)"""
import joblib
return joblib.load(model_path)
async def process_audio_stream(self, audio_chunks):
"""
Process streaming audio
Uses patterns from all topics
"""
for chunk in audio_chunks:
# Validate input (BST validation pattern)
is_valid, violations = self.validator.validate(chunk)
if not is_valid:
print(f"Invalid chunk: {violations}")
continue
# Process chunk (model serving)
start_time = time.time()
result = self.model.predict(chunk)
latency = time.time() - start_time
# Monitor (model serving)
self.metrics.record_prediction(latency)
yield result
class AudioValidator:
"""Validate audio chunks (similar to BST validation)"""
def __init__(self):
# Define valid ranges (like BST min/max)
self.amplitude_range = (-1.0, 1.0)
self.length_range = (100, 10000) # samples
def validate(self, chunk):
"""
Validate chunk falls within ranges
Like BST validation with [min, max] bounds
"""
violations = []
# Check amplitude range
if np.min(chunk) < self.amplitude_range[0]:
violations.append("Amplitude too low")
if np.max(chunk) > self.amplitude_range[1]:
violations.append("Amplitude too high")
# Check length range
if len(chunk) < self.length_range[0]:
violations.append("Chunk too short")
if len(chunk) > self.length_range[1]:
violations.append("Chunk too long")
return len(violations) == 0, violations
Production Patterns
1. Multi-Channel Audio Streaming
class MultiChannelStreamingProcessor:
"""
Process multiple audio streams simultaneously
Use case: Conference calls, multi-mic arrays
"""
def __init__(self, num_channels=4):
self.num_channels = num_channels
self.channel_buffers = [ChunkBuffer() for _ in range(num_channels)]
self.processors = [StreamingProcessor() for _ in range(num_channels)]
async def process_multi_channel(self, channel_chunks: dict):
"""
Process multiple channels in parallel
Args:
channel_chunks: Dict {channel_id: audio_chunk}
Returns: Dict {channel_id: result}
"""
import asyncio
# Process channels in parallel
tasks = []
for channel_id, chunk in channel_chunks.items():
task = self.processors[channel_id].process_chunk_async(chunk)
tasks.append((channel_id, task))
# Wait for all results
results = {}
for channel_id, task in tasks:
result = await task
results[channel_id] = result
return results
def merge_results(self, channel_results: dict):
"""
Merge results from multiple channels
E.g., speaker diarization, beam forming
"""
# Simple merging: concatenate transcriptions
merged_text = []
for channel_id in sorted(channel_results.keys()):
result = channel_results[channel_id]
if result:
merged_text.append(f"[Channel {channel_id}]: {result['text']}")
return '\n'.join(merged_text)
# Usage
multi_processor = MultiChannelStreamingProcessor(num_channels=4)
# Stream audio from 4 microphones
async def process_meeting():
while True:
# Get chunks from all channels
chunks = {
0: mic1.get_chunk(),
1: mic2.get_chunk(),
2: mic3.get_chunk(),
3: mic4.get_chunk()
}
# Process in parallel
results = await multi_processor.process_multi_channel(chunks)
# Merge and display
merged = multi_processor.merge_results(results)
print(merged)
2. Adaptive Chunk Size
class AdaptiveChunkingProcessor:
"""
Dynamically adjust chunk size based on network/compute conditions
Smaller chunks: Lower latency but higher overhead
Larger chunks: Higher latency but more efficient
"""
def __init__(self, min_chunk_ms=50, max_chunk_ms=200):
self.min_chunk_ms = min_chunk_ms
self.max_chunk_ms = max_chunk_ms
self.current_chunk_ms = 100 # Start with middle value
self.latency_history = []
def adjust_chunk_size(self, recent_latency_ms):
"""
Adjust chunk size based on latency
High latency → smaller chunks (more responsive)
Low latency → larger chunks (more efficient)
"""
self.latency_history.append(recent_latency_ms)
if len(self.latency_history) < 10:
return self.current_chunk_ms
# Calculate average latency
avg_latency = np.mean(self.latency_history[-10:])
# Adjust chunk size
if avg_latency > 150: # High latency
# Reduce chunk size for better responsiveness
self.current_chunk_ms = max(
self.min_chunk_ms,
self.current_chunk_ms - 10
)
print(f"↓ Reducing chunk size to {self.current_chunk_ms}ms")
elif avg_latency < 50: # Very low latency
# Increase chunk size for efficiency
self.current_chunk_ms = min(
self.max_chunk_ms,
self.current_chunk_ms + 10
)
print(f"↑ Increasing chunk size to {self.current_chunk_ms}ms")
return self.current_chunk_ms
async def process_with_adaptive_chunking(self, audio_stream):
"""Process stream with adaptive chunk sizing"""
for chunk in audio_stream:
start_time = time.time()
# Process chunk
result = await self.process_chunk(chunk)
# Calculate latency
latency_ms = (time.time() - start_time) * 1000
# Adjust chunk size for next iteration
next_chunk_ms = self.adjust_chunk_size(latency_ms)
yield result, next_chunk_ms
3. Buffering Strategy for Unreliable Networks
class NetworkAwareStreamingBuffer:
"""
Buffer audio to handle network issues
Maintains smooth playback despite packet loss
"""
def __init__(self, buffer_size_seconds=2.0, sample_rate=16000):
self.buffer_size = int(buffer_size_seconds * sample_rate)
self.buffer = np.zeros(self.buffer_size, dtype=np.float32)
self.write_pos = 0
self.read_pos = 0
self.underrun_count = 0
self.overrun_count = 0
def write_chunk(self, chunk):
"""
Write audio chunk to buffer
Returns: Success status
"""
chunk_size = len(chunk)
# Check for buffer overrun
available_space = self.buffer_size - (self.write_pos - self.read_pos)
if chunk_size > available_space:
self.overrun_count += 1
print("⚠️ Buffer overrun - dropping oldest data")
# Drop oldest data
self.read_pos = self.write_pos - self.buffer_size + chunk_size
# Write to circular buffer
for i, sample in enumerate(chunk):
pos = (self.write_pos + i) % self.buffer_size
self.buffer[pos] = sample
self.write_pos += chunk_size
return True
def read_chunk(self, chunk_size):
"""
Read audio chunk from buffer
Returns: Audio chunk or None if underrun
"""
# Check for buffer underrun
available_data = self.write_pos - self.read_pos
if available_data < chunk_size:
self.underrun_count += 1
print("⚠️ Buffer underrun - not enough data")
return None
# Read from circular buffer
chunk = np.zeros(chunk_size, dtype=np.float32)
for i in range(chunk_size):
pos = (self.read_pos + i) % self.buffer_size
chunk[i] = self.buffer[pos]
self.read_pos += chunk_size
return chunk
def get_buffer_level(self):
"""Get current buffer fill level (0-1)"""
available = self.write_pos - self.read_pos
return available / self.buffer_size
def get_stats(self):
"""Get buffer statistics"""
return {
'buffer_level': self.get_buffer_level(),
'underruns': self.underrun_count,
'overruns': self.overrun_count
}
# Usage
buffer = NetworkAwareStreamingBuffer(buffer_size_seconds=2.0)
# Writer thread (receiving from network)
async def receive_audio():
async for chunk in network_stream:
buffer.write_chunk(chunk)
# Adaptive buffering
level = buffer.get_buffer_level()
if level < 0.2:
print("⚠️ Low buffer, may need to increase")
# Reader thread (processing)
async def process_audio():
while True:
chunk = buffer.read_chunk(chunk_size=1600) # 100ms at 16kHz
if chunk is not None:
result = await process_chunk(chunk)
yield result
else:
await asyncio.sleep(0.01) # Wait for more data
Advanced Optimization Techniques
1. Model Warm-Up
class WarmUpStreamingProcessor:
"""
Pre-warm model for lower latency on first request
Cold start can add 100-500ms latency
"""
def __init__(self, model):
self.model = model
self.is_warm = False
def warm_up(self, sample_rate=16000):
"""
Warm up model with dummy input
Call during initialization
"""
print("Warming up model...")
# Create dummy audio chunk
dummy_chunk = np.random.randn(int(sample_rate * 0.1)) # 100ms
# Run inference to warm up
for _ in range(3):
_ = self.model.predict(dummy_chunk)
self.is_warm = True
print("Model warm-up complete")
def process_chunk(self, chunk):
"""Process with warm-up check"""
if not self.is_warm:
self.warm_up()
return self.model.predict(chunk)
# Usage
processor = WarmUpStreamingProcessor(model)
processor.warm_up() # Do this during server startup
2. GPU Batching for Throughput
class GPUBatchProcessor:
"""
Batch multiple streams for GPU efficiency
GPUs are most efficient with batch processing
"""
def __init__(self, model, max_batch_size=16, max_wait_ms=50):
self.model = model
self.max_batch_size = max_batch_size
self.max_wait_ms = max_wait_ms
self.pending_batches = []
async def process_chunk_batched(self, chunk, stream_id):
"""
Add chunk to batch and process when ready
Returns: Future that resolves with result
"""
future = asyncio.Future()
self.pending_batches.append((chunk, stream_id, future))
# Process batch if ready
if len(self.pending_batches) >= self.max_batch_size:
await self._process_batch()
else:
# Wait for more requests or timeout
asyncio.create_task(self._process_batch_after_delay())
return await future
async def _process_batch(self):
"""Process accumulated batch on GPU"""
if not self.pending_batches:
return
# Extract batch
chunks = [item[0] for item in self.pending_batches]
stream_ids = [item[1] for item in self.pending_batches]
futures = [item[2] for item in self.pending_batches]
# Pad to same length
max_len = max(len(c) for c in chunks)
padded_chunks = [
np.pad(c, (0, max_len - len(c)), mode='constant')
for c in chunks
]
# Stack into batch
batch = np.stack(padded_chunks)
# Run batch inference on GPU
results = self.model.predict_batch(batch)
# Distribute results
for result, future in zip(results, futures):
future.set_result(result)
# Clear batch
self.pending_batches = []
async def _process_batch_after_delay(self):
"""Process batch after timeout"""
await asyncio.sleep(self.max_wait_ms / 1000.0)
await self._process_batch()
# Usage
gpu_processor = GPUBatchProcessor(model, max_batch_size=16)
# Multiple concurrent streams
async def process_stream(stream_id):
async for chunk in audio_streams[stream_id]:
result = await gpu_processor.process_chunk_batched(chunk, stream_id)
yield result
# Run multiple streams in parallel
await asyncio.gather(*[
process_stream(i) for i in range(10)
])
3. Quantized Models for Edge Devices
import torch
import torchaudio
class EdgeOptimizedStreamingASR:
"""
Streaming ASR optimized for edge devices
Uses INT8 quantization for faster inference
"""
def __init__(self, model_path):
# Load and quantize model
self.model = torch.jit.load(model_path)
self.model = torch.quantization.quantize_dynamic(
self.model,
{torch.nn.Linear, torch.nn.LSTM},
dtype=torch.qint8
)
self.model.eval()
def process_chunk_optimized(self, audio_chunk):
"""
Process chunk with optimizations
- INT8 quantization: 4x faster
- No gradient computation
- Minimal memory allocation
"""
with torch.no_grad():
# Convert to tensor
audio_tensor = torch.from_numpy(audio_chunk).float()
audio_tensor = audio_tensor.unsqueeze(0) # Add batch dim
# Extract features (optimized)
features = torchaudio.compliance.kaldi.mfcc(
audio_tensor,
sample_frequency=16000,
num_ceps=13
)
# Run inference
output = self.model(features)
# Decode
transcription = self.decode(output)
return transcription
def decode(self, output):
"""Simple greedy decoding"""
# Get most likely tokens
tokens = torch.argmax(output, dim=-1)
# Convert to text (simplified)
transcription = self.tokens_to_text(tokens)
return transcription
# Benchmark: Quantized vs Full Precision
def benchmark_models():
"""Compare quantized vs full precision"""
full_model = load_model('model_fp32.pt')
quant_model = EdgeOptimizedStreamingASR('model_int8.pt')
audio_chunk = np.random.randn(1600) # 100ms at 16kHz
# Full precision
start = time.time()
for _ in range(100):
_ = full_model.predict(audio_chunk)
fp32_time = time.time() - start
# Quantized
start = time.time()
for _ in range(100):
_ = quant_model.process_chunk_optimized(audio_chunk)
int8_time = time.time() - start
print(f"FP32: {fp32_time:.2f}s")
print(f"INT8: {int8_time:.2f}s")
print(f"Speedup: {fp32_time / int8_time:.1f}x")
Real-World Integration Examples
1. Zoom-like Meeting Transcription
class MeetingTranscriptionService:
"""
Real-time meeting transcription
Similar to Zoom's live transcription
"""
def __init__(self):
self.asr_model = load_asr_model()
self.active_sessions = {}
def start_session(self, meeting_id):
"""Start transcription session"""
self.active_sessions[meeting_id] = {
'participants': {},
'transcript': [],
'start_time': time.time()
}
async def process_participant_audio(self, meeting_id, participant_id, audio_stream):
"""
Process audio from single participant
Returns: Real-time transcription
"""
session = self.active_sessions[meeting_id]
# Initialize participant
if participant_id not in session['participants']:
session['participants'][participant_id] = {
'processor': StatefulStreamingProcessor(self.asr_model),
'transcript_buffer': []
}
participant = session['participants'][participant_id]
processor = participant['processor']
async for chunk in audio_stream:
# Process chunk
result, is_complete = processor.process_chunk(chunk)
if is_complete:
# Add to transcript
timestamp = time.time() - session['start_time']
transcript_entry = {
'participant_id': participant_id,
'text': result['text'],
'timestamp': timestamp,
'confidence': result.get('confidence', 1.0)
}
session['transcript'].append(transcript_entry)
yield transcript_entry
def get_full_transcript(self, meeting_id):
"""Get complete meeting transcript"""
if meeting_id not in self.active_sessions:
return []
transcript = self.active_sessions[meeting_id]['transcript']
# Format as readable text
formatted = []
for entry in transcript:
time_str = format_timestamp(entry['timestamp'])
formatted.append(
f"[{time_str}] Participant {entry['participant_id']}: {entry['text']}"
)
return '\n'.join(formatted)
def format_timestamp(seconds):
"""Format seconds as MM:SS"""
minutes = int(seconds // 60)
secs = int(seconds % 60)
return f"{minutes:02d}:{secs:02d}"
# Usage
service = MeetingTranscriptionService()
service.start_session('meeting-123')
# Process audio from multiple participants
async def transcribe_meeting():
participants = ['user1', 'user2', 'user3']
# Process all participants in parallel
tasks = [
service.process_participant_audio(
'meeting-123',
participant_id,
get_audio_stream(participant_id)
)
for participant_id in participants
]
# Collect transcriptions
# Collect tasks concurrently
results = await asyncio.gather(*tasks, return_exceptions=True)
for res in results:
if isinstance(res, Exception):
print(f"Stream error: {res}")
else:
for entry in res:
print(f"[{entry['timestamp']:.1f}s] {entry['participant_id']}: {entry['text']}")
2. Voice Assistant Backend
This pattern combines streaming ASR with keyword spotting for wake word detection and TTS for audio responses.
class VoiceAssistantPipeline:
"""
Complete voice assistant pipeline
ASR → NLU → Action → TTS
"""
def __init__(self):
self.asr = StreamingASR()
self.nlu = IntentClassifier()
self.action_executor = ActionExecutor()
self.tts = TextToSpeech()
async def process_voice_command(self, audio_stream):
"""
Process voice command end-to-end
Returns: Audio response
"""
# 1. Speech Recognition
transcription = await self.asr.transcribe_stream(audio_stream)
print(f"User said: {transcription}")
# 2. Natural Language Understanding
intent = self.nlu.classify(transcription)
print(f"Intent: {intent['name']} (confidence: {intent['confidence']:.2f})")
# 3. Execute Action
if intent['confidence'] > 0.7:
response_text = await self.action_executor.execute(intent)
else:
response_text = "I'm not sure what you mean. Could you rephrase that?"
# 4. Text-to-Speech
response_audio = self.tts.synthesize(response_text)
return {
'transcription': transcription,
'intent': intent,
'response_text': response_text,
'response_audio': response_audio
}
async def continuous_listening(self, audio_source):
"""
Continuously listen for wake word + command
Efficient always-on listening
"""
wake_word_detector = WakeWordDetector('hey assistant')
async for chunk in audio_source:
# Check for wake word (lightweight model)
if wake_word_detector.detect(chunk):
print("🎤 Wake word detected!")
# Start full ASR
command_audio = await self.capture_command(audio_source, timeout=5.0)
# Process command
result = await self.process_voice_command(command_audio)
# Play response
play_audio(result['response_audio'])
async def capture_command(self, audio_source, timeout=5.0):
"""Capture audio command after wake word"""
command_chunks = []
start_time = time.time()
async for chunk in audio_source:
command_chunks.append(chunk)
# Check timeout
if time.time() - start_time > timeout:
break
# Check for end of speech (silence)
if self.is_silence(chunk):
break
return np.concatenate(command_chunks)
def is_silence(self, chunk, threshold=0.01):
"""Detect if chunk is silence"""
energy = np.sqrt(np.mean(chunk ** 2))
return energy < threshold
# Usage
assistant = VoiceAssistantPipeline()
# Continuous listening
await assistant.continuous_listening(microphone_stream)
Performance Metrics & SLAs
Latency Tracking
class StreamingLatencyTracker:
"""
Track end-to-end latency for streaming pipeline
Measures:
- Audio capture latency
- Network latency
- Processing latency
- Total latency
"""
def __init__(self):
self.metrics = {
'capture_latency': [],
'network_latency': [],
'processing_latency': [],
'total_latency': []
}
async def process_with_tracking(self, audio_chunk, capture_timestamp):
"""
Process chunk with latency tracking
Args:
audio_chunk: Audio data
capture_timestamp: When audio was captured
Returns: (result, latency_breakdown)
"""
# Network latency (time from capture to arrival)
network_start = time.time()
network_latency = (network_start - capture_timestamp) * 1000
self.metrics['network_latency'].append(network_latency)
# Processing latency
processing_start = time.time()
result = await self.process_chunk(audio_chunk)
processing_end = time.time()
processing_latency = (processing_end - processing_start) * 1000
self.metrics['processing_latency'].append(processing_latency)
# Total latency
total_latency = (processing_end - capture_timestamp) * 1000
self.metrics['total_latency'].append(total_latency)
latency_breakdown = {
'network_ms': network_latency,
'processing_ms': processing_latency,
'total_ms': total_latency
}
return result, latency_breakdown
def get_latency_stats(self):
"""Get latency statistics"""
stats = {}
for metric_name, values in self.metrics.items():
if values:
stats[metric_name] = {
'p50': np.percentile(values, 50),
'p95': np.percentile(values, 95),
'p99': np.percentile(values, 99),
'mean': np.mean(values),
'max': np.max(values)
}
return stats
def check_sla(self, sla_ms=100):
"""
Check if meeting SLA
Returns: (is_meeting_sla, violation_rate)
"""
if not self.metrics['total_latency']:
return True, 0.0
violations = sum(1 for lat in self.metrics['total_latency'] if lat > sla_ms)
violation_rate = violations / len(self.metrics['total_latency'])
is_meeting_sla = violation_rate < 0.01 # < 1% violations
return is_meeting_sla, violation_rate
# Usage
tracker = StreamingLatencyTracker()
# Process with tracking
result, latency = await tracker.process_with_tracking(chunk, capture_time)
# Check SLA
is_ok, violation_rate = tracker.check_sla(sla_ms=100)
if not is_ok:
print(f"⚠️ SLA violation rate: {violation_rate:.1%}")
# Get detailed stats
stats = tracker.get_latency_stats()
print(f"P95 latency: {stats['total_latency']['p95']:.1f}ms")
Key Takeaways
✅ Chunk audio correctly - Balance latency vs context ✅ Manage state - RNN/LSTM models need previous chunks ✅ Optimize latency - Smaller chunks, quantization, prefetching ✅ Handle errors gracefully - Network failures, audio glitches ✅ Validate inputs - Like BST range checking ✅ Monitor performance - Latency, error rate, throughput ✅ WebSocket for streaming - Bidirectional, low-latency
FAQ
Q: What chunk size should I use for streaming speech processing? A: Start with 100ms chunks (1,600 samples at 16kHz) as a balanced default. Smaller chunks (50ms) reduce latency but increase overhead and may not provide enough context for the model. Larger chunks (200ms) are more efficient but add perceptible delay. Use adaptive chunk sizing to adjust dynamically based on network and compute conditions.
Q: How do I handle state across audio chunks in streaming ASR? A: For RNN/LSTM models, pass the hidden state from each chunk to the next. For non-stateful models, maintain a rolling buffer of recent chunks and concatenate context from previous chunks before inference. Reset state at utterance boundaries detected by silence or end-of-speech signals.
Q: How do I reduce cold-start latency in a streaming speech server? A: Run model warm-up during server initialization by passing 3-5 dummy audio chunks through the model. This pre-allocates memory and compiles any JIT operations. Cold start can add 100-500ms to the first request, but warm-up eliminates this entirely.
Originally published at: arunbaby.com/speech-tech/0008-streaming-speech-pipeline
If you found this helpful, consider sharing it with others who might benefit.
Want to work together?
I take on projects, advisory roles, and fractional CTO engagements in AI/ML. I also help businesses go AI-native with agentic workflows and agent orchestration.
Get in touch