24 minute read

Clean audio is the foundation of robust speech systems, master preprocessing pipelines that handle real-world noise and variability.

Introduction

Audio preprocessing transforms raw audio into clean, standardized representations suitable for ML models.

Why it matters:

  • Garbage in, garbage out: Poor audio quality destroys model performance
  • Real-world audio is messy: Background noise, varying volumes, different devices
  • Standardization: Models expect consistent input formats
  • Data augmentation: Increase training data diversity

Pipeline overview:

Raw Audio (microphone)
   ↓
[Loading & Format Conversion]
   ↓
[Resampling]
   ↓
[Normalization]
   ↓
[Noise Reduction]
   ↓
[Voice Activity Detection]
   ↓
[Segmentation]
   ↓
[Feature Extraction]
   ↓
Clean Features → Model

Audio Fundamentals

Digital Audio Representation

Analog Sound Wave:
  ∿∿∿∿∿∿∿∿∿∿∿

Sampling (digitization):
  ●─●─●─●─●─●─●─●  (sample points)

Key parameters:
- Sample Rate: Samples per second (Hz)
  - CD quality: 44,100 Hz
  - Speech: 16,000 Hz or 22,050 Hz
  - Telephone: 8,000 Hz

- Bit Depth: Bits per sample
  - 16-bit: 65,536 possible values
  - 24-bit: 16,777,216 values
  - 32-bit float: Highest precision

- Channels:
  - Mono: 1 channel
  - Stereo: 2 channels (left, right)

Nyquist-Shannon Sampling Theorem

Rule: To capture frequency f, sample rate must be ≥ 2f

Human hearing: 20 Hz - 20 kHz
→ Need ≥40 kHz sample rate
→ CD uses 44.1 kHz (margin above 40 kHz)

Speech frequencies: ~300 Hz - 8 kHz
→ 16 kHz sample rate is sufficient

Loading & Format Conversion

Loading Audio

import librosa
import soundfile as sf
import numpy as np

def load_audio(file_path, sr=None):
    """
    Load audio file
    
    Args:
        file_path: Path to audio file
        sr: Target sample rate (None = keep original)
    
    Returns:
        audio: np.array of samples
        sr: Sample rate
    """
    # Librosa (resamples automatically)
    audio, sample_rate = librosa.load(file_path, sr=sr)
    
    return audio, sample_rate

# Example
audio, sr = load_audio('speech.wav', sr=16000)
print(f"Shape: {audio.shape}, Sample Rate: {sr} Hz")
print(f"Duration: {len(audio) / sr:.2f} seconds")

Format Conversion

from pydub import AudioSegment

def convert_audio_format(input_path, output_path, output_format='wav'):
    """
    Convert between audio formats
    
    Supports: mp3, wav, ogg, flac, m4a, etc.
    """
    audio = AudioSegment.from_file(input_path)
    
    # Export in new format
    audio.export(output_path, format=output_format)

# Convert MP3 to WAV
convert_audio_format('input.mp3', 'output.wav', 'wav')

Mono/Stereo Conversion

def stereo_to_mono(audio_stereo):
    """
    Convert stereo to mono by averaging channels
    
    Args:
        audio_stereo: Shape (2, n_samples) or (n_samples, 2)
    
    Returns:
        audio_mono: Shape (n_samples,)
    """
    if audio_stereo.ndim == 1:
        # Already mono
        return audio_stereo
    
    # Average across channels
    if audio_stereo.shape[0] == 2:
        # Shape: (2, n_samples)
        return np.mean(audio_stereo, axis=0)
    else:
        # Shape: (n_samples, 2)
        return np.mean(audio_stereo, axis=1)

# Example
audio_stereo, sr = librosa.load('stereo.wav', sr=None, mono=False)
audio_mono = stereo_to_mono(audio_stereo)

Resampling

Purpose: Convert sample rate to match model requirements

High-Quality Resampling

import librosa

def resample_audio(audio, orig_sr, target_sr):
    """
    Resample audio using high-quality algorithm
    
    Args:
        audio: Audio samples
        orig_sr: Original sample rate
        target_sr: Target sample rate
    
    Returns:
        resampled_audio
    """
    if orig_sr == target_sr:
        return audio
    
    # Librosa uses high-quality resampling (Kaiser window)
    resampled = librosa.resample(
        audio,
        orig_sr=orig_sr,
        target_sr=target_sr,
        res_type='kaiser_best'  # Highest quality
    )
    
    return resampled

# Example: Downsample 44.1 kHz to 16 kHz
audio_44k, _ = librosa.load('audio.wav', sr=44100)
audio_16k = resample_audio(audio_44k, orig_sr=44100, target_sr=16000)

print(f"Original length: {len(audio_44k)}")
print(f"Resampled length: {len(audio_16k)}")
print(f"Ratio: {len(audio_44k) / len(audio_16k):.2f}")  # ~2.76

Resampling visualization:

Original (44.1 kHz):
●●●●●●●●●●●●●●●●●●●●●●●●●●●●  (44,100 samples/second)

Downsampled (16 kHz):
●───●───●───●───●───●───●───●  (16,000 samples/second)

Algorithm interpolates to avoid aliasing

Normalization

Amplitude Normalization

def normalize_audio(audio, target_level=-20.0):
    """
    Normalize audio to target level (dB)
    
    Args:
        audio: Audio samples
        target_level: Target RMS level in dB
    
    Returns:
        normalized_audio
    """
    # Calculate current RMS
    rms = np.sqrt(np.mean(audio ** 2))
    
    if rms == 0:
        return audio
    
    # Convert target level from dB to linear
    target_rms = 10 ** (target_level / 20.0)
    
    # Scale audio
    scaling_factor = target_rms / rms
    normalized = audio * scaling_factor
    
    # Clip to prevent overflow
    normalized = np.clip(normalized, -1.0, 1.0)
    
    return normalized

# Example
audio, sr = librosa.load('speech.wav', sr=16000)
normalized_audio = normalize_audio(audio, target_level=-20.0)

Peak Normalization

def peak_normalize(audio):
    """
    Normalize to peak amplitude = 1.0
    
    Simple but can be problematic if audio has spikes
    """
    peak = np.max(np.abs(audio))
    
    if peak == 0:
        return audio
    
    return audio / peak

DC Offset Removal

def remove_dc_offset(audio):
    """
    Remove DC bias (mean offset)
    
    DC offset can cause clicking sounds
    """
    return audio - np.mean(audio)

# Example
audio_clean = remove_dc_offset(audio)

Noise Reduction

1. Spectral Subtraction

import scipy.signal as signal

def spectral_subtraction(audio, sr, noise_duration=0.5):
    """
    Reduce noise using spectral subtraction
    
    Assumes first noise_duration seconds are noise only
    
    Args:
        audio: Audio signal
        sr: Sample rate
        noise_duration: Duration of noise-only segment (seconds)
    
    Returns:
        denoised_audio
    """
    # Extract noise profile from beginning
    noise_samples = int(noise_duration * sr)
    noise_segment = audio[:noise_samples]
    
    # Compute noise spectrum
    noise_fft = np.fft.rfft(noise_segment)
    noise_power = np.abs(noise_fft) ** 2
    noise_power_avg = np.mean(noise_power)
    
    # STFT of full audio
    f, t, Zxx = signal.stft(audio, fs=sr, nperseg=1024)
    
    # Subtract noise spectrum
    magnitude = np.abs(Zxx)
    phase = np.angle(Zxx)
    
    # Spectral subtraction
    noise_estimate = np.sqrt(noise_power_avg)
    magnitude_denoised = np.maximum(magnitude - noise_estimate, 0.0)
    
    # Reconstruct
    Zxx_denoised = magnitude_denoised * np.exp(1j * phase)
    _, audio_denoised = signal.istft(Zxx_denoised, fs=sr)
    
    return audio_denoised[:len(audio)]

# Example
audio, sr = librosa.load('noisy_speech.wav', sr=16000)
denoised = spectral_subtraction(audio, sr, noise_duration=0.5)

2. Wiener Filtering

def wiener_filter(audio, sr, noise_reduction_factor=0.5):
    """
    Apply Wiener filter for noise reduction
    
    More sophisticated than spectral subtraction
    """
    from scipy.signal import wiener
    
    # Apply Wiener filter
    filtered = wiener(audio, mysize=5, noise=noise_reduction_factor)
    
    return filtered

3. High-Pass Filter (Remove Low-Frequency Noise)

def high_pass_filter(audio, sr, cutoff_freq=80):
    """
    Remove low-frequency noise (e.g., rumble, hum)
    
    Args:
        audio: Audio signal
        sr: Sample rate
        cutoff_freq: Cutoff frequency in Hz
    
    Returns:
        filtered_audio
    """
    from scipy.signal import butter, filtfilt
    
    # Design high-pass filter
    nyquist = sr / 2
    normalized_cutoff = cutoff_freq / nyquist
    b, a = butter(N=5, Wn=normalized_cutoff, btype='high')
    
    # Apply filter (zero-phase filtering)
    filtered = filtfilt(b, a, audio)
    
    return filtered

# Example: Remove rumble below 80 Hz
audio_filtered = high_pass_filter(audio, sr=16000, cutoff_freq=80)

Voice Activity Detection (VAD)

Purpose: Identify speech segments, remove silence

import librosa

def voice_activity_detection(audio, sr, frame_length=2048, hop_length=512, energy_threshold=0.02):
    """
    Simple energy-based VAD
    
    Args:
        audio: Audio signal
        sr: Sample rate
        energy_threshold: Threshold for voice activity
    
    Returns:
        speech_segments: List of (start_sample, end_sample) tuples
    """
    # Compute frame energy
    energy = librosa.feature.rms(
        y=audio,
        frame_length=frame_length,
        hop_length=hop_length
    )[0]
    
    # Normalize energy
    energy_normalized = energy / (np.max(energy) + 1e-8)
    
    # Threshold to get voice activity
    voice_activity = energy_normalized > energy_threshold
    
    # Convert to sample indices
    def frame_to_sample(frame_idx):
        start = frame_idx * hop_length
        end = min(start + frame_length, len(audio))
        return start, end
    
    # Find continuous speech segments
    segments = []
    in_speech = False
    start_frame = 0
    
    for i, is_voice in enumerate(voice_activity):
        if is_voice and not in_speech:
            # Start of speech
            start_frame = i
            in_speech = True
        elif not is_voice and in_speech:
            # End of speech
            end_frame = i
            start_sample, _ = frame_to_sample(start_frame)
            end_sample, _ = frame_to_sample(end_frame)
            segments.append((start_sample, end_sample))
            in_speech = False
    
    # Handle case where speech goes to end
    if in_speech:
        start_sample, _ = frame_to_sample(start_frame)
        end_sample = len(audio)
        segments.append((start_sample, end_sample))
    
    return segments

# Example
audio, sr = librosa.load('speech_with_pauses.wav', sr=16000)
segments = voice_activity_detection(audio, sr)

print(f"Found {len(segments)} speech segments:")
for i, (start, end) in enumerate(segments):
    duration = (end - start) / sr
    print(f"  Segment {i+1}: {start/sr:.2f}s - {end/sr:.2f}s ({duration:.2f}s)")

VAD visualization:

Audio waveform:
     ___           ___       ___
    /   \         /   \     /   \
___/     \______/     \___/     \___

Energy:
    ████          ████      ████
    ████          ████      ████
────████──────────████──────████────  ← threshold

VAD output:
    SSSS          SSSS      SSSS
    (S = Speech,  spaces = Silence)

Segmentation

Fixed-Length Segmentation

def segment_audio_fixed_length(audio, sr, segment_duration=3.0, hop_duration=1.0):
    """
    Segment audio into fixed-length chunks with overlap
    
    Args:
        audio: Audio signal
        sr: Sample rate
        segment_duration: Segment length in seconds
        hop_duration: Hop between segments in seconds
    
    Returns:
        segments: List of audio segments
    """
    segment_samples = int(segment_duration * sr)
    hop_samples = int(hop_duration * sr)
    
    segments = []
    start = 0
    
    while start + segment_samples <= len(audio):
        segment = audio[start:start + segment_samples]
        segments.append(segment)
        start += hop_samples
    
    return segments

# Example: 3-second segments with 1-second hop (2-second overlap)
segments = segment_audio_fixed_length(audio, sr=16000, segment_duration=3.0, hop_duration=1.0)
print(f"Created {len(segments)} segments")

Adaptive Segmentation (Based on Pauses)

def segment_by_pauses(audio, sr, min_silence_duration=0.3, silence_threshold=0.02):
    """
    Segment audio at silence/pause points
    
    Better than fixed-length for natural speech
    """
    # Detect voice activity
    speech_segments = voice_activity_detection(
        audio, sr,
        energy_threshold=silence_threshold
    )
    
    # Filter out very short segments
    min_segment_samples = int(min_silence_duration * sr)
    filtered_segments = [
        (start, end) for start, end in speech_segments
        if end - start >= min_segment_samples
    ]
    
    # Extract audio segments
    audio_segments = []
    for start, end in filtered_segments:
        segment = audio[start:end]
        audio_segments.append(segment)
    
    return audio_segments, filtered_segments

# Example
audio_segments, timestamps = segment_by_pauses(audio, sr=16000)

Data Augmentation

Purpose: Increase training data diversity, improve model robustness

1. Time Stretching

def time_stretch(audio, rate=1.0):
    """
    Speed up or slow down audio without changing pitch
    
    Args:
        audio: Audio signal
        rate: Stretch factor
              > 1.0: speed up
              < 1.0: slow down
    
    Returns:
        stretched_audio
    """
    return librosa.effects.time_stretch(audio, rate=rate)

# Example: Speed up by 20%
audio_fast = time_stretch(audio, rate=1.2)

# Slow down by 20%
audio_slow = time_stretch(audio, rate=0.8)

2. Pitch Shifting

def pitch_shift(audio, sr, n_steps=2):
    """
    Shift pitch without changing speed
    
    Args:
        audio: Audio signal
        sr: Sample rate
        n_steps: Semitones to shift (positive = higher, negative = lower)
    
    Returns:
        pitch_shifted_audio
    """
    return librosa.effects.pitch_shift(audio, sr=sr, n_steps=n_steps)

# Example: Shift up 2 semitones
audio_high = pitch_shift(audio, sr=16000, n_steps=2)

# Shift down 2 semitones
audio_low = pitch_shift(audio, sr=16000, n_steps=-2)

3. Adding Noise

def add_noise(audio, noise_factor=0.005):
    """
    Add random Gaussian noise
    
    Args:
        audio: Audio signal
        noise_factor: Standard deviation of noise
    
    Returns:
        noisy_audio
    """
    noise = np.random.randn(len(audio)) * noise_factor
    return audio + noise

# Example
audio_noisy = add_noise(audio, noise_factor=0.01)

4. Background Noise Mixing

def mix_background_noise(speech_audio, noise_audio, snr_db=10):
    """
    Mix speech with background noise at specified SNR
    
    Args:
        speech_audio: Clean speech
        noise_audio: Background noise
        snr_db: Signal-to-noise ratio in dB
    
    Returns:
        mixed_audio
    """
    # Match lengths
    if len(noise_audio) < len(speech_audio):
        # Repeat noise to match speech length
        repeats = int(np.ceil(len(speech_audio) / len(noise_audio)))
        noise_audio = np.tile(noise_audio, repeats)[:len(speech_audio)]
    else:
        # Trim noise
        noise_audio = noise_audio[:len(speech_audio)]
    
    # Calculate signal and noise power
    speech_power = np.mean(speech_audio ** 2)
    noise_power = np.mean(noise_audio ** 2)
    
    # Calculate scaling factor for noise
    snr_linear = 10 ** (snr_db / 10)
    noise_scaling = np.sqrt(speech_power / (snr_linear * noise_power))
    
    # Mix
    mixed = speech_audio + noise_scaling * noise_audio
    
    # Normalize to prevent clipping
    mixed = mixed / (np.max(np.abs(mixed)) + 1e-8)
    
    return mixed

# Example: Mix with café noise at SNR=15dB
cafe_noise, _ = librosa.load('cafe_background.wav', sr=16000)
noisy_speech = mix_background_noise(audio, cafe_noise, snr_db=15)

5. SpecAugment (For Spectrograms)

def spec_augment(mel_spectrogram, num_mask=2, freq_mask_param=20, time_mask_param=30):
    """
    SpecAugment: mask random time-frequency patches
    
    Popular augmentation for speech recognition
    
    Args:
        mel_spectrogram: Shape (n_mels, time)
        num_mask: Number of masks to apply
        freq_mask_param: Max width of frequency mask
        time_mask_param: Max width of time mask
    
    Returns:
        augmented_spectrogram
    """
    aug_spec = mel_spectrogram.copy()
    n_mels, n_frames = aug_spec.shape
    
    # Frequency masking
    for _ in range(num_mask):
        f = np.random.randint(0, freq_mask_param)
        f0 = np.random.randint(0, n_mels - f)
        aug_spec[f0:f0+f, :] = 0
    
    # Time masking
    for _ in range(num_mask):
        t = np.random.randint(0, time_mask_param)
        t0 = np.random.randint(0, n_frames - t)
        aug_spec[:, t0:t0+t] = 0
    
    return aug_spec

# Example
mel_spec = librosa.feature.melspectrogram(y=audio, sr=16000)
aug_mel_spec = spec_augment(mel_spec, num_mask=2)

Connection to Feature Engineering (Day 7)

Audio preprocessing is feature engineering for speech:

class AudioFeatureEngineeringPipeline:
    """
    Complete pipeline: raw audio → features
    
    Similar to general ML feature engineering
    """
    
    def __init__(self, sr=16000):
        self.sr = sr
    
    def process(self, audio_path):
        """
        Full preprocessing pipeline
        
        Analogous to feature engineering pipeline in ML
        """
        # 1. Load (like data loading)
        audio, sr = librosa.load(audio_path, sr=self.sr)
        
        # 2. Normalize (like feature scaling)
        audio = normalize_audio(audio)
        
        # 3. Noise reduction (like outlier removal)
        audio = high_pass_filter(audio, sr)
        
        # 4. VAD (like removing null values)
        segments = voice_activity_detection(audio, sr)
        
        # 5. Feature extraction (like creating derived features)
        features = self.extract_features(audio, sr)
        
        return features
    
    def extract_features(self, audio, sr):
        """
        Extract multiple feature types
        
        Like creating feature crosses and aggregations
        """
        features = {}
        
        # Spectral features (numerical features)
        features['mfcc'] = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=13)
        features['spectral_centroid'] = librosa.feature.spectral_centroid(y=audio, sr=sr)
        features['zero_crossing_rate'] = librosa.feature.zero_crossing_rate(audio)
        
        # Temporal features (time-based features)
        features['rms_energy'] = librosa.feature.rms(y=audio)
        
        # Aggregations (like SQL GROUP BY)
        features['mfcc_mean'] = np.mean(features['mfcc'], axis=1)
        features['mfcc_std'] = np.std(features['mfcc'], axis=1)
        
        return features

Production Pipeline

class ProductionAudioPreprocessor:
    """
    Production-ready audio preprocessing
    
    Handles errors, logging, monitoring
    """
    
    def __init__(self, config):
        self.sr = config.get('sample_rate', 16000)
        self.normalize_level = config.get('normalize_level', -20.0)
        self.enable_vad = config.get('enable_vad', True)
    
    def preprocess(self, audio_bytes):
        """
        Preprocess audio from bytes
        
        Returns: (processed_audio, metadata, success)
        """
        metadata = {}
        
        try:
            # Load from bytes
            audio = self._load_from_bytes(audio_bytes)
            metadata['original_length'] = len(audio)
            
            # Resample
            if self.sr != 16000:  # Assuming input is 16kHz
                audio = resample_audio(audio, 16000, self.sr)
            
            # Normalize
            audio = normalize_audio(audio, self.normalize_level)
            metadata['normalized'] = True
            
            # VAD
            if self.enable_vad:
                segments = voice_activity_detection(audio, self.sr)
                if segments:
                    # Keep only speech
                    speech_audio = np.concatenate([
                        audio[start:end] for start, end in segments
                    ])
                    audio = speech_audio
                    metadata['vad_segments'] = len(segments)
            
            metadata['final_length'] = len(audio)
            metadata['duration_seconds'] = len(audio) / self.sr
            
            return audio, metadata, True
        
        except Exception as e:
            return None, {'error': str(e)}, False
    
    def _load_from_bytes(self, audio_bytes):
        """Load audio from bytes"""
        import io
        audio, _ = librosa.load(io.BytesIO(audio_bytes), sr=self.sr)
        return audio

Real-World Challenges & Solutions

Challenge 1: Codec Artifacts

Problem: Different audio codecs introduce artifacts

def detect_codec_artifacts(audio, sr):
    """
    Detect codec artifacts (e.g., from MP3 compression)
    
    Returns: artifact_score (higher = more artifacts)
    """
    import scipy.signal as signal
    
    # Compute spectrogram
    f, t, Sxx = signal.spectrogram(audio, fs=sr)
    
    # MP3 artifacts often appear as:
    # 1. High-frequency cutoff (lossy codecs)
    cutoff_freq = 16000  # Hz
    high_freq_mask = f > cutoff_freq
    high_freq_energy = np.mean(Sxx[high_freq_mask, :])
    
    # 2. Pre-echo artifacts
    # Sudden changes in energy
    energy = np.sum(Sxx, axis=0)
    energy_diff = np.diff(energy)
    pre_echo_score = np.std(energy_diff)
    
    artifact_score = {
        'high_freq_loss': high_freq_energy,
        'pre_echo': pre_echo_score,
        'overall': 1.0 - high_freq_energy + pre_echo_score
    }
    
    return artifact_score

# Example
audio_mp3, sr = librosa.load('compressed.mp3', sr=16000)
audio_wav, sr = librosa.load('lossless.wav', sr=16000)

artifacts_mp3 = detect_codec_artifacts(audio_mp3, sr)
artifacts_wav = detect_codec_artifacts(audio_wav, sr)

print(f"MP3 artifacts: {artifacts_mp3['overall']:.3f}")
print(f"WAV artifacts: {artifacts_wav['overall']:.3f}")

Challenge 2: Variable Sample Rates

class AdaptiveResampler:
    """
    Handle audio from various sources with different sample rates
    
    Production systems receive audio from:
    - Phone calls: 8 kHz
    - Bluetooth: 16 kHz  
    - Studio mics: 44.1 kHz / 48 kHz
    """
    
    def __init__(self, target_sr=16000):
        self.target_sr = target_sr
        self.cache = {}  # Cache resampling filters
    
    def resample(self, audio, orig_sr):
        """
        Efficiently resample with caching
        """
        if orig_sr == self.target_sr:
            return audio
        
        # Check cache
        cache_key = (orig_sr, self.target_sr)
        if cache_key not in self.cache:
            # Compute resampling filter once
            self.cache[cache_key] = self._compute_filter(orig_sr, self.target_sr)
        
        # Apply cached filter
        return librosa.resample(
            audio,
            orig_sr=orig_sr,
            target_sr=self.target_sr,
            res_type='kaiser_fast'  # Good balance of quality/speed
        )
    
    def _compute_filter(self, orig_sr, target_sr):
        """Compute and cache resampling filter"""
        # In real implementation, would compute filter coefficients
        return None

# Usage
resampler = AdaptiveResampler(target_sr=16000)

# Handle various sources
phone_audio = resampler.resample(phone_audio, orig_sr=8000)
bluetooth_audio = resampler.resample(bluetooth_audio, orig_sr=16000)
studio_audio = resampler.resample(studio_audio, orig_sr=48000)

Challenge 3: Clipping & Distortion

def detect_and_fix_clipping(audio, threshold=0.99):
    """
    Detect clipped samples and attempt interpolation
    
    Args:
        audio: Audio signal
        threshold: Clipping threshold (absolute value)
    
    Returns:
        fixed_audio, was_clipped
    """
    # Detect clipping
    clipped_mask = np.abs(audio) >= threshold
    num_clipped = np.sum(clipped_mask)
    
    if num_clipped == 0:
        return audio, False
    
    print(f"⚠️ Detected {num_clipped} clipped samples ({100*num_clipped/len(audio):.2f}%)")
    
    # Simple interpolation for clipped regions
    fixed_audio = audio.copy()
    
    # Find clipped regions
    clipped_indices = np.where(clipped_mask)[0]
    
    for idx in clipped_indices:
        # Skip edges
        if idx == 0 or idx == len(audio) - 1:
            continue
        
        # Interpolate from neighbors
        if not clipped_mask[idx-1] and not clipped_mask[idx+1]:
            fixed_audio[idx] = (audio[idx-1] + audio[idx+1]) / 2
    
    return fixed_audio, True

# Example
audio_with_clipping, sr = librosa.load('clipped_audio.wav', sr=16000)
fixed_audio, was_clipped = detect_and_fix_clipping(audio_with_clipping)

if was_clipped:
    print("Applied clipping repair")

Challenge 4: Background Babble Noise

def reduce_babble_noise(audio, sr, noise_profile_duration=1.0):
    """
    Reduce background babble (multiple speakers)
    
    More challenging than stationary noise
    """
    import noisereduce as nr
    
    # Estimate noise profile from segments with lowest energy
    frame_length = int(0.1 * sr)  # 100ms frames
    hop_length = frame_length // 2
    
    # Compute frame energy
    energy = librosa.feature.rms(
        y=audio,
        frame_length=frame_length,
        hop_length=hop_length
    )[0]
    
    # Select low-energy frames as noise
    noise_threshold = np.percentile(energy, 20)
    noise_frames = np.where(energy < noise_threshold)[0]
    
    # Extract noise samples
    noise_samples = []
    for frame_idx in noise_frames:
        start = frame_idx * hop_length
        end = start + frame_length
        if end <= len(audio):
            noise_samples.extend(audio[start:end])
    
    noise_profile = np.array(noise_samples)
    
    # Apply noise reduction
    if len(noise_profile) > sr * noise_profile_duration:
        reduced_noise = nr.reduce_noise(
            y=audio,
            y_noise=noise_profile[:int(sr * noise_profile_duration)],
            sr=sr,
            stationary=False,  # Non-stationary noise
            prop_decrease=0.8
        )
        return reduced_noise
    else:
        print("⚠️ Insufficient noise profile, returning original")
        return audio

# Example
audio_with_babble, sr = librosa.load('meeting_audio.wav', sr=16000)
clean_audio = reduce_babble_noise(audio_with_babble, sr)

Audio Quality Metrics

Signal-to-Noise Ratio (SNR)

def calculate_snr(clean_signal, noisy_signal):
    """
    Calculate SNR in dB
    
    Args:
        clean_signal: Ground truth clean signal
        noisy_signal: Signal with noise
    
    Returns:
        SNR in dB
    """
    # Ensure same length
    min_len = min(len(clean_signal), len(noisy_signal))
    clean = clean_signal[:min_len]
    noisy = noisy_signal[:min_len]
    
    # Compute noise
    noise = noisy - clean
    
    # Power
    signal_power = np.mean(clean ** 2)
    noise_power = np.mean(noise ** 2)
    
    # SNR in dB
    if noise_power == 0:
        return float('inf')
    
    snr = 10 * np.log10(signal_power / noise_power)
    
    return snr

# Example
clean, sr = librosa.load('clean_speech.wav', sr=16000)
noisy, sr = librosa.load('noisy_speech.wav', sr=16000)

snr = calculate_snr(clean, noisy)
print(f"SNR: {snr:.2f} dB")

# Typical SNRs:
# > 40 dB: Excellent
# 25-40 dB: Good
# 10-25 dB: Fair
# < 10 dB: Poor

Perceptual Evaluation of Speech Quality (PESQ)

# PESQ is a standard metric for speech quality
# Requires pesq library: pip install pesq

from pesq import pesq

def evaluate_speech_quality(reference_audio, degraded_audio, sr=16000):
    """
    Evaluate speech quality using PESQ
    
    Args:
        reference_audio: Clean reference
        degraded_audio: Processed/degraded audio
        sr: Sample rate (8000 or 16000)
    
    Returns:
        PESQ score (1.0 to 4.5, higher is better)
    """
    # PESQ requires 8kHz or 16kHz
    if sr not in [8000, 16000]:
        raise ValueError("PESQ requires sr=8000 or sr=16000")
    
    # Ensure same length
    min_len = min(len(reference_audio), degraded_audio)
    ref = reference_audio[:min_len]
    deg = degraded_audio[:min_len]
    
    # Compute PESQ
    if sr == 8000:
        mode = 'nb'  # Narrowband
    else:
        mode = 'wb'  # Wideband
    
    score = pesq(sr, ref, deg, mode)
    
    return score

# Example
reference, sr = librosa.load('clean.wav', sr=16000)
processed, sr = librosa.load('processed.wav', sr=16000)

quality_score = evaluate_speech_quality(reference, processed, sr)
print(f"PESQ Score: {quality_score:.2f}")

# PESQ interpretation:
# 4.0+: Excellent
# 3.0-4.0: Good
# 2.0-3.0: Fair
# < 2.0: Poor

Advanced Augmentation Strategies

Room Impulse Response (RIR) Convolution

def apply_room_impulse_response(speech, rir):
    """
    Simulate room acoustics using RIR
    
    Makes model robust to reverberation
    
    Args:
        speech: Clean speech signal
        rir: Room impulse response
    
    Returns:
        Reverberant speech
    """
    from scipy.signal import fftconvolve
    
    # Convolve speech with RIR
    reverb_speech = fftconvolve(speech, rir, mode='same')
    
    # Normalize
    reverb_speech = reverb_speech / (np.max(np.abs(reverb_speech)) + 1e-8)
    
    return reverb_speech

# Example: Generate synthetic RIR
def generate_synthetic_rir(sr=16000, room_size='medium', rt60=0.5):
    """
    Generate synthetic room impulse response
    
    Args:
        sr: Sample rate
        room_size: 'small', 'medium', 'large'
        rt60: Reverberation time (seconds)
    
    Returns:
        RIR signal
    """
    # Duration based on RT60
    duration = int(rt60 * sr)
    
    # Exponential decay
    t = np.arange(duration) / sr
    decay = np.exp(-6.91 * t / rt60)  # -60 dB decay
    
    # Add random reflections
    rir = decay * np.random.randn(duration)
    
    # Initial spike (direct path)
    rir[0] = 1.0
    
    # Normalize
    rir = rir / np.max(np.abs(rir))
    
    return rir

# Usage
clean_speech, sr = librosa.load('speech.wav', sr=16000)

# Simulate different rooms
small_room_rir = generate_synthetic_rir(sr, 'small', rt60=0.3)
large_room_rir = generate_synthetic_rir(sr, 'large', rt60=1.2)

speech_small_room = apply_room_impulse_response(clean_speech, small_room_rir)
speech_large_room = apply_room_impulse_response(clean_speech, large_room_rir)

Codec Simulation

def simulate_codec(audio, sr, codec='mp3', bitrate=32):
    """
    Simulate lossy codec compression
    
    Makes model robust to codec artifacts
    
    Args:
        audio: Clean audio
        sr: Sample rate
        codec: 'mp3', 'aac', 'opus'
        bitrate: Bitrate in kbps
    
    Returns:
        Codec-compressed audio
    """
    import subprocess
    import tempfile
    import os
    import soundfile as sf
    
    # Save to temp file
    with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_in:
        sf.write(tmp_in.name, audio, sr)
        input_path = tmp_in.name
    
    with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as tmp_out:
        output_path = tmp_out.name
    
    try:
        # Compress with ffmpeg
        if codec == 'mp3':
            subprocess.run([
                'ffmpeg', '-i', input_path,
                '-codec:a', 'libmp3lame',
                '-b:a', f'{bitrate}k',
                '-y', output_path
            ], capture_output=True, check=True)
        elif codec == 'opus':
            subprocess.run([
                'ffmpeg', '-i', input_path,
                '-codec:a', 'libopus',
                '-b:a', f'{bitrate}k',
                '-y', output_path
            ], capture_output=True, check=True)
        
        # Load compressed audio
        compressed_audio, _ = librosa.load(output_path, sr=sr)
        
        return compressed_audio
    
    finally:
        # Cleanup
        os.unlink(input_path)
        if os.path.exists(output_path):
            os.unlink(output_path)

# Usage
audio, sr = librosa.load('clean.wav', sr=16000)

# Simulate low-bitrate compression
audio_32kbps = simulate_codec(audio, sr, codec='mp3', bitrate=32)
audio_64kbps = simulate_codec(audio, sr, codec='mp3', bitrate=64)

Dynamic Range Compression

def dynamic_range_compression(audio, threshold=-20, ratio=4, attack=0.005, release=0.1, sr=16000):
    """
    Apply dynamic range compression (like audio compressors)
    
    Reduces loudness variation, simulating broadcast audio
    
    Args:
        audio: Input audio
        threshold: Threshold in dB
        ratio: Compression ratio (4:1 means 4dB input → 1dB output above threshold)
        attack: Attack time in seconds
        release: Release time in seconds
        sr: Sample rate
    
    Returns:
        Compressed audio
    """
    # Convert to dB
    audio_db = 20 * np.log10(np.abs(audio) + 1e-8)
    
    # Compute gain reduction
    gain_db = np.zeros_like(audio_db)
    
    for i in range(len(audio_db)):
        if audio_db[i] > threshold:
            # Above threshold: apply compression
            excess_db = audio_db[i] - threshold
            gain_db[i] = -excess_db * (1 - 1/ratio)
        else:
            gain_db[i] = 0
    
    # Smooth gain reduction (attack/release)
    attack_samples = int(attack * sr)
    release_samples = int(release * sr)
    
    smoothed_gain = np.zeros_like(gain_db)
    for i in range(1, len(gain_db)):
        if gain_db[i] < smoothed_gain[i-1]:
            # Attack
            alpha = 1 - np.exp(-1 / attack_samples)
        else:
            # Release
            alpha = 1 - np.exp(-1 / release_samples)
        
        smoothed_gain[i] = alpha * gain_db[i] + (1 - alpha) * smoothed_gain[i-1]
    
    # Apply gain
    gain_linear = 10 ** (smoothed_gain / 20)
    compressed = audio * gain_linear
    
    return compressed

# Example
audio, sr = librosa.load('speech.wav', sr=16000)
compressed = dynamic_range_compression(audio, threshold=-20, ratio=4, sr=sr)

End-to-End Preprocessing Pipeline

class ProductionAudioPipeline:
    """
    Complete production-ready preprocessing pipeline
    
    Handles all edge cases and monitors quality
    """
    
    def __init__(self, config):
        self.target_sr = config.get('sample_rate', 16000)
        self.target_duration = config.get('target_duration', None)
        self.enable_noise_reduction = config.get('noise_reduction', True)
        self.enable_vad = config.get('vad', True)
        self.augmentation_enabled = config.get('augmentation', False)
        
        self.stats = {
            'processed': 0,
            'failed': 0,
            'clipped': 0,
            'too_short': 0,
            'avg_snr': []
        }
    
    def process(self, audio_path):
        """
        Process single audio file
        
        Returns: (processed_audio, metadata, success)
        """
        metadata = {'original_path': audio_path}
        
        try:
            # 1. Load
            audio, orig_sr = librosa.load(audio_path, sr=None)
            metadata['original_sr'] = orig_sr
            metadata['original_duration'] = len(audio) / orig_sr
            
            # 2. Detect issues
            clipped = np.max(np.abs(audio)) >= 0.99
            if clipped:
                audio, _ = detect_and_fix_clipping(audio)
                self.stats['clipped'] += 1
                metadata['had_clipping'] = True
            
            # 3. Resample
            if orig_sr != self.target_sr:
                audio = resample_audio(audio, orig_sr, self.target_sr)
                metadata['resampled'] = True
            
            # 4. Normalize
            audio = normalize_audio(audio, target_level=-20.0)
            metadata['normalized'] = True
            
            # 5. Noise reduction
            if self.enable_noise_reduction:
                audio = high_pass_filter(audio, self.target_sr, cutoff_freq=80)
                metadata['noise_reduction'] = True
            
            # 6. Voice Activity Detection
            if self.enable_vad:
                segments = voice_activity_detection(audio, self.target_sr)
                if segments:
                    speech_audio = np.concatenate([
                        audio[start:end] for start, end in segments
                    ])
                    audio = speech_audio
                    metadata['vad_segments'] = len(segments)
                else:
                    # No speech detected
                    return None, {'error': 'No speech detected'}, False
            
            # 7. Duration handling
            current_duration = len(audio) / self.target_sr
            
            if self.target_duration:
                target_samples = int(self.target_duration * self.target_sr)
                
                if len(audio) < target_samples:
                    # Pad
                    audio = np.pad(audio, (0, target_samples - len(audio)), mode='constant')
                    metadata['padded'] = True
                elif len(audio) > target_samples:
                    # Trim
                    audio = audio[:target_samples]
                    metadata['trimmed'] = True
            
            # 8. Quality checks
            if len(audio) < 0.5 * self.target_sr:  # Less than 0.5 seconds
                self.stats['too_short'] += 1
                return None, {'error': 'Too short after VAD'}, False
            
            # 9. Augmentation (training only)
            if self.augmentation_enabled:
                audio = self._augment(audio)
                metadata['augmented'] = True
            
            # 10. Final normalization
            audio = audio / (np.max(np.abs(audio)) + 1e-8) * 0.95
            
            metadata['final_duration'] = len(audio) / self.target_sr
            metadata['final_samples'] = len(audio)
            
            self.stats['processed'] += 1
            
            return audio, metadata, True
        
        except Exception as e:
            self.stats['failed'] += 1
            return None, {'error': str(e)}, False
    
    def _augment(self, audio):
        """Apply random augmentation"""
        import random
        
        aug_type = random.choice(['noise', 'pitch', 'speed', 'none'])
        
        if aug_type == 'noise':
            audio = add_noise(audio, noise_factor=random.uniform(0.001, 0.01))
        elif aug_type == 'pitch':
            steps = random.choice([-2, -1, 1, 2])
            audio = pitch_shift(audio, self.target_sr, n_steps=steps)
        elif aug_type == 'speed':
            rate = random.uniform(0.9, 1.1)
            audio = time_stretch(audio, rate=rate)
        
        return audio
    
    def get_stats(self):
        """Get processing statistics"""
        return self.stats

# Usage
config = {
    'sample_rate': 16000,
    'target_duration': 3.0,
    'noise_reduction': True,
    'vad': True,
    'augmentation': False  # True for training
}

pipeline = ProductionAudioPipeline(config)

# Process single file
audio, metadata, success = pipeline.process('input.wav')

if success:
    print(f"✓ Processed successfully")
    print(f"Duration: {metadata['final_duration']:.2f}s")
    # Save
    sf.write('output.wav', audio, pipeline.target_sr)
else:
    print(f"✗ Failed: {metadata.get('error')}")

# Process batch
for audio_file in audio_files:
    audio, metadata, success = pipeline.process(audio_file)
    if success:
        save_processed(audio, metadata)

# Get statistics
stats = pipeline.get_stats()
print(f"Processed: {stats['processed']}")
print(f"Failed: {stats['failed']}")
print(f"Clipped: {stats['clipped']}")

Key Takeaways

Clean audio is critical - Preprocessing can make/break model performance
Standardize formats - Consistent sample rate, bit depth, mono/stereo
Remove noise - Spectral subtraction, filtering reduce artifacts
VAD improves efficiency - Remove silence saves compute
Augmentation boosts robustness - Time stretch, pitch shift, noise mixing
Like feature engineering - Transform raw data into useful representations
Pipeline thinking - Chain transformations like tree traversal


Originally published at: arunbaby.com/speech-tech/0007-audio-preprocessing

If you found this helpful, consider sharing it with others who might benefit.