25 minute read

Build real-time speech processing pipelines that handle audio streams with minimal latency for live transcription and voice interfaces.

TL;DR

Streaming speech pipelines break continuous audio into chunks (50-200ms), process them through a feature extraction and ML inference pipeline, and return results in real-time over WebSockets. Critical techniques include overlap buffering for context continuity, prefetch queues to hide I/O latency, adaptive chunk sizing based on network conditions, model warm-up to avoid cold-start delays, and GPU batching across concurrent streams. End-to-end latency target is under 100ms. Related: streaming ASR architecture for the model layer and keyword spotting for always-on wake word detection.

Bundled fiber optic cables carrying visible pulses of light through a transparent junction box where streams merge an...

Introduction

Streaming speech processing handles audio in real-time as it’s captured, without waiting for the entire recording.

Why streaming matters:

  • Low latency: Start processing immediately (< 100ms)
  • Live applications: Transcription, translation, voice assistants
  • Memory efficiency: Process chunks, not entire recordings
  • Better UX: Instant feedback to users

Challenges:

  • Chunking audio correctly
  • Managing state across chunks
  • Handling network delays
  • Synchronization issues

Streaming Pipeline Architecture

┌─────────────┐
│ Microphone │
└──────┬──────┘
 │ Audio stream (PCM)
 ▼
┌──────────────────┐
│ Audio Chunker │ ← Split into chunks (e.g., 100ms)
└──────┬───────────┘
 │ Chunks
 ▼
┌──────────────────┐
│ Preprocessor │ ← Normalize, filter
└──────┬───────────┘
 │
 ▼
┌──────────────────┐
│ Feature Extract │ ← MFCC, Mel-spec
└──────┬───────────┘
 │
 ▼
┌──────────────────┐
│ ML Model │ ← ASR, classification
└──────┬───────────┘
 │
 ▼
┌──────────────────┐
│ Post-processing │ ← Smoothing, formatting
└──────┬───────────┘
 │
 ▼
┌──────────────────┐
│ Output │ ← Transcription, action
└──────────────────┘

Audio Capture & Chunking

Real-time Audio Capture

import pyaudio
import numpy as np
from queue import Queue
import threading

class AudioStreamer:
    """
    Capture audio from microphone in real-time

    Buffers chunks for processing
    """

    def __init__(self, sample_rate=16000, chunk_duration_ms=100):
        self.sample_rate = sample_rate
        self.chunk_duration_ms = chunk_duration_ms
        self.chunk_size = int(sample_rate * chunk_duration_ms / 1000)

        self.audio_queue = Queue()
        self.stream = None
        self.running = False

    def _audio_callback(self, in_data, frame_count, time_info, status):
        """
        Callback called by PyAudio for each audio chunk

        Runs in separate thread
        """
        if status:
            print(f"Audio status: {status}")

            # Convert bytes to numpy array
            audio_data = np.frombuffer(in_data, dtype=np.int16)

            # Normalize to [-1, 1]
            audio_data = audio_data.astype(np.float32) / 32768.0

            # Add to queue
            self.audio_queue.put(audio_data)

            return (in_data, pyaudio.paContinue)

    def start(self):
        """Start capturing audio"""
        self.running = True

        p = pyaudio.PyAudio()

        self.stream = p.open(
        format=pyaudio.paInt16,
        channels=1,
        rate=self.sample_rate,
        input=True,
        frames_per_buffer=self.chunk_size,
        stream_callback=self._audio_callback
        )

        self.stream.start_stream()
        print(f"Started audio capture (chunk={self.chunk_duration_ms}ms)")

    def stop(self):
        """Stop capturing audio"""
        self.running = False
        if self.stream:
            self.stream.stop_stream()
            self.stream.close()
            print("Stopped audio capture")

    def get_chunk(self, timeout=1.0):
        """
        Get next audio chunk

        Returns: numpy array of audio samples
        """
        try:
            return self.audio_queue.get(timeout=timeout)
        except:
            return None

            # Usage
            streamer = AudioStreamer(sample_rate=16000, chunk_duration_ms=100)
            streamer.start()

            # Process chunks in real-time
            while True:
                chunk = streamer.get_chunk()
                if chunk is not None:
                    # Process chunk
                    process_audio_chunk(chunk)

Chunk Buffering Strategy

class ChunkBuffer:
    """
    Buffer audio chunks with overlap

    Helps models that need context from previous chunks
    """

    def __init__(self, buffer_size=3, overlap_size=1):
        """
        Args:
            buffer_size: Number of chunks to keep
            overlap_size: Number of chunks to overlap
            """
            self.buffer_size = buffer_size
            self.overlap_size = overlap_size
            self.chunks = []

    def add_chunk(self, chunk):
        """Add new chunk to buffer"""
        self.chunks.append(chunk)

        # Keep only recent chunks
        if len(self.chunks) > self.buffer_size:
            self.chunks.pop(0)

    def get_buffered_audio(self):
        """
        Get concatenated audio with overlap

        Returns: numpy array
        """
        if not self.chunks:
            return None

            return np.concatenate(self.chunks)

    def get_latest_with_context(self):
        """
        Get latest chunk with context from previous chunks

        Useful for models that need history
        """
        if len(self.chunks) < 2:
            return self.chunks[-1] if self.chunks else None

            # Return last 'overlap_size + 1' chunks
            context_chunks = self.chunks[-(self.overlap_size + 1):]
            return np.concatenate(context_chunks)

            # Usage
            buffer = ChunkBuffer(buffer_size=3, overlap_size=1)

            for chunk in audio_chunks:
                buffer.add_chunk(chunk)
                audio_with_context = buffer.get_latest_with_context()
                # Process audio with context

WebSocket-Based Streaming

Server Side

import asyncio
import websockets
import json
import numpy as np
import time

class StreamingASRServer:
    """
    WebSocket server for streaming ASR

    Clients send audio chunks, server returns transcriptions
    """

    def __init__(self, model, port=8765):
        self.model = model
        self.port = port
        self.active_connections = set()

        async def handle_client(self, websocket, path):
            """Handle single client connection"""
            client_id = id(websocket)
            self.active_connections.add(websocket)
            print(f"Client {client_id} connected")

            try:
                async for message in websocket:
                    # Decode message
                    data = json.loads(message)

                    if data['type'] == 'audio':
                        # Process audio chunk
                        audio_bytes = bytes.fromhex(data['audio'])
                        audio_chunk = np.frombuffer(audio_bytes, dtype=np.float32)

                        # Run inference
                        transcription = await self.process_chunk(audio_chunk)

                        # Send result
                        response = {
                        'type': 'transcription',
                        'text': transcription,
                        'is_final': data.get('is_final', False)
                        }
                        await websocket.send(json.dumps(response))

                    elif data['type'] == 'end':
                        # Session ended
                        break

                    except websockets.exceptions.ConnectionClosed:
                        print(f"Client {client_id} disconnected")

                    finally:
                        self.active_connections.remove(websocket)

                        async def process_chunk(self, audio_chunk):
                            """
                            Process audio chunk

                            Returns: Transcription text
                            """
                            # Extract features
                            # Placeholder feature extractor (should match your model's expected input)
                            features = extract_features(audio_chunk)

                            # Run model inference
                            transcription = self.model.predict(features)

                            return transcription

    def start(self):
        """Start WebSocket server"""
        print(f"Starting ASR server on port {self.port}")

        start_server = websockets.serve(
        self.handle_client,
        'localhost',
        self.port
        )

        asyncio.get_event_loop().run_until_complete(start_server)
        asyncio.get_event_loop().run_forever()

        # Usage
        server = StreamingASRServer(model=asr_model, port=8765)
        server.start()

Client Side

import asyncio
import websockets
import json
import numpy as np

class StreamingASRClient:
    """
    WebSocket client for streaming ASR

    Sends audio chunks and receives transcriptions
    """

    def __init__(self, server_url='ws://localhost:8765'):
        self.server_url = server_url
        self.websocket = None

        async def connect(self):
            """Connect to server"""
            self.websocket = await websockets.connect(self.server_url)
            print(f"Connected to {self.server_url}")

            async def send_audio_chunk(self, audio_chunk, is_final=False):
                """
                Send audio chunk to server

                Args:
                    audio_chunk: numpy array
                    is_final: Whether this is the last chunk
                    """
                    # Convert to bytes
                    audio_bytes = audio_chunk.astype(np.float32).tobytes()
                    audio_hex = audio_bytes.hex()

                    # Create message
                    message = {
                    'type': 'audio',
                    'audio': audio_hex,
                    'is_final': is_final
                    }

                    # Send
                    await self.websocket.send(json.dumps(message))

                    async def receive_transcription(self):
                        """
                        Receive transcription from server

                        Returns: Dict with transcription
                        """
                        response = await self.websocket.recv()
                        return json.loads(response)

                        async def close(self):
                            """Close connection"""
                            if self.websocket:
                                await self.websocket.send(json.dumps({'type': 'end'}))
                                await self.websocket.close()

                                # Usage
                                async def stream_audio():
                                    client = StreamingASRClient()
                                    await client.connect()

                                    # Stream audio chunks
                                    streamer = AudioStreamer()
                                    streamer.start()

                                    try:
                                        while True:
                                            chunk = streamer.get_chunk()
                                            if chunk is None:
                                                break

                                                # Send chunk
                                                await client.send_audio_chunk(chunk)

                                                # Receive transcription
                                                result = await client.receive_transcription()
                                                print(f"Transcription: {result['text']}")

                                            finally:
                                                streamer.stop()
                                                await client.close()

                                                # Run
                                                asyncio.run(stream_audio())

Latency Optimization

Latency Breakdown

Total Latency = Audio Capture + Network + Processing + Network + Display

Typical values:
- Audio capture: 10-50ms (chunk duration)
- Network (client → server): 10-30ms
- Feature extraction: 5-10ms
- Model inference: 20-100ms (depends on model)
- Network (server → client): 10-30ms
- Display: 1-5ms

Total: 56-225ms (aim for < 100ms)

Optimization Strategies

class OptimizedStreamingPipeline:
    """
    Optimized streaming pipeline

    Techniques:
        - Smaller chunks
        - Model quantization
        - Batch processing
        - Prefetching
        """

    def __init__(self, model, chunk_duration_ms=50):
        """
        Args:
            chunk_duration_ms: Smaller chunks = lower latency
            """
            self.model = model
            self.chunk_duration_ms = chunk_duration_ms

            # Prefetch buffer
            self.prefetch_buffer = asyncio.Queue(maxsize=3)

            # Start prefetching thread
            self.prefetch_task = None

            async def start_prefetching(self, audio_source):
                """
                Prefetch audio chunks

                Reduces waiting time
                """
                async for chunk in audio_source:
                    await self.prefetch_buffer.put(chunk)

                    async def process_stream(self, audio_source):
                        """
                        Process audio stream with optimizations
                        """
                        # Start prefetching
                        self.prefetch_task = asyncio.create_task(
                        self.start_prefetching(audio_source)
                        )

                        while True:
                            # Get prefetched chunk (zero wait!)
                            chunk = await self.prefetch_buffer.get()

                            if chunk is None:
                                break

                                # Process chunk
                                result = await self.process_chunk_optimized(chunk)

                                yield result

                                async def process_chunk_optimized(self, chunk):
                                    """
                                    Optimized chunk processing

                                    Uses quantized model for faster inference
                                    """
                                    # Extract features (optimized)
                                    features = self.extract_features_fast(chunk)

                                    # Run inference (quantized model)
                                    result = self.model.predict(features)

                                    return result

    def extract_features_fast(self, audio):
        """
        Fast feature extraction

        Uses caching and vectorization
        """
        # Vectorized operations are faster
        mfcc = librosa.feature.mfcc(
        y=audio,
        sr=16000,
        n_mfcc=13,
        hop_length=160 # Smaller hop = more features
        )

        return mfcc

State Management

Stateful Streaming

class StatefulStreamingProcessor:
    """
    Maintain state across chunks

    Important for context-dependent models
    """

    def __init__(self, model):
        self.model = model
        self.state = None # Hidden state for RNN/LSTM models
        self.previous_chunks = []
        self.partial_results = []

    def process_chunk(self, audio_chunk):
        """
        Process chunk with state

        Returns: (result, is_complete)
        """
        # Extract features
        features = extract_features(audio_chunk)

        # Run model with state
        if hasattr(self.model, 'predict_stateful'):
            result, self.state = self.model.predict_stateful(
            features,
            previous_state=self.state
            )
        else:
            # Fallback: concatenate with previous chunks
            self.previous_chunks.append(audio_chunk)
            if len(self.previous_chunks) > 5:
                self.previous_chunks.pop(0)

                combined_audio = np.concatenate(self.previous_chunks)
                combined_features = extract_features(combined_audio)
                result = self.model.predict(combined_features)

                # Determine if result is complete
                is_complete = self.check_completeness(result)

                if is_complete:
                    self.partial_results.append(result)

                    return result, is_complete

    def check_completeness(self, result):
        """
        Check if result is a complete utterance

        Uses heuristics:
            - Pause detection
            - Confidence threshold
            - Length limits
            """
            # Simple heuristic: check for pause
            # (In practice, use more sophisticated methods)
            if hasattr(result, 'confidence') and result.confidence > 0.9:
                return True

                return False

    def reset_state(self):
        """Reset state (e.g., after complete utterance)"""
        self.state = None
        self.previous_chunks = []
        self.partial_results = []

Error Handling & Recovery

Robust Streaming

class RobustStreamingPipeline:
    """
    Streaming pipeline with error handling

    Handles:
        - Network failures
        - Audio glitches
        - Model errors
        """

    def __init__(self, model):
        self.model = model
        self.error_count = 0
        self.max_errors = 10

        async def process_stream_robust(self, audio_source):
            """
            Process stream with error recovery
            """
            retry_count = 0
            max_retries = 3

            async for chunk in audio_source:
                try:
                    # Process chunk
                    result = await self.process_chunk_safe(chunk)

                    # Reset retry count on success
                    retry_count = 0

                    yield result

                except AudioGlitchError as e:
                    # Audio glitch: skip chunk
                    print(f"Audio glitch detected: {e}")
                    self.error_count += 1
                    continue

                except ModelInferenceError as e:
                    # Model error: retry with fallback
                    print(f"Model inference failed: {e}")

                    if retry_count < max_retries:
                        retry_count += 1
                        # Use simpler fallback model
                        result = await self.fallback_inference(chunk)
                        yield result
                    else:
                        # Give up after max retries
                        print("Max retries exceeded, skipping chunk")
                        retry_count = 0

                    except Exception as e:
                        # Unexpected error
                        print(f"Unexpected error: {e}")
                        self.error_count += 1

                        if self.error_count > self.max_errors:
                            raise RuntimeError("Too many errors, stopping stream")

                            async def process_chunk_safe(self, chunk):
                                """
                                Process chunk with validation
                                """
                                # Validate chunk
                                if not self.validate_chunk(chunk):
                                    raise AudioGlitchError("Invalid audio chunk")

                                    # Process
                                    try:
                                        features = extract_features(chunk)
                                        result = self.model.predict(features)
                                        return result
                                    except Exception as e:
                                        raise ModelInferenceError(f"Inference failed: {e}")

    def validate_chunk(self, chunk):
        """
        Validate audio chunk

        Checks for:
            - Correct length
            - Valid range
            - No NaN values
            """
            if chunk is None or len(chunk) == 0:
                return False

                if np.any(np.isnan(chunk)):
                    return False

                    if np.max(np.abs(chunk)) > 10: # Suspiciously large
                        return False

                        return True

                        async def fallback_inference(self, chunk):
                            """
                            Fallback inference with simpler model

                            Trades accuracy for reliability
                            """
                            # Use cached results or simple heuristics
                            return {"text": "[processing...]", "confidence": 0.5}

    class AudioGlitchError(Exception):
        pass

    class ModelInferenceError(Exception):
        pass

Connection to Model Serving (ML)

Streaming speech pipelines use model serving patterns:

class StreamingSpeechServer:
    """
    Streaming speech server with model serving best practices

    Combines:
        - Model serving (ML)
        - Streaming audio (Speech)
        - Validation (DSA)
        """

    def __init__(self, model_path):
        # Load model (model serving pattern)
        self.model = self.load_model(model_path)

        # Validation (BST-like range checking)
        self.validator = AudioValidator()

        # Monitoring
        self.metrics = StreamingMetrics()

    def load_model(self, model_path):
        """Load model with caching (from model serving)"""
        import joblib
        return joblib.load(model_path)

        async def process_audio_stream(self, audio_chunks):
            """
            Process streaming audio

            Uses patterns from all topics
            """
            for chunk in audio_chunks:
                # Validate input (BST validation pattern)
                is_valid, violations = self.validator.validate(chunk)

                if not is_valid:
                    print(f"Invalid chunk: {violations}")
                    continue

                    # Process chunk (model serving)
                    start_time = time.time()
                    result = self.model.predict(chunk)
                    latency = time.time() - start_time

                    # Monitor (model serving)
                    self.metrics.record_prediction(latency)

                    yield result

    class AudioValidator:
        """Validate audio chunks (similar to BST validation)"""

    def __init__(self):
        # Define valid ranges (like BST min/max)
        self.amplitude_range = (-1.0, 1.0)
        self.length_range = (100, 10000) # samples

    def validate(self, chunk):
        """
        Validate chunk falls within ranges

        Like BST validation with [min, max] bounds
        """
        violations = []

        # Check amplitude range
        if np.min(chunk) < self.amplitude_range[0]:
            violations.append("Amplitude too low")
            if np.max(chunk) > self.amplitude_range[1]:
                violations.append("Amplitude too high")

                # Check length range
                if len(chunk) < self.length_range[0]:
                    violations.append("Chunk too short")
                    if len(chunk) > self.length_range[1]:
                        violations.append("Chunk too long")

                        return len(violations) == 0, violations

Production Patterns

1. Multi-Channel Audio Streaming

class MultiChannelStreamingProcessor:
    """
    Process multiple audio streams simultaneously

    Use case: Conference calls, multi-mic arrays
    """

    def __init__(self, num_channels=4):
        self.num_channels = num_channels
        self.channel_buffers = [ChunkBuffer() for _ in range(num_channels)]
        self.processors = [StreamingProcessor() for _ in range(num_channels)]

        async def process_multi_channel(self, channel_chunks: dict):
            """
            Process multiple channels in parallel

            Args:
                channel_chunks: Dict {channel_id: audio_chunk}

                Returns: Dict {channel_id: result}
                """
                import asyncio

                # Process channels in parallel
                tasks = []
                for channel_id, chunk in channel_chunks.items():
                    task = self.processors[channel_id].process_chunk_async(chunk)
                    tasks.append((channel_id, task))

                    # Wait for all results
                    results = {}
                    for channel_id, task in tasks:
                        result = await task
                        results[channel_id] = result

                        return results

    def merge_results(self, channel_results: dict):
        """
        Merge results from multiple channels

        E.g., speaker diarization, beam forming
        """
        # Simple merging: concatenate transcriptions
        merged_text = []

        for channel_id in sorted(channel_results.keys()):
            result = channel_results[channel_id]
            if result:
                merged_text.append(f"[Channel {channel_id}]: {result['text']}")

                return '\n'.join(merged_text)

                # Usage
                multi_processor = MultiChannelStreamingProcessor(num_channels=4)

                # Stream audio from 4 microphones
                async def process_meeting():
                    while True:
                        # Get chunks from all channels
                        chunks = {
                        0: mic1.get_chunk(),
                        1: mic2.get_chunk(),
                        2: mic3.get_chunk(),
                        3: mic4.get_chunk()
                        }

                        # Process in parallel
                        results = await multi_processor.process_multi_channel(chunks)

                        # Merge and display
                        merged = multi_processor.merge_results(results)
                        print(merged)

2. Adaptive Chunk Size

class AdaptiveChunkingProcessor:
    """
    Dynamically adjust chunk size based on network/compute conditions

    Smaller chunks: Lower latency but higher overhead
    Larger chunks: Higher latency but more efficient
    """

    def __init__(self, min_chunk_ms=50, max_chunk_ms=200):
        self.min_chunk_ms = min_chunk_ms
        self.max_chunk_ms = max_chunk_ms
        self.current_chunk_ms = 100 # Start with middle value
        self.latency_history = []

    def adjust_chunk_size(self, recent_latency_ms):
        """
        Adjust chunk size based on latency

        High latency → smaller chunks (more responsive)
        Low latency → larger chunks (more efficient)
        """
        self.latency_history.append(recent_latency_ms)

        if len(self.latency_history) < 10:
            return self.current_chunk_ms

            # Calculate average latency
            avg_latency = np.mean(self.latency_history[-10:])

            # Adjust chunk size
            if avg_latency > 150: # High latency
                # Reduce chunk size for better responsiveness
                self.current_chunk_ms = max(
                self.min_chunk_ms,
                self.current_chunk_ms - 10
                )
                print(f"↓ Reducing chunk size to {self.current_chunk_ms}ms")

            elif avg_latency < 50: # Very low latency
                # Increase chunk size for efficiency
                self.current_chunk_ms = min(
                self.max_chunk_ms,
                self.current_chunk_ms + 10
                )
                print(f"↑ Increasing chunk size to {self.current_chunk_ms}ms")

                return self.current_chunk_ms

                async def process_with_adaptive_chunking(self, audio_stream):
                    """Process stream with adaptive chunk sizing"""
                    for chunk in audio_stream:
                        start_time = time.time()

                        # Process chunk
                        result = await self.process_chunk(chunk)

                        # Calculate latency
                        latency_ms = (time.time() - start_time) * 1000

                        # Adjust chunk size for next iteration
                        next_chunk_ms = self.adjust_chunk_size(latency_ms)

                        yield result, next_chunk_ms

3. Buffering Strategy for Unreliable Networks

class NetworkAwareStreamingBuffer:
    """
    Buffer audio to handle network issues

    Maintains smooth playback despite packet loss
    """

    def __init__(self, buffer_size_seconds=2.0, sample_rate=16000):
        self.buffer_size = int(buffer_size_seconds * sample_rate)
        self.buffer = np.zeros(self.buffer_size, dtype=np.float32)
        self.write_pos = 0
        self.read_pos = 0
        self.underrun_count = 0
        self.overrun_count = 0

    def write_chunk(self, chunk):
        """
        Write audio chunk to buffer

        Returns: Success status
        """
        chunk_size = len(chunk)

        # Check for buffer overrun
        available_space = self.buffer_size - (self.write_pos - self.read_pos)
        if chunk_size > available_space:
            self.overrun_count += 1
            print("⚠️ Buffer overrun - dropping oldest data")
            # Drop oldest data
            self.read_pos = self.write_pos - self.buffer_size + chunk_size

            # Write to circular buffer
            for i, sample in enumerate(chunk):
                pos = (self.write_pos + i) % self.buffer_size
                self.buffer[pos] = sample

                self.write_pos += chunk_size
                return True

    def read_chunk(self, chunk_size):
        """
        Read audio chunk from buffer

        Returns: Audio chunk or None if underrun
        """
        # Check for buffer underrun
        available_data = self.write_pos - self.read_pos
        if available_data < chunk_size:
            self.underrun_count += 1
            print("⚠️ Buffer underrun - not enough data")
            return None

            # Read from circular buffer
            chunk = np.zeros(chunk_size, dtype=np.float32)
            for i in range(chunk_size):
                pos = (self.read_pos + i) % self.buffer_size
                chunk[i] = self.buffer[pos]

                self.read_pos += chunk_size
                return chunk

    def get_buffer_level(self):
        """Get current buffer fill level (0-1)"""
        available = self.write_pos - self.read_pos
        return available / self.buffer_size

    def get_stats(self):
        """Get buffer statistics"""
        return {
        'buffer_level': self.get_buffer_level(),
        'underruns': self.underrun_count,
        'overruns': self.overrun_count
        }

        # Usage
        buffer = NetworkAwareStreamingBuffer(buffer_size_seconds=2.0)

        # Writer thread (receiving from network)
        async def receive_audio():
            async for chunk in network_stream:
                buffer.write_chunk(chunk)

                # Adaptive buffering
                level = buffer.get_buffer_level()
                if level < 0.2:
                    print("⚠️ Low buffer, may need to increase")

                    # Reader thread (processing)
                    async def process_audio():
                        while True:
                            chunk = buffer.read_chunk(chunk_size=1600) # 100ms at 16kHz
                            if chunk is not None:
                                result = await process_chunk(chunk)
                                yield result
                            else:
                                await asyncio.sleep(0.01) # Wait for more data

Advanced Optimization Techniques

1. Model Warm-Up

class WarmUpStreamingProcessor:
    """
    Pre-warm model for lower latency on first request

    Cold start can add 100-500ms latency
    """

    def __init__(self, model):
        self.model = model
        self.is_warm = False

    def warm_up(self, sample_rate=16000):
        """
        Warm up model with dummy input

        Call during initialization
        """
        print("Warming up model...")

        # Create dummy audio chunk
        dummy_chunk = np.random.randn(int(sample_rate * 0.1)) # 100ms

        # Run inference to warm up
        for _ in range(3):
            _ = self.model.predict(dummy_chunk)

            self.is_warm = True
            print("Model warm-up complete")

    def process_chunk(self, chunk):
        """Process with warm-up check"""
        if not self.is_warm:
            self.warm_up()

            return self.model.predict(chunk)

            # Usage
            processor = WarmUpStreamingProcessor(model)
            processor.warm_up() # Do this during server startup

2. GPU Batching for Throughput

class GPUBatchProcessor:
    """
    Batch multiple streams for GPU efficiency

    GPUs are most efficient with batch processing
    """

    def __init__(self, model, max_batch_size=16, max_wait_ms=50):
        self.model = model
        self.max_batch_size = max_batch_size
        self.max_wait_ms = max_wait_ms
        self.pending_batches = []

        async def process_chunk_batched(self, chunk, stream_id):
            """
            Add chunk to batch and process when ready

            Returns: Future that resolves with result
            """
            future = asyncio.Future()
            self.pending_batches.append((chunk, stream_id, future))

            # Process batch if ready
            if len(self.pending_batches) >= self.max_batch_size:
                await self._process_batch()
            else:
                # Wait for more requests or timeout
                asyncio.create_task(self._process_batch_after_delay())

                return await future

                async def _process_batch(self):
                    """Process accumulated batch on GPU"""
                    if not self.pending_batches:
                        return

                        # Extract batch
                        chunks = [item[0] for item in self.pending_batches]
                        stream_ids = [item[1] for item in self.pending_batches]
                        futures = [item[2] for item in self.pending_batches]

                        # Pad to same length
                        max_len = max(len(c) for c in chunks)
                        padded_chunks = [
                        np.pad(c, (0, max_len - len(c)), mode='constant')
                        for c in chunks
                        ]

                        # Stack into batch
                        batch = np.stack(padded_chunks)

                        # Run batch inference on GPU
                        results = self.model.predict_batch(batch)

                        # Distribute results
                        for result, future in zip(results, futures):
                            future.set_result(result)

                            # Clear batch
                            self.pending_batches = []

                            async def _process_batch_after_delay(self):
                                """Process batch after timeout"""
                                await asyncio.sleep(self.max_wait_ms / 1000.0)
                                await self._process_batch()

                                # Usage
                                gpu_processor = GPUBatchProcessor(model, max_batch_size=16)

                                # Multiple concurrent streams
                                async def process_stream(stream_id):
                                    async for chunk in audio_streams[stream_id]:
                                        result = await gpu_processor.process_chunk_batched(chunk, stream_id)
                                        yield result

                                        # Run multiple streams in parallel
                                        await asyncio.gather(*[
                                        process_stream(i) for i in range(10)
                                        ])

3. Quantized Models for Edge Devices

import torch
import torchaudio

class EdgeOptimizedStreamingASR:
    """
    Streaming ASR optimized for edge devices

    Uses INT8 quantization for faster inference
    """

    def __init__(self, model_path):
        # Load and quantize model
        self.model = torch.jit.load(model_path)
        self.model = torch.quantization.quantize_dynamic(
        self.model,
        {torch.nn.Linear, torch.nn.LSTM},
        dtype=torch.qint8
        )
        self.model.eval()

    def process_chunk_optimized(self, audio_chunk):
        """
        Process chunk with optimizations

        - INT8 quantization: 4x faster
        - No gradient computation
        - Minimal memory allocation
        """
        with torch.no_grad():
            # Convert to tensor
            audio_tensor = torch.from_numpy(audio_chunk).float()
            audio_tensor = audio_tensor.unsqueeze(0) # Add batch dim

            # Extract features (optimized)
            features = torchaudio.compliance.kaldi.mfcc(
            audio_tensor,
            sample_frequency=16000,
            num_ceps=13
            )

            # Run inference
            output = self.model(features)

            # Decode
            transcription = self.decode(output)

            return transcription

    def decode(self, output):
        """Simple greedy decoding"""
        # Get most likely tokens
        tokens = torch.argmax(output, dim=-1)

        # Convert to text (simplified)
        transcription = self.tokens_to_text(tokens)

        return transcription

        # Benchmark: Quantized vs Full Precision
    def benchmark_models():
        """Compare quantized vs full precision"""
        full_model = load_model('model_fp32.pt')
        quant_model = EdgeOptimizedStreamingASR('model_int8.pt')

        audio_chunk = np.random.randn(1600) # 100ms at 16kHz

        # Full precision
        start = time.time()
        for _ in range(100):
            _ = full_model.predict(audio_chunk)
            fp32_time = time.time() - start

            # Quantized
            start = time.time()
            for _ in range(100):
                _ = quant_model.process_chunk_optimized(audio_chunk)
                int8_time = time.time() - start

                print(f"FP32: {fp32_time:.2f}s")
                print(f"INT8: {int8_time:.2f}s")
                print(f"Speedup: {fp32_time / int8_time:.1f}x")

Real-World Integration Examples

1. Zoom-like Meeting Transcription

class MeetingTranscriptionService:
    """
    Real-time meeting transcription

    Similar to Zoom's live transcription
    """

    def __init__(self):
        self.asr_model = load_asr_model()
        self.active_sessions = {}

    def start_session(self, meeting_id):
        """Start transcription session"""
        self.active_sessions[meeting_id] = {
        'participants': {},
        'transcript': [],
        'start_time': time.time()
        }

        async def process_participant_audio(self, meeting_id, participant_id, audio_stream):
            """
            Process audio from single participant

            Returns: Real-time transcription
            """
            session = self.active_sessions[meeting_id]

            # Initialize participant
            if participant_id not in session['participants']:
                session['participants'][participant_id] = {
                'processor': StatefulStreamingProcessor(self.asr_model),
                'transcript_buffer': []
                }

                participant = session['participants'][participant_id]
                processor = participant['processor']

                async for chunk in audio_stream:
                    # Process chunk
                    result, is_complete = processor.process_chunk(chunk)

                    if is_complete:
                        # Add to transcript
                        timestamp = time.time() - session['start_time']
                        transcript_entry = {
                        'participant_id': participant_id,
                        'text': result['text'],
                        'timestamp': timestamp,
                        'confidence': result.get('confidence', 1.0)
                        }

                        session['transcript'].append(transcript_entry)

                        yield transcript_entry

    def get_full_transcript(self, meeting_id):
        """Get complete meeting transcript"""
        if meeting_id not in self.active_sessions:
            return []

            transcript = self.active_sessions[meeting_id]['transcript']

            # Format as readable text
            formatted = []
            for entry in transcript:
                time_str = format_timestamp(entry['timestamp'])
                formatted.append(
                f"[{time_str}] Participant {entry['participant_id']}: {entry['text']}"
                )

                return '\n'.join(formatted)

    def format_timestamp(seconds):
        """Format seconds as MM:SS"""
        minutes = int(seconds // 60)
        secs = int(seconds % 60)
        return f"{minutes:02d}:{secs:02d}"

        # Usage
        service = MeetingTranscriptionService()
        service.start_session('meeting-123')

        # Process audio from multiple participants
        async def transcribe_meeting():
            participants = ['user1', 'user2', 'user3']

            # Process all participants in parallel
            tasks = [
            service.process_participant_audio(
            'meeting-123',
            participant_id,
            get_audio_stream(participant_id)
            )
            for participant_id in participants
            ]

            # Collect transcriptions
            # Collect tasks concurrently
            results = await asyncio.gather(*tasks, return_exceptions=True)
            for res in results:
                if isinstance(res, Exception):
                    print(f"Stream error: {res}")
                else:
                    for entry in res:
                        print(f"[{entry['timestamp']:.1f}s] {entry['participant_id']}: {entry['text']}")

2. Voice Assistant Backend

This pattern combines streaming ASR with keyword spotting for wake word detection and TTS for audio responses.

class VoiceAssistantPipeline:
    """
    Complete voice assistant pipeline

    ASR → NLU → Action → TTS
    """

    def __init__(self):
        self.asr = StreamingASR()
        self.nlu = IntentClassifier()
        self.action_executor = ActionExecutor()
        self.tts = TextToSpeech()

        async def process_voice_command(self, audio_stream):
            """
            Process voice command end-to-end

            Returns: Audio response
            """
            # 1. Speech Recognition
            transcription = await self.asr.transcribe_stream(audio_stream)
            print(f"User said: {transcription}")

            # 2. Natural Language Understanding
            intent = self.nlu.classify(transcription)
            print(f"Intent: {intent['name']} (confidence: {intent['confidence']:.2f})")

            # 3. Execute Action
            if intent['confidence'] > 0.7:
                response_text = await self.action_executor.execute(intent)
            else:
                response_text = "I'm not sure what you mean. Could you rephrase that?"

                # 4. Text-to-Speech
                response_audio = self.tts.synthesize(response_text)

                return {
                'transcription': transcription,
                'intent': intent,
                'response_text': response_text,
                'response_audio': response_audio
                }

                async def continuous_listening(self, audio_source):
                    """
                    Continuously listen for wake word + command

                    Efficient always-on listening
                    """
                    wake_word_detector = WakeWordDetector('hey assistant')

                    async for chunk in audio_source:
                        # Check for wake word (lightweight model)
                        if wake_word_detector.detect(chunk):
                            print("🎤 Wake word detected!")

                            # Start full ASR
                            command_audio = await self.capture_command(audio_source, timeout=5.0)

                            # Process command
                            result = await self.process_voice_command(command_audio)

                            # Play response
                            play_audio(result['response_audio'])

                            async def capture_command(self, audio_source, timeout=5.0):
                                """Capture audio command after wake word"""
                                command_chunks = []
                                start_time = time.time()

                                async for chunk in audio_source:
                                    command_chunks.append(chunk)

                                    # Check timeout
                                    if time.time() - start_time > timeout:
                                        break

                                        # Check for end of speech (silence)
                                        if self.is_silence(chunk):
                                            break

                                            return np.concatenate(command_chunks)

    def is_silence(self, chunk, threshold=0.01):
        """Detect if chunk is silence"""
        energy = np.sqrt(np.mean(chunk ** 2))
        return energy < threshold

        # Usage
        assistant = VoiceAssistantPipeline()

        # Continuous listening
        await assistant.continuous_listening(microphone_stream)

Performance Metrics & SLAs

Latency Tracking

class StreamingLatencyTracker:
    """
    Track end-to-end latency for streaming pipeline

    Measures:
        - Audio capture latency
        - Network latency
        - Processing latency
        - Total latency
        """

    def __init__(self):
        self.metrics = {
        'capture_latency': [],
        'network_latency': [],
        'processing_latency': [],
        'total_latency': []
        }

        async def process_with_tracking(self, audio_chunk, capture_timestamp):
            """
            Process chunk with latency tracking

            Args:
                audio_chunk: Audio data
                capture_timestamp: When audio was captured

                Returns: (result, latency_breakdown)
                """
                # Network latency (time from capture to arrival)
                network_start = time.time()
                network_latency = (network_start - capture_timestamp) * 1000
                self.metrics['network_latency'].append(network_latency)

                # Processing latency
                processing_start = time.time()
                result = await self.process_chunk(audio_chunk)
                processing_end = time.time()
                processing_latency = (processing_end - processing_start) * 1000
                self.metrics['processing_latency'].append(processing_latency)

                # Total latency
                total_latency = (processing_end - capture_timestamp) * 1000
                self.metrics['total_latency'].append(total_latency)

                latency_breakdown = {
                'network_ms': network_latency,
                'processing_ms': processing_latency,
                'total_ms': total_latency
                }

                return result, latency_breakdown

    def get_latency_stats(self):
        """Get latency statistics"""
        stats = {}

        for metric_name, values in self.metrics.items():
            if values:
                stats[metric_name] = {
                'p50': np.percentile(values, 50),
                'p95': np.percentile(values, 95),
                'p99': np.percentile(values, 99),
                'mean': np.mean(values),
                'max': np.max(values)
                }

                return stats

    def check_sla(self, sla_ms=100):
        """
        Check if meeting SLA

        Returns: (is_meeting_sla, violation_rate)
        """
        if not self.metrics['total_latency']:
            return True, 0.0

            violations = sum(1 for lat in self.metrics['total_latency'] if lat > sla_ms)
            violation_rate = violations / len(self.metrics['total_latency'])

            is_meeting_sla = violation_rate < 0.01 # < 1% violations

            return is_meeting_sla, violation_rate

            # Usage
            tracker = StreamingLatencyTracker()

            # Process with tracking
            result, latency = await tracker.process_with_tracking(chunk, capture_time)

            # Check SLA
            is_ok, violation_rate = tracker.check_sla(sla_ms=100)
            if not is_ok:
                print(f"⚠️ SLA violation rate: {violation_rate:.1%}")

                # Get detailed stats
                stats = tracker.get_latency_stats()
                print(f"P95 latency: {stats['total_latency']['p95']:.1f}ms")

Key Takeaways

Chunk audio correctly - Balance latency vs context ✅ Manage state - RNN/LSTM models need previous chunks ✅ Optimize latency - Smaller chunks, quantization, prefetching ✅ Handle errors gracefully - Network failures, audio glitches ✅ Validate inputs - Like BST range checking ✅ Monitor performance - Latency, error rate, throughput ✅ WebSocket for streaming - Bidirectional, low-latency


FAQ

Q: What chunk size should I use for streaming speech processing? A: Start with 100ms chunks (1,600 samples at 16kHz) as a balanced default. Smaller chunks (50ms) reduce latency but increase overhead and may not provide enough context for the model. Larger chunks (200ms) are more efficient but add perceptible delay. Use adaptive chunk sizing to adjust dynamically based on network and compute conditions.

Q: How do I handle state across audio chunks in streaming ASR? A: For RNN/LSTM models, pass the hidden state from each chunk to the next. For non-stateful models, maintain a rolling buffer of recent chunks and concatenate context from previous chunks before inference. Reset state at utterance boundaries detected by silence or end-of-speech signals.

Q: How do I reduce cold-start latency in a streaming speech server? A: Run model warm-up during server initialization by passing 3-5 dummy audio chunks through the model. This pre-allocates memory and compiles any JIT operations. Cold start can add 100-500ms to the first request, but warm-up eliminates this entirely.


Originally published at: arunbaby.com/speech-tech/0008-streaming-speech-pipeline

If you found this helpful, consider sharing it with others who might benefit.

Want to work together?

I take on projects, advisory roles, and fractional CTO engagements in AI/ML. I also help businesses go AI-native with agentic workflows and agent orchestration.

Get in touch