19 minute read

Build production speaker diarization systems that cluster audio segments by speaker using embedding-based similarity and hash-based grouping.

Problem Statement

Design a Speaker Diarization System that answers “who spoke when?” in multi-speaker audio recordings, clustering speech segments by speaker identity without prior knowledge of speaker identities or count.

Functional Requirements

  1. Speaker segmentation: Detect speaker change points
  2. Speaker clustering: Group segments by speaker identity
  3. Speaker count estimation: Automatically determine number of speakers
  4. Overlap handling: Detect and handle overlapping speech
  5. Real-time capability: Process audio with minimal latency (<1s per minute)
  6. Speaker labels: Assign consistent labels across recordings
  7. Quality metrics: Calculate Diarization Error Rate (DER)
  8. Multi-language support: Work across different languages

Non-Functional Requirements

  1. Accuracy: DER < 10% on benchmark datasets
  2. Latency: <1 second to process 1 minute of audio
  3. Throughput: 1000+ concurrent diarization sessions
  4. Scalability: Handle 10,000+ hours of audio daily
  5. Real-time: Support live streaming diarization
  6. Cost: <$0.01 per minute of audio
  7. Robustness: Handle noise, accents, channel variability

Understanding the Problem

Speaker diarization is critical for many applications:

Use Cases

Company Use Case Approach Scale
Zoom Meeting transcription Real-time online diarization 300M+ meetings/day
Google Meet Speaker identification x-vector + clustering Billions of minutes
Otter.ai Note-taking Offline batch diarization 10M+ hours
Amazon Alexa Multi-user recognition Speaker ID + diarization 100M+ devices
Microsoft Teams Meeting analytics Hybrid online/offline Enterprise scale
Call centers Quality assurance Batch processing Millions of calls

Why Diarization Matters

  1. Meeting transcripts: Attribute speech to correct speaker
  2. Call analytics: Separate agent vs customer
  3. Podcast production: Automatic speaker labeling
  4. Surveillance: Track multiple speakers
  5. Accessibility: Better subtitles with speaker info
  6. Content search: “Find all segments where Person A spoke”

The Hash-Based Grouping Connection

Just like Group Anagrams and Clustering Systems:

Group Anagrams Clustering Systems Speaker Diarization
Group strings by chars Group points by features Group segments by speaker
Hash: sorted string Hash: quantized vector Hash: voice embedding
Exact matching Similarity matching Similarity matching
O(NK log K) O(NK) with LSH O(N log N) with clustering

All three use hash-based or similarity-based grouping to organize items efficiently.

High-Level Architecture

┌─────────────────────────────────────────────────────────────────┐
│                  Speaker Diarization System                      │
└─────────────────────────────────────────────────────────────────┘

                    Audio Input
                    (Multi-speaker)
                         ↓
            ┌────────────────────────┐
            │  Voice Activity        │
            │  Detection (VAD)       │
            │  - Remove silence      │
            └───────────┬────────────┘
                        │
            ┌───────────▼────────────┐
            │  Audio Segmentation    │
            │  - Fixed windows       │
            │  - Change detection    │
            └───────────┬────────────┘
                        │
            ┌───────────▼────────────┐
            │  Embedding Extraction  │
            │  - x-vectors           │
            │  - d-vectors           │
            │  - ECAPA-TDNN          │
            └───────────┬────────────┘
                        │
        ┌───────────────┼───────────────┐
        │               │               │
┌───────▼──────┐ ┌─────▼─────┐ ┌──────▼──────┐
│ Clustering   │ │ Refinement│ │ Overlap     │
│ - AHC        │ │ - VB      │ │ Detection   │
│ - Spectral   │ │ - PLDA    │ │             │
└───────┬──────┘ └─────┬─────┘ └──────┬──────┘
        │               │               │
        └───────────────┼───────────────┘
                        │
            ┌───────────▼────────────┐
            │  Diarization Output    │
            │                        │
            │  [0-10s]:  Speaker A   │
            │  [10-25s]: Speaker B   │
            │  [25-40s]: Speaker A   │
            │  [40-55s]: Speaker C   │
            └────────────────────────┘

Key Components

  1. VAD: Remove silence and non-speech
  2. Segmentation: Split audio into segments
  3. Embedding Extraction: Convert segments to vectors
  4. Clustering: Group segments by speaker (like anagram grouping!)
  5. Refinement: Improve boundaries and assignments
  6. Overlap Detection: Handle simultaneous speech

Component Deep-Dives

1. Voice Activity Detection (VAD)

Remove silence to focus on speech segments:

import numpy as np
import librosa
from typing import List, Tuple

class VoiceActivityDetector:
    """
    Voice Activity Detection using energy-based approach.
    
    Filters out silence before diarization.
    """
    
    def __init__(
        self,
        sample_rate: int = 16000,
        frame_length: int = 512,
        hop_length: int = 160,
        energy_threshold: float = 0.03
    ):
        self.sample_rate = sample_rate
        self.frame_length = frame_length
        self.hop_length = hop_length
        self.energy_threshold = energy_threshold
    
    def detect(self, audio: np.ndarray) -> List[Tuple[float, float]]:
        """
        Detect speech segments.
        
        Args:
            audio: Audio waveform
            
        Returns:
            List of (start_time, end_time) tuples in seconds
        """
        # Calculate energy for each frame
        energy = librosa.feature.rms(
            y=audio,
            frame_length=self.frame_length,
            hop_length=self.hop_length
        )[0]
        
        # Normalize energy
        energy = energy / (energy.max() + 1e-8)
        
        # Threshold to get speech frames
        speech_frames = energy > self.energy_threshold
        
        # Convert frames to time segments
        segments = self._frames_to_segments(speech_frames)
        
        return segments
    
    def _frames_to_segments(
        self,
        speech_frames: np.ndarray
    ) -> List[Tuple[float, float]]:
        """Convert binary frame sequence to time segments."""
        segments = []
        
        in_speech = False
        start_frame = 0
        
        for i, is_speech in enumerate(speech_frames):
            if is_speech and not in_speech:
                # Speech started
                start_frame = i
                in_speech = True
            elif not is_speech and in_speech:
                # Speech ended
                start_time = start_frame * self.hop_length / self.sample_rate
                end_time = i * self.hop_length / self.sample_rate
                segments.append((start_time, end_time))
                in_speech = False
        
        # Handle case where speech continues to end
        if in_speech:
            start_time = start_frame * self.hop_length / self.sample_rate
            end_time = len(speech_frames) * self.hop_length / self.sample_rate
            segments.append((start_time, end_time))
        
        return segments

2. Speaker Embedding Extraction

Extract voice embeddings (x-vectors) for each segment:

import torch
import torch.nn as nn

class SpeakerEmbeddingExtractor:
    """
    Extract speaker embeddings from audio.
    
    Similar to Group Anagrams:
    - Anagrams: sorted string = signature
    - Diarization: embedding vector = signature
    
    Embeddings encode speaker identity in fixed-size vector.
    """
    
    def __init__(self, model_path: str = "pretrained_xvector.pt"):
        """
        Initialize embedding extractor.
        
        In production, use pre-trained models:
        - x-vectors (Kaldi)
        - d-vectors (Google)
        - ECAPA-TDNN (SpeechBrain)
        """
        # Load pre-trained model
        # self.model = torch.load(model_path)
        
        # For demo: use dummy model
        self.model = self._create_dummy_model()
        self.model.eval()
        
        self.embedding_dim = 512
    
    def _create_dummy_model(self) -> nn.Module:
        """Create dummy embedding model for demo."""
        class DummyEmbeddingModel(nn.Module):
            def __init__(self):
                super().__init__()
                self.conv = nn.Conv1d(40, 512, kernel_size=5)
                self.pool = nn.AdaptiveAvgPool1d(1)
            
            def forward(self, x):
                # x: (batch, features, time)
                x = self.conv(x)
                x = self.pool(x)
                return x.squeeze(-1)
        
        return DummyEmbeddingModel()
    
    def extract(
        self,
        audio: np.ndarray,
        sample_rate: int = 16000
    ) -> np.ndarray:
        """
        Extract embedding from audio segment.
        
        Args:
            audio: Audio waveform
            sample_rate: Sample rate
            
        Returns:
            Embedding vector of shape (embedding_dim,)
        """
        # Extract mel spectrogram features
        mel_spec = librosa.feature.melspectrogram(
            y=audio,
            sr=sample_rate,
            n_mels=40,
            n_fft=512,
            hop_length=160
        )
        
        # Log mel spectrogram
        log_mel = librosa.power_to_db(mel_spec)
        
        # Convert to tensor
        features = torch.FloatTensor(log_mel).unsqueeze(0)
        
        # Extract embedding
        with torch.no_grad():
            embedding = self.model(features)
        
        # Normalize embedding
        embedding = embedding.squeeze().numpy()
        embedding = embedding / (np.linalg.norm(embedding) + 1e-8)
        
        return embedding
    
    def extract_batch(
        self,
        audio_segments: List[np.ndarray],
        sample_rate: int = 16000
    ) -> np.ndarray:
        """
        Extract embeddings for multiple segments.
        
        Args:
            audio_segments: List of audio waveforms
            
        Returns:
            Embedding matrix of shape (n_segments, embedding_dim)
        """
        embeddings = []
        
        for audio in audio_segments:
            emb = self.extract(audio, sample_rate)
            embeddings.append(emb)
        
        return np.array(embeddings)

3. Agglomerative Hierarchical Clustering

Cluster embeddings by speaker using AHC:

from scipy.cluster.hierarchy import linkage, fcluster
from scipy.spatial.distance import cosine
from sklearn.metrics import silhouette_score

class SpeakerClustering:
    """
    Cluster speaker embeddings using Agglomerative Hierarchical Clustering.
    
    Similar to Group Anagrams:
    - Anagrams: group by sorted string
    - Diarization: group by embedding similarity
    
    Both group similar items, but diarization uses approximate similarity.
    """
    
    def __init__(
        self,
        metric: str = "cosine",
        linkage_method: str = "average",
        threshold: float = 0.5
    ):
        """
        Initialize speaker clustering.
        
        Args:
            metric: Distance metric ("cosine", "euclidean")
            linkage_method: "average", "complete", "ward"
            threshold: Clustering threshold
        """
        self.metric = metric
        self.linkage_method = linkage_method
        self.threshold = threshold
        
        self.linkage_matrix = None
        self.labels = None
    
    def fit_predict(self, embeddings: np.ndarray) -> np.ndarray:
        """
        Cluster embeddings into speakers.
        
        Args:
            embeddings: Embedding matrix (n_segments, embedding_dim)
            
        Returns:
            Cluster labels (n_segments,)
        """
        n_segments = len(embeddings)
        
        if n_segments < 2:
            return np.array([0])
        
        # Calculate pairwise distances
        if self.metric == "cosine":
            # Cosine distance
            from sklearn.metrics.pairwise import cosine_similarity
            similarity = cosine_similarity(embeddings)
            distances = 1 - similarity
            
            # Convert to condensed distance matrix
            from scipy.spatial.distance import squareform
            distances = squareform(distances, checks=False)
        else:
            # Use scipy's pdist
            from scipy.spatial.distance import pdist
            distances = pdist(embeddings, metric=self.metric)
        
        # Perform hierarchical clustering
        self.linkage_matrix = linkage(
            distances,
            method=self.linkage_method,
            metric=self.metric
        )
        
        # Cut dendrogram to get clusters
        self.labels = fcluster(
            self.linkage_matrix,
            self.threshold,
            criterion='distance'
        ) - 1  # Convert to 0-indexed
        
        return self.labels
    
    def auto_tune_threshold(
        self,
        embeddings: np.ndarray,
        min_speakers: int = 2,
        max_speakers: int = 10
    ) -> float:
        """
        Automatically tune clustering threshold.
        
        Uses silhouette score to find optimal threshold.
        
        Args:
            embeddings: Embedding matrix
            min_speakers: Minimum number of speakers
            max_speakers: Maximum number of speakers
            
        Returns:
            Optimal threshold
        """
        best_threshold = self.threshold
        best_score = -1.0
        
        # Try different thresholds
        for threshold in np.linspace(0.1, 1.0, 20):
            self.threshold = threshold
            labels = self.fit_predict(embeddings)
            
            n_clusters = len(np.unique(labels))
            
            # Check if within valid range
            if n_clusters < min_speakers or n_clusters > max_speakers:
                continue
            
            # Calculate silhouette score
            if n_clusters > 1 and n_clusters < len(embeddings):
                score = silhouette_score(embeddings, labels)
                
                if score > best_score:
                    best_score = score
                    best_threshold = threshold
        
        self.threshold = best_threshold
        return best_threshold
    
    def estimate_num_speakers(self, embeddings: np.ndarray) -> int:
        """
        Estimate number of speakers using elbow method.
        
        Similar to finding optimal k in K-means.
        """
        from scipy.cluster.hierarchy import dendrogram
        
        # Calculate dendrogram
        # Look for "elbow" in height differences
        
        if self.linkage_matrix is None:
            self.fit_predict(embeddings)
        
        # Get cluster counts at different thresholds
        thresholds = np.linspace(0.1, 1.0, 20)
        cluster_counts = []
        
        for threshold in thresholds:
            labels = fcluster(
                self.linkage_matrix,
                threshold,
                criterion='distance'
            )
            cluster_counts.append(len(np.unique(labels)))
        
        # Find elbow point
        # Simplified: use median
        return int(np.median(cluster_counts))

4. Complete Diarization Pipeline

from dataclasses import dataclass
from typing import List, Tuple, Optional
import logging

@dataclass
class DiarizationSegment:
    """A speech segment with speaker label."""
    start_time: float
    end_time: float
    speaker_id: int
    confidence: float = 1.0
    
    @property
    def duration(self) -> float:
        return self.end_time - self.start_time

class SpeakerDiarization:
    """
    Complete speaker diarization system.
    
    Pipeline:
    1. VAD: Remove silence
    2. Segmentation: Split into windows
    3. Embedding extraction: Get x-vectors
    4. Clustering: Group by speaker (like anagram grouping!)
    5. Smoothing: Refine boundaries
    
    Similar to Group Anagrams:
    - Input: List of audio segments
    - Process: Extract embeddings (like sorting strings)
    - Output: Grouped segments (like grouped anagrams)
    """
    
    def __init__(
        self,
        vad_threshold: float = 0.03,
        segment_duration: float = 1.5,
        overlap: float = 0.75,
        clustering_threshold: float = 0.5
    ):
        """
        Initialize diarization system.
        
        Args:
            vad_threshold: Voice activity threshold
            segment_duration: Duration of segments (seconds)
            overlap: Overlap between segments (seconds)
            clustering_threshold: Speaker clustering threshold
        """
        self.vad = VoiceActivityDetector(energy_threshold=vad_threshold)
        self.embedding_extractor = SpeakerEmbeddingExtractor()
        self.clustering = SpeakerClustering(threshold=clustering_threshold)
        
        self.segment_duration = segment_duration
        self.overlap = overlap
        
        self.logger = logging.getLogger(__name__)
    
    def diarize(
        self,
        audio: np.ndarray,
        sample_rate: int = 16000,
        num_speakers: Optional[int] = None
    ) -> List[DiarizationSegment]:
        """
        Perform speaker diarization.
        
        Args:
            audio: Audio waveform
            sample_rate: Sample rate
            num_speakers: Optional number of speakers (auto-detect if None)
            
        Returns:
            List of diarization segments
        """
        self.logger.info("Starting diarization...")
        
        # Step 1: Voice Activity Detection
        speech_segments = self.vad.detect(audio)
        self.logger.info(f"Found {len(speech_segments)} speech segments")
        
        if not speech_segments:
            return []
        
        # Step 2: Create overlapping windows
        windows = self._create_windows(audio, sample_rate, speech_segments)
        self.logger.info(f"Created {len(windows)} windows")
        
        if not windows:
            return []
        
        # Step 3: Extract embeddings
        embeddings = self._extract_embeddings(audio, windows, sample_rate)
        self.logger.info(f"Extracted embeddings of shape {embeddings.shape}")
        
        # Step 4: Cluster by speaker
        if num_speakers is not None:
            # If num_speakers provided, use it
            labels = self._cluster_fixed_speakers(embeddings, num_speakers)
        else:
            # Auto-detect number of speakers
            labels = self.clustering.fit_predict(embeddings)
        
        n_speakers = len(np.unique(labels))
        self.logger.info(f"Detected {n_speakers} speakers")
        
        # Step 5: Convert to segments
        segments = self._windows_to_segments(windows, labels)
        
        # Step 6: Smooth boundaries
        segments = self._smooth_segments(segments)
        
        return segments
    
    def _create_windows(
        self,
        audio: np.ndarray,
        sample_rate: int,
        speech_segments: List[Tuple[float, float]]
    ) -> List[Tuple[float, float]]:
        """
        Create overlapping windows for embedding extraction.
        
        Args:
            audio: Audio waveform
            sample_rate: Sample rate
            speech_segments: Speech segments from VAD
            
        Returns:
            List of (start_time, end_time) windows
        """
        windows = []
        
        hop_duration = self.segment_duration - self.overlap
        
        for seg_start, seg_end in speech_segments:
            current_time = seg_start
            
            while current_time + self.segment_duration <= seg_end:
                windows.append((
                    current_time,
                    current_time + self.segment_duration
                ))
                current_time += hop_duration
            
            # Add last window if remaining duration > 50% of segment_duration
            if seg_end - current_time > self.segment_duration * 0.5:
                windows.append((current_time, seg_end))
        
        return windows
    
    def _extract_embeddings(
        self,
        audio: np.ndarray,
        windows: List[Tuple[float, float]],
        sample_rate: int
    ) -> np.ndarray:
        """Extract embeddings for all windows."""
        audio_segments = []
        
        for start, end in windows:
            start_sample = int(start * sample_rate)
            end_sample = int(end * sample_rate)
            
            segment_audio = audio[start_sample:end_sample]
            audio_segments.append(segment_audio)
        
        # Extract embeddings in batch
        embeddings = self.embedding_extractor.extract_batch(
            audio_segments,
            sample_rate
        )
        
        return embeddings
    
    def _cluster_fixed_speakers(
        self,
        embeddings: np.ndarray,
        num_speakers: int
    ) -> np.ndarray:
        """Cluster with fixed number of speakers."""
        from sklearn.cluster import KMeans
        
        kmeans = KMeans(n_clusters=num_speakers, random_state=42)
        labels = kmeans.fit_predict(embeddings)
        
        return labels
    
    def _windows_to_segments(
        self,
        windows: List[Tuple[float, float]],
        labels: np.ndarray
    ) -> List[DiarizationSegment]:
        """Convert windows with labels to segments."""
        segments = []
        
        for (start, end), label in zip(windows, labels):
            segments.append(DiarizationSegment(
                start_time=start,
                end_time=end,
                speaker_id=int(label)
            ))
        
        return segments
    
    def _smooth_segments(
        self,
        segments: List[DiarizationSegment],
        min_duration: float = 0.5
    ) -> List[DiarizationSegment]:
        """
        Smooth segment boundaries.
        
        Steps:
        1. Merge consecutive segments from same speaker
        2. Remove very short segments
        3. Fill gaps between segments
        """
        if not segments:
            return []
        
        # Sort by start time
        segments = sorted(segments, key=lambda s: s.start_time)
        
        # Merge consecutive segments from same speaker
        merged = []
        current = segments[0]
        
        for segment in segments[1:]:
            if (segment.speaker_id == current.speaker_id and
                segment.start_time - current.end_time < 0.3):
                # Merge
                current = DiarizationSegment(
                    start_time=current.start_time,
                    end_time=segment.end_time,
                    speaker_id=current.speaker_id
                )
            else:
                # Save current and start new
                if current.duration >= min_duration:
                    merged.append(current)
                current = segment
        
        # Add last segment
        if current.duration >= min_duration:
            merged.append(current)
        
        return merged
    
    def format_output(
        self,
        segments: List[DiarizationSegment],
        format: str = "rttm"
    ) -> str:
        """
        Format diarization output.
        
        Args:
            segments: Diarization segments
            format: Output format ("rttm", "json", "text")
            
        Returns:
            Formatted string
        """
        if format == "rttm":
            # RTTM format (standard for diarization evaluation)
            lines = []
            for seg in segments:
                line = (
                    f"SPEAKER file 1 {seg.start_time:.2f} "
                    f"{seg.duration:.2f} <NA> <NA> speaker_{seg.speaker_id} <NA> <NA>"
                )
                lines.append(line)
            return '\n'.join(lines)
        
        elif format == "json":
            import json
            output = [
                {
                    "start": seg.start_time,
                    "end": seg.end_time,
                    "speaker": f"speaker_{seg.speaker_id}",
                    "duration": seg.duration
                }
                for seg in segments
            ]
            return json.dumps(output, indent=2)
        
        else:  # text format
            lines = []
            for seg in segments:
                line = (
                    f"[{seg.start_time:.1f}s - {seg.end_time:.1f}s] "
                    f"Speaker {seg.speaker_id}"
                )
                lines.append(line)
            return '\n'.join(lines)


# Example usage
if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    
    # Generate sample audio (multi-speaker conversation)
    # In practice, load real audio
    sample_rate = 16000
    duration = 60  # 60 seconds
    audio = np.random.randn(sample_rate * duration) * 0.1
    
    # Create diarization system
    diarizer = SpeakerDiarization(
        segment_duration=1.5,
        overlap=0.75,
        clustering_threshold=0.5
    )
    
    # Perform diarization
    segments = diarizer.diarize(audio, sample_rate, num_speakers=None)
    
    print(f"\nDiarization Results:")
    print(f"Found {len(segments)} segments")
    print(f"Speakers: {len(set(s.speaker_id for s in segments))}")
    
    # Format output
    print("\n" + diarizer.format_output(segments, format="text"))

Production Deployment

Real-Time Streaming Diarization

from queue import Queue
from threading import Thread

class StreamingDiarization:
    """
    Online speaker diarization for live audio.
    
    Challenges:
    - Need to assign speakers before seeing full audio
    - No future context for boundary refinement
    - Must be fast (<100ms latency)
    """
    
    def __init__(self, chunk_duration: float = 2.0):
        self.chunk_duration = chunk_duration
        self.embedding_extractor = SpeakerEmbeddingExtractor()
        
        # Running state
        self.speaker_embeddings = {}  # speaker_id -> list of embeddings
        self.next_speaker_id = 0
        
        # Buffer
        self.audio_buffer = Queue()
        self.result_queue = Queue()
    
    def process_chunk(
        self,
        audio_chunk: np.ndarray,
        sample_rate: int = 16000
    ) -> Optional[DiarizationSegment]:
        """
        Process audio chunk and return diarization.
        
        Args:
            audio_chunk: Audio chunk
            sample_rate: Sample rate
            
        Returns:
            Diarization segment or None
        """
        # Extract embedding
        embedding = self.embedding_extractor.extract(audio_chunk, sample_rate)
        
        # Find nearest speaker
        speaker_id, similarity = self._find_nearest_speaker(embedding)
        
        # If no similar speaker found, create new speaker
        if speaker_id is None or similarity < 0.7:
            speaker_id = self.next_speaker_id
            self.speaker_embeddings[speaker_id] = []
            self.next_speaker_id += 1
        
        # Add embedding to speaker profile
        self.speaker_embeddings[speaker_id].append(embedding)
        
        # Return segment
        return DiarizationSegment(
            start_time=0.0,  # Relative time
            end_time=self.chunk_duration,
            speaker_id=speaker_id,
            confidence=similarity if similarity else 0.0
        )
    
    def _find_nearest_speaker(
        self,
        embedding: np.ndarray
    ) -> Tuple[Optional[int], float]:
        """Find nearest known speaker."""
        if not self.speaker_embeddings:
            return None, 0.0
        
        best_speaker = None
        best_similarity = -1.0
        
        for speaker_id, embeddings in self.speaker_embeddings.items():
            # Average speaker embedding
            speaker_emb = np.mean(embeddings, axis=0)
            
            # Cosine similarity
            similarity = np.dot(embedding, speaker_emb) / (
                np.linalg.norm(embedding) * np.linalg.norm(speaker_emb) + 1e-8
            )
            
            if similarity > best_similarity:
                best_similarity = similarity
                best_speaker = speaker_id
        
        return best_speaker, best_similarity

Evaluation Metrics

Diarization Error Rate (DER)

def calculate_der(
    reference: List[DiarizationSegment],
    hypothesis: List[DiarizationSegment],
    collar: float = 0.25
) -> Dict[str, float]:
    """
    Calculate Diarization Error Rate.
    
    DER = (False Alarm + Missed Detection + Speaker Error) / Total Speech Time
    
    Args:
        reference: Ground truth segments
        hypothesis: Predicted segments
        collar: Forgiveness collar around boundaries (seconds)
        
    Returns:
        Dictionary with DER components
    """
    # Convert segments to frame-level labels
    # Simplified implementation
    
    total_speech_time = sum(seg.duration for seg in reference)
    
    # Calculate overlap with collar
    false_alarm = 0.0
    missed_detection = 0.0
    speaker_error = 0.0
    
    # ... detailed calculation ...
    
    der = (false_alarm + missed_detection + speaker_error) / total_speech_time
    
    return {
        "der": der,
        "false_alarm": false_alarm / total_speech_time,
        "missed_detection": missed_detection / total_speech_time,
        "speaker_error": speaker_error / total_speech_time
    }

Real-World Case Study: Zoom’s Diarization

Zoom’s Approach

Zoom processes 300M+ meetings daily with speaker diarization:

Architecture:

  1. Real-time VAD:
    • WebRTC VAD for low latency
    • Runs on client side
    • Filters silence before sending to server
  2. Embedding extraction:
    • Lightweight TDNN model
    • 128-dim embeddings
    • <10ms per segment
  3. Online clustering:
    • Incremental spectral clustering
    • Updates speaker profiles in real-time
    • Handles participants joining/leaving
  4. Post-processing:
    • Offline refinement after meeting
    • Improves boundary accuracy
    • Corrects speaker switches

Results:

  • DER: 8-12% (depending on audio quality)
  • Latency: <500ms for real-time
  • Throughput: 300M+ meetings/day
  • Cost: <$0.005 per meeting hour

Key Lessons

  1. Hybrid online/offline: Real-time + post-processing
  2. Lightweight models: Fast embeddings critical
  3. Incremental clustering: Can’t wait for full audio
  4. Client-side VAD: Reduces bandwidth and cost
  5. Quality adaptation: Adjust based on audio conditions

Cost Analysis

Cost Breakdown (1000 hours audio/day)

Component On-premise Cloud Serverless
VAD $10/day $20/day $5/day
Embedding extraction $200/day $500/day $300/day
Clustering $50/day $100/day $50/day
Storage $20/day $30/day $30/day
Total $280/day $650/day $385/day
Per hour $0.28 $0.65 $0.39

Optimization strategies:

  1. Batch processing:
    • Process in larger batches
    • Amortize overhead
    • Savings: 40%
  2. Model optimization:
    • Quantization (INT8)
    • Distillation
    • Savings: 50% compute
  3. Caching:
    • Cache speaker profiles
    • Reuse across sessions
    • Savings: 20%
  4. Smart sampling:
    • Variable segment duration
    • Skip easy segments
    • Savings: 30%

Key Takeaways

Diarization = clustering audio by speaker using embedding similarity

x-vectors are standard for speaker embeddings (512-dim)

AHC works well for offline diarization with auto speaker count

Online diarization is harder - no future context, must be fast

VAD is critical - removes 50-80% of audio (silence)

Same pattern as anagrams/clustering - group by similarity signature

DER < 10% is good for production systems

Embedding quality matters most - better embeddings > better clustering

Real-time requires streaming - process chunks, incremental updates

Hybrid approach best - online for speed, offline for accuracy

All three topics share the same grouping pattern:

DSA (Group Anagrams):

  • Items: strings
  • Signature: sorted characters
  • Grouping: exact hash match
  • Result: anagram groups

ML System Design (Clustering Systems):

  • Items: data points
  • Signature: quantized vector or nearest centroid
  • Grouping: approximate similarity
  • Result: data clusters

Speech Tech (Speaker Diarization):

  • Items: audio segments
  • Signature: voice embedding (x-vector)
  • Grouping: cosine similarity threshold
  • Result: speaker-labeled segments

Universal Pattern

# Generic grouping pattern
def group_by_similarity(items, embed_function, similarity_threshold):
    """
    Universal pattern for grouping similar items.
    
    Used in:
    - Anagrams: embed = sort, threshold = exact match
    - Clustering: embed = features, threshold = distance
    - Diarization: embed = x-vector, threshold = cosine similarity
    """
    embeddings = [embed_function(item) for item in items]
    
    # Cluster by similarity
    groups = []
    assigned = set()
    
    for i, emb_i in enumerate(embeddings):
        if i in assigned:
            continue
        
        group = [i]
        assigned.add(i)
        
        for j, emb_j in enumerate(embeddings[i+1:], start=i+1):
            if j in assigned:
                continue
            
            # Check similarity
            similarity = compute_similarity(emb_i, emb_j)
            if similarity > similarity_threshold:
                group.append(j)
                assigned.add(j)
        
        groups.append(group)
    
    return groups

This pattern is universal across:

  • String algorithms (anagrams)
  • Machine learning (clustering)
  • Speech processing (diarization)
  • Computer vision (object tracking)
  • Natural language processing (document clustering)

Originally published at: arunbaby.com/speech-tech/0015-speaker-clustering-diarization

If you found this helpful, consider sharing it with others who might benefit.