Speaker Clustering (Diarization)
Build production speaker diarization systems that cluster audio segments by speaker using embedding-based similarity and hash-based grouping.
Problem Statement
Design a Speaker Diarization System that answers “who spoke when?” in multi-speaker audio recordings, clustering speech segments by speaker identity without prior knowledge of speaker identities or count.
Functional Requirements
- Speaker segmentation: Detect speaker change points
- Speaker clustering: Group segments by speaker identity
- Speaker count estimation: Automatically determine number of speakers
- Overlap handling: Detect and handle overlapping speech
- Real-time capability: Process audio with minimal latency (<1s per minute)
- Speaker labels: Assign consistent labels across recordings
- Quality metrics: Calculate Diarization Error Rate (DER)
- Multi-language support: Work across different languages
Non-Functional Requirements
- Accuracy: DER < 10% on benchmark datasets
- Latency: <1 second to process 1 minute of audio
- Throughput: 1000+ concurrent diarization sessions
- Scalability: Handle 10,000+ hours of audio daily
- Real-time: Support live streaming diarization
- Cost: <$0.01 per minute of audio
- Robustness: Handle noise, accents, channel variability
Understanding the Problem
Speaker diarization is critical for many applications:
Use Cases
| Company | Use Case | Approach | Scale |
|---|---|---|---|
| Zoom | Meeting transcription | Real-time online diarization | 300M+ meetings/day |
| Google Meet | Speaker identification | x-vector + clustering | Billions of minutes |
| Otter.ai | Note-taking | Offline batch diarization | 10M+ hours |
| Amazon Alexa | Multi-user recognition | Speaker ID + diarization | 100M+ devices |
| Microsoft Teams | Meeting analytics | Hybrid online/offline | Enterprise scale |
| Call centers | Quality assurance | Batch processing | Millions of calls |
Why Diarization Matters
- Meeting transcripts: Attribute speech to correct speaker
- Call analytics: Separate agent vs customer
- Podcast production: Automatic speaker labeling
- Surveillance: Track multiple speakers
- Accessibility: Better subtitles with speaker info
- Content search: “Find all segments where Person A spoke”
The Hash-Based Grouping Connection
Just like Group Anagrams and Clustering Systems:
| Group Anagrams | Clustering Systems | Speaker Diarization |
|---|---|---|
| Group strings by chars | Group points by features | Group segments by speaker |
| Hash: sorted string | Hash: quantized vector | Hash: voice embedding |
| Exact matching | Similarity matching | Similarity matching |
| O(NK log K) | O(NK) with LSH | O(N log N) with clustering |
All three use hash-based or similarity-based grouping to organize items efficiently.
High-Level Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Speaker Diarization System │
└─────────────────────────────────────────────────────────────────┘
Audio Input
(Multi-speaker)
↓
┌────────────────────────┐
│ Voice Activity │
│ Detection (VAD) │
│ - Remove silence │
└───────────┬────────────┘
│
┌───────────▼────────────┐
│ Audio Segmentation │
│ - Fixed windows │
│ - Change detection │
└───────────┬────────────┘
│
┌───────────▼────────────┐
│ Embedding Extraction │
│ - x-vectors │
│ - d-vectors │
│ - ECAPA-TDNN │
└───────────┬────────────┘
│
┌───────────────┼───────────────┐
│ │ │
┌───────▼──────┐ ┌─────▼─────┐ ┌──────▼──────┐
│ Clustering │ │ Refinement│ │ Overlap │
│ - AHC │ │ - VB │ │ Detection │
│ - Spectral │ │ - PLDA │ │ │
└───────┬──────┘ └─────┬─────┘ └──────┬──────┘
│ │ │
└───────────────┼───────────────┘
│
┌───────────▼────────────┐
│ Diarization Output │
│ │
│ [0-10s]: Speaker A │
│ [10-25s]: Speaker B │
│ [25-40s]: Speaker A │
│ [40-55s]: Speaker C │
└────────────────────────┘
Key Components
- VAD: Remove silence and non-speech
- Segmentation: Split audio into segments
- Embedding Extraction: Convert segments to vectors
- Clustering: Group segments by speaker (like anagram grouping!)
- Refinement: Improve boundaries and assignments
- Overlap Detection: Handle simultaneous speech
Component Deep-Dives
1. Voice Activity Detection (VAD)
Remove silence to focus on speech segments:
import numpy as np
import librosa
from typing import List, Tuple
class VoiceActivityDetector:
"""
Voice Activity Detection using energy-based approach.
Filters out silence before diarization.
"""
def __init__(
self,
sample_rate: int = 16000,
frame_length: int = 512,
hop_length: int = 160,
energy_threshold: float = 0.03
):
self.sample_rate = sample_rate
self.frame_length = frame_length
self.hop_length = hop_length
self.energy_threshold = energy_threshold
def detect(self, audio: np.ndarray) -> List[Tuple[float, float]]:
"""
Detect speech segments.
Args:
audio: Audio waveform
Returns:
List of (start_time, end_time) tuples in seconds
"""
# Calculate energy for each frame
energy = librosa.feature.rms(
y=audio,
frame_length=self.frame_length,
hop_length=self.hop_length
)[0]
# Normalize energy
energy = energy / (energy.max() + 1e-8)
# Threshold to get speech frames
speech_frames = energy > self.energy_threshold
# Convert frames to time segments
segments = self._frames_to_segments(speech_frames)
return segments
def _frames_to_segments(
self,
speech_frames: np.ndarray
) -> List[Tuple[float, float]]:
"""Convert binary frame sequence to time segments."""
segments = []
in_speech = False
start_frame = 0
for i, is_speech in enumerate(speech_frames):
if is_speech and not in_speech:
# Speech started
start_frame = i
in_speech = True
elif not is_speech and in_speech:
# Speech ended
start_time = start_frame * self.hop_length / self.sample_rate
end_time = i * self.hop_length / self.sample_rate
segments.append((start_time, end_time))
in_speech = False
# Handle case where speech continues to end
if in_speech:
start_time = start_frame * self.hop_length / self.sample_rate
end_time = len(speech_frames) * self.hop_length / self.sample_rate
segments.append((start_time, end_time))
return segments
2. Speaker Embedding Extraction
Extract voice embeddings (x-vectors) for each segment:
import torch
import torch.nn as nn
class SpeakerEmbeddingExtractor:
"""
Extract speaker embeddings from audio.
Similar to Group Anagrams:
- Anagrams: sorted string = signature
- Diarization: embedding vector = signature
Embeddings encode speaker identity in fixed-size vector.
"""
def __init__(self, model_path: str = "pretrained_xvector.pt"):
"""
Initialize embedding extractor.
In production, use pre-trained models:
- x-vectors (Kaldi)
- d-vectors (Google)
- ECAPA-TDNN (SpeechBrain)
"""
# Load pre-trained model
# self.model = torch.load(model_path)
# For demo: use dummy model
self.model = self._create_dummy_model()
self.model.eval()
self.embedding_dim = 512
def _create_dummy_model(self) -> nn.Module:
"""Create dummy embedding model for demo."""
class DummyEmbeddingModel(nn.Module):
def __init__(self):
super().__init__()
self.conv = nn.Conv1d(40, 512, kernel_size=5)
self.pool = nn.AdaptiveAvgPool1d(1)
def forward(self, x):
# x: (batch, features, time)
x = self.conv(x)
x = self.pool(x)
return x.squeeze(-1)
return DummyEmbeddingModel()
def extract(
self,
audio: np.ndarray,
sample_rate: int = 16000
) -> np.ndarray:
"""
Extract embedding from audio segment.
Args:
audio: Audio waveform
sample_rate: Sample rate
Returns:
Embedding vector of shape (embedding_dim,)
"""
# Extract mel spectrogram features
mel_spec = librosa.feature.melspectrogram(
y=audio,
sr=sample_rate,
n_mels=40,
n_fft=512,
hop_length=160
)
# Log mel spectrogram
log_mel = librosa.power_to_db(mel_spec)
# Convert to tensor
features = torch.FloatTensor(log_mel).unsqueeze(0)
# Extract embedding
with torch.no_grad():
embedding = self.model(features)
# Normalize embedding
embedding = embedding.squeeze().numpy()
embedding = embedding / (np.linalg.norm(embedding) + 1e-8)
return embedding
def extract_batch(
self,
audio_segments: List[np.ndarray],
sample_rate: int = 16000
) -> np.ndarray:
"""
Extract embeddings for multiple segments.
Args:
audio_segments: List of audio waveforms
Returns:
Embedding matrix of shape (n_segments, embedding_dim)
"""
embeddings = []
for audio in audio_segments:
emb = self.extract(audio, sample_rate)
embeddings.append(emb)
return np.array(embeddings)
3. Agglomerative Hierarchical Clustering
Cluster embeddings by speaker using AHC:
from scipy.cluster.hierarchy import linkage, fcluster
from scipy.spatial.distance import cosine
from sklearn.metrics import silhouette_score
class SpeakerClustering:
"""
Cluster speaker embeddings using Agglomerative Hierarchical Clustering.
Similar to Group Anagrams:
- Anagrams: group by sorted string
- Diarization: group by embedding similarity
Both group similar items, but diarization uses approximate similarity.
"""
def __init__(
self,
metric: str = "cosine",
linkage_method: str = "average",
threshold: float = 0.5
):
"""
Initialize speaker clustering.
Args:
metric: Distance metric ("cosine", "euclidean")
linkage_method: "average", "complete", "ward"
threshold: Clustering threshold
"""
self.metric = metric
self.linkage_method = linkage_method
self.threshold = threshold
self.linkage_matrix = None
self.labels = None
def fit_predict(self, embeddings: np.ndarray) -> np.ndarray:
"""
Cluster embeddings into speakers.
Args:
embeddings: Embedding matrix (n_segments, embedding_dim)
Returns:
Cluster labels (n_segments,)
"""
n_segments = len(embeddings)
if n_segments < 2:
return np.array([0])
# Calculate pairwise distances
if self.metric == "cosine":
# Cosine distance
from sklearn.metrics.pairwise import cosine_similarity
similarity = cosine_similarity(embeddings)
distances = 1 - similarity
# Convert to condensed distance matrix
from scipy.spatial.distance import squareform
distances = squareform(distances, checks=False)
else:
# Use scipy's pdist
from scipy.spatial.distance import pdist
distances = pdist(embeddings, metric=self.metric)
# Perform hierarchical clustering
self.linkage_matrix = linkage(
distances,
method=self.linkage_method,
metric=self.metric
)
# Cut dendrogram to get clusters
self.labels = fcluster(
self.linkage_matrix,
self.threshold,
criterion='distance'
) - 1 # Convert to 0-indexed
return self.labels
def auto_tune_threshold(
self,
embeddings: np.ndarray,
min_speakers: int = 2,
max_speakers: int = 10
) -> float:
"""
Automatically tune clustering threshold.
Uses silhouette score to find optimal threshold.
Args:
embeddings: Embedding matrix
min_speakers: Minimum number of speakers
max_speakers: Maximum number of speakers
Returns:
Optimal threshold
"""
best_threshold = self.threshold
best_score = -1.0
# Try different thresholds
for threshold in np.linspace(0.1, 1.0, 20):
self.threshold = threshold
labels = self.fit_predict(embeddings)
n_clusters = len(np.unique(labels))
# Check if within valid range
if n_clusters < min_speakers or n_clusters > max_speakers:
continue
# Calculate silhouette score
if n_clusters > 1 and n_clusters < len(embeddings):
score = silhouette_score(embeddings, labels)
if score > best_score:
best_score = score
best_threshold = threshold
self.threshold = best_threshold
return best_threshold
def estimate_num_speakers(self, embeddings: np.ndarray) -> int:
"""
Estimate number of speakers using elbow method.
Similar to finding optimal k in K-means.
"""
from scipy.cluster.hierarchy import dendrogram
# Calculate dendrogram
# Look for "elbow" in height differences
if self.linkage_matrix is None:
self.fit_predict(embeddings)
# Get cluster counts at different thresholds
thresholds = np.linspace(0.1, 1.0, 20)
cluster_counts = []
for threshold in thresholds:
labels = fcluster(
self.linkage_matrix,
threshold,
criterion='distance'
)
cluster_counts.append(len(np.unique(labels)))
# Find elbow point
# Simplified: use median
return int(np.median(cluster_counts))
4. Complete Diarization Pipeline
from dataclasses import dataclass
from typing import List, Tuple, Optional
import logging
@dataclass
class DiarizationSegment:
"""A speech segment with speaker label."""
start_time: float
end_time: float
speaker_id: int
confidence: float = 1.0
@property
def duration(self) -> float:
return self.end_time - self.start_time
class SpeakerDiarization:
"""
Complete speaker diarization system.
Pipeline:
1. VAD: Remove silence
2. Segmentation: Split into windows
3. Embedding extraction: Get x-vectors
4. Clustering: Group by speaker (like anagram grouping!)
5. Smoothing: Refine boundaries
Similar to Group Anagrams:
- Input: List of audio segments
- Process: Extract embeddings (like sorting strings)
- Output: Grouped segments (like grouped anagrams)
"""
def __init__(
self,
vad_threshold: float = 0.03,
segment_duration: float = 1.5,
overlap: float = 0.75,
clustering_threshold: float = 0.5
):
"""
Initialize diarization system.
Args:
vad_threshold: Voice activity threshold
segment_duration: Duration of segments (seconds)
overlap: Overlap between segments (seconds)
clustering_threshold: Speaker clustering threshold
"""
self.vad = VoiceActivityDetector(energy_threshold=vad_threshold)
self.embedding_extractor = SpeakerEmbeddingExtractor()
self.clustering = SpeakerClustering(threshold=clustering_threshold)
self.segment_duration = segment_duration
self.overlap = overlap
self.logger = logging.getLogger(__name__)
def diarize(
self,
audio: np.ndarray,
sample_rate: int = 16000,
num_speakers: Optional[int] = None
) -> List[DiarizationSegment]:
"""
Perform speaker diarization.
Args:
audio: Audio waveform
sample_rate: Sample rate
num_speakers: Optional number of speakers (auto-detect if None)
Returns:
List of diarization segments
"""
self.logger.info("Starting diarization...")
# Step 1: Voice Activity Detection
speech_segments = self.vad.detect(audio)
self.logger.info(f"Found {len(speech_segments)} speech segments")
if not speech_segments:
return []
# Step 2: Create overlapping windows
windows = self._create_windows(audio, sample_rate, speech_segments)
self.logger.info(f"Created {len(windows)} windows")
if not windows:
return []
# Step 3: Extract embeddings
embeddings = self._extract_embeddings(audio, windows, sample_rate)
self.logger.info(f"Extracted embeddings of shape {embeddings.shape}")
# Step 4: Cluster by speaker
if num_speakers is not None:
# If num_speakers provided, use it
labels = self._cluster_fixed_speakers(embeddings, num_speakers)
else:
# Auto-detect number of speakers
labels = self.clustering.fit_predict(embeddings)
n_speakers = len(np.unique(labels))
self.logger.info(f"Detected {n_speakers} speakers")
# Step 5: Convert to segments
segments = self._windows_to_segments(windows, labels)
# Step 6: Smooth boundaries
segments = self._smooth_segments(segments)
return segments
def _create_windows(
self,
audio: np.ndarray,
sample_rate: int,
speech_segments: List[Tuple[float, float]]
) -> List[Tuple[float, float]]:
"""
Create overlapping windows for embedding extraction.
Args:
audio: Audio waveform
sample_rate: Sample rate
speech_segments: Speech segments from VAD
Returns:
List of (start_time, end_time) windows
"""
windows = []
hop_duration = self.segment_duration - self.overlap
for seg_start, seg_end in speech_segments:
current_time = seg_start
while current_time + self.segment_duration <= seg_end:
windows.append((
current_time,
current_time + self.segment_duration
))
current_time += hop_duration
# Add last window if remaining duration > 50% of segment_duration
if seg_end - current_time > self.segment_duration * 0.5:
windows.append((current_time, seg_end))
return windows
def _extract_embeddings(
self,
audio: np.ndarray,
windows: List[Tuple[float, float]],
sample_rate: int
) -> np.ndarray:
"""Extract embeddings for all windows."""
audio_segments = []
for start, end in windows:
start_sample = int(start * sample_rate)
end_sample = int(end * sample_rate)
segment_audio = audio[start_sample:end_sample]
audio_segments.append(segment_audio)
# Extract embeddings in batch
embeddings = self.embedding_extractor.extract_batch(
audio_segments,
sample_rate
)
return embeddings
def _cluster_fixed_speakers(
self,
embeddings: np.ndarray,
num_speakers: int
) -> np.ndarray:
"""Cluster with fixed number of speakers."""
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=num_speakers, random_state=42)
labels = kmeans.fit_predict(embeddings)
return labels
def _windows_to_segments(
self,
windows: List[Tuple[float, float]],
labels: np.ndarray
) -> List[DiarizationSegment]:
"""Convert windows with labels to segments."""
segments = []
for (start, end), label in zip(windows, labels):
segments.append(DiarizationSegment(
start_time=start,
end_time=end,
speaker_id=int(label)
))
return segments
def _smooth_segments(
self,
segments: List[DiarizationSegment],
min_duration: float = 0.5
) -> List[DiarizationSegment]:
"""
Smooth segment boundaries.
Steps:
1. Merge consecutive segments from same speaker
2. Remove very short segments
3. Fill gaps between segments
"""
if not segments:
return []
# Sort by start time
segments = sorted(segments, key=lambda s: s.start_time)
# Merge consecutive segments from same speaker
merged = []
current = segments[0]
for segment in segments[1:]:
if (segment.speaker_id == current.speaker_id and
segment.start_time - current.end_time < 0.3):
# Merge
current = DiarizationSegment(
start_time=current.start_time,
end_time=segment.end_time,
speaker_id=current.speaker_id
)
else:
# Save current and start new
if current.duration >= min_duration:
merged.append(current)
current = segment
# Add last segment
if current.duration >= min_duration:
merged.append(current)
return merged
def format_output(
self,
segments: List[DiarizationSegment],
format: str = "rttm"
) -> str:
"""
Format diarization output.
Args:
segments: Diarization segments
format: Output format ("rttm", "json", "text")
Returns:
Formatted string
"""
if format == "rttm":
# RTTM format (standard for diarization evaluation)
lines = []
for seg in segments:
line = (
f"SPEAKER file 1 {seg.start_time:.2f} "
f"{seg.duration:.2f} <NA> <NA> speaker_{seg.speaker_id} <NA> <NA>"
)
lines.append(line)
return '\n'.join(lines)
elif format == "json":
import json
output = [
{
"start": seg.start_time,
"end": seg.end_time,
"speaker": f"speaker_{seg.speaker_id}",
"duration": seg.duration
}
for seg in segments
]
return json.dumps(output, indent=2)
else: # text format
lines = []
for seg in segments:
line = (
f"[{seg.start_time:.1f}s - {seg.end_time:.1f}s] "
f"Speaker {seg.speaker_id}"
)
lines.append(line)
return '\n'.join(lines)
# Example usage
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Generate sample audio (multi-speaker conversation)
# In practice, load real audio
sample_rate = 16000
duration = 60 # 60 seconds
audio = np.random.randn(sample_rate * duration) * 0.1
# Create diarization system
diarizer = SpeakerDiarization(
segment_duration=1.5,
overlap=0.75,
clustering_threshold=0.5
)
# Perform diarization
segments = diarizer.diarize(audio, sample_rate, num_speakers=None)
print(f"\nDiarization Results:")
print(f"Found {len(segments)} segments")
print(f"Speakers: {len(set(s.speaker_id for s in segments))}")
# Format output
print("\n" + diarizer.format_output(segments, format="text"))
Production Deployment
Real-Time Streaming Diarization
from queue import Queue
from threading import Thread
class StreamingDiarization:
"""
Online speaker diarization for live audio.
Challenges:
- Need to assign speakers before seeing full audio
- No future context for boundary refinement
- Must be fast (<100ms latency)
"""
def __init__(self, chunk_duration: float = 2.0):
self.chunk_duration = chunk_duration
self.embedding_extractor = SpeakerEmbeddingExtractor()
# Running state
self.speaker_embeddings = {} # speaker_id -> list of embeddings
self.next_speaker_id = 0
# Buffer
self.audio_buffer = Queue()
self.result_queue = Queue()
def process_chunk(
self,
audio_chunk: np.ndarray,
sample_rate: int = 16000
) -> Optional[DiarizationSegment]:
"""
Process audio chunk and return diarization.
Args:
audio_chunk: Audio chunk
sample_rate: Sample rate
Returns:
Diarization segment or None
"""
# Extract embedding
embedding = self.embedding_extractor.extract(audio_chunk, sample_rate)
# Find nearest speaker
speaker_id, similarity = self._find_nearest_speaker(embedding)
# If no similar speaker found, create new speaker
if speaker_id is None or similarity < 0.7:
speaker_id = self.next_speaker_id
self.speaker_embeddings[speaker_id] = []
self.next_speaker_id += 1
# Add embedding to speaker profile
self.speaker_embeddings[speaker_id].append(embedding)
# Return segment
return DiarizationSegment(
start_time=0.0, # Relative time
end_time=self.chunk_duration,
speaker_id=speaker_id,
confidence=similarity if similarity else 0.0
)
def _find_nearest_speaker(
self,
embedding: np.ndarray
) -> Tuple[Optional[int], float]:
"""Find nearest known speaker."""
if not self.speaker_embeddings:
return None, 0.0
best_speaker = None
best_similarity = -1.0
for speaker_id, embeddings in self.speaker_embeddings.items():
# Average speaker embedding
speaker_emb = np.mean(embeddings, axis=0)
# Cosine similarity
similarity = np.dot(embedding, speaker_emb) / (
np.linalg.norm(embedding) * np.linalg.norm(speaker_emb) + 1e-8
)
if similarity > best_similarity:
best_similarity = similarity
best_speaker = speaker_id
return best_speaker, best_similarity
Evaluation Metrics
Diarization Error Rate (DER)
def calculate_der(
reference: List[DiarizationSegment],
hypothesis: List[DiarizationSegment],
collar: float = 0.25
) -> Dict[str, float]:
"""
Calculate Diarization Error Rate.
DER = (False Alarm + Missed Detection + Speaker Error) / Total Speech Time
Args:
reference: Ground truth segments
hypothesis: Predicted segments
collar: Forgiveness collar around boundaries (seconds)
Returns:
Dictionary with DER components
"""
# Convert segments to frame-level labels
# Simplified implementation
total_speech_time = sum(seg.duration for seg in reference)
# Calculate overlap with collar
false_alarm = 0.0
missed_detection = 0.0
speaker_error = 0.0
# ... detailed calculation ...
der = (false_alarm + missed_detection + speaker_error) / total_speech_time
return {
"der": der,
"false_alarm": false_alarm / total_speech_time,
"missed_detection": missed_detection / total_speech_time,
"speaker_error": speaker_error / total_speech_time
}
Real-World Case Study: Zoom’s Diarization
Zoom’s Approach
Zoom processes 300M+ meetings daily with speaker diarization:
Architecture:
- Real-time VAD:
- WebRTC VAD for low latency
- Runs on client side
- Filters silence before sending to server
- Embedding extraction:
- Lightweight TDNN model
- 128-dim embeddings
- <10ms per segment
- Online clustering:
- Incremental spectral clustering
- Updates speaker profiles in real-time
- Handles participants joining/leaving
- Post-processing:
- Offline refinement after meeting
- Improves boundary accuracy
- Corrects speaker switches
Results:
- DER: 8-12% (depending on audio quality)
- Latency: <500ms for real-time
- Throughput: 300M+ meetings/day
- Cost: <$0.005 per meeting hour
Key Lessons
- Hybrid online/offline: Real-time + post-processing
- Lightweight models: Fast embeddings critical
- Incremental clustering: Can’t wait for full audio
- Client-side VAD: Reduces bandwidth and cost
- Quality adaptation: Adjust based on audio conditions
Cost Analysis
Cost Breakdown (1000 hours audio/day)
| Component | On-premise | Cloud | Serverless |
|---|---|---|---|
| VAD | $10/day | $20/day | $5/day |
| Embedding extraction | $200/day | $500/day | $300/day |
| Clustering | $50/day | $100/day | $50/day |
| Storage | $20/day | $30/day | $30/day |
| Total | $280/day | $650/day | $385/day |
| Per hour | $0.28 | $0.65 | $0.39 |
Optimization strategies:
- Batch processing:
- Process in larger batches
- Amortize overhead
- Savings: 40%
- Model optimization:
- Quantization (INT8)
- Distillation
- Savings: 50% compute
- Caching:
- Cache speaker profiles
- Reuse across sessions
- Savings: 20%
- Smart sampling:
- Variable segment duration
- Skip easy segments
- Savings: 30%
Key Takeaways
✅ Diarization = clustering audio by speaker using embedding similarity
✅ x-vectors are standard for speaker embeddings (512-dim)
✅ AHC works well for offline diarization with auto speaker count
✅ Online diarization is harder - no future context, must be fast
✅ VAD is critical - removes 50-80% of audio (silence)
✅ Same pattern as anagrams/clustering - group by similarity signature
✅ DER < 10% is good for production systems
✅ Embedding quality matters most - better embeddings > better clustering
✅ Real-time requires streaming - process chunks, incremental updates
✅ Hybrid approach best - online for speed, offline for accuracy
Connection to Thematic Link: Grouping Similar Items with Hash-Based Approaches
All three topics share the same grouping pattern:
DSA (Group Anagrams):
- Items: strings
- Signature: sorted characters
- Grouping: exact hash match
- Result: anagram groups
ML System Design (Clustering Systems):
- Items: data points
- Signature: quantized vector or nearest centroid
- Grouping: approximate similarity
- Result: data clusters
Speech Tech (Speaker Diarization):
- Items: audio segments
- Signature: voice embedding (x-vector)
- Grouping: cosine similarity threshold
- Result: speaker-labeled segments
Universal Pattern
# Generic grouping pattern
def group_by_similarity(items, embed_function, similarity_threshold):
"""
Universal pattern for grouping similar items.
Used in:
- Anagrams: embed = sort, threshold = exact match
- Clustering: embed = features, threshold = distance
- Diarization: embed = x-vector, threshold = cosine similarity
"""
embeddings = [embed_function(item) for item in items]
# Cluster by similarity
groups = []
assigned = set()
for i, emb_i in enumerate(embeddings):
if i in assigned:
continue
group = [i]
assigned.add(i)
for j, emb_j in enumerate(embeddings[i+1:], start=i+1):
if j in assigned:
continue
# Check similarity
similarity = compute_similarity(emb_i, emb_j)
if similarity > similarity_threshold:
group.append(j)
assigned.add(j)
groups.append(group)
return groups
This pattern is universal across:
- String algorithms (anagrams)
- Machine learning (clustering)
- Speech processing (diarization)
- Computer vision (object tracking)
- Natural language processing (document clustering)
Originally published at: arunbaby.com/speech-tech/0015-speaker-clustering-diarization
If you found this helpful, consider sharing it with others who might benefit.