Audio Preprocessing & Signal Processing
Clean audio is the foundation of robust speech systems – master preprocessing pipelines that handle real-world noise and variability.
TL;DR
Audio preprocessing is the feature engineering step for speech ML. The standard pipeline runs: load audio, resample to 16kHz, normalize amplitude, apply noise reduction (high-pass filter + spectral subtraction), run voice activity detection to strip silence, segment into chunks, then extract features (MFCCs, mel spectrograms). Data augmentation techniques like time stretching, pitch shifting, background noise mixing, SpecAugment, and room impulse response convolution dramatically improve model robustness. Related: audio feature extraction for downstream feature types and voice enhancement for advanced noise reduction with deep learning.

Introduction
Audio preprocessing transforms raw audio into clean, standardized representations suitable for ML models.
Why it matters:
- Garbage in, garbage out: Poor audio quality destroys model performance
- Real-world audio is messy: Background noise, varying volumes, different devices
- Standardization: Models expect consistent input formats
- Data augmentation: Increase training data diversity
Pipeline overview:
Raw Audio (microphone)
↓
[Loading & Format Conversion]
↓
[Resampling]
↓
[Normalization]
↓
[Noise Reduction]
↓
[Voice Activity Detection]
↓
[Segmentation]
↓
[Feature Extraction]
↓
Clean Features → Model
Audio Fundamentals
Digital Audio Representation
Analog Sound Wave:
∿∿∿∿∿∿∿∿∿∿∿
Sampling (digitization):
●─●─●─●─●─●─●─● (sample points)
Key parameters:
- Sample Rate: Samples per second (Hz)
- CD quality: 44,100 Hz
- Speech: 16,000 Hz or 22,050 Hz
- Telephone: 8,000 Hz
- Bit Depth: Bits per sample
- 16-bit: 65,536 possible values
- 24-bit: 16,777,216 values
- 32-bit float: Highest precision
- Channels:
- Mono: 1 channel
- Stereo: 2 channels (left, right)
Nyquist-Shannon Sampling Theorem
Rule: To capture frequency f, sample rate must be ≥ 2f
Human hearing: 20 Hz - 20 kHz
→ Need ≥40 kHz sample rate
→ CD uses 44.1 kHz (margin above 40 kHz)
Speech frequencies: ~300 Hz - 8 kHz
→ 16 kHz sample rate is sufficient
Loading & Format Conversion
Loading Audio
import librosa
import soundfile as sf
import numpy as np
def load_audio(file_path, sr=None):
"""
Load audio file
Args:
file_path: Path to audio file
sr: Target sample rate (None = keep original)
Returns:
audio: np.array of samples
sr: Sample rate
"""
# Librosa (resamples automatically)
audio, sample_rate = librosa.load(file_path, sr=sr)
return audio, sample_rate
# Example
audio, sr = load_audio('speech.wav', sr=16000)
print(f"Shape: {audio.shape}, Sample Rate: {sr} Hz")
print(f"Duration: {len(audio) / sr:.2f} seconds")
Format Conversion
from pydub import AudioSegment
def convert_audio_format(input_path, output_path, output_format='wav'):
"""
Convert between audio formats
Supports: mp3, wav, ogg, flac, m4a, etc.
"""
audio = AudioSegment.from_file(input_path)
# Export in new format
audio.export(output_path, format=output_format)
# Convert MP3 to WAV
convert_audio_format('input.mp3', 'output.wav', 'wav')
Mono/Stereo Conversion
def stereo_to_mono(audio_stereo):
"""
Convert stereo to mono by averaging channels
Args:
audio_stereo: Shape (2, n_samples) or (n_samples, 2)
Returns:
audio_mono: Shape (n_samples,)
"""
if audio_stereo.ndim == 1:
# Already mono
return audio_stereo
# Average across channels
if audio_stereo.shape[0] == 2:
# Shape: (2, n_samples)
return np.mean(audio_stereo, axis=0)
else:
# Shape: (n_samples, 2)
return np.mean(audio_stereo, axis=1)
# Example
audio_stereo, sr = librosa.load('stereo.wav', sr=None, mono=False)
audio_mono = stereo_to_mono(audio_stereo)
Resampling
Purpose: Convert sample rate to match model requirements
High-Quality Resampling
import librosa
def resample_audio(audio, orig_sr, target_sr):
"""
Resample audio using high-quality algorithm
Args:
audio: Audio samples
orig_sr: Original sample rate
target_sr: Target sample rate
Returns:
resampled_audio
"""
if orig_sr == target_sr:
return audio
# Librosa uses high-quality resampling (Kaiser window)
resampled = librosa.resample(
audio,
orig_sr=orig_sr,
target_sr=target_sr,
res_type='kaiser_best' # Highest quality
)
return resampled
# Example: Downsample 44.1 kHz to 16 kHz
audio_44k, _ = librosa.load('audio.wav', sr=44100)
audio_16k = resample_audio(audio_44k, orig_sr=44100, target_sr=16000)
print(f"Original length: {len(audio_44k)}")
print(f"Resampled length: {len(audio_16k)}")
print(f"Ratio: {len(audio_44k) / len(audio_16k):.2f}") # ~2.76
Resampling visualization:
Original (44.1 kHz):
●●●●●●●●●●●●●●●●●●●●●●●●●●●● (44,100 samples/second)
Downsampled (16 kHz):
●───●───●───●───●───●───●───● (16,000 samples/second)
Algorithm interpolates to avoid aliasing
Normalization
Amplitude Normalization
def normalize_audio(audio, target_level=-20.0):
"""
Normalize audio to target level (dB)
Args:
audio: Audio samples
target_level: Target RMS level in dB
Returns:
normalized_audio
"""
# Calculate current RMS
rms = np.sqrt(np.mean(audio ** 2))
if rms == 0:
return audio
# Convert target level from dB to linear
target_rms = 10 ** (target_level / 20.0)
# Scale audio
scaling_factor = target_rms / rms
normalized = audio * scaling_factor
# Clip to prevent overflow
normalized = np.clip(normalized, -1.0, 1.0)
return normalized
# Example
audio, sr = librosa.load('speech.wav', sr=16000)
normalized_audio = normalize_audio(audio, target_level=-20.0)
Peak Normalization
def peak_normalize(audio):
"""
Normalize to peak amplitude = 1.0
Simple but can be problematic if audio has spikes
"""
peak = np.max(np.abs(audio))
if peak == 0:
return audio
return audio / peak
DC Offset Removal
def remove_dc_offset(audio):
"""
Remove DC bias (mean offset)
DC offset can cause clicking sounds
"""
return audio - np.mean(audio)
# Example
audio_clean = remove_dc_offset(audio)
Noise Reduction
1. Spectral Subtraction
import scipy.signal as signal
def spectral_subtraction(audio, sr, noise_duration=0.5):
"""
Reduce noise using spectral subtraction
Assumes first noise_duration seconds are noise only
Args:
audio: Audio signal
sr: Sample rate
noise_duration: Duration of noise-only segment (seconds)
Returns:
denoised_audio
"""
# Extract noise profile from beginning
noise_samples = int(noise_duration * sr)
noise_segment = audio[:noise_samples]
# Compute noise spectrum
noise_fft = np.fft.rfft(noise_segment)
noise_power = np.abs(noise_fft) ** 2
noise_power_avg = np.mean(noise_power)
# STFT of full audio
f, t, Zxx = signal.stft(audio, fs=sr, nperseg=1024)
# Subtract noise spectrum
magnitude = np.abs(Zxx)
phase = np.angle(Zxx)
# Spectral subtraction
noise_estimate = np.sqrt(noise_power_avg)
magnitude_denoised = np.maximum(magnitude - noise_estimate, 0.0)
# Reconstruct
Zxx_denoised = magnitude_denoised * np.exp(1j * phase)
_, audio_denoised = signal.istft(Zxx_denoised, fs=sr)
return audio_denoised[:len(audio)]
# Example
audio, sr = librosa.load('noisy_speech.wav', sr=16000)
denoised = spectral_subtraction(audio, sr, noise_duration=0.5)
2. Wiener Filtering
def wiener_filter(audio, sr, noise_reduction_factor=0.5):
"""
Apply Wiener filter for noise reduction
More sophisticated than spectral subtraction
"""
from scipy.signal import wiener
# Apply Wiener filter
filtered = wiener(audio, mysize=5, noise=noise_reduction_factor)
return filtered
3. High-Pass Filter (Remove Low-Frequency Noise)
def high_pass_filter(audio, sr, cutoff_freq=80):
"""
Remove low-frequency noise (e.g., rumble, hum)
Args:
audio: Audio signal
sr: Sample rate
cutoff_freq: Cutoff frequency in Hz
Returns:
filtered_audio
"""
from scipy.signal import butter, filtfilt
# Design high-pass filter
nyquist = sr / 2
normalized_cutoff = cutoff_freq / nyquist
b, a = butter(N=5, Wn=normalized_cutoff, btype='high')
# Apply filter (zero-phase filtering)
filtered = filtfilt(b, a, audio)
return filtered
# Example: Remove rumble below 80 Hz
audio_filtered = high_pass_filter(audio, sr=16000, cutoff_freq=80)
Voice Activity Detection (VAD)
Purpose: Identify speech segments, remove silence. For a dedicated deep dive, see Voice Activity Detection.
import librosa
def voice_activity_detection(audio, sr, frame_length=2048, hop_length=512, energy_threshold=0.02):
"""
Simple energy-based VAD
Args:
audio: Audio signal
sr: Sample rate
energy_threshold: Threshold for voice activity
Returns:
speech_segments: List of (start_sample, end_sample) tuples
"""
# Compute frame energy
energy = librosa.feature.rms(
y=audio,
frame_length=frame_length,
hop_length=hop_length
)[0]
# Normalize energy
energy_normalized = energy / (np.max(energy) + 1e-8)
# Threshold to get voice activity
voice_activity = energy_normalized > energy_threshold
# Convert to sample indices
def frame_to_sample(frame_idx):
start = frame_idx * hop_length
end = min(start + frame_length, len(audio))
return start, end
# Find continuous speech segments
segments = []
in_speech = False
start_frame = 0
for i, is_voice in enumerate(voice_activity):
if is_voice and not in_speech:
# Start of speech
start_frame = i
in_speech = True
elif not is_voice and in_speech:
# End of speech
end_frame = i
start_sample, _ = frame_to_sample(start_frame)
end_sample, _ = frame_to_sample(end_frame)
segments.append((start_sample, end_sample))
in_speech = False
# Handle case where speech goes to end
if in_speech:
start_sample, _ = frame_to_sample(start_frame)
end_sample = len(audio)
segments.append((start_sample, end_sample))
return segments
# Example
audio, sr = librosa.load('speech_with_pauses.wav', sr=16000)
segments = voice_activity_detection(audio, sr)
print(f"Found {len(segments)} speech segments:")
for i, (start, end) in enumerate(segments):
duration = (end - start) / sr
print(f" Segment {i+1}: {start/sr:.2f}s - {end/sr:.2f}s ({duration:.2f}s)")
VAD visualization:
Audio waveform:
___ ___ ___
/ \ / \ / \
___/ \______/ \___/ \___
Energy:
████ ████ ████
████ ████ ████
────████──────────████──────████──── ← threshold
VAD output:
SSSS SSSS SSSS
(S = Speech, spaces = Silence)
Segmentation
Fixed-Length Segmentation
def segment_audio_fixed_length(audio, sr, segment_duration=3.0, hop_duration=1.0):
"""
Segment audio into fixed-length chunks with overlap
Args:
audio: Audio signal
sr: Sample rate
segment_duration: Segment length in seconds
hop_duration: Hop between segments in seconds
Returns:
segments: List of audio segments
"""
segment_samples = int(segment_duration * sr)
hop_samples = int(hop_duration * sr)
segments = []
start = 0
while start + segment_samples <= len(audio):
segment = audio[start:start + segment_samples]
segments.append(segment)
start += hop_samples
return segments
# Example: 3-second segments with 1-second hop (2-second overlap)
segments = segment_audio_fixed_length(audio, sr=16000, segment_duration=3.0, hop_duration=1.0)
print(f"Created {len(segments)} segments")
Adaptive Segmentation (Based on Pauses)
def segment_by_pauses(audio, sr, min_silence_duration=0.3, silence_threshold=0.02):
"""
Segment audio at silence/pause points
Better than fixed-length for natural speech
"""
# Detect voice activity
speech_segments = voice_activity_detection(
audio, sr,
energy_threshold=silence_threshold
)
# Filter out very short segments
min_segment_samples = int(min_silence_duration * sr)
filtered_segments = [
(start, end) for start, end in speech_segments
if end - start >= min_segment_samples
]
# Extract audio segments
audio_segments = []
for start, end in filtered_segments:
segment = audio[start:end]
audio_segments.append(segment)
return audio_segments, filtered_segments
# Example
audio_segments, timestamps = segment_by_pauses(audio, sr=16000)
Data Augmentation
Purpose: Increase training data diversity, improve model robustness
1. Time Stretching
def time_stretch(audio, rate=1.0):
"""
Speed up or slow down audio without changing pitch
Args:
audio: Audio signal
rate: Stretch factor
> 1.0: speed up
< 1.0: slow down
Returns:
stretched_audio
"""
return librosa.effects.time_stretch(audio, rate=rate)
# Example: Speed up by 20%
audio_fast = time_stretch(audio, rate=1.2)
# Slow down by 20%
audio_slow = time_stretch(audio, rate=0.8)
2. Pitch Shifting
def pitch_shift(audio, sr, n_steps=2):
"""
Shift pitch without changing speed
Args:
audio: Audio signal
sr: Sample rate
n_steps: Semitones to shift (positive = higher, negative = lower)
Returns:
pitch_shifted_audio
"""
return librosa.effects.pitch_shift(audio, sr=sr, n_steps=n_steps)
# Example: Shift up 2 semitones
audio_high = pitch_shift(audio, sr=16000, n_steps=2)
# Shift down 2 semitones
audio_low = pitch_shift(audio, sr=16000, n_steps=-2)
3. Adding Noise
def add_noise(audio, noise_factor=0.005):
"""
Add random Gaussian noise
Args:
audio: Audio signal
noise_factor: Standard deviation of noise
Returns:
noisy_audio
"""
noise = np.random.randn(len(audio)) * noise_factor
return audio + noise
# Example
audio_noisy = add_noise(audio, noise_factor=0.01)
4. Background Noise Mixing
def mix_background_noise(speech_audio, noise_audio, snr_db=10):
"""
Mix speech with background noise at specified SNR
Args:
speech_audio: Clean speech
noise_audio: Background noise
snr_db: Signal-to-noise ratio in dB
Returns:
mixed_audio
"""
# Match lengths
if len(noise_audio) < len(speech_audio):
# Repeat noise to match speech length
repeats = int(np.ceil(len(speech_audio) / len(noise_audio)))
noise_audio = np.tile(noise_audio, repeats)[:len(speech_audio)]
else:
# Trim noise
noise_audio = noise_audio[:len(speech_audio)]
# Calculate signal and noise power
speech_power = np.mean(speech_audio ** 2)
noise_power = np.mean(noise_audio ** 2)
# Calculate scaling factor for noise
snr_linear = 10 ** (snr_db / 10)
noise_scaling = np.sqrt(speech_power / (snr_linear * noise_power))
# Mix
mixed = speech_audio + noise_scaling * noise_audio
# Normalize to prevent clipping
mixed = mixed / (np.max(np.abs(mixed)) + 1e-8)
return mixed
# Example: Mix with café noise at SNR=15dB
cafe_noise, _ = librosa.load('cafe_background.wav', sr=16000)
noisy_speech = mix_background_noise(audio, cafe_noise, snr_db=15)
5. SpecAugment (For Spectrograms)
def spec_augment(mel_spectrogram, num_mask=2, freq_mask_param=20, time_mask_param=30):
"""
SpecAugment: mask random time-frequency patches
Popular augmentation for speech recognition
Args:
mel_spectrogram: Shape (n_mels, time)
num_mask: Number of masks to apply
freq_mask_param: Max width of frequency mask
time_mask_param: Max width of time mask
Returns:
augmented_spectrogram
"""
aug_spec = mel_spectrogram.copy()
n_mels, n_frames = aug_spec.shape
# Frequency masking
for _ in range(num_mask):
f = np.random.randint(0, freq_mask_param)
f0 = np.random.randint(0, n_mels - f)
aug_spec[f0:f0+f, :] = 0
# Time masking
for _ in range(num_mask):
t = np.random.randint(0, time_mask_param)
t0 = np.random.randint(0, n_frames - t)
aug_spec[:, t0:t0+t] = 0
return aug_spec
# Example
mel_spec = librosa.feature.melspectrogram(y=audio, sr=16000)
aug_mel_spec = spec_augment(mel_spec, num_mask=2)
Connection to Feature Engineering
Audio preprocessing is feature engineering for speech. For a detailed guide on specific feature types (MFCCs, spectrograms, pitch), see Audio Feature Extraction for Speech ML.
class AudioFeatureEngineeringPipeline:
"""
Complete pipeline: raw audio → features
Similar to general ML feature engineering
"""
def __init__(self, sr=16000):
self.sr = sr
def process(self, audio_path):
"""
Full preprocessing pipeline
Analogous to feature engineering pipeline in ML
"""
# 1. Load (like data loading)
audio, sr = librosa.load(audio_path, sr=self.sr)
# 2. Normalize (like feature scaling)
audio = normalize_audio(audio)
# 3. Noise reduction (like outlier removal)
audio = high_pass_filter(audio, sr)
# 4. VAD (like removing null values)
segments = voice_activity_detection(audio, sr)
# 5. Feature extraction (like creating derived features)
features = self.extract_features(audio, sr)
return features
def extract_features(self, audio, sr):
"""
Extract multiple feature types
Like creating feature crosses and aggregations
"""
features = {}
# Spectral features (numerical features)
features['mfcc'] = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13)
features['spectral_centroid'] = librosa.feature.spectral_centroid(y=audio, sr=sr)
features['zero_crossing_rate'] = librosa.feature.zero_crossing_rate(audio)
# Temporal features (time-based features)
features['rms_energy'] = librosa.feature.rms(y=audio)
# Aggregations (like SQL GROUP BY)
features['mfcc_mean'] = np.mean(features['mfcc'], axis=1)
features['mfcc_std'] = np.std(features['mfcc'], axis=1)
return features
Production Pipeline
class ProductionAudioPreprocessor:
"""
Production-ready audio preprocessing
Handles errors, logging, monitoring
"""
def __init__(self, config):
self.sr = config.get('sample_rate', 16000)
self.normalize_level = config.get('normalize_level', -20.0)
self.enable_vad = config.get('enable_vad', True)
def preprocess(self, audio_bytes):
"""
Preprocess audio from bytes
Returns: (processed_audio, metadata, success)
"""
metadata = {}
try:
# Load from bytes
audio = self._load_from_bytes(audio_bytes)
metadata['original_length'] = len(audio)
# Resample
if self.sr != 16000: # Assuming input is 16kHz
audio = resample_audio(audio, 16000, self.sr)
# Normalize
audio = normalize_audio(audio, self.normalize_level)
metadata['normalized'] = True
# VAD
if self.enable_vad:
segments = voice_activity_detection(audio, self.sr)
if segments:
# Keep only speech
speech_audio = np.concatenate([
audio[start:end] for start, end in segments
])
audio = speech_audio
metadata['vad_segments'] = len(segments)
metadata['final_length'] = len(audio)
metadata['duration_seconds'] = len(audio) / self.sr
return audio, metadata, True
except Exception as e:
return None, {'error': str(e)}, False
def _load_from_bytes(self, audio_bytes):
"""Load audio from bytes"""
import io
audio, _ = librosa.load(io.BytesIO(audio_bytes), sr=self.sr)
return audio
Real-World Challenges & Solutions
Challenge 1: Codec Artifacts
Problem: Different audio codecs introduce artifacts
def detect_codec_artifacts(audio, sr):
"""
Detect codec artifacts (e.g., from MP3 compression)
Returns: artifact_score (higher = more artifacts)
"""
import scipy.signal as signal
# Compute spectrogram
f, t, Sxx = signal.spectrogram(audio, fs=sr)
# MP3 artifacts often appear as:
# 1. High-frequency cutoff (lossy codecs)
cutoff_freq = 16000 # Hz
high_freq_mask = f > cutoff_freq
high_freq_energy = np.mean(Sxx[high_freq_mask, :])
# 2. Pre-echo artifacts
# Sudden changes in energy
energy = np.sum(Sxx, axis=0)
energy_diff = np.diff(energy)
pre_echo_score = np.std(energy_diff)
artifact_score = {
'high_freq_loss': high_freq_energy,
'pre_echo': pre_echo_score,
'overall': 1.0 - high_freq_energy + pre_echo_score
}
return artifact_score
# Example
audio_mp3, sr = librosa.load('compressed.mp3', sr=16000)
audio_wav, sr = librosa.load('lossless.wav', sr=16000)
artifacts_mp3 = detect_codec_artifacts(audio_mp3, sr)
artifacts_wav = detect_codec_artifacts(audio_wav, sr)
print(f"MP3 artifacts: {artifacts_mp3['overall']:.3f}")
print(f"WAV artifacts: {artifacts_wav['overall']:.3f}")
Challenge 2: Variable Sample Rates
class AdaptiveResampler:
"""
Handle audio from various sources with different sample rates
Production systems receive audio from:
- Phone calls: 8 kHz
- Bluetooth: 16 kHz
- Studio mics: 44.1 kHz / 48 kHz
"""
def __init__(self, target_sr=16000):
self.target_sr = target_sr
self.cache = {} # Cache resampling filters
def resample(self, audio, orig_sr):
"""
Efficiently resample with caching
"""
if orig_sr == self.target_sr:
return audio
# Check cache
cache_key = (orig_sr, self.target_sr)
if cache_key not in self.cache:
# Compute resampling filter once
self.cache[cache_key] = self._compute_filter(orig_sr, self.target_sr)
# Apply cached filter
return librosa.resample(
audio,
orig_sr=orig_sr,
target_sr=self.target_sr,
res_type='kaiser_fast' # Good balance of quality/speed
)
def _compute_filter(self, orig_sr, target_sr):
"""Compute and cache resampling filter"""
# In real implementation, would compute filter coefficients
return None
# Usage
resampler = AdaptiveResampler(target_sr=16000)
# Handle various sources
phone_audio = resampler.resample(phone_audio, orig_sr=8000)
bluetooth_audio = resampler.resample(bluetooth_audio, orig_sr=16000)
studio_audio = resampler.resample(studio_audio, orig_sr=48000)
Challenge 3: Clipping & Distortion
def detect_and_fix_clipping(audio, threshold=0.99):
"""
Detect clipped samples and attempt interpolation
Args:
audio: Audio signal
threshold: Clipping threshold (absolute value)
Returns:
fixed_audio, was_clipped
"""
# Detect clipping
clipped_mask = np.abs(audio) >= threshold
num_clipped = np.sum(clipped_mask)
if num_clipped == 0:
return audio, False
print(f"⚠️ Detected {num_clipped} clipped samples ({100*num_clipped/len(audio):.2f}%)")
# Simple interpolation for clipped regions
fixed_audio = audio.copy()
# Find clipped regions
clipped_indices = np.where(clipped_mask)[0]
for idx in clipped_indices:
# Skip edges
if idx == 0 or idx == len(audio) - 1:
continue
# Interpolate from neighbors
if not clipped_mask[idx-1] and not clipped_mask[idx+1]:
fixed_audio[idx] = (audio[idx-1] + audio[idx+1]) / 2
return fixed_audio, True
# Example
audio_with_clipping, sr = librosa.load('clipped_audio.wav', sr=16000)
fixed_audio, was_clipped = detect_and_fix_clipping(audio_with_clipping)
if was_clipped:
print("Applied clipping repair")
Challenge 4: Background Babble Noise
def reduce_babble_noise(audio, sr, noise_profile_duration=1.0):
"""
Reduce background babble (multiple speakers)
More challenging than stationary noise
"""
import noisereduce as nr
# Estimate noise profile from segments with lowest energy
frame_length = int(0.1 * sr) # 100ms frames
hop_length = frame_length // 2
# Compute frame energy
energy = librosa.feature.rms(
y=audio,
frame_length=frame_length,
hop_length=hop_length
)[0]
# Select low-energy frames as noise
noise_threshold = np.percentile(energy, 20)
noise_frames = np.where(energy < noise_threshold)[0]
# Extract noise samples
noise_samples = []
for frame_idx in noise_frames:
start = frame_idx * hop_length
end = start + frame_length
if end <= len(audio):
noise_samples.extend(audio[start:end])
noise_profile = np.array(noise_samples)
# Apply noise reduction
if len(noise_profile) > sr * noise_profile_duration:
reduced_noise = nr.reduce_noise(
y=audio,
y_noise=noise_profile[:int(sr * noise_profile_duration)],
sr=sr,
stationary=False, # Non-stationary noise
prop_decrease=0.8
)
return reduced_noise
else:
print("⚠️ Insufficient noise profile, returning original")
return audio
# Example
audio_with_babble, sr = librosa.load('meeting_audio.wav', sr=16000)
clean_audio = reduce_babble_noise(audio_with_babble, sr)
Audio Quality Metrics
Signal-to-Noise Ratio (SNR)
def calculate_snr(clean_signal, noisy_signal):
"""
Calculate SNR in dB
Args:
clean_signal: Ground truth clean signal
noisy_signal: Signal with noise
Returns:
SNR in dB
"""
# Ensure same length
min_len = min(len(clean_signal), len(noisy_signal))
clean = clean_signal[:min_len]
noisy = noisy_signal[:min_len]
# Compute noise
noise = noisy - clean
# Power
signal_power = np.mean(clean ** 2)
noise_power = np.mean(noise ** 2)
# SNR in dB
if noise_power == 0:
return float('inf')
snr = 10 * np.log10(signal_power / noise_power)
return snr
# Example
clean, sr = librosa.load('clean_speech.wav', sr=16000)
noisy, sr = librosa.load('noisy_speech.wav', sr=16000)
snr = calculate_snr(clean, noisy)
print(f"SNR: {snr:.2f} dB")
# Typical SNRs:
# > 40 dB: Excellent
# 25-40 dB: Good
# 10-25 dB: Fair
# < 10 dB: Poor
Perceptual Evaluation of Speech Quality (PESQ)
# PESQ is a standard metric for speech quality
# Requires pesq library: pip install pesq
from pesq import pesq
def evaluate_speech_quality(reference_audio, degraded_audio, sr=16000):
"""
Evaluate speech quality using PESQ
Args:
reference_audio: Clean reference
degraded_audio: Processed/degraded audio
sr: Sample rate (8000 or 16000)
Returns:
PESQ score (1.0 to 4.5, higher is better)
"""
# PESQ requires 8kHz or 16kHz
if sr not in [8000, 16000]:
raise ValueError("PESQ requires sr=8000 or sr=16000")
# Ensure same length
min_len = min(len(reference_audio), degraded_audio)
ref = reference_audio[:min_len]
deg = degraded_audio[:min_len]
# Compute PESQ
if sr == 8000:
mode = 'nb' # Narrowband
else:
mode = 'wb' # Wideband
score = pesq(sr, ref, deg, mode)
return score
# Example
reference, sr = librosa.load('clean.wav', sr=16000)
processed, sr = librosa.load('processed.wav', sr=16000)
quality_score = evaluate_speech_quality(reference, processed, sr)
print(f"PESQ Score: {quality_score:.2f}")
# PESQ interpretation:
# 4.0+: Excellent
# 3.0-4.0: Good
# 2.0-3.0: Fair
# < 2.0: Poor
Advanced Augmentation Strategies
Room Impulse Response (RIR) Convolution
def apply_room_impulse_response(speech, rir):
"""
Simulate room acoustics using RIR
Makes model robust to reverberation
Args:
speech: Clean speech signal
rir: Room impulse response
Returns:
Reverberant speech
"""
from scipy.signal import fftconvolve
# Convolve speech with RIR
reverb_speech = fftconvolve(speech, rir, mode='same')
# Normalize
reverb_speech = reverb_speech / (np.max(np.abs(reverb_speech)) + 1e-8)
return reverb_speech
# Example: Generate synthetic RIR
def generate_synthetic_rir(sr=16000, room_size='medium', rt60=0.5):
"""
Generate synthetic room impulse response
Args:
sr: Sample rate
room_size: 'small', 'medium', 'large'
rt60: Reverberation time (seconds)
Returns:
RIR signal
"""
# Duration based on RT60
duration = int(rt60 * sr)
# Exponential decay
t = np.arange(duration) / sr
decay = np.exp(-6.91 * t / rt60) # -60 dB decay
# Add random reflections
rir = decay * np.random.randn(duration)
# Initial spike (direct path)
rir[0] = 1.0
# Normalize
rir = rir / np.max(np.abs(rir))
return rir
# Usage
clean_speech, sr = librosa.load('speech.wav', sr=16000)
# Simulate different rooms
small_room_rir = generate_synthetic_rir(sr, 'small', rt60=0.3)
large_room_rir = generate_synthetic_rir(sr, 'large', rt60=1.2)
speech_small_room = apply_room_impulse_response(clean_speech, small_room_rir)
speech_large_room = apply_room_impulse_response(clean_speech, large_room_rir)
Codec Simulation
def simulate_codec(audio, sr, codec='mp3', bitrate=32):
"""
Simulate lossy codec compression
Makes model robust to codec artifacts
Args:
audio: Clean audio
sr: Sample rate
codec: 'mp3', 'aac', 'opus'
bitrate: Bitrate in kbps
Returns:
Codec-compressed audio
"""
import subprocess
import tempfile
import os
import soundfile as sf
# Save to temp file
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_in:
sf.write(tmp_in.name, audio, sr)
input_path = tmp_in.name
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_out:
output_path = tmp_out.name
try:
# Compress with ffmpeg
if codec == 'mp3':
subprocess.run([
'ffmpeg', '-i', input_path,
'-codec:a', 'libmp3lame',
'-b:a', f'{bitrate}k',
'-y', output_path
], capture_output=True, check=True)
elif codec == 'opus':
subprocess.run([
'ffmpeg', '-i', input_path,
'-codec:a', 'libopus',
'-b:a', f'{bitrate}k',
'-y', output_path
], capture_output=True, check=True)
# Load compressed audio
compressed_audio, _ = librosa.load(output_path, sr=sr)
return compressed_audio
finally:
# Cleanup
os.unlink(input_path)
if os.path.exists(output_path):
os.unlink(output_path)
# Usage
audio, sr = librosa.load('clean.wav', sr=16000)
# Simulate low-bitrate compression
audio_32kbps = simulate_codec(audio, sr, codec='mp3', bitrate=32)
audio_64kbps = simulate_codec(audio, sr, codec='mp3', bitrate=64)
Dynamic Range Compression
def dynamic_range_compression(audio, threshold=-20, ratio=4, attack=0.005, release=0.1, sr=16000):
"""
Apply dynamic range compression (like audio compressors)
Reduces loudness variation, simulating broadcast audio
Args:
audio: Input audio
threshold: Threshold in dB
ratio: Compression ratio (4:1 means 4dB input → 1dB output above threshold)
attack: Attack time in seconds
release: Release time in seconds
sr: Sample rate
Returns:
Compressed audio
"""
# Convert to dB
audio_db = 20 * np.log10(np.abs(audio) + 1e-8)
# Compute gain reduction
gain_db = np.zeros_like(audio_db)
for i in range(len(audio_db)):
if audio_db[i] > threshold:
# Above threshold: apply compression
excess_db = audio_db[i] - threshold
gain_db[i] = -excess_db * (1 - 1/ratio)
else:
gain_db[i] = 0
# Smooth gain reduction (attack/release)
attack_samples = int(attack * sr)
release_samples = int(release * sr)
smoothed_gain = np.zeros_like(gain_db)
for i in range(1, len(gain_db)):
if gain_db[i] < smoothed_gain[i-1]:
# Attack
alpha = 1 - np.exp(-1 / attack_samples)
else:
# Release
alpha = 1 - np.exp(-1 / release_samples)
smoothed_gain[i] = alpha * gain_db[i] + (1 - alpha) * smoothed_gain[i-1]
# Apply gain
gain_linear = 10 ** (smoothed_gain / 20)
compressed = audio * gain_linear
return compressed
# Example
audio, sr = librosa.load('speech.wav', sr=16000)
compressed = dynamic_range_compression(audio, threshold=-20, ratio=4, sr=sr)
End-to-End Preprocessing Pipeline
class ProductionAudioPipeline:
"""
Complete production-ready preprocessing pipeline
Handles all edge cases and monitors quality
"""
def __init__(self, config):
self.target_sr = config.get('sample_rate', 16000)
self.target_duration = config.get('target_duration', None)
self.enable_noise_reduction = config.get('noise_reduction', True)
self.enable_vad = config.get('vad', True)
self.augmentation_enabled = config.get('augmentation', False)
self.stats = {
'processed': 0,
'failed': 0,
'clipped': 0,
'too_short': 0,
'avg_snr': []
}
def process(self, audio_path):
"""
Process single audio file
Returns: (processed_audio, metadata, success)
"""
metadata = {'original_path': audio_path}
try:
# 1. Load
audio, orig_sr = librosa.load(audio_path, sr=None)
metadata['original_sr'] = orig_sr
metadata['original_duration'] = len(audio) / orig_sr
# 2. Detect issues
clipped = np.max(np.abs(audio)) >= 0.99
if clipped:
audio, _ = detect_and_fix_clipping(audio)
self.stats['clipped'] += 1
metadata['had_clipping'] = True
# 3. Resample
if orig_sr != self.target_sr:
audio = resample_audio(audio, orig_sr, self.target_sr)
metadata['resampled'] = True
# 4. Normalize
audio = normalize_audio(audio, target_level=-20.0)
metadata['normalized'] = True
# 5. Noise reduction
if self.enable_noise_reduction:
audio = high_pass_filter(audio, self.target_sr, cutoff_freq=80)
metadata['noise_reduction'] = True
# 6. Voice Activity Detection
if self.enable_vad:
segments = voice_activity_detection(audio, self.target_sr)
if segments:
speech_audio = np.concatenate([
audio[start:end] for start, end in segments
])
audio = speech_audio
metadata['vad_segments'] = len(segments)
else:
# No speech detected
return None, {'error': 'No speech detected'}, False
# 7. Duration handling
current_duration = len(audio) / self.target_sr
if self.target_duration:
target_samples = int(self.target_duration * self.target_sr)
if len(audio) < target_samples:
# Pad
audio = np.pad(audio, (0, target_samples - len(audio)), mode='constant')
metadata['padded'] = True
elif len(audio) > target_samples:
# Trim
audio = audio[:target_samples]
metadata['trimmed'] = True
# 8. Quality checks
if len(audio) < 0.5 * self.target_sr: # Less than 0.5 seconds
self.stats['too_short'] += 1
return None, {'error': 'Too short after VAD'}, False
# 9. Augmentation (training only)
if self.augmentation_enabled:
audio = self._augment(audio)
metadata['augmented'] = True
# 10. Final normalization
audio = audio / (np.max(np.abs(audio)) + 1e-8) * 0.95
metadata['final_duration'] = len(audio) / self.target_sr
metadata['final_samples'] = len(audio)
self.stats['processed'] += 1
return audio, metadata, True
except Exception as e:
self.stats['failed'] += 1
return None, {'error': str(e)}, False
def _augment(self, audio):
"""Apply random augmentation"""
import random
aug_type = random.choice(['noise', 'pitch', 'speed', 'none'])
if aug_type == 'noise':
audio = add_noise(audio, noise_factor=random.uniform(0.001, 0.01))
elif aug_type == 'pitch':
steps = random.choice([-2, -1, 1, 2])
audio = pitch_shift(audio, self.target_sr, n_steps=steps)
elif aug_type == 'speed':
rate = random.uniform(0.9, 1.1)
audio = time_stretch(audio, rate=rate)
return audio
def get_stats(self):
"""Get processing statistics"""
return self.stats
# Usage
config = {
'sample_rate': 16000,
'target_duration': 3.0,
'noise_reduction': True,
'vad': True,
'augmentation': False # True for training
}
pipeline = ProductionAudioPipeline(config)
# Process single file
audio, metadata, success = pipeline.process('input.wav')
if success:
print(f"✓ Processed successfully")
print(f"Duration: {metadata['final_duration']:.2f}s")
# Save
sf.write('output.wav', audio, pipeline.target_sr)
else:
print(f"✗ Failed: {metadata.get('error')}")
# Process batch
for audio_file in audio_files:
audio, metadata, success = pipeline.process(audio_file)
if success:
save_processed(audio, metadata)
# Get statistics
stats = pipeline.get_stats()
print(f"Processed: {stats['processed']}")
print(f"Failed: {stats['failed']}")
print(f"Clipped: {stats['clipped']}")
Key Takeaways
✅ Clean audio is critical - Preprocessing can make/break model performance ✅ Standardize formats - Consistent sample rate, bit depth, mono/stereo ✅ Remove noise - Spectral subtraction, filtering reduce artifacts ✅ VAD improves efficiency - Remove silence saves compute ✅ Augmentation boosts robustness - Time stretch, pitch shift, noise mixing ✅ Like feature engineering - Transform raw data into useful representations ✅ Pipeline thinking - Chain transformations like tree traversal
FAQ
Q: What sample rate should I use for speech ML models? A: 16kHz is the standard sample rate for speech processing. It captures frequencies up to 8kHz (per the Nyquist theorem), which covers the full range of speech. CD-quality 44.1kHz is unnecessary for speech and wastes compute. Telephone audio at 8kHz is too low for high-quality models.
Q: What is SpecAugment and why is it useful? A: SpecAugment is a data augmentation technique that randomly masks time and frequency patches in mel spectrograms during training. It improves model robustness without requiring additional data by forcing the model to learn from incomplete information, similar to dropout for audio features.
Q: How do I remove background noise from speech recordings? A: Start with a high-pass filter at 80Hz to remove low-frequency rumble. For stationary noise, use spectral subtraction with a noise profile estimated from silence segments. For non-stationary noise like babble, use the noisereduce library with stationary=False. For best results, combine multiple approaches in a pipeline.
Originally published at: arunbaby.com/speech-tech/0007-audio-preprocessing
If you found this helpful, consider sharing it with others who might benefit.
Want to work together?
I take on projects, advisory roles, and fractional CTO engagements in AI/ML. I also help businesses go AI-native with agentic workflows and agent orchestration.
Get in touch