Voice Enhancement & Noise Reduction
Build systems that enhance voice quality by removing noise, improving intelligibility, and optimizing audio for speech applications.
TL;DR
Voice enhancement removes background noise while preserving speech naturalness. Classical methods (spectral subtraction, Wiener filtering) work well for stationary noise. Deep learning approaches – mask-based networks (LSTM) and end-to-end waveform models (Conv-TasNet) – handle non-stationary noise better. Production deployment requires sub-50ms streaming with overlap-add, ONNX runtime optimization, and automated quality monitoring via PESQ/STOI/SNR metrics. Multi-channel beamforming adds spatial noise suppression for microphone arrays. Related: audio preprocessing covers upstream signal conditioning and speech separation addresses multi-speaker scenarios.

Introduction
Voice enhancement improves speech quality by:
- Removing background noise (traffic, wind, keyboard)
- Suppressing reverberation
- Normalizing volume levels
- Enhancing speech intelligibility
- Removing artifacts and distortion
Critical for:
- Video conferencing (Zoom, Teams, Meet)
- Voice assistants (Alexa, Siri, Google Assistant)
- Podcast/content creation
- Hearing aids
- Telecommunication
- Speech recognition systems
Key challenges:
- Real-time processing (< 50ms latency)
- Preserving speech quality
- Handling diverse noise types
- Low computational cost
- Avoiding artifacts
Problem Formulation
Input/Output
Input: Noisy speech signal
y(t) = s(t) + n(t)
where:
s(t) = clean speech
n(t) = noise
Output: Enhanced speech signal
ŝ(t) ≈ s(t)
Goal: Minimize ‖ŝ(t) - s(t)‖ while maintaining naturalness
Quality Metrics
import numpy as np
from scipy import signal
def calculate_snr(clean_speech, noisy_speech):
"""
Calculate Signal-to-Noise Ratio
SNR = 10 * log10(P_signal / P_noise)
Higher is better (typically 10-30 dB)
"""
signal_power = np.mean(clean_speech ** 2)
noise = noisy_speech - clean_speech
noise_power = np.mean(noise ** 2)
if noise_power == 0:
return float('inf')
snr = 10 * np.log10(signal_power / noise_power)
return snr
def calculate_pesq(reference, degraded, sr=16000):
"""
Calculate PESQ (Perceptual Evaluation of Speech Quality)
Range: -0.5 to 4.5 (higher is better)
Industry standard for speech quality
"""
from pesq import pesq
# PESQ requires 8kHz or 16kHz
if sr not in [8000, 16000]:
raise ValueError("PESQ requires sr=8000 or sr=16000")
mode = 'nb' if sr == 8000 else 'wb'
score = pesq(sr, reference, degraded, mode)
return score
def calculate_stoi(clean, enhanced, sr=16000):
"""
Calculate STOI (Short-Time Objective Intelligibility)
Range: 0 to 1 (higher is better)
Correlates well with speech intelligibility
"""
from pystoi import stoi
score = stoi(clean, enhanced, sr, extended=False)
return score
# Usage
clean = np.random.randn(16000) # 1 second at 16kHz
noisy = clean + 0.1 * np.random.randn(16000)
snr = calculate_snr(clean, noisy)
print(f"SNR: {snr:.2f} dB")
# pesq_score = calculate_pesq(clean, noisy, sr=16000)
# print(f"PESQ: {pesq_score:.2f}")
Classical Methods
1. Spectral Subtraction
Subtract noise spectrum from noisy spectrum
import librosa
import numpy as np
class SpectralSubtraction:
"""
Classic spectral subtraction for noise reduction
Steps:
1. Estimate noise spectrum (from silence periods)
2. Subtract from noisy spectrum
3. Half-wave rectification
4. Reconstruct signal
"""
def __init__(self, n_fft=512, hop_length=128):
self.n_fft = n_fft
self.hop_length = hop_length
self.noise_profile = None
def estimate_noise(self, noise_audio, sr=16000):
"""
Estimate noise spectrum from noise-only segment
Args:
noise_audio: Audio containing only noise
"""
# STFT of noise
noise_stft = librosa.stft(
noise_audio,
n_fft=self.n_fft,
hop_length=self.hop_length
)
# Average magnitude spectrum
self.noise_profile = np.mean(np.abs(noise_stft), axis=1, keepdims=True)
def enhance(self, noisy_audio, alpha=2.0, beta=0.002):
"""
Apply spectral subtraction
Args:
noisy_audio: Noisy speech signal
alpha: Over-subtraction factor (higher = more aggressive)
beta: Spectral floor (prevents negative values)
Returns:
Enhanced audio
"""
if self.noise_profile is None:
raise ValueError("Must estimate noise first")
# STFT of noisy signal
noisy_stft = librosa.stft(
noisy_audio,
n_fft=self.n_fft,
hop_length=self.hop_length
)
# Magnitude and phase
mag = np.abs(noisy_stft)
phase = np.angle(noisy_stft)
# Spectral subtraction
enhanced_mag = mag - alpha * self.noise_profile
# Half-wave rectification with spectral floor
enhanced_mag = np.maximum(enhanced_mag, beta * mag)
# Reconstruct with original phase
enhanced_stft = enhanced_mag * np.exp(1j * phase)
# Inverse STFT
enhanced_audio = librosa.istft(
enhanced_stft,
hop_length=self.hop_length
)
return enhanced_audio
# Usage
sr = 16000
# Load noisy speech
noisy_speech, _ = librosa.load('noisy_speech.wav', sr=sr)
# Estimate noise from first 0.5 seconds (assumed to be silence)
noise_segment = noisy_speech[:int(0.5 * sr)]
enhancer = SpectralSubtraction()
enhancer.estimate_noise(noise_segment)
# Enhance full audio
enhanced = enhancer.enhance(noisy_speech, alpha=2.0)
# Save result
import soundfile as sf
sf.write('enhanced_speech.wav', enhanced, sr)
2. Wiener Filtering
Optimal filter in MMSE sense
class WienerFilter:
"""
Wiener filtering for speech enhancement
Minimizes mean squared error between clean and enhanced speech
"""
def __init__(self, n_fft=512, hop_length=128):
self.n_fft = n_fft
self.hop_length = hop_length
self.noise_psd = None
def estimate_noise_psd(self, noise_audio):
"""Estimate noise power spectral density"""
noise_stft = librosa.stft(
noise_audio,
n_fft=self.n_fft,
hop_length=self.hop_length
)
# Power spectral density
self.noise_psd = np.mean(np.abs(noise_stft) ** 2, axis=1, keepdims=True)
def enhance(self, noisy_audio, a_priori_snr=None):
"""
Apply Wiener filtering
Wiener gain: H = S / (S + N)
where S = signal PSD, N = noise PSD
"""
if self.noise_psd is None:
raise ValueError("Must estimate noise PSD first")
# STFT
noisy_stft = librosa.stft(
noisy_audio,
n_fft=self.n_fft,
hop_length=self.hop_length
)
# Noisy PSD
noisy_psd = np.abs(noisy_stft) ** 2
# Estimate clean speech PSD
speech_psd = np.maximum(noisy_psd - self.noise_psd, 0)
# Wiener gain
wiener_gain = speech_psd / (speech_psd + self.noise_psd + 1e-10)
# Apply gain
enhanced_stft = wiener_gain * noisy_stft
# Inverse STFT
enhanced_audio = librosa.istft(
enhanced_stft,
hop_length=self.hop_length
)
return enhanced_audio
# Usage
wiener = WienerFilter()
wiener.estimate_noise_psd(noise_segment)
enhanced = wiener.enhance(noisy_speech)
Deep Learning Approaches
1. Mask-Based Enhancement
Learn ideal ratio mask (IRM) or ideal binary mask (IBM)
import torch
import torch.nn as nn
class MaskEstimationNet(nn.Module):
"""
Neural network for mask estimation
Predicts time-frequency mask to apply to noisy spectrogram
"""
def __init__(self, n_fft=512, hidden_dim=128):
super().__init__()
self.n_freq = n_fft // 2 + 1
# Bidirectional LSTM
self.lstm = nn.LSTM(
input_size=self.n_freq,
hidden_size=hidden_dim,
num_layers=2,
batch_first=True,
bidirectional=True
)
# Mask prediction
self.mask_fc = nn.Sequential(
nn.Linear(hidden_dim * 2, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim, self.n_freq),
nn.Sigmoid() # Mask values in [0, 1]
)
def forward(self, noisy_mag):
"""
Args:
noisy_mag: Noisy magnitude spectrogram [batch, time, freq]
Returns:
mask: Predicted mask [batch, time, freq]
"""
# LSTM
lstm_out, _ = self.lstm(noisy_mag)
# Predict mask
mask = self.mask_fc(lstm_out)
return mask
class MaskBasedEnhancer:
"""
Speech enhancement using learned mask
"""
def __init__(self, model, n_fft=512, hop_length=128):
self.model = model
self.model.eval()
self.n_fft = n_fft
self.hop_length = hop_length
def enhance(self, noisy_audio):
"""
Enhance audio using learned mask
Steps:
1. Compute noisy spectrogram
2. Predict mask with neural network
3. Apply mask
4. Reconstruct audio
"""
# STFT
noisy_stft = librosa.stft(
noisy_audio,
n_fft=self.n_fft,
hop_length=self.hop_length
)
# Magnitude and phase
noisy_mag = np.abs(noisy_stft)
phase = np.angle(noisy_stft)
# Normalize magnitude
mag_mean = np.mean(noisy_mag)
mag_std = np.std(noisy_mag)
noisy_mag_norm = (noisy_mag - mag_mean) / (mag_std + 1e-8)
# Predict mask
with torch.no_grad():
# Transpose to [1, time, freq]
mag_tensor = torch.FloatTensor(noisy_mag_norm.T).unsqueeze(0)
mask = self.model(mag_tensor)
# Back to numpy
mask = mask.squeeze(0).numpy().T
# Apply mask
enhanced_mag = noisy_mag * mask
# Reconstruct
enhanced_stft = enhanced_mag * np.exp(1j * phase)
enhanced_audio = librosa.istft(
enhanced_stft,
hop_length=self.hop_length
)
return enhanced_audio
# Usage
model = MaskEstimationNet(n_fft=512)
enhancer = MaskBasedEnhancer(model)
# Enhance
enhanced = enhancer.enhance(noisy_speech)
2. End-to-End Waveform Enhancement
Direct waveform→waveform mapping
class ConvTasNet(nn.Module):
"""
Conv-TasNet for speech enhancement
End-to-end time-domain speech separation
Based on: "Conv-TasNet: Surpassing Ideal Time-Frequency Masking"
"""
def __init__(self, N=256, L=20, B=256, H=512, P=3, X=8, R=3):
"""
Args:
N: Number of filters in autoencoder
L: Length of filters (ms)
B: Number of channels in bottleneck
H: Number of channels in conv blocks
P: Kernel size in conv blocks
X: Number of conv blocks in each repeat
R: Number of repeats
"""
super().__init__()
# Encoder (waveform → features)
self.encoder = nn.Conv1d(1, N, L, stride=L//2, padding=L//2)
# Separator (TCN blocks)
self.separator = self._build_separator(N, B, H, P, X, R)
# Decoder (features → waveform)
self.decoder = nn.ConvTranspose1d(N, 1, L, stride=L//2, padding=L//2)
def _build_separator(self, N, B, H, P, X, R):
"""Build temporal convolutional network"""
layers = []
# Layer normalization
layers.append(nn.LayerNorm(N))
# Bottleneck
layers.append(nn.Conv1d(N, B, 1))
# TCN blocks
for r in range(R):
for x in range(X):
dilation = 2 ** x
layers.append(
TemporalConvBlock(B, H, P, dilation)
)
# Output projection
layers.append(nn.PReLU())
layers.append(nn.Conv1d(B, N, 1))
return nn.Sequential(*layers)
def forward(self, mixture):
"""
Args:
mixture: Noisy waveform [batch, 1, samples]
Returns:
estimated_clean: Enhanced waveform [batch, 1, samples]
"""
# Encode
encoded = self.encoder(mixture) # [batch, N, T]
# Separate
mask = self.separator(encoded) # [batch, N, T]
# Apply mask
separated = encoded * mask
# Decode
estimated = self.decoder(separated) # [batch, 1, samples]
return estimated
class TemporalConvBlock(nn.Module):
"""
Temporal convolutional block with dilated convolutions
"""
def __init__(self, in_channels, hidden_channels, kernel_size, dilation):
super().__init__()
self.conv1 = nn.Conv1d(
in_channels, hidden_channels,
kernel_size, dilation=dilation,
padding=dilation * (kernel_size - 1) // 2
)
self.prelu1 = nn.PReLU()
self.norm1 = nn.GroupNorm(1, hidden_channels)
self.conv2 = nn.Conv1d(
hidden_channels, in_channels,
1
)
self.prelu2 = nn.PReLU()
self.norm2 = nn.GroupNorm(1, in_channels)
def forward(self, x):
"""
Args:
x: [batch, channels, time]
"""
residual = x
out = self.conv1(x)
out = self.prelu1(out)
out = self.norm1(out)
out = self.conv2(out)
out = self.prelu2(out)
out = self.norm2(out)
return out + residual
# Usage
model = ConvTasNet()
noisy_tensor = torch.randn(1, 1, 16000) # 1 second
enhanced_tensor = model(noisy_tensor)
Real-Time Enhancement
For the broader streaming architecture pattern (WebSockets, buffering, adaptive chunking), see Streaming Speech Processing Pipeline.
Streaming Enhancement System
import numpy as np
from collections import deque
class StreamingEnhancer:
"""
Real-time streaming speech enhancement
Requirements:
- Low latency (< 50ms)
- Causal processing
- Minimal buffering
"""
def __init__(self, model, chunk_size=512, overlap=256, sr=16000):
"""
Args:
chunk_size: Samples per chunk
overlap: Overlap between chunks (for smooth transitions)
"""
self.model = model
self.chunk_size = chunk_size
self.overlap = overlap
self.sr = sr
# Circular buffer for overlap-add
self.buffer = deque(maxlen=overlap)
self.output_buffer = deque(maxlen=overlap)
self.processed_chunks = 0
def process_chunk(self, audio_chunk):
"""
Process single audio chunk
Args:
audio_chunk: Audio samples [chunk_size]
Returns:
Enhanced audio chunk
"""
# Add previous overlap
if len(self.buffer) > 0:
input_chunk = np.concatenate([
np.array(self.buffer),
audio_chunk
])
else:
input_chunk = audio_chunk
# Enhance
enhanced = self._enhance_chunk(input_chunk)
# Overlap-add with linear cross-fade
if len(self.output_buffer) > 0:
# Smooth transition
overlap_region = min(len(self.output_buffer), self.overlap)
for i in range(overlap_region):
weight = i / overlap_region
enhanced[i] = (1 - weight) * self.output_buffer[i] + weight * enhanced[i]
# Save overlap for next chunk
self.buffer.clear()
self.buffer.extend(audio_chunk[-self.overlap:])
self.output_buffer.clear()
self.output_buffer.extend(enhanced[-self.overlap:])
self.processed_chunks += 1
# Return non-overlap part
return enhanced[:-self.overlap] if len(enhanced) > self.overlap else enhanced
def _enhance_chunk(self, audio_chunk):
"""Enhance using model"""
# Convert to tensor
audio_tensor = torch.FloatTensor(audio_chunk).unsqueeze(0).unsqueeze(0)
# Enhance
with torch.no_grad():
enhanced_tensor = self.model(audio_tensor)
# Back to numpy
enhanced = enhanced_tensor.squeeze().numpy()
return enhanced
def get_latency_ms(self):
"""Calculate processing latency"""
return (self.chunk_size / self.sr) * 1000
# Usage for real-time processing
model = ConvTasNet()
enhancer = StreamingEnhancer(model, chunk_size=512, overlap=256, sr=16000)
print(f"Latency: {enhancer.get_latency_ms():.2f} ms")
# Process audio stream
import sounddevice as sd
def audio_callback(indata, outdata, frames, time, status):
"""Real-time audio callback"""
# Get input chunk
input_chunk = indata[:, 0]
# Enhance
enhanced_chunk = enhancer.process_chunk(input_chunk)
# Output
outdata[:len(enhanced_chunk), 0] = enhanced_chunk
if status:
print(f"Status: {status}")
# Start real-time processing
with sd.Stream(
samplerate=16000,
channels=1,
callback=audio_callback,
blocksize=512
):
print("Processing audio in real-time... Press Ctrl+C to stop")
sd.sleep(10000)
Multi-Channel Enhancement
Beamforming
class BeamformerEnhancer:
"""
Beamforming for multi-microphone enhancement
Uses spatial information to enhance target speech
"""
def __init__(self, n_mics=4, sr=16000):
self.n_mics = n_mics
self.sr = sr
def delay_and_sum(self, multi_channel_audio, target_direction=0):
"""
Delay-and-sum beamforming
Args:
multi_channel_audio: [n_mics, n_samples]
target_direction: Target angle in degrees (0 = front)
Returns:
Enhanced single-channel audio
"""
n_samples = multi_channel_audio.shape[1]
# Calculate delays for each microphone
# (Simplified: assumes linear array)
mic_spacing = 0.05 # 5cm between mics
speed_of_sound = 343 # m/s
delays = []
for i in range(self.n_mics):
distance_diff = i * mic_spacing * np.sin(np.deg2rad(target_direction))
delay_samples = int(distance_diff / speed_of_sound * self.sr)
delays.append(delay_samples)
# Align and sum
aligned_signals = []
for i, delay in enumerate(delays):
sig = multi_channel_audio[i]
if delay > 0:
# Delay by pre-pending zeros
padded = np.concatenate([np.zeros(delay, dtype=sig.dtype), sig])
aligned = padded[:n_samples]
elif delay < 0:
# Advance by removing first samples
aligned = sig[-delay:]
if aligned.shape[0] < n_samples:
aligned = np.pad(aligned, (0, n_samples - aligned.shape[0]), mode='constant')
else:
aligned = sig
aligned_signals.append(aligned)
# Sum aligned signals
enhanced = np.mean(aligned_signals, axis=0)
return enhanced
def mvdr_beamformer(self, multi_channel_audio, noise_segment):
"""
MVDR (Minimum Variance Distortionless Response) beamformer
Optimal beamformer for known noise covariance
"""
# Compute noise covariance matrix
noise_cov = self._compute_covariance(noise_segment)
# Compute signal+noise covariance
signal_noise_cov = self._compute_covariance(multi_channel_audio)
# MVDR weights
# w = R_n^{-1} * a / (a^H * R_n^{-1} * a)
# where a is steering vector
# Simplified: assume steering vector points to channel 0
steering_vector = np.zeros((self.n_mics, 1))
steering_vector[0] = 1
# Compute weights
inv_noise_cov = np.linalg.pinv(noise_cov)
numerator = inv_noise_cov @ steering_vector
denominator = steering_vector.T @ inv_noise_cov @ steering_vector
weights = numerator / (denominator + 1e-10)
# Apply weights
enhanced = weights.T @ multi_channel_audio
return enhanced.squeeze()
def _compute_covariance(self, signal):
"""Compute covariance matrix"""
# [n_mics, n_samples] → [n_mics, n_mics]
cov = signal @ signal.T / signal.shape[1]
return cov
# Usage
beamformer = BeamformerEnhancer(n_mics=4, sr=16000)
# Multi-channel recording
multi_ch_audio = np.random.randn(4, 16000) # 4 mics, 1 second
# Enhance using delay-and-sum
enhanced_ds = beamformer.delay_and_sum(multi_ch_audio, target_direction=0)
# Or using MVDR
noise_segment = multi_ch_audio[:, :8000] # First 0.5 seconds
enhanced_mvdr = beamformer.mvdr_beamformer(multi_ch_audio, noise_segment)
Connection to Caching
Voice enhancement benefits from caching strategies:
class EnhancementCache:
"""
Cache enhanced audio segments
Connection to ML:
- Cache expensive enhancement operations
- LRU for frequently accessed segments
- TTL for time-sensitive applications
"""
def __init__(self, capacity=1000):
from collections import OrderedDict
self.cache = OrderedDict()
self.capacity = capacity
self.hits = 0
self.misses = 0
def get_enhanced(self, audio_segment, model):
"""
Get enhanced audio with caching
Args:
audio_segment: Raw audio
model: Enhancement model
Returns:
Enhanced audio
"""
# Create cache key (hash of audio)
cache_key = hash(audio_segment.tobytes())
# Check cache
if cache_key in self.cache:
self.hits += 1
self.cache.move_to_end(cache_key) # Mark as recently used
return self.cache[cache_key]
# Compute enhancement
self.misses += 1
enhanced = model.enhance(audio_segment)
# Cache result
self.cache[cache_key] = enhanced
# Evict if over capacity
if len(self.cache) > self.capacity:
self.cache.popitem(last=False)
return enhanced
def get_hit_rate(self):
"""Calculate cache hit rate"""
total = self.hits + self.misses
return self.hits / total if total > 0 else 0
# Usage
cache = EnhancementCache(capacity=1000)
model = ConvTasNet()
# Process with caching
for audio_segment in audio_stream:
enhanced = cache.get_enhanced(audio_segment, model)
print(f"Cache hit rate: {cache.get_hit_rate():.2%}")
Understanding Audio Enhancement Fundamentals
Why Enhancement is Critical
Voice enhancement is the foundation of any production speech system. Poor audio quality cascades through the entire pipeline:
class AudioQualityImpactAnalyzer:
"""
Analyze impact of audio quality on downstream tasks
Demonstrates how SNR affects ASR accuracy, speaker recognition, etc.
"""
def __init__(self, asr_model, speaker_model):
self.asr_model = asr_model
self.speaker_model = speaker_model
def evaluate_quality_impact(self, clean_audio, noisy_audio, transcript):
"""
Compare performance on clean vs noisy audio
Returns:
Dictionary with metrics for both conditions
"""
# ASR on clean audio
clean_prediction = self.asr_model.transcribe(clean_audio)
clean_wer = self._calculate_wer(transcript, clean_prediction)
# ASR on noisy audio
noisy_prediction = self.asr_model.transcribe(noisy_audio)
noisy_wer = self._calculate_wer(transcript, noisy_prediction)
# Speaker embedding quality
clean_embedding = self.speaker_model.extract_embedding(clean_audio)
noisy_embedding = self.speaker_model.extract_embedding(noisy_audio)
# Embedding similarity (should be close for same speaker)
similarity = np.dot(clean_embedding, noisy_embedding) / (
np.linalg.norm(clean_embedding) * np.linalg.norm(noisy_embedding)
)
# Calculate SNR
snr_db = self._calculate_snr(clean_audio, noisy_audio)
return {
'snr_db': snr_db,
'clean_wer': clean_wer,
'noisy_wer': noisy_wer,
'wer_degradation': noisy_wer - clean_wer,
'embedding_similarity': similarity,
'relative_performance': clean_wer / noisy_wer if noisy_wer > 0 else 1.0
}
def _calculate_wer(self, reference, hypothesis):
"""Calculate Word Error Rate"""
import editdistance
ref_words = reference.lower().split()
hyp_words = hypothesis.lower().split()
distance = editdistance.eval(ref_words, hyp_words)
wer = distance / len(ref_words) if len(ref_words) > 0 else 0
return wer
def _calculate_snr(self, clean, noisy):
"""Calculate Signal-to-Noise Ratio"""
noise = noisy - clean
signal_power = np.mean(clean ** 2)
noise_power = np.mean(noise ** 2)
if noise_power == 0:
return float('inf')
snr = 10 * np.log10(signal_power / noise_power)
return snr
# Demo impact analysis
print("="*60)
print("AUDIO QUALITY IMPACT ANALYSIS")
print("="*60)
# Simulate different SNR levels
snr_levels = [-5, 0, 5, 10, 15, 20]
for snr_target in snr_levels:
# Add noise at specific SNR
noisy = add_noise_at_snr(clean_audio, noise, snr_target)
# Evaluate
results = analyzer.evaluate_quality_impact(clean_audio, noisy, transcript)
print(f"\nSNR: {snr_target} dB")
print(f" WER (clean): {results['clean_wer']:.2%}")
print(f" WER (noisy): {results['noisy_wer']:.2%}")
print(f" Degradation: {results['wer_degradation']:.2%}")
print(f" Speaker Sim: {results['embedding_similarity']:.3f}")
Frequency Domain Analysis
Understanding audio in frequency domain is crucial for enhancement:
class FrequencyDomainAnalyzer:
"""
Analyze and visualize audio in frequency domain
Essential for understanding what noise reduction does
"""
def __init__(self, sr=16000):
self.sr = sr
def analyze_spectrum(self, audio):
"""
Compute and visualize spectrum
Returns:
frequencies, magnitudes, phases
"""
# Compute FFT
n_fft = 2048
fft = np.fft.rfft(audio, n=n_fft)
# Magnitude and phase
magnitude = np.abs(fft)
phase = np.angle(fft)
# Frequency bins
frequencies = np.fft.rfftfreq(n_fft, 1/self.sr)
return frequencies, magnitude, phase
def compare_spectra(self, clean, noisy, enhanced):
"""
Compare spectra before and after enhancement
"""
import matplotlib.pyplot as plt
# Compute spectra
freq_clean, mag_clean, _ = self.analyze_spectrum(clean)
freq_noisy, mag_noisy, _ = self.analyze_spectrum(noisy)
freq_enhanced, mag_enhanced, _ = self.analyze_spectrum(enhanced)
# Plot
fig, axes = plt.subplots(3, 1, figsize=(12, 10))
# Clean
axes[0].plot(freq_clean, 20 * np.log10(mag_clean + 1e-10))
axes[0].set_title('Clean Audio Spectrum')
axes[0].set_ylabel('Magnitude (dB)')
axes[0].grid(True)
# Noisy
axes[1].plot(freq_noisy, 20 * np.log10(mag_noisy + 1e-10), color='red')
axes[1].set_title('Noisy Audio Spectrum')
axes[1].set_ylabel('Magnitude (dB)')
axes[1].grid(True)
# Enhanced
axes[2].plot(freq_enhanced, 20 * np.log10(mag_enhanced + 1e-10), color='green')
axes[2].set_title('Enhanced Audio Spectrum')
axes[2].set_xlabel('Frequency (Hz)')
axes[2].set_ylabel('Magnitude (dB)')
axes[2].grid(True)
plt.tight_layout()
plt.savefig('spectrum_comparison.png')
plt.close()
def compute_spectral_features(self, audio):
"""
Compute spectral features for quality assessment
"""
freq, mag, _ = self.analyze_spectrum(audio)
# Spectral centroid
centroid = np.sum(freq * mag) / np.sum(mag)
# Spectral bandwidth
bandwidth = np.sqrt(np.sum(((freq - centroid) ** 2) * mag) / np.sum(mag))
# Spectral flatness (Wiener entropy)
geometric_mean = np.exp(np.mean(np.log(mag + 1e-10)))
arithmetic_mean = np.mean(mag)
flatness = geometric_mean / arithmetic_mean
# Spectral rolloff (95% of energy)
cumsum = np.cumsum(mag)
rolloff_idx = np.where(cumsum >= 0.95 * cumsum[-1])[0][0]
rolloff = freq[rolloff_idx]
return {
'centroid_hz': centroid,
'bandwidth_hz': bandwidth,
'flatness': flatness,
'rolloff_hz': rolloff
}
# Usage
analyzer = FrequencyDomainAnalyzer(sr=16000)
# Analyze audio
features = analyzer.compute_spectral_features(audio)
print("Spectral Features:")
print(f" Centroid: {features['centroid_hz']:.1f} Hz")
print(f" Bandwidth: {features['bandwidth_hz']:.1f} Hz")
print(f" Flatness: {features['flatness']:.3f}")
print(f" Rolloff: {features['rolloff_hz']:.1f} Hz")
# Compare before/after
analyzer.compare_spectra(clean_audio, noisy_audio, enhanced_audio)
Advanced Deep Learning Enhancement
State-of-the-Art Architectures
class ConvTasNetEnhancer(nn.Module):
"""
Conv-TasNet for speech enhancement
Architecture:
1. Encoder: Waveform → Feature representation
2. Separator: Mask estimation using temporal convolutions
3. Decoder: Masked features → Enhanced waveform
Advantages over STFT-based methods:
- Operates on raw waveform
- Learnable basis functions
- Better phase reconstruction
"""
def __init__(
self,
n_src=1,
n_filters=512,
kernel_size=16,
stride=8,
n_blocks=8,
n_repeats=3,
bn_chan=128,
hid_chan=512,
skip_chan=128
):
super().__init__()
# Encoder: 1D conv
self.encoder = nn.Conv1d(
1,
n_filters,
kernel_size=kernel_size,
stride=stride,
padding=kernel_size // 2
)
# Separator: TCN blocks
self.separator = TemporalConvNet(
n_filters,
n_src,
n_blocks=n_blocks,
n_repeats=n_repeats,
bn_chan=bn_chan,
hid_chan=hid_chan,
skip_chan=skip_chan
)
# Decoder: 1D transposed conv
self.decoder = nn.ConvTranspose1d(
n_filters,
1,
kernel_size=kernel_size,
stride=stride,
padding=kernel_size // 2
)
def forward(self, waveform):
"""
Enhance waveform
Args:
waveform: [batch, time]
Returns:
enhanced: [batch, time]
"""
# Add channel dimension
x = waveform.unsqueeze(1) # [batch, 1, time]
# Encode
encoded = self.encoder(x) # [batch, n_filters, time']
# Separate (estimate mask)
masks = self.separator(encoded) # [batch, n_src, n_filters, time']
# Apply mask
masked = encoded.unsqueeze(1) * masks # [batch, n_src, n_filters, time']
# Decode
enhanced = self.decoder(masked.squeeze(1)) # [batch, 1, time]
# Remove channel dimension
enhanced = enhanced.squeeze(1) # [batch, time]
# Trim to original length
if enhanced.shape[-1] != waveform.shape[-1]:
enhanced = enhanced[..., :waveform.shape[-1]]
return enhanced
class TemporalConvNet(nn.Module):
"""
Temporal Convolutional Network for Conv-TasNet
Stack of dilated conv blocks with skip connections
"""
def __init__(
self,
n_filters,
n_src,
n_blocks=8,
n_repeats=3,
bn_chan=128,
hid_chan=512,
skip_chan=128
):
super().__init__()
# Layer norm
self.layer_norm = nn.GroupNorm(1, n_filters)
# Bottleneck
self.bottleneck = nn.Conv1d(n_filters, bn_chan, 1)
# TCN blocks
self.blocks = nn.ModuleList()
for r in range(n_repeats):
for b in range(n_blocks):
dilation = 2 ** b
self.blocks.append(
TCNBlock(
bn_chan,
hid_chan,
skip_chan,
kernel_size=3,
dilation=dilation
)
)
# Output
self.output = nn.Sequential(
nn.PReLU(),
nn.Conv1d(skip_chan, n_filters, 1),
nn.Sigmoid() # Mask should be [0, 1]
)
def forward(self, x):
"""
Args:
x: [batch, n_filters, time]
Returns:
masks: [batch, n_src, n_filters, time]
"""
# Normalize
x = self.layer_norm(x)
# Bottleneck
x = self.bottleneck(x) # [batch, bn_chan, time]
# Accumulate skip connections
skip_sum = 0
for block in self.blocks:
x, skip = block(x)
skip_sum = skip_sum + skip
# Output mask
masks = self.output(skip_sum)
# Unsqueeze for n_src dimension
masks = masks.unsqueeze(1) # [batch, 1, n_filters, time]
return masks
class TCNBlock(nn.Module):
"""Single TCN block with dilated convolution"""
def __init__(self, in_chan, hid_chan, skip_chan, kernel_size=3, dilation=1):
super().__init__()
self.conv1 = nn.Conv1d(
in_chan,
hid_chan,
1
)
self.prelu1 = nn.PReLU()
self.norm1 = nn.GroupNorm(1, hid_chan)
self.depthwise_conv = nn.Conv1d(
hid_chan,
hid_chan,
kernel_size,
padding=(kernel_size - 1) * dilation // 2,
dilation=dilation,
groups=hid_chan
)
self.prelu2 = nn.PReLU()
self.norm2 = nn.GroupNorm(1, hid_chan)
self.conv2 = nn.Conv1d(hid_chan, in_chan, 1)
self.skip_conv = nn.Conv1d(hid_chan, skip_chan, 1)
def forward(self, x):
"""
Args:
x: [batch, in_chan, time]
Returns:
output: [batch, in_chan, time]
skip: [batch, skip_chan, time]
"""
residual = x
# 1x1 conv
x = self.conv1(x)
x = self.prelu1(x)
x = self.norm1(x)
# Depthwise conv
x = self.depthwise_conv(x)
x = self.prelu2(x)
x = self.norm2(x)
# Skip connection
skip = self.skip_conv(x)
# Output
x = self.conv2(x)
# Residual
output = x + residual
return output, skip
# Training Conv-TasNet
class ConvTasNetTrainer:
"""
Train Conv-TasNet for speech enhancement
"""
def __init__(self, model, device='cuda'):
self.model = model.to(device)
self.device = device
# Optimizer
self.optimizer = torch.optim.Adam(
self.model.parameters(),
lr=1e-3
)
# Learning rate scheduler
self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
self.optimizer,
mode='min',
factor=0.5,
patience=3
)
def train_epoch(self, train_loader):
"""Train one epoch"""
self.model.train()
total_loss = 0
for batch_idx, (noisy, clean) in enumerate(train_loader):
noisy = noisy.to(self.device)
clean = clean.to(self.device)
# Forward
enhanced = self.model(noisy)
# Loss: SI-SNR (Scale-Invariant SNR)
loss = self._si_snr_loss(enhanced, clean)
# Backward
self.optimizer.zero_grad()
loss.backward()
# Gradient clipping
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=5.0)
self.optimizer.step()
total_loss += loss.item()
if batch_idx % 100 == 0:
print(f"Batch {batch_idx}, Loss: {loss.item():.4f}")
return total_loss / len(train_loader)
def _si_snr_loss(self, estimate, target):
"""
Scale-Invariant Signal-to-Noise Ratio loss
Better than MSE for speech enhancement
"""
# Zero-mean
estimate_zm = estimate - estimate.mean(dim=-1, keepdim=True)
target_zm = target - target.mean(dim=-1, keepdim=True)
# <s', s>s / ||s||^2
dot = (estimate_zm * target_zm).sum(dim=-1, keepdim=True)
target_energy = (target_zm ** 2).sum(dim=-1, keepdim=True)
projection = dot * target_zm / (target_energy + 1e-8)
# Noise
noise = estimate_zm - projection
# SI-SNR
si_snr = 10 * torch.log10(
(projection ** 2).sum(dim=-1) / (noise ** 2).sum(dim=-1) + 1e-8
)
# Negative for loss (we want to maximize SI-SNR)
return -si_snr.mean()
# Usage
model = ConvTasNetEnhancer()
trainer = ConvTasNetTrainer(model, device='cuda')
# Train
for epoch in range(num_epochs):
train_loss = trainer.train_epoch(train_loader)
val_loss = trainer.validate(val_loader)
print(f"Epoch {epoch}: Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
trainer.scheduler.step(val_loss)
Real-Time Enhancement with ONNX
class RealTimeONNXEnhancer:
"""
Real-time enhancement using ONNX Runtime
Optimized for production deployment
"""
def __init__(self, onnx_model_path, chunk_size=4800):
"""
Args:
onnx_model_path: Path to exported ONNX model
chunk_size: Audio chunk size (samples)
"""
import onnxruntime as ort
self.chunk_size = chunk_size
# Load ONNX model
self.session = ort.InferenceSession(
onnx_model_path,
providers=['CUDAExecutionProvider', 'CPUExecutionProvider']
)
# Get input/output names
self.input_name = self.session.get_inputs()[0].name
self.output_name = self.session.get_outputs()[0].name
# State for streaming
self.reset_state()
def reset_state(self):
"""Reset streaming state"""
self.overlap_buffer = np.zeros(self.chunk_size // 2, dtype=np.float32)
def enhance_chunk(self, audio_chunk):
"""
Enhance single audio chunk with overlap-add
Args:
audio_chunk: [chunk_size] numpy array
Returns:
enhanced_chunk: [chunk_size] numpy array
"""
# Prepare input (batch dimension)
input_data = audio_chunk.astype(np.float32)[np.newaxis, :]
# Run inference
enhanced = self.session.run(
[self.output_name],
{self.input_name: input_data}
)[0][0]
# Overlap-add
overlap_size = len(self.overlap_buffer)
enhanced[:overlap_size] += self.overlap_buffer
# Save overlap for next chunk
self.overlap_buffer = enhanced[-overlap_size:].copy()
# Return without overlap region
return enhanced[:-overlap_size]
def enhance_stream(self, audio_stream):
"""
Enhance audio stream in real-time
Generator that yields enhanced chunks
"""
for chunk in audio_stream:
# Ensure correct size
if len(chunk) != self.chunk_size:
# Pad or skip
continue
# Enhance
enhanced = self.enhance_chunk(chunk)
yield enhanced
# Export PyTorch model to ONNX
def export_to_onnx(pytorch_model, onnx_path, chunk_size=4800):
"""
Export trained PyTorch model to ONNX
"""
pytorch_model.eval()
# Dummy input
dummy_input = torch.randn(1, chunk_size)
# Export
torch.onnx.export(
pytorch_model,
dummy_input,
onnx_path,
input_names=['audio_input'],
output_names=['audio_output'],
dynamic_axes={
'audio_input': {1: 'time'},
'audio_output': {1: 'time'}
},
opset_version=14
)
print(f"Model exported to {onnx_path}")
# Usage
# Export model
export_to_onnx(trained_model, 'convtasnet_enhancer.onnx')
# Create real-time enhancer
enhancer = RealTimeONNXEnhancer('convtasnet_enhancer.onnx', chunk_size=4800)
# Stream audio
def audio_stream_generator():
"""Generate audio chunks from microphone/file"""
# Implementation depends on audio source
pass
# Enhance stream
for enhanced_chunk in enhancer.enhance_stream(audio_stream_generator()):
# Play or save enhanced audio
pass
Production Quality Assurance
Automated Quality Metrics
class EnhancementQualityAssurance:
"""
Automated quality assurance for enhancement pipeline
Monitors:
- SNR improvement
- Speech intelligibility
- Artifacts
- Latency
"""
def __init__(self):
self.metrics_history = []
def assess_quality(self, original, enhanced, reference=None):
"""
Comprehensive quality assessment
Args:
original: Noisy input
enhanced: Enhanced output
reference: Clean reference (if available)
Returns:
Quality metrics dictionary
"""
metrics = {}
# SNR improvement (requires reference)
if reference is not None:
original_snr = self._compute_snr(original, reference)
enhanced_snr = self._compute_snr(enhanced, reference)
metrics['snr_improvement_db'] = enhanced_snr - original_snr
# PESQ (Perceptual Evaluation of Speech Quality)
from pesq import pesq
metrics['pesq_original'] = pesq(16000, reference, original, 'wb')
metrics['pesq_enhanced'] = pesq(16000, reference, enhanced, 'wb')
metrics['pesq_improvement'] = (
metrics['pesq_enhanced'] - metrics['pesq_original']
)
# STOI (Short-Time Objective Intelligibility)
from pystoi import stoi
metrics['stoi_original'] = stoi(reference, original, 16000)
metrics['stoi_enhanced'] = stoi(reference, enhanced, 16000)
metrics['stoi_improvement'] = (
metrics['stoi_enhanced'] - metrics['stoi_original']
)
# Artifact detection (no reference needed)
metrics['artifact_score'] = self._detect_artifacts(enhanced)
# Spectral distortion
metrics['spectral_distortion'] = self._compute_spectral_distortion(
original, enhanced
)
# Dynamic range
metrics['dynamic_range_db'] = 20 * np.log10(
np.max(np.abs(enhanced)) / (np.mean(np.abs(enhanced)) + 1e-8)
)
# Clipping detection
metrics['clipping_ratio'] = np.mean(np.abs(enhanced) > 0.99)
# Overall quality score
metrics['quality_score'] = self._compute_overall_score(metrics)
self.metrics_history.append(metrics)
return metrics
def _compute_snr(self, signal, reference):
"""Compute SNR"""
noise = signal - reference
signal_power = np.mean(reference ** 2)
noise_power = np.mean(noise ** 2)
if noise_power == 0:
return float('inf')
snr_db = 10 * np.log10(signal_power / noise_power)
return snr_db
def _detect_artifacts(self, audio):
"""
Detect musical noise and other artifacts
Returns:
Artifact score (0-1, lower is better)
"""
# Compute spectrogram
S = librosa.stft(audio)
magnitude = np.abs(S)
# Temporal variation
temporal_diff = np.diff(magnitude, axis=1)
temporal_variance = np.var(temporal_diff)
# Spectral variation
spectral_diff = np.diff(magnitude, axis=0)
spectral_variance = np.var(spectral_diff)
# High variance indicates artifacts
artifact_score = (temporal_variance + spectral_variance) / 2
# Normalize to [0, 1]
artifact_score = np.clip(artifact_score / 100, 0, 1)
return artifact_score
def _compute_spectral_distortion(self, original, enhanced):
"""
Compute spectral distortion
Measures how much the spectrum changed
"""
# Compute spectrograms
S_orig = np.abs(librosa.stft(original))
S_enh = np.abs(librosa.stft(enhanced))
# Log magnitude
S_orig_db = librosa.amplitude_to_db(S_orig + 1e-10)
S_enh_db = librosa.amplitude_to_db(S_enh + 1e-10)
# MSE in log domain
distortion = np.mean((S_orig_db - S_enh_db) ** 2)
return distortion
def _compute_overall_score(self, metrics):
"""
Compute overall quality score
Weighted combination of metrics
"""
score = 0.0
# PESQ improvement (if available)
if 'pesq_improvement' in metrics:
score += 0.4 * np.clip(metrics['pesq_improvement'] / 2, 0, 1)
# STOI improvement (if available)
if 'stoi_improvement' in metrics:
score += 0.4 * np.clip(metrics['stoi_improvement'], 0, 1)
# Artifact penalty
score -= 0.2 * metrics['artifact_score']
# Normalize to [0, 1]
score = np.clip(score, 0, 1)
return score
def generate_report(self):
"""Generate quality assurance report"""
if not self.metrics_history:
print("No metrics recorded")
return
# Aggregate metrics
avg_metrics = {}
for key in self.metrics_history[0].keys():
values = [m[key] for m in self.metrics_history if key in m]
avg_metrics[key] = np.mean(values)
print("\n" + "="*60)
print("ENHANCEMENT QUALITY ASSURANCE REPORT")
print("="*60)
print(f"Samples Evaluated: {len(self.metrics_history)}")
print(f"\nAverage Metrics:")
for key, value in avg_metrics.items():
print(f" {key:30s}: {value:.4f}")
# Pass/fail criteria
print(f"\n{'Criterion':<30s} {'Status':>10s}")
print("-" * 42)
checks = [
('SNR Improvement', avg_metrics.get('snr_improvement_db', 0) > 3, '>3 dB'),
('PESQ Improvement', avg_metrics.get('pesq_improvement', 0) > 0.5, '>0.5'),
('STOI Improvement', avg_metrics.get('stoi_improvement', 0) > 0.1, '>0.1'),
('Artifact Score', avg_metrics.get('artifact_score', 1) < 0.3, '<0.3'),
('Clipping Ratio', avg_metrics.get('clipping_ratio', 1) < 0.01, '<1%'),
]
all_passed = True
for name, passed, threshold in checks:
status = "✓ PASS" if passed else "✗ FAIL"
all_passed = all_passed and passed
print(f" {name:<30s} {status:>10s} ({threshold})")
print("-" * 42)
print(f" {'Overall Result':<30s} {'✓ PASS' if all_passed else '✗ FAIL':>10s}")
print("="*60)
# Usage
qa = EnhancementQualityAssurance()
# Evaluate multiple files
for noisy_file, clean_file in test_pairs:
noisy_audio, _ = librosa.load(noisy_file, sr=16000)
clean_audio, _ = librosa.load(clean_file, sr=16000)
# Enhance
enhanced_audio = enhancer.enhance(noisy_audio)
# Assess quality
metrics = qa.assess_quality(noisy_audio, enhanced_audio, clean_audio)
# Generate report
qa.generate_report()
Key Takeaways
✅ Multiple approaches - Classical (spectral subtraction, Wiener) and deep learning ✅ Quality metrics - PESQ, STOI, SNR for evaluation ✅ Real-time processing - Streaming with low latency < 50ms ✅ Multi-channel - Beamforming for spatial enhancement ✅ Caching benefits - Reduce computational cost for repeated segments ✅ Trade-offs - Quality vs latency vs computational cost ✅ Production considerations - Monitoring, fallback, quality control
FAQ
Q: What is the difference between spectral subtraction and Wiener filtering for noise reduction? A: Spectral subtraction directly subtracts the estimated noise spectrum magnitude from the noisy signal, which is simpler but can introduce musical noise artifacts. Wiener filtering computes an optimal gain based on the ratio of estimated signal power to total power (signal + noise), producing smoother results with fewer artifacts. Both require a noise profile estimated from silence segments.
Q: How does Conv-TasNet work for speech enhancement? A: Conv-TasNet operates directly on raw waveforms using three components: an encoder (1D convolution) that transforms the waveform into a learned feature representation, a separator (temporal convolutional network with dilated convolutions and skip connections) that estimates a mask, and a decoder (transposed convolution) that reconstructs the enhanced waveform. It avoids STFT phase reconstruction issues by learning end-to-end.
Q: What metrics should I use to evaluate speech enhancement quality? A: Use PESQ (Perceptual Evaluation of Speech Quality, range -0.5 to 4.5) for overall perceptual quality, STOI (Short-Time Objective Intelligibility, range 0-1) for speech intelligibility, and SNR improvement in dB for noise reduction effectiveness. Also monitor for artifacts (musical noise), clipping ratio, and spectral distortion. A comprehensive QA pipeline checks all of these against pass/fail thresholds.
Originally published at: arunbaby.com/speech-tech/0010-voice-enhancement
If you found this helpful, consider sharing it with others who might benefit.
Want to work together?
I take on projects, advisory roles, and fractional CTO engagements in AI/ML. I also help businesses go AI-native with agentic workflows and agent orchestration.
Get in touch