27 minute read

Build production multi-speaker ASR systems: Combine speech recognition, speaker diarization, and overlap handling for real-world conversations.

TL;DR

Multi-speaker ASR systems combine voice activity detection, speaker diarization, and per-speaker transcription into a pipeline that handles real-world conversations with 2-10 speakers. The key insight is running diarization before ASR to get clean speaker segments, then transcribing each segment separately. Speaker embeddings (512-dim vectors from models like ECAPA-TDNN) capture voice identity for clustering, while overlapping speech requires special handling through detection and source separation. For real-time applications, streaming architectures with WebSocket delivery can achieve under 300ms latency. See also speaker clustering and diarization and real-time audio segmentation for deeper dives into the individual components.

A circular conference microphone array with eight individual capsules arranged in a ring

Problem Statement

Design a multi-speaker ASR system that can:

  1. Recognize speech from multiple speakers in a conversation
  2. Identify who spoke each word/sentence (speaker diarization)
  3. Handle overlapping speech when multiple people speak simultaneously
  4. Work in real-time with < 300ms latency for live transcription
  5. Scale to meetings with 2-10 speakers

Why Is This Hard?

Single-speaker ASR (covered in ) assumes:

  • ✅ One speaker at a time
  • ✅ No speaker changes mid-sentence
  • ✅ No overlapping speech

Real-world conversations break all these assumptions:

Time: 0s 1s 2s 3s 4s
Speaker A: "So I think we should..."
Speaker B: "Wait, can I..."
Speaker C: "Actually..."
 ↑ Overlap! ↑ ↑ Overlap! ↑

Single-speaker ASR would produce: "So I wait can think actually should we..."
Multi-speaker ASR must produce:
 [A, 0.0-1.5s]: "So I think we should"
 [B, 1.2-2.3s]: "Wait, can I"
 [C, 2.8-4.0s]: "Actually"

The core challenges:

Challenge Why It’s Hard Impact
Speaker changes Voice characteristics change suddenly Acoustic model confused
Overlapping speech Multiple audio sources mixed Can’t separate cleanly
Speaker identification Need to know who said what Requires speaker embeddings
Real-time processing Must process while speakers still talking Latency constraints
Unknown # of speakers Don’t know speaker count in advance Can’t pre-allocate resources

Real-World Use Cases

Application Requirements Challenges
Meeting transcription (Zoom, Teams) 2-10 speakers, real-time Overlaps, background noise
Call center analytics 2 speakers (agent + customer) Quality monitoring, compliance
Podcast transcription 2-5 hosts + guests High accuracy needed
Courtroom transcription Multiple speakers, legal record 99%+ accuracy, speaker IDs
Medical consultations Doctor + patient(s) HIPAA compliance, accuracy

Understanding Multi-Speaker ASR

The Full System Pipeline

┌─────────────────────────────────────────────────────────────┐
│ MULTI-SPEAKER ASR PIPELINE │
└─────────────────────────────────────────────────────────────┘

Step 1: AUDIO INPUT
┌────────────────────────────────────────────┐
│ Mixed audio (all speakers combined) │
│ [Speaker A + Speaker B + Speaker C + ...]│
└────────────────┬───────────────────────────┘
 ▼
Step 2: VOICE ACTIVITY DETECTION (VAD)
┌────────────────────────────────────────────┐
│ Find speech regions (vs silence) │
│ Output: [(0.0s, 3.2s), (3.5s, 7.1s), ...] │
└────────────────┬───────────────────────────┘
 ▼
Step 3: SPEAKER DIARIZATION
┌────────────────────────────────────────────┐
│ Cluster speech by speaker │
│ Output: [(A, 0-1.5s), (B, 1.2-2.3s), ...]│
└────────────────┬───────────────────────────┘
 ▼
Step 4: ASR (per speaker segment)
┌────────────────────────────────────────────┐
│ Transcribe each speaker segment │
│ Output: [(A, "Hello"), (B, "Hi"), ...] │
└────────────────┬───────────────────────────┘
 ▼
Step 5: POST-PROCESSING
┌────────────────────────────────────────────┐
│ • Merge overlaps │
│ • Add punctuation │
│ • Format output │
└────────────────────────────────────────────┘

Each step has challenges! Let’s dig into each.

Why This Pipeline?

Why not just run ASR on everything?

Imagine you have a 1-hour meeting:

  • Raw audio: 1 hour
  • Actual speech: ~30 minutes (50% silence/pauses)
  • Running ASR on silence: Waste of 30 minutes compute!

Why not run ASR first, then diarize?

Option A: ASR → Diarization (BAD)
Problem: ASR produces one continuous text blob
"Hello there hi how are you fine thanks"
↑ Can't tell where speakers change!

Option B: Diarization → ASR (GOOD)
Step 1: Find speaker segments
 [A: 0-2s], [B: 2-4s], [A: 4-6s]
Step 2: Transcribe each segment separately
 A: "Hello there"
 B: "Hi, how are you?"
 A: "Fine, thanks"
↑ Clean separation!

Why separate VAD from diarization?

  • VAD is fast (simple energy-based or small model)
  • Diarization is slow (needs embeddings + clustering)
  • Don’t waste diarization compute on silence!

Pipeline efficiency:

1 hour audio
 ↓ VAD (fast, eliminates silence)
30 min speech segments
 ↓ Diarization (slow, but only on speech)
30 min speaker-labeled segments
 ↓ ASR (slowest, but parallelizable)
Transcriptions

The Mathematics Behind Speaker Embeddings

Key question: How do we represent a voice mathematically?

Answer: Deep learning learns to compress voice characteristics into a fixed-size vector.

Training process (simplified):

Step 1: Collect data
 Speaker 1: 100 utterances
 Speaker 2: 100 utterances
 ...
 Speaker 10,000: 100 utterances

Step 2: Train neural network
 Input: Audio waveform or spectrogram
 Output: 512-dimensional embedding
 
 Goal: Minimize distance between embeddings of same speaker,
 maximize distance between different speakers

Step 3: Loss function (Triplet Loss)
 Anchor: Speaker A, utterance 1
 Positive: Speaker A, utterance 2 (same speaker)
 Negative: Speaker B, utterance 1 (different speaker)
 
 Loss = max(0, distance(anchor, positive) - distance(anchor, negative) + margin)
 
 This forces:
 - distance(A_utt1, A_utt2) < distance(A_utt1, B_utt1)

Visual intuition:

Before training (random embeddings):
Speaker A utterances: scattered everywhere
Speaker B utterances: scattered everywhere
No clustering!

After training:
Speaker A utterances: tight cluster in embedding space
Speaker B utterances: different tight cluster, far from A
Clear separation!

Why 512 dimensions?

  • Lower (e.g., 64): Not enough capacity to capture all voice variations
  • Higher (e.g., 2048): Overfitting, slow, unnecessary
  • 512: Sweet spot (empirically found by researchers)

What does the embedding capture?

  • Pitch/fundamental frequency
  • Formant structure (vocal tract resonances)
  • Speaking rate
  • Accent/dialect
  • Voice quality (breathy, creaky, etc.)

What it should NOT capture (ideally):

  • Spoken words (content)
  • Emotions (though it does somewhat)
  • Background noise

Comparison: Different Diarization Approaches

Approach How It Works Pros Cons Use Case
Clustering-based Extract embeddings → Cluster Simple, interpretable Needs good embeddings General purpose
End-to-end neural Single model: audio → labels Best accuracy Slow, black-box High-accuracy needs
Online diarization Process stream incrementally Real-time capable Lower accuracy Live captions
Supervised (known speakers) Match to registered voices Very accurate for known speakers Requires enrollment Authentication, personalization

Example scenario: Meeting with known participants

class KnownSpeakerDiarization:
    """
    When you know who's in the meeting

    Much more accurate than unsupervised clustering
    """

    def __init__(self):
        self.speaker_profiles = {} # speaker_name → mean embedding

    def enroll_speaker(self, speaker_name, audio_samples):
        """
        Register a speaker

        Args:
            speaker_name: "Alice", "Bob", etc.
            audio_samples: List of audio clips of this speaker
            """
            # Extract embeddings from all samples
            embeddings = [
            self.extract_embedding(audio)
            for audio in audio_samples
            ]

            # Compute mean embedding (speaker profile)
            mean_embedding = np.mean(embeddings, axis=0)

            # Store
            self.speaker_profiles[speaker_name] = mean_embedding
            print(f"✓ Enrolled {speaker_name}")

    def identify_speaker(self, audio_segment):
        """
        Identify which registered speaker this is

        Much more accurate than unsupervised clustering!
        """
        # Extract embedding
        test_embedding = self.extract_embedding(audio_segment)

        # Compare with all registered speakers
        best_match = None
        best_similarity = -1

        for name, profile_embedding in self.speaker_profiles.items():
            similarity = self._cosine_similarity(test_embedding, profile_embedding)

            if similarity > best_similarity:
                best_similarity = similarity
                best_match = name

                # Threshold
                if best_similarity > 0.7:
                    return best_match, best_similarity
                else:
                    return "UNKNOWN", best_similarity

    def extract_embedding(self, audio):
        """Placeholder: replace with real embedding extractor"""
        import numpy as np
        audio = np.asarray(audio)
        return audio[:512] if audio.size >= 512 else np.pad(audio, (0, max(0, 512 - audio.size)))

    def _cosine_similarity(self, a, b):
        import numpy as np
        a = np.asarray(a); b = np.asarray(b)
        denom = (np.linalg.norm(a) * np.linalg.norm(b)) + 1e-10
        return float(np.dot(a, b) / denom)

        # Usage
        diarizer = KnownSpeakerDiarization()

        # Enroll meeting participants
        diarizer.enroll_speaker("Alice", alice_audio_samples)
        diarizer.enroll_speaker("Bob", bob_audio_samples)
        diarizer.enroll_speaker("Carol", carol_audio_samples)

        # Now identify speakers in meeting
        for segment in meeting_segments:
            speaker, confidence = diarizer.identify_speaker(segment)
            print(f"{speaker} ({confidence:.2f}): {transcribe(segment)}")

This is how Zoom/Teams could improve:

  • Ask users to speak their name when joining
  • Build speaker profile
  • Use it for accurate diarization

Component 1: Voice Activity Detection (VAD)

Goal: Find when any speaker is talking

Why needed: Don’t waste compute on silence

class VoiceActivityDetector:
    """
    Detect speech vs non-speech

    Uses energy + spectral features
    """

    def __init__(self, sample_rate=16000, frame_ms=30):
        self.sample_rate = sample_rate
        self.frame_size = int(sample_rate * frame_ms / 1000)

    def detect(self, audio):
        """
        Detect speech regions

        Args:
            audio: numpy array, shape (samples,)

            Returns:
                List of (start_time, end_time) tuples
                """
                import numpy as np

                # Split into frames
                frames = self._split_frames(audio)

                # Compute features for each frame
                is_speech = []
                for frame in frames:
                    # Energy-based detection
                    energy = np.mean(frame ** 2)

                    # Spectral flatness (voice has low flatness)
                    flatness = self._spectral_flatness(frame)

                    # Simple threshold
                    speech = (energy > 0.01) and (flatness < 0.5)
                    is_speech.append(speech)

                    # Convert frame-level to time segments
                    segments = self._merge_segments(is_speech)

                    return segments

    def _split_frames(self, audio):
        """Split audio into overlapping frames"""
        import numpy as np

        frames = []
        hop_size = self.frame_size // 2 # 50% overlap

        for i in range(0, len(audio) - self.frame_size, hop_size):
            frame = audio[i:i + self.frame_size]
            frames.append(frame)

            return frames

    def _spectral_flatness(self, frame):
        """
        Compute spectral flatness

        Low for voice (harmonic), high for noise (flat spectrum)
        """
        import numpy as np
        from scipy import signal

        # FFT
        fft = np.abs(np.fft.rfft(frame))

        # Geometric mean / arithmetic mean
        geometric_mean = np.exp(np.mean(np.log(fft + 1e-10)))
        arithmetic_mean = np.mean(fft)

        flatness = geometric_mean / (arithmetic_mean + 1e-10)

        return flatness

    def _merge_segments(self, is_speech):
        """
        Merge consecutive speech frames into segments

        Args:
            is_speech: List of bools per frame

            Returns:
                List of (start_time, end_time)
                """
                segments = []
                in_segment = False
                start = 0

                frame_duration = self.frame_size / self.sample_rate

                for i, speech in enumerate(is_speech):
                    if speech and not in_segment:
                        # Start new segment
                        start = i * frame_duration
                        in_segment = True
                    elif not speech and in_segment:
                        # End segment
                        end = i * frame_duration
                        segments.append((start, end))
                        in_segment = False

                        # Handle case where last frame is speech
                        if in_segment:
                            segments.append((start, len(is_speech) * frame_duration))

                            return segments

Modern approach: Use pre-trained VAD models (more accurate)

def vad_pretrained(audio, sample_rate=16000):
    """
    Use pre-trained VAD model (Silero VAD)

    More accurate than energy-based
    """
    import torch

    # Load pre-trained model
    model, utils = torch.hub.load(
    repo_or_dir='snakers4/silero-vad',
    model='silero_vad',
    force_reload=False
    )

    (get_speech_timestamps, _, _, _, _) = utils

    # Detect speech
    speech_timestamps = get_speech_timestamps(
    audio,
    model,
    sampling_rate=sample_rate,
    threshold=0.5
    )

    # Convert to seconds
    segments = [
    (ts['start'] / sample_rate, ts['end'] / sample_rate)
    for ts in speech_timestamps
    ]

    return segments

Component 2: Speaker Diarization

Goal: Cluster speech segments by speaker

Key idea: Speakers have unique voice characteristics (embeddings)

Speaker Embeddings

Concept: Convert speech to fixed-size vector that captures speaker identity

Speaker A: "Hello" → [0.2, 0.8, -0.3, ...] (512-dim)
Speaker A: "How are you" → [0.21, 0.79, -0.31, ...] (similar!)
Speaker B: "Hi there" → [-0.5, 0.1, 0.9, ...] (different!)

Models: x-vector, d-vector, ECAPA-TDNN

class SpeakerEmbeddingExtractor:
    """
    Extract speaker embeddings using ECAPA-TDNN

    Embeddings capture speaker identity
    """

    def __init__(self):
        from speechbrain.pretrained import EncoderClassifier

        # Load pre-trained model
        self.model = EncoderClassifier.from_hparams(
        source="speechbrain/spkrec-ecapa-voxceleb",
        savedir="pretrained_models/spkrec-ecapa"
        )

    def extract(self, audio, sample_rate=16000):
        """
        Extract speaker embedding

        Args:
            audio: numpy array

            Returns:
                embedding: numpy array, shape (512,)
                """
                import torch

                # Convert to tensor
                audio_tensor = torch.FloatTensor(audio).unsqueeze(0)

                # Extract embedding
                with torch.no_grad():
                    embedding = self.model.encode_batch(audio_tensor)

                    # Convert to numpy
                    embedding = embedding.squeeze().cpu().numpy()

                    return embedding

                    # Usage
                    extractor = SpeakerEmbeddingExtractor()

                    # Extract embeddings for each speech segment
                    segments = [(0.0, 1.5), (1.2, 2.3), (2.8, 4.0)] # From VAD
                    embeddings = []

                    for start, end in segments:
                        segment_audio = audio[int(start*sr):int(end*sr)]
                        embedding = extractor.extract(segment_audio)
                        embeddings.append(embedding)

                        # Now cluster embeddings to identify speakers

Clustering Embeddings

Goal: Group similar embeddings (same speaker)

class SpeakerClustering:
    """
    Cluster speaker embeddings

    Same speaker → similar embeddings → same cluster
    """

    def __init__(self, method='spectral'):
        self.method = method

    def cluster(self, embeddings, num_speakers=None):
        """
        Cluster embeddings into speakers

        Args:
            embeddings: List of numpy arrays
            num_speakers: If known, specify; else auto-detect

            Returns:
                labels: Array of speaker IDs per segment
                """
                import numpy as np
                from sklearn.cluster import SpectralClustering, AgglomerativeClustering

                # Convert to matrix
                X = np.array(embeddings)

                if num_speakers is None:
                    # Auto-detect number of speakers
                    num_speakers = self._estimate_num_speakers(X)

                    # Cluster
                    if self.method == 'spectral':
                        # Use precomputed affinity for better control
                        from sklearn.metrics.pairwise import cosine_similarity
                        affinity = cosine_similarity(X)
                        clusterer = SpectralClustering(
                        n_clusters=num_speakers,
                        affinity='precomputed'
                        )
                        labels = clusterer.fit_predict(affinity)
                        return labels
                    else:
                        clusterer = AgglomerativeClustering(
                        n_clusters=num_speakers,
                        linkage='average',
                        metric='cosine'
                        )

                        labels = clusterer.fit_predict(X)
                        return labels

    def _estimate_num_speakers(self, embeddings):
        """
        Estimate number of speakers

        Use eigengap heuristic or elbow method
        """
        from sklearn.cluster import SpectralClustering
        import numpy as np

        # Try different numbers of clusters
        max_speakers = min(10, len(embeddings))

        scores = []
        for k in range(2, max_speakers + 1):
            from sklearn.metrics.pairwise import cosine_similarity
            aff = cosine_similarity(embeddings)
            clusterer = SpectralClustering(n_clusters=k, affinity='precomputed')
            labels = clusterer.fit_predict(aff)

            # Compute silhouette score
            from sklearn.metrics import silhouette_score
            score = silhouette_score(embeddings, labels, metric='cosine')
            scores.append(score)

            # Pick k with highest score
            best_k = np.argmax(scores) + 2

            return best_k

Production library: Use pyannote.audio (state-of-the-art)

def diarize_with_pyannote(audio_path):
    """
    Speaker diarization using pyannote.audio

    Production-ready, state-of-the-art
    """
    from pyannote.audio import Pipeline

    # Load pre-trained pipeline
    pipeline = Pipeline.from_pretrained(
    "pyannote/speaker-diarization",
    use_auth_token="YOUR_HF_TOKEN"
    )

    # Run diarization
    diarization = pipeline(audio_path)

    # Extract speaker segments
    segments = []
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        segments.append({
        'speaker': speaker,
        'start': turn.start,
        'end': turn.end
        })

        return segments

        # Example output:
        # [
        # {'speaker': 'SPEAKER_00', 'start': 0.0, 'end': 1.5},
        # {'speaker': 'SPEAKER_01', 'start': 1.2, 'end': 2.3},
        # {'speaker': 'SPEAKER_00', 'start': 2.8, 'end': 4.0},
        # ]

Component 3: ASR Per Speaker

Goal: Transcribe each speaker segment

class MultiSpeakerASR:
    """
    Complete multi-speaker ASR system

    Combines VAD + Diarization + ASR
    """

    def __init__(self):
        # Load models
        self.vad = VoiceActivityDetector()
        self.embedding_extractor = SpeakerEmbeddingExtractor()
        self.clustering = SpeakerClustering()

        # ASR model (Whisper)
        import whisper
        self.asr_model = whisper.load_model("base")

    def transcribe(self, audio, sample_rate=16000):
        """
        Multi-speaker transcription

        Returns:
            List of {speaker, start, end, text}
            """
            # Step 1: VAD
            speech_segments = self.vad.detect(audio)
            print(f"Found {len(speech_segments)} speech segments")

            # Step 2: Extract embeddings
            embeddings = []
            for start, end in speech_segments:
                segment = audio[int(start*sample_rate):int(end*sample_rate)]
                emb = self.embedding_extractor.extract(segment)
                embeddings.append(emb)

                # Step 3: Cluster by speaker
                speaker_labels = self.clustering.cluster(embeddings)
                print(f"Detected {len(set(speaker_labels))} speakers")

                # Step 4: Transcribe each segment
                results = []
                for i, (start, end) in enumerate(speech_segments):
                    segment = audio[int(start*sample_rate):int(end*sample_rate)]

                    # Transcribe (Whisper expects float32 numpy audio @16k)
                    result = self.asr_model.transcribe(segment, fp16=False)
                    text = result['text']

                    # Add speaker label
                    speaker = f"SPEAKER_{speaker_labels[i]}"

                    results.append({
                    'speaker': speaker,
                    'start': start,
                    'end': end,
                    'text': text
                    })

                    return results

                    # Usage
                    asr = MultiSpeakerASR()
                    results = asr.transcribe(audio)

                    # Output:
                    # [
                    # {'speaker': 'SPEAKER_0', 'start': 0.0, 'end': 1.5, 'text': 'Hello everyone'},
                    # {'speaker': 'SPEAKER_1', 'start': 1.2, 'end': 2.3, 'text': 'Hi there'},
                    # {'speaker': 'SPEAKER_0', 'start': 2.8, 'end': 4.0, 'text': 'How are you'},
                    # ]

Handling Overlapping Speech

The hardest problem: Multiple speakers at once

Challenge

Time: 0s 1s 2s
Speaker A: "Hello there..."
Speaker B: "Hi..."
Audio: [A] [A+B] [A]
 ↑
 Overlapped!

Problem: Single-channel audio can’t separate perfectly

Approach 1: Overlap Detection + Best Effort

class OverlapHandler:
    """
    Detect overlapping speech and handle gracefully
    """

    def detect_overlaps(self, segments):
        """
        Find overlapping segments

        Args:
            segments: List of {speaker, start, end}

            Returns:
                List of overlap regions
                """
                overlaps = []

                for i, seg1 in enumerate(segments):
                    for seg2 in segments[i+1:]:
                        # Check if overlapping
                        if seg1['end'] > seg2['start'] and seg1['start'] < seg2['end']:
                            # Compute overlap region
                            overlap_start = max(seg1['start'], seg2['start'])
                            overlap_end = min(seg1['end'], seg2['end'])

                            overlaps.append({
                            'start': overlap_start,
                            'end': overlap_end,
                            'speakers': [seg1['speaker'], seg2['speaker']]
                            })

                            return overlaps

    def handle_overlap(self, audio, overlap, speakers):
        """
        Handle overlapped region

        Options:
            1. Transcribe mixed audio (less accurate)
            2. Mark as [OVERLAP] in transcript
            3. Use source separation (advanced)
            """
            # Option 1: Transcribe mixed audio
            segment = audio[int(overlap['start']*sr):int(overlap['end']*sr)]
            result = asr_model.transcribe(segment)

            return {
            'type': 'overlap',
            'speakers': overlap['speakers'],
            'start': overlap['start'],
            'end': overlap['end'],
            'text': result['text'],
            'confidence': 'low' # Mark as uncertain
            }

Approach 2: Multi-Channel Source Separation

If you have multiple microphones, you can separate speakers!

class MultiChannelSeparation:
    """
    Use multiple microphones to separate speakers

    Requires: Multiple audio channels (e.g., mic array)
    """

    def __init__(self):
        # Use beamforming or deep learning separation
        pass

    def separate(self, multi_channel_audio):
        """
        Separate speakers using spatial information

        Args:
            multi_channel_audio: (channels, samples)

            Returns:
                separated_sources: List of (speaker_audio, speaker_id)
                """
                # Advanced: Use Conv-TasNet or similar (from )
                # Here we'll use simple beamforming

                from scipy import signal

                # Beamforming toward each speaker
                # (Simplified - real implementation is complex)

                # For now, just return multi-channel as-is
                # In production, use libraries like:
                # - pyroomacoustics (beamforming)
                # - asteroid (deep learning separation)

                return multi_channel_audio

Real-Time Streaming

Challenge: Process live audio with low latency

Streaming Architecture

User's mic
 ↓
[Capture] → [Buffer] → [VAD] → [Diarization] → [ASR] → [Display]
 20ms 500ms 50ms 100ms 100ms 10ms
 ↑
 Total: ~270ms latency
class StreamingMultiSpeakerASR:
    """
    Real-time multi-speaker ASR

    Processes audio chunks as they arrive
    """

    def __init__(self, chunk_duration=0.5):
        self.chunk_duration = chunk_duration
        self.buffer = []
        self.speaker_history = {} # Track speaker embeddings over time

        # Models
        import whisper
        self.vad = VoiceActivityDetector()
        self.embedding_extractor = SpeakerEmbeddingExtractor()
        self.asr_model = whisper.load_model("tiny") # Faster for real-time

        async def process_stream(self, audio_stream):
            """
            Process audio stream in real-time

            Args:
                audio_stream: Async iterator yielding audio chunks
                """
                async for chunk in audio_stream:
                    # Add to buffer
                    self.buffer.extend(chunk)

                    # Process if buffer large enough
                    if len(self.buffer) >= int(self.chunk_duration * 16000):
                        result = await self._process_chunk()

                        if result:
                            yield result

                            async def _process_chunk(self):
                                """Process buffered audio chunk"""
                                import numpy as np

                                # Get chunk
                                chunk = np.array(self.buffer[:int(self.chunk_duration * 16000)])

                                # Remove from buffer (with overlap for continuity)
                                overlap_samples = int(0.1 * 16000) # 100ms overlap
                                self.buffer = self.buffer[len(chunk) - overlap_samples:]

                                # VAD
                                if not self._is_speech(chunk):
                                    return None

                                    # Extract embedding
                                    embedding = self.embedding_extractor.extract(chunk)

                                    # Identify speaker (match with history)
                                    speaker_id = self._identify_speaker(embedding)

                                    # Transcribe (async to not block)
                                    import asyncio
                                    text = await asyncio.to_thread(self.asr_model.transcribe, chunk, fp16=False)

                                    return {
                                    'speaker': speaker_id,
                                    'text': text['text'],
                                    'timestamp': __import__('time').time()
                                    }

    def _is_speech(self, chunk):
        """Quick speech check"""
        energy = np.mean(chunk ** 2)
        return energy > 0.01

    def _identify_speaker(self, embedding):
        """
        Match embedding to known speakers

        If new speaker, assign new ID
        """
        import numpy as np

        # Compare with known speakers
        best_match = None
        best_similarity = -1

        for speaker_id, known_embedding in self.speaker_history.items():
            # Cosine similarity
            similarity = np.dot(embedding, known_embedding) / (
            np.linalg.norm(embedding) * np.linalg.norm(known_embedding)
            )

            if similarity > best_similarity:
                best_similarity = similarity
                best_match = speaker_id

                # Threshold for same speaker
                if best_similarity > 0.75:
                    return best_match
                else:
                    # New speaker
                    new_id = f"SPEAKER_{len(self.speaker_history)}"
                    self.speaker_history[new_id] = embedding
                    return new_id

                    # Usage with WebSocket
                    import asyncio
                    import websockets

                    async def handle_client(websocket, path):
                        """Handle incoming audio stream from client"""
                        asr = StreamingMultiSpeakerASR()

                        async for result in asr.process_stream(websocket):
                            # Send transcription back to client
                            await websocket.send(json.dumps(result))

                            # Start server
                            start_server = websockets.serve(handle_client, "localhost", 8765)
                            asyncio.get_event_loop().run_until_complete(start_server)
                            asyncio.get_event_loop().run_forever()

Common Failure Modes & Debugging

Failure Mode 1: Speaker Confusion

Symptom: System assigns same utterance to multiple speakers or switches mid-sentence

Example:

Ground truth:
 [Alice, 0-5s]: "Hello, how are you today?"

System output (WRONG):
 [Alice, 0-2s]: "Hello, how"
 [Bob, 2-5s]: "are you today?"

Root causes:

  1. Insufficient speech for embedding
    • Embeddings need 2-3 seconds minimum
    • Short utterances (<1s) have unreliable embeddings
  2. Similar voices
    • Two speakers with similar pitch/timbre
    • System can’t distinguish
  3. Poor audio quality
    • Background noise corrupts embeddings
    • Low SNR (<10dB) confuses system

Solutions:

class RobustSpeakerIdentification:
    """
    Handle edge cases in speaker identification
    """

    def __init__(self, min_segment_duration=2.0):
        self.min_segment_duration = min_segment_duration
        self.speaker_history = [] # Track recent speakers

    def identify_with_context(self, audio_segment, duration, prev_speaker=None):
        """
        Identify speaker with contextual hints

        Args:
            audio_segment: Audio to identify
            duration: Segment duration in seconds
            prev_speaker: Who spoke last (context)

            Returns:
                speaker_id, confidence
                """
                # Check 1: Is segment long enough?
                if duration < self.min_segment_duration:
                    # Too short for reliable embedding
                    # Use speaker from previous segment (continuity assumption)
                    if prev_speaker:
                        return prev_speaker, 0.5 # Low confidence
                    else:
                        return "UNKNOWN", 0.0

                        # Check 2: Extract embedding
                        embedding = self.extract_embedding(audio_segment)

                        # Check 3: Identify with threshold
                        speaker, similarity = self.identify_speaker(embedding)

                        # Check 4: Apply contextual prior
                        if prev_speaker and similarity < 0.75:
                            # Ambiguous - bias toward previous speaker (people usually finish sentences)
                            return prev_speaker, 0.6

                            return speaker, similarity

Failure Mode 2: Overlap Mis-attribution

Symptom: During overlaps, words from Speaker A attributed to Speaker B

Example:

Ground truth:
 [Alice, 0-3s]: "I think we should consider this option"
 [Bob, 2-4s]: "Wait, what about the other approach?"

System output (WRONG):
 [Alice, 0-2s]: "I think we should"
 [Bob, 2-4s]: "consider this option Wait, what about the other approach?"
 ↑ These words are Alice's, not Bob's!

Root cause: Diarization boundaries don’t align with actual speaker turns

Solution: Post-processing refinement

class OverlapRefiner:
    """
    Refine transcriptions in overlap regions
    """

    def refine_overlaps(self, segments, asr_results):
        """
        Use ASR confidence to refine overlap boundaries

        Idea: Low-confidence words might be from the other speaker
        """
        refined = []

        for i, (seg, result) in enumerate(zip(segments, asr_results)):
            words = result['words'] # Word-level timestamps + confidence

            # Check if next segment overlaps
            if i < len(segments) - 1:
                next_seg = segments[i+1]

                if self._is_overlapping(seg, next_seg):
                    # Refine boundary based on word confidence
                    words, next_words = self._split_by_confidence(
                    words, seg, next_seg
                    )

                    refined.append({
                    'speaker': seg['speaker'],
                    'start': seg['start'],
                    'end': seg['end'],
                    'words': words
                    })

                    return refined

    def _split_by_confidence(self, words, seg1, seg2):
        """
        Split words between two overlapping segments

        High-confidence words stay, low-confidence might belong to other speaker
        """
        overlap_start = max(seg1['start'], seg2['start'])
        overlap_end = min(seg1['end'], seg2['end'])

        seg1_words = []
        seg2_words = []

        for word in words:
            # Check if word is in overlap region
            if overlap_start <= word['start'] <= overlap_end:
                # In overlap - check confidence
                if word['confidence'] > 0.8:
                    seg1_words.append(word) # Keep in current segment
                else:
                    seg2_words.append(word) # Might belong to other speaker
                else:
                    seg1_words.append(word)

                    return seg1_words, seg2_words

Failure Mode 3: Far-Field Audio Degradation

Symptom: Accuracy drops significantly when speaker is far from microphone

Example metrics:

Near-field (< 1m from mic):
 WER: 5%
 Diarization accuracy: 95%

Far-field (> 3m from mic):
 WER: 25% ← 5x worse!
 Diarization accuracy: 70%

Root cause:

  • Lower SNR (signal-to-noise ratio)
  • More reverberation
  • Acoustic reflections

Solutions:

  1. Beamforming (if mic array available)
  2. Speech enhancement pre-processing
  3. Specialized far-field models
class FarFieldPreprocessor:
    """
    Enhance far-field audio before ASR
    """

    def enhance(self, audio, sample_rate=16000):
        """
        Apply far-field enhancements

        1. Dereverb (reduce echo)
        2. Denoise
        3. Equalize (boost high frequencies)
        """
        # Step 1: Dereverberation (WPE algorithm)
        enhanced = self._dereverb_wpe(audio, sample_rate)

        # Step 2: Noise reduction (spectral subtraction)
        enhanced = self._denoise(enhanced, sample_rate)

        # Step 3: Equalization (boost consonants)
        enhanced = self._equalize(enhanced, sample_rate)

        return enhanced

    def _dereverb_wpe(self, audio, sr):
        """
        Weighted Prediction Error (WPE) dereverberation

        Removes room echo/reverberation
        """
        # Simplified - use library like `nara_wpe` in production
        from scipy import signal

        # High-pass filter to remove low-freq rumble
        sos = signal.butter(5, 100, 'highpass', fs=sr, output='sos')
        filtered = signal.sosfilt(sos, audio)

        return filtered

    def _denoise(self, audio, sr):
        """
        Spectral subtraction noise reduction
        """
        import noisereduce as nr

        # Estimate noise from first 0.5s (assuming silence/noise)
        reduced = nr.reduce_noise(
        y=audio,
        sr=sr,
        stationary=True,
        prop_decrease=0.8
        )

        return reduced

    def _equalize(self, audio, sr):
        """
        Boost high frequencies (consonants)

        Far-field audio loses high-freq content
        """
        from scipy import signal

        # Boost 2-8kHz (consonant region)
        sos = signal.butter(3, [2000, 8000], 'bandpass', fs=sr, output='sos')
        boosted = signal.sosfilt(sos, audio)

        # Mix with original (50-50)
        enhanced = 0.5 * audio + 0.5 * boosted

        return enhanced

Debugging Tools

class MultiSpeakerASRDebugger:
    """
    Tools for debugging multi-speaker ASR issues
    """

    def visualize_diarization(self, segments, audio_duration):
        """
        Visual timeline of speakers

        Helps spot issues like:
            - Too many speaker switches
            - Missing speakers
            - Wrong boundaries
            """
            import matplotlib.pyplot as plt
            import numpy as np

            fig, ax = plt.subplots(figsize=(15, 3))

            # Plot each segment
            for seg in segments:
                speaker_id = int(seg['speaker'].split('_')[1])
                color = plt.cm.tab10(speaker_id)

                ax.barh(
                y=speaker_id,
                width=seg['end'] - seg['start'],
                left=seg['start'],
                height=0.8,
                color=color,
                label=seg['speaker']
                )

                ax.set_xlabel('Time (seconds)')
                ax.set_ylabel('Speaker')
                ax.set_title('Speaker Diarization Timeline')
                ax.set_xlim(0, audio_duration)

                plt.tight_layout()
                plt.savefig('diarization_debug.png')
                print("✓ Saved visualization to diarization_debug.png")

    def compute_metrics(self, predicted_segments, ground_truth_segments):
        """
        Compute diarization metrics

        DER (Diarization Error Rate) =
        (False Alarm + Miss + Speaker Confusion) / Total
        """
        from pyannote.metrics.diarization import DiarizationErrorRate

        der = DiarizationErrorRate()

        # Convert to pyannote format
        pred_annotation = self._to_annotation(predicted_segments)
        gt_annotation = self._to_annotation(ground_truth_segments)

        # Compute DER
        error_rate = der(gt_annotation, pred_annotation)

        # Detailed breakdown
        details = der.components(gt_annotation, pred_annotation)

        return {
        'DER': error_rate,
        'false_alarm': details['false alarm'],
        'missed_detection': details['missed detection'],
        'speaker_confusion': details['confusion']
        }

    def _to_annotation(self, segments):
        """Convert segments to pyannote Annotation format"""
        from pyannote.core import Annotation, Segment

        annotation = Annotation()

        for seg in segments:
            annotation[Segment(seg['start'], seg['end'])] = seg['speaker']

            return annotation

Production Considerations

1. Latency Optimization

Target: < 300ms end-to-end for real-time feel

Breakdown:

Audio capture: 20ms
Buffering: 100ms
VAD: 10ms
Embedding: 50ms
ASR: 100ms
Network: 20ms
Total: 300ms

Optimizations:

  • Use smaller ASR models (Whisper tiny/base)
  • Batch embedding extraction
  • Pre-compute speaker profiles
  • GPU acceleration
  • Reduce network round-trips

2. Accuracy vs Speed Trade-off

Model Size Latency WER Use Case
Whisper tiny 50ms 10% Live captions
Whisper base 100ms 7% Meetings
Whisper medium 300ms 5% Post-processing
Whisper large 1000ms 3% Archival transcription

3. Speaker Persistence

Challenge: Same speaker should have consistent ID across session

class SpeakerRegistry:
    """
    Maintain consistent speaker IDs

    Matches new embeddings to registered speakers
    """

    def __init__(self, similarity_threshold=0.75):
        self.speakers = {} # id -> mean embedding
        self.threshold = similarity_threshold

    def register_or_identify(self, embedding):
        """
        Register new speaker or identify existing
        """
        # Check against known speakers
        for speaker_id, known_emb in self.speakers.items():
            similarity = cosine_similarity(embedding, known_emb)

            if similarity > self.threshold:
                # Update running average
                self.speakers[speaker_id] = (
                0.9 * known_emb + 0.1 * embedding
                )
                return speaker_id

                # New speaker
                new_id = f"SPEAKER_{len(self.speakers) + 1}"
                self.speakers[new_id] = embedding
                return new_id

4. Monitoring & Debugging

class MultiSpeakerASRMetrics:
    """
    Track system performance
    """

    def __init__(self):
        self.metrics = {
        'latency_ms': [],
        'overlap_ratio': 0,
        'speaker_switches_per_minute': 0,
        'wer_per_speaker': {}
        }

    def log_latency(self, latency_ms):
        self.metrics['latency_ms'].append(latency_ms)

    def report(self):
        import numpy as np

        return {
        'p50_latency_ms': np.median(self.metrics['latency_ms']),
        'p95_latency_ms': np.percentile(self.metrics['latency_ms'], 95),
        'overlap_ratio': self.metrics['overlap_ratio'],
        'speaker_switches_per_minute': self.metrics['speaker_switches_per_minute']
        }

Key Takeaways

Multi-speaker ASR = VAD + Diarization + ASR ✅ Speaker embeddings capture voice identity ✅ Clustering groups segments by speaker ✅ Overlaps are hard - detect and handle gracefully ✅ Real-time requires careful latency optimization ✅ State-of-the-art: Use pyannote.audio + Whisper

Production tips:

  • Start with pyannote + whisper (best quality)
  • Optimize latency with smaller models if needed
  • Handle overlaps explicitly (mark in transcript)
  • Maintain speaker consistency across session
  • Monitor latency and accuracy per speaker

FAQ

What is multi-speaker ASR and how does it differ from single-speaker ASR?

Multi-speaker ASR recognizes speech from multiple speakers in a conversation, identifies who spoke each word, and handles overlapping speech. Unlike single-speaker ASR, it combines voice activity detection, speaker diarization, and per-speaker transcription to produce speaker-attributed transcripts. Single-speaker ASR assumes one voice at a time with no speaker changes, which breaks down in real-world meetings, calls, and conversations.

How do speaker embeddings work in diarization systems?

Speaker embeddings are fixed-size vectors (typically 512 dimensions) that capture voice identity characteristics like pitch, formant structure, and speaking rate. Models like ECAPA-TDNN are trained using triplet loss to cluster same-speaker utterances together while separating different speakers in embedding space. At inference time, embeddings are extracted from short audio segments and clustered to assign speaker labels.

What is the typical latency for real-time multi-speaker ASR?

Production real-time multi-speaker ASR systems target under 300ms end-to-end latency, broken down as roughly 20ms audio capture, 100ms buffering, 10ms VAD, 50ms embedding extraction, 100ms ASR, and 20ms network overhead. Smaller ASR models like Whisper tiny or base are used for streaming to meet these constraints, with larger models reserved for post-processing.

How do multi-speaker ASR systems handle overlapping speech?

Overlapping speech is handled through overlap detection followed by either transcribing the mixed audio with lower confidence, marking regions as overlapped in the transcript, or using multi-channel source separation with beamforming when multiple microphones are available. This remains the hardest unsolved challenge in multi-speaker ASR.


Originally published at: arunbaby.com/speech-tech/0012-multi-speaker-asr

Want to work together?

I take on projects, advisory roles, and fractional CTO engagements in AI/ML. I also help businesses go AI-native with agentic workflows and agent orchestration.

Get in touch