23 minute read

Feature engineering makes or breaks ML models, learn how to build scalable, production-ready feature pipelines that power real-world systems.

TL;DR

Feature engineering is often more impactful than model choice. This post covers numerical, categorical, text, temporal, and aggregation features, along with advanced techniques like interaction features, embeddings, and feature crosses. It also addresses feature store architecture for training-serving consistency, feature selection methods (filter, wrapper, embedded), automated feature generation with Featuretools, production monitoring for drift, and large-scale processing with Spark. For the broader ML system design context, feature pipelines feed every downstream model. See also how data preprocessing handles the cleaning stage before feature engineering begins.

An industrial distillation column with multiple collection ports at different heights

Introduction

Feature engineering is the process of transforming raw data into features that better represent the underlying problem to ML models.

Why it matters:

  • Makes models better: Good features > complex models with bad features
  • Domain knowledge encoding: Capture expert insights in features
  • Data quality: Garbage in = garbage out
  • Production complexity: 80% of ML engineering time is data/feature work

Stat: Andrew Ng says “Applied ML is basically feature engineering”


Feature Engineering Pipeline Architecture

High-Level Architecture

┌──────────────┐
│ Raw Data │ (Logs, DB, Streams)
└──────┬───────┘
 │
 ▼
┌──────────────────────────────┐
│ Feature Engineering Layer │
│ ┌─────────┐ ┌─────────┐ │
│ │Transform│ │ Compute │ │
│ │ Logic │ │ Engines │ │
│ └─────────┘ └─────────┘ │
└──────────┬───────────────────┘
 │
 ▼
┌──────────────────────────────┐
│ Feature Store │
│ ┌────────┐ ┌──────────┐ │
│ │ Online │ │ Offline │ │
│ │Features│ │ Features │ │
│ │(low ms)│ │ (batch) │ │
│ └────────┘ └──────────┘ │
└──────────┬───────────────────┘
 │
 ▼
┌──────────────────────────────┐
│ ML Models │
│ ┌─────────┐ ┌──────────┐ │
│ │Training │ │ Serving │ │
│ └─────────┘ └──────────┘ │
└──────────────────────────────┘

Types of Features

1. Numerical Features

Raw numerical values

import pandas as pd
import numpy as np

# Example dataset
df = pd.DataFrame({
'age': [25, 30, 35, 40],
'income': [50000, 75000, 100000, 125000],
'num_purchases': [5, 12, 20, 15]
})

# Common transformations
df['age_squared'] = df['age'] ** 2
df['log_income'] = np.log(df['income'])
df['income_per_purchase'] = df['income'] / (df['num_purchases'] + 1) # +1 to avoid division by zero

2. Categorical Features

Discrete values that represent categories

One-Hot Encoding

# Simple one-hot encoding
df_categorical = pd.DataFrame({
'city': ['NYC', 'SF', 'LA', 'NYC', 'SF'],
'device': ['mobile', 'desktop', 'mobile', 'tablet', 'desktop']
})

# One-hot encode
df_encoded = pd.get_dummies(df_categorical, columns=['city', 'device'])
print(df_encoded)
# city_LA city_NYC city_SF device_desktop device_mobile device_tablet
# 0 0 1 0 0 1 0
# 1 0 0 1 1 0 0
# ...

Label Encoding (for ordinal features)

from sklearn.preprocessing import LabelEncoder

df = pd.DataFrame({
'size': ['small', 'medium', 'large', 'small', 'large']
})

le = LabelEncoder()
df['size_encoded'] = le.fit_transform(df['size'])
# small→0, medium→1, large→2

Target Encoding (Mean Encoding)

def target_encode(df, column, target):
    """
    Replace category with mean of target variable

    Good for high-cardinality categoricals
    """
    means = df.groupby(column)[target].mean()
    return df[column].map(means)

    # Example
    df = pd.DataFrame({
    'city': ['NYC', 'SF', 'LA', 'NYC', 'SF', 'LA'],
    'conversion': [1, 0, 1, 1, 0, 0]
    })

    df['city_encoded'] = target_encode(df, 'city', 'conversion')
    # NYC → 1.0 (2/2), SF → 0.0 (0/2), LA → 0.5 (1/2)

3. Text Features

Transform text into numerical representations

TF-IDF

from sklearn.feature_extraction.text import TfidfVectorizer

documents = [
"machine learning is awesome",
"deep learning is a subset of machine learning",
"natural language processing is fun"
]

vectorizer = TfidfVectorizer(max_features=10)
tfidf_matrix = vectorizer.fit_transform(documents)

print(f"Shape: {tfidf_matrix.shape}")
print(f"Features: {vectorizer.get_feature_names_out()}")

Word Embeddings

# Using pre-trained embeddings (e.g., Word2Vec, GloVe)
import gensim.downloader as api

# Load pre-trained model
word_vectors = api.load("glove-wiki-gigaword-100")

def text_to_embedding(text, word_vectors):
    """
    Average word vectors for text embedding
    """
    words = text.lower().split()
    vectors = [word_vectors[word] for word in words if word in word_vectors]

    if not vectors:
        return np.zeros(100)

        return np.mean(vectors, axis=0)

        # Example
        text = "machine learning"
        embedding = text_to_embedding(text, word_vectors)
        print(f"Embedding shape: {embedding.shape}") # (100,)

4. Time-Based Features

Extract temporal patterns

import pandas as pd

df = pd.DataFrame({
'timestamp': pd.date_range('2024-01-01', periods=100, freq='H')
})

# Extract time features
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['day_of_month'] = df['timestamp'].dt.day
df['month'] = df['timestamp'].dt.month
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
df['is_holiday'] = df['timestamp'].isin(holiday_dates).astype(int)

# Cyclical encoding (hour wraps around)
df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)

Cyclical encoding visualization:

Hour encoding (linear):
0 ─ 6 ─ 12 ─ 18 ─ 24
 │
 └─> Problem: 0 and 24 are far apart numerically!

Hour encoding (cyclical):
 0/24
 │
 21──┼──3
 │ │ │
18 │ 6
 │ │ │
 15──┼──9
 12

Using sin/cos captures cyclical nature:
hour_sin = sin(2π × hour / 24)
hour_cos = cos(2π × hour / 24)

5. Aggregation Features

Statistics over groups

# Example: user behavior features
user_sessions = pd.DataFrame({
'user_id': [1, 1, 1, 2, 2, 3],
'session_duration': [120, 300, 180, 450, 200, 350],
'pages_viewed': [5, 12, 8, 20, 10, 15],
'timestamp': pd.date_range('2024-01-01', periods=6, freq='D')
})

# Aggregate by user
user_features = user_sessions.groupby('user_id').agg({
'session_duration': ['mean', 'std', 'min', 'max', 'sum'],
'pages_viewed': ['mean', 'sum', 'count'],
'timestamp': ['min', 'max'] # First/last session
}).reset_index()

# Flatten column names
user_features.columns = ['_'.join(col).strip('_') for col in user_features.columns.values]

# Time-windowed aggregations
user_sessions['date'] = user_sessions['timestamp'].dt.date

# Last 7 days features
last_7_days = user_sessions[
user_sessions['timestamp'] >= (user_sessions['timestamp'].max() - pd.Timedelta(days=7))
]

user_features_7d = last_7_days.groupby('user_id').agg({
'session_duration': 'mean',
'pages_viewed': 'sum'
}).add_suffix('_7d')

Advanced Feature Engineering Techniques

1. Interaction Features

Capture relationships between features

from sklearn.preprocessing import PolynomialFeatures

# Simple example
df = pd.DataFrame({
'feature_a': [1, 2, 3],
'feature_b': [4, 5, 6]
})

# Polynomial features (includes interactions)
poly = PolynomialFeatures(degree=2, include_bias=False)
poly_features = poly.fit_transform(df[['feature_a', 'feature_b']])

# Creates: [a, b, a², ab, b²]
print(poly.get_feature_names_out())
# ['feature_a', 'feature_b', 'feature_a^2', 'feature_a feature_b', 'feature_b^2']

# Manual domain-specific interactions
df['price_per_sqft'] = df['price'] / df['sqft']
df['bedrooms_bathrooms_ratio'] = df['bedrooms'] / (df['bathrooms'] + 1)

2. Binning/Discretization

Convert continuous to categorical

# Equal-width binning
df['age_bin'] = pd.cut(df['age'], bins=[0, 18, 35, 50, 100],
labels=['child', 'young_adult', 'adult', 'senior'])

# Equal-frequency binning (quantiles)
df['income_quartile'] = pd.qcut(df['income'], q=4, labels=['Q1', 'Q2', 'Q3', 'Q4'])

# Custom bins based on domain knowledge
def categorize_temperature(temp):
    if temp < 32:
        return 'freezing'
    elif temp < 60:
        return 'cold'
    elif temp < 80:
        return 'mild'
    else:
        return 'hot'

        df['temp_category'] = df['temperature'].apply(categorize_temperature)

3. Feature Crosses

Combine multiple categorical features

# Simple feature cross
df['city_device'] = df['city'] + '_' + df['device']
# Creates: 'NYC_mobile', 'SF_desktop', etc.

# Multiple feature crosses
df['city_device_hour'] = df['city'] + '_' + df['device'] + '_' + df['hour_bin']

# Then one-hot encode the crosses
df_crossed = pd.get_dummies(df['city_device'], prefix='city_device')

4. Embedding Features

Learn dense representations

import tensorflow as tf

def create_embedding_layer(vocab_size, embedding_dim):
    """
    Create embedding layer for categorical feature

    Useful for high-cardinality categoricals (e.g., user_id, item_id)
    """
    return tf.keras.layers.Embedding(
    input_dim=vocab_size,
    output_dim=embedding_dim,
    embeddings_regularizer=tf.keras.regularizers.l2(1e-6)
    )

    # Example: User embeddings
    num_users = 10000
    user_embedding_dim = 32

    user_input = tf.keras.layers.Input(shape=(1,), name='user_id')
    user_embedding = create_embedding_layer(num_users, user_embedding_dim)(user_input)
    user_vec = tf.keras.layers.Flatten()(user_embedding)

Feature Store Architecture

Problem: Features computed differently in training vs serving → prediction skew

Solution: Centralized feature store with unified computation

Feature Store Components

from dataclasses import dataclass
from typing import Callable, List
import numpy as np
import pandas as pd

@dataclass
class Feature:
    """Feature definition"""
    name: str
    transform_fn: Callable
    dependencies: List[str]
    batch_source: str # Where to get data for batch computation
    stream_source: str # Where to get data for real-time

    class FeatureStore:
        """
        Simplified feature store

        Real systems: Feast, Tecton, AWS SageMaker Feature Store
        """

    def __init__(self):
        self.features = {}
        self.offline_store = {} # Batch features (historical)
        self.online_store = {} # Real-time features (low latency)

    def register_feature(self, feature: Feature):
        """Register feature definition"""
        self.features[feature.name] = feature

    def compute_batch_features(self, entity_ids: List[str], features: List[str]):
        """
        Compute features for training (batch)

        Returns: DataFrame with features
        """
        result = pd.DataFrame({'entity_id': entity_ids})

        for feature_name in features:
            feature = self.features[feature_name]

            # Load batch data
            data = self._load_batch_data(feature.batch_source, entity_id=None)

            # Compute feature
            result[feature_name] = feature.transform_fn(data)

            return result

    def get_online_features(self, entity_id: str, features: List[str]):
        """
        Get features for serving (real-time)

        Returns: Dict of feature values
        """
        result = {}

        for feature_name in features:
            # Check online store
            key = f"{entity_id}:{feature_name}"
            if key in self.online_store:
                result[feature_name] = self.online_store[key]
            else:
                # Compute on-the-fly (fallback)
                feature = self.features[feature_name]
                data = self._load_stream_data(feature.stream_source, entity_id)
                result[feature_name] = feature.transform_fn(data)

                return result

    def materialize_features(self, features: List[str]):
        """
        Pre-compute features and store in online store

        Batch job that runs periodically
        """
        for feature_name in features:
            feature = self.features[feature_name]

            # Compute for all entities
            all_entities = self._get_all_entities()

            for entity_id in all_entities:
                data = self._load_batch_data(feature.batch_source, entity_id)
                value = feature.transform_fn(data)

                # Store in online store
                key = f"{entity_id}:{feature_name}"
                self.online_store[key] = value

    def _load_batch_data(self, source, entity_id=None):
        # Load from data warehouse (e.g., BigQuery, Snowflake)
        pass

    def _load_stream_data(self, source, entity_id):
        # Load from stream (e.g., Kafka, Kinesis)
        pass

    def _get_all_entities(self):
        # Get all entity IDs
        pass

        # Example usage
        feature_store = FeatureStore()

        # Register features
        feature_store.register_feature(Feature(
        name='user_avg_purchase_amount_30d',
        transform_fn=lambda data: data['purchase_amount'].mean(),
        dependencies=['purchase_amount'],
        batch_source='dwh.purchases',
        stream_source='kafka.purchases'
        ))

        # Training: Get batch features
        training_features = feature_store.compute_batch_features(
        entity_ids=['user_1', 'user_2'],
        features=['user_avg_purchase_amount_30d']
        )

        # Serving: Get online features (< 10ms)
        serving_features = feature_store.get_online_features(
        entity_id='user_1',
        features=['user_avg_purchase_amount_30d']
        )

Feature Store Benefits

Training-Serving Consistency:

Without Feature Store:
 Training: Compute features in Python/Spark
 Serving: Reimplement in Java/Go
 Result: Different implementations → prediction skew!

With Feature Store:
 Training: feature_store.get_offline_features()
 Serving: feature_store.get_online_features()
 Result: Same computation logic → consistent!

Feature Engineering for Tree Traversal

Connecting to DSA(tree traversal):

Hierarchical Features

class CategoryTree:
    """
    Category hierarchy (like tree traversal)

    Example:
        Electronics
        / \
        Computers Phones
        / \ |
        Laptops Desktops Smartphones
        """

    def __init__(self):
        self.tree = {
        'Electronics': {
        'Computers': {
        'Laptops': {},
        'Desktops': {}
        },
        'Phones': {
        'Smartphones': {}
        }
        }
        }

    def get_category_path(self, category: str) -> List[str]:
        """
        Get path from root to category

        Uses DFS (similar to tree traversal)
        """
    def dfs(node, target, path):
        if node == target:
            return path + [node]

            if isinstance(node, dict):
                for child, subtree in node.items():
                    result = dfs(subtree, target, path + [child])
                    if result:
                        return result

                        return None

                        for root, subtree in self.tree.items():
                            path = dfs(subtree, category, [root])
                            if path:
                                return path

                                return []

    def category_level_features(self, category: str):
        """
        Create features from category hierarchy

        level_1: Electronics
        level_2: Computers
        level_3: Laptops
        """
        path = self.get_category_path(category)

        features = {}
        for i, cat in enumerate(path):
            features[f'category_level_{i+1}'] = cat

            return features

            # Example
            cat_tree = CategoryTree()
            features = cat_tree.category_level_features('Laptops')
            print(features)
            # {'category_level_1': 'Electronics',
            # 'category_level_2': 'Computers',
            # 'category_level_3': 'Laptops'}

Connection to Speech Processing

Feature engineering is critical in speech ML:

Audio Feature Extraction Pipeline

class AudioFeatureExtractor:
    """
    Extract features from audio (similar to general feature engineering)
    """

    def extract_spectral_features(self, audio):
        """
        Extract spectral features

        Similar to numerical feature engineering
        """
        import librosa

        # Mel-frequency cepstral coefficients
        mfccs = librosa.feature.mfcc(y=audio, sr=22050, n_mfcc=13)

        # Spectral features
        spectral_centroid = librosa.feature.spectral_centroid(y=audio, sr=22050)
        spectral_rolloff = librosa.feature.spectral_rolloff(y=audio, sr=22050)

        # Aggregate over time (similar to aggregation features)
        features = {
        'mfcc_mean': np.mean(mfccs, axis=1),
        'mfcc_std': np.std(mfccs, axis=1),
        'spectral_centroid_mean': np.mean(spectral_centroid),
        'spectral_rolloff_mean': np.mean(spectral_rolloff)
        }

        return features

    def extract_prosodic_features(self, audio):
        """
        Extract prosody features (pitch, energy, duration)

        Domain-specific feature engineering
        """
        import librosa

        # Pitch (F0)
        f0, voiced_flag, voiced_probs = librosa.pyin(
        audio,
        fmin=librosa.note_to_hz('C2'),
        fmax=librosa.note_to_hz('C7')
        )

        # Energy
        energy = librosa.feature.rms(y=audio)

        # Duration features
        zero_crossings = librosa.feature.zero_crossing_rate(audio)

        features = {
        'pitch_mean': np.nanmean(f0),
        'pitch_std': np.nanstd(f0),
        'pitch_range': np.nanmax(f0) - np.nanmin(f0),
        'energy_mean': np.mean(energy),
        'energy_std': np.std(energy),
        'zcr_mean': np.mean(zero_crossings)
        }

        return features

Production Best Practices

1. Feature Versioning

class VersionedFeature:
    """Track feature versions"""

    def __init__(self, name, version, transform_fn):
        self.name = name
        self.version = version
        self.transform_fn = transform_fn
        self.created_at = datetime.now()

    def get_full_name(self):
        return f"{self.name}_v{self.version}"

        # Example
        user_age_v1 = VersionedFeature(
        name='user_age',
        version=1,
        transform_fn=lambda df: df['birth_year'].apply(lambda x: 2024 - x)
        )

        user_age_v2 = VersionedFeature(
        name='user_age',
        version=2,
        transform_fn=lambda df: (datetime.now().year - df['birth_year']).clip(0, 120)
        )

        # Models can specify feature version
        model_features = {
        'user_age_v2', # Use version 2
        'income_v1'
        }

2. Feature Monitoring

class FeatureMonitor:
    """Monitor feature distributions"""

    def __init__(self):
        self.baseline_stats = {}

    def compute_stats(self, feature_name, values):
        """Compute feature statistics"""
        return {
        'mean': np.mean(values),
        'std': np.std(values),
        'min': np.min(values),
        'max': np.max(values),
        'nulls': np.isnan(values).sum(),
        'unique_count': len(np.unique(values))
        }

    def set_baseline(self, feature_name, values):
        """Set baseline statistics"""
        self.baseline_stats[feature_name] = self.compute_stats(feature_name, values)

    def check_drift(self, feature_name, values, threshold=0.1):
        """
        Check if feature distribution has drifted

        Returns: (has_drifted, drift_metrics)
        """
        if feature_name not in self.baseline_stats:
            return False, {}

            current_stats = self.compute_stats(feature_name, values)
            baseline_stats = self.baseline_stats[feature_name]

            # Check mean drift
            mean_drift = abs(current_stats['mean'] - baseline_stats['mean']) / (baseline_stats['std'] + 1e-8)

            # Check std drift
            std_ratio = current_stats['std'] / (baseline_stats['std'] + 1e-8)

            drift_metrics = {
            'mean_drift': mean_drift,
            'std_ratio': std_ratio,
            'null_rate_change': current_stats['nulls'] / len(values) - baseline_stats['nulls'] / len(values)
            }

            has_drifted = mean_drift > threshold or std_ratio < 0.5 or std_ratio > 2.0

            return has_drifted, drift_metrics

            # Usage
            monitor = FeatureMonitor()

            # Set baseline during training
            monitor.set_baseline('user_age', training_df['user_age'].values)

            # Check for drift in production
            has_drifted, metrics = monitor.check_drift('user_age', production_df['user_age'].values)
            if has_drifted:
                print(f"⚠️ Feature drift detected: {metrics}")

3. Feature Documentation

@dataclass
class FeatureDocumentation:
    """Document features for team collaboration"""
    name: str
    description: str
    owner: str
    creation_date: str
    dependencies: List[str]
    update_frequency: str # 'realtime', 'hourly', 'daily'
    sla_ms: int # SLA for feature computation
    example_values: List

    def to_markdown(self):
        """Generate markdown documentation"""
        return f"""
        # Feature: {self.name}

        **Description:** {self.description}

        **Owner:** {self.owner}

        **Created:** {self.creation_date}

        **Update Frequency:** {self.update_frequency}

        **SLA:** {self.sla_ms}ms

        **Dependencies:** {', '.join(self.dependencies)}

        **Example Values:** {self.example_values[:5]}
        """

        # Example
        feature_doc = FeatureDocumentation(
        name='user_purchase_frequency_30d',
        description='Number of purchases by user in last 30 days',
        owner='ml-team@company.com',
        creation_date='2024-01-15',
        dependencies=['purchase_events'],
        update_frequency='hourly',
        sla_ms=100,
        example_values=[0, 2, 5, 1, 3, 0, 7]
        )

        print(feature_doc.to_markdown())

Feature Selection Techniques

Problem: Too many features can lead to:

  • Overfitting
  • Increased computation
  • Reduced interpretability

1. Filter Methods

from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from sklearn.datasets import make_classification
import pandas as pd

# Generate sample data
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10, random_state=42)

# ANOVA F-test
selector_f = SelectKBest(f_classif, k=10)
X_selected_f = selector_f.fit_transform(X, y)

# Get selected feature indices
selected_features_f = selector_f.get_support(indices=True)
print(f"Selected features (F-test): {selected_features_f}")

# Mutual Information
selector_mi = SelectKBest(mutual_info_classif, k=10)
X_selected_mi = selector_mi.fit_transform(X, y)

print(f"Original features: {X.shape[1]}")
print(f"Selected features: {X_selected_f.shape[1]}")

2. Wrapper Methods (Forward/Backward Selection)

from sklearn.feature_selection import SequentialFeatureSelector
from sklearn.ensemble import RandomForestClassifier

# Forward selection
sfs = SequentialFeatureSelector(
RandomForestClassifier(n_estimators=100),
n_features_to_select=10,
direction='forward',
cv=5
)

sfs.fit(X, y)
selected_features = sfs.get_support(indices=True)
print(f"Forward selection features: {selected_features}")

3. Embedded Methods (L1 Regularization)

from sklearn.linear_model import LassoCV
import numpy as np

# Lasso for feature selection
lasso = LassoCV(cv=5, random_state=42)
lasso.fit(X, y)

# Features with non-zero coefficients
importance = np.abs(lasso.coef_)
selected_features = np.where(importance > 0.01)[0]

print(f"Lasso selected {len(selected_features)} features")
print(f"Feature importance: {importance}")

4. Feature Importance from Models

from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt

# Train model
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X, y)

# Get feature importance
importance = rf.feature_importances_
indices = np.argsort(importance)[::-1]

# Plot
plt.figure(figsize=(10, 6))
plt.title('Feature Importance')
plt.bar(range(X.shape[1]), importance[indices])
plt.xlabel('Feature Index')
plt.ylabel('Importance')
plt.show()

# Select top k features
k = 10
top_features = indices[:k]
print(f"Top {k} features: {top_features}")

Automated Feature Engineering

AutoFeat with Featuretools

# Featuretools for automated feature engineering
import featuretools as ft
import pandas as pd

# Example: E-commerce transactions
customers = pd.DataFrame({
'customer_id': [1, 2, 3],
'age': [25, 35, 45],
'city': ['NYC', 'SF', 'LA']
})

transactions = pd.DataFrame({
'transaction_id': [1, 2, 3, 4, 5],
'customer_id': [1, 1, 2, 2, 3],
'amount': [100, 150, 200, 50, 300],
'timestamp': pd.date_range('2024-01-01', periods=5, freq='D')
})

# Create entity set
es = ft.EntitySet(id='customer_transactions')

# Add entities
es = es.add_dataframe(
dataframe_name='customers',
dataframe=customers,
index='customer_id'
)

es = es.add_dataframe(
dataframe_name='transactions',
dataframe=transactions,
index='transaction_id',
time_index='timestamp'
)

# Add relationship
es = es.add_relationship('customers', 'customer_id', 'transactions', 'customer_id')

# Generate features automatically
feature_matrix, feature_defs = ft.dfs(
entityset=es,
target_dataframe_name='customers',
max_depth=2,
verbose=True
)

print(f"Generated {len(feature_defs)} features automatically")
print(feature_matrix.head())

# Features like:
# - SUM(transactions.amount)
# - MEAN(transactions.amount)
# - COUNT(transactions)
# - MAX(transactions.timestamp)

Custom Feature Generation

class AutoFeatureGenerator:
    """
    Automatically generate mathematical transformations
    """

    def __init__(self, operations=['square', 'sqrt', 'log', 'reciprocal']):
        self.operations = operations

    def generate(self, df, numerical_columns):
        """
        Generate features by applying operations

        Args:
            df: DataFrame
            numerical_columns: Columns to transform

            Returns:
                DataFrame with original + generated features
                """
                result = df.copy()

                for col in numerical_columns:
                    if 'square' in self.operations:
                        result[f'{col}_squared'] = df[col] ** 2

                        if 'sqrt' in self.operations:
                            # Only for non-negative
                            if (df[col] >= 0).all():
                                result[f'{col}_sqrt'] = np.sqrt(df[col])

                                if 'log' in self.operations:
                                    # Only for positive
                                    if (df[col] > 0).all():
                                        result[f'{col}_log'] = np.log(df[col])

                                        if 'reciprocal' in self.operations:
                                            # Avoid division by zero
                                            result[f'{col}_reciprocal'] = 1 / (df[col] + 1e-8)

                                            # Generate interactions
                                            for i, col1 in enumerate(numerical_columns):
                                                for col2 in numerical_columns[i+1:]:
                                                    result[f'{col1}_times_{col2}'] = df[col1] * df[col2]
                                                    result[f'{col1}_div_{col2}'] = df[col1] / (df[col2] + 1e-8)

                                                    return result

                                                    # Usage
                                                    df = pd.DataFrame({
                                                    'feature_a': [1, 2, 3, 4, 5],
                                                    'feature_b': [10, 20, 30, 40, 50]
                                                    })

                                                    generator = AutoFeatureGenerator()
                                                    df_with_features = generator.generate(df, ['feature_a', 'feature_b'])

                                                    print(f"Original features: {df.shape[1]}")
                                                    print(f"After generation: {df_with_features.shape[1]}")
                                                    print(df_with_features.columns.tolist())

Real-World Case Studies

Case Study 1: Netflix Recommendation Features

class NetflixFeatureEngine:
    """
    Feature engineering for content recommendation

    Based on public Netflix research papers
    """

    def engineer_user_features(self, user_history):
        """
        User behavioral features

        Args:
            user_history: DataFrame with user viewing history

            Returns:
                User features
                """
                features = {}

                # Viewing patterns
                features['total_watch_time'] = user_history['watch_duration'].sum()
                features['avg_watch_time'] = user_history['watch_duration'].mean()
                features['num_titles_watched'] = user_history['title_id'].nunique()

                # Time-based patterns
                user_history['hour'] = pd.to_datetime(user_history['timestamp']).dt.hour
                features['favorite_hour'] = user_history.groupby('hour').size().idxmax()
                features['weekend_ratio'] = (user_history['is_weekend'].sum() / len(user_history))

                # Genre preferences
                genre_counts = user_history['genre'].value_counts()
                features['favorite_genre'] = genre_counts.index[0] if len(genre_counts) > 0 else 'unknown'
                features['genre_diversity'] = user_history['genre'].nunique()

                # Completion rate
                features['completion_rate'] = (
                user_history['watch_duration'] / user_history['total_duration']
                ).mean()

                # Binge-watching behavior
                features['avg_sessions_per_day'] = user_history.groupby(
                user_history['timestamp'].dt.date
                ).size().mean()

                # Recency features
                last_watch = user_history['timestamp'].max()
                features['days_since_last_watch'] = (pd.Timestamp.now() - last_watch).days

                return features

    def engineer_content_features(self, content_metadata, user_interactions):
        """
        Content-based features

        Combine metadata + user engagement
        """
        features = {}

        # Popularity features
        features['view_count'] = len(user_interactions)
        features['unique_viewers'] = user_interactions['user_id'].nunique()
        features['avg_rating'] = user_interactions['rating'].mean()

        # Engagement features
        features['avg_completion_rate'] = (
        user_interactions['watch_duration'] / content_metadata['duration']
        ).mean()

        # Temporal features
        features['days_since_release'] = (
        pd.Timestamp.now() - pd.to_datetime(content_metadata['release_date'])
        ).days

        # Freshness score (decaying popularity)
        features['freshness_score'] = (
        features['view_count'] / (1 + np.log(1 + features['days_since_release']))
        )

        return features

Case Study 2: Uber Demand Prediction Features

class UberDemandFeatures:
    """
    Feature engineering for ride demand prediction

    Inspired by Uber's blog posts on ML
    """

    def engineer_spatial_features(self, location_data):
        """
        Spatial features for demand prediction
        """
        features = {}

        # Grid-based features
        features['grid_id'] = self.lat_lon_to_grid(
        location_data['lat'],
        location_data['lon']
        )

        # Distance to key locations
        features['dist_to_airport'] = self.haversine_distance(
        location_data['lat'], location_data['lon'],
        airport_lat, airport_lon
        )

        features['dist_to_downtown'] = self.haversine_distance(
        location_data['lat'], location_data['lon'],
        downtown_lat, downtown_lon
        )

        # Neighborhood features
        features['is_business_district'] = self.check_business_district(
        location_data['lat'], location_data['lon']
        )

        return features

    def engineer_temporal_features(self, timestamp):
        """
        Time-based features for demand
        """
        ts = pd.to_datetime(timestamp)

        features = {}

        # Basic time features
        features['hour'] = ts.hour
        features['day_of_week'] = ts.dayofweek
        features['is_weekend'] = ts.dayofweek in [5, 6]

        # Peak hours
        features['is_morning_rush'] = (7 <= ts.hour <= 9)
        features['is_evening_rush'] = (17 <= ts.hour <= 19)
        features['is_late_night'] = (23 <= ts.hour or ts.hour <= 5)

        # Special events
        features['is_holiday'] = self.check_holiday(ts)
        features['is_major_event_day'] = self.check_events(ts, location)

        # Weather features (if available)
        features['is_raining'] = self.get_weather(ts, 'rain')
        features['temperature'] = self.get_weather(ts, 'temp')

        return features

    def engineer_historical_features(self, location, timestamp, lookback_days=7):
        """
        Historical demand features
        """
        features = {}

        # Same hour, previous days
        for days_ago in [1, 7, 14]:
            past_timestamp = timestamp - pd.Timedelta(days=days_ago)
            features[f'demand_{days_ago}d_ago'] = self.get_historical_demand(
            location, past_timestamp
            )

            # Moving averages
            features['demand_7d_avg'] = self.get_avg_demand(
            location, timestamp, lookback_days=7
            )

            features['demand_7d_std'] = self.get_std_demand(
            location, timestamp, lookback_days=7
            )

            # Trend
            recent_demand = self.get_demand_series(location, timestamp, days=7)
            features['demand_trend'] = self.compute_trend(recent_demand)

            return features

            @staticmethod
    def haversine_distance(lat1, lon1, lat2, lon2):
        """Calculate distance between two points on Earth"""
        from math import radians, cos, sin, asin, sqrt

        # Convert to radians
        lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])

        # Haversine formula
        dlat = lat2 - lat1
        dlon = lon2 - lon1
        a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
        c = 2 * asin(sqrt(a))

        # Radius of Earth in kilometers
        r = 6371

        return c * r

Feature Engineering at Scale with Spark

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

class SparkFeatureEngine:
    """
    Scalable feature engineering with Apache Spark

    For datasets too large for pandas
    """

    def __init__(self):
        self.spark = SparkSession.builder \
        .appName("FeatureEngineering") \
        .getOrCreate()

    def aggregate_features(self, df, group_by_col, agg_col):
        """
        Compute aggregations at scale

        Args:
            df: Spark DataFrame
            group_by_col: Column to group by (e.g., 'user_id')
            agg_col: Column to aggregate (e.g., 'purchase_amount')

            Returns:
                DataFrame with aggregated features
                """
                agg_df = df.groupBy(group_by_col).agg(
                F.count(agg_col).alias(f'{agg_col}_count'),
                F.sum(agg_col).alias(f'{agg_col}_sum'),
                F.mean(agg_col).alias(f'{agg_col}_mean'),
                F.stddev(agg_col).alias(f'{agg_col}_std'),
                F.min(agg_col).alias(f'{agg_col}_min'),
                F.max(agg_col).alias(f'{agg_col}_max')
                )

                return agg_df

    def window_features(self, df, partition_col, order_col, value_col):
        """
        Compute window features (rolling aggregations)

        Example: 7-day rolling average
        """
        # Define window
        days_7 = 7 * 86400 # 7 days in seconds

        window_spec = Window \
        .partitionBy(partition_col) \
        .orderBy(F.col(order_col).cast('long')) \
        .rangeBetween(-days_7, 0)

        # Compute rolling features
        df_with_window = df.withColumn(
        f'{value_col}_7d_avg',
        F.avg(value_col).over(window_spec)
        ).withColumn(
        f'{value_col}_7d_sum',
        F.sum(value_col).over(window_spec)
        ).withColumn(
        f'{value_col}_7d_count',
        F.count(value_col).over(window_spec)
        )

        return df_with_window

    def lag_features(self, df, partition_col, order_col, value_col, lags=[1, 7, 30]):
        """
        Create lag features (previous values)
        """
        window_spec = Window \
        .partitionBy(partition_col) \
        .orderBy(order_col)

        for lag in lags:
            df = df.withColumn(
            f'{value_col}_lag_{lag}',
            F.lag(value_col, lag).over(window_spec)
            )

            return df

            # Usage example
            # spark_fe = SparkFeatureEngine()
            #
            # # Load large dataset
            # df = spark_fe.spark.read.parquet('s3://bucket/data/')
            #
            # # Compute features at scale
            # df_features = spark_fe.aggregate_features(df, 'user_id', 'purchase_amount')
            # df_features = spark_fe.window_features(df, 'user_id', 'timestamp', 'purchase_amount')

Cost Analysis & Optimization

Feature Computation Cost

class FeatureCostAnalyzer:
    """
    Analyze cost of feature computation

    Important for production systems
    """

    def __init__(self):
        self.feature_costs = {}

    def measure_cost(self, feature_name, compute_fn, data, iterations=100):
        """
        Measure computation cost

        Returns: (time_ms, memory_mb)
        """
        import time
        import tracemalloc

        # Measure time
        times = []
        for _ in range(iterations):
            start = time.perf_counter()
            compute_fn(data)
            end = time.perf_counter()
            times.append((end - start) * 1000) # ms

            avg_time = np.mean(times)

            # Measure memory
            tracemalloc.start()
            compute_fn(data)
            current, peak = tracemalloc.get_traced_memory()
            tracemalloc.stop()

            memory_mb = peak / 1024 / 1024

            self.feature_costs[feature_name] = {
            'time_ms': avg_time,
            'memory_mb': memory_mb,
            'cost_score': avg_time * memory_mb # Simple cost metric
            }

            return avg_time, memory_mb

    def recommend_features(self, feature_importance, cost_threshold=100):
        """
        Recommend features based on importance vs cost trade-off

        Args:
            feature_importance: Dict {feature_name: importance_score}
            cost_threshold: Maximum acceptable cost

            Returns:
                List of recommended features
                """
                recommendations = []

                for feature_name, importance in feature_importance.items():
                    if feature_name not in self.feature_costs:
                        continue

                        cost = self.feature_costs[feature_name]['cost_score']

                        # Value/cost ratio
                        value_ratio = importance / (cost + 1e-8)

                        if cost <= cost_threshold:
                            recommendations.append({
                            'feature': feature_name,
                            'importance': importance,
                            'cost': cost,
                            'value_ratio': value_ratio
                            })

                            # Sort by value ratio
                            recommendations.sort(key=lambda x: x['value_ratio'], reverse=True)

                            return recommendations

                            # Example usage
                            analyzer = FeatureCostAnalyzer()

                            # Measure costs
                            analyzer.measure_cost('simple_sum', lambda df: df['col1'] + df['col2'], data)
                            analyzer.measure_cost('complex_agg', lambda df: df.groupby('id').agg({'col': ['mean', 'std', 'max']}), data)

                            # Get recommendations
                            feature_importance = {'simple_sum': 0.8, 'complex_agg': 0.3}
                            recommended = analyzer.recommend_features(feature_importance)

Key Takeaways

Feature engineering is critical - Often more impactful than model choice ✅ Feature stores solve consistency - Same code for training and serving ✅ Domain knowledge matters - Best features come from understanding the problem ✅ Monitor features in production - Detect drift and data quality issues ✅ Version features - Track changes, enable rollback ✅ Document everything - Features are long-lived assets ✅ Like tree traversal - Hierarchical features need DFS/BFS logic

FAQ

Why is a feature store important for ML systems?

A feature store ensures the same feature computation logic is used in both training and serving, preventing prediction skew. It also enables feature reuse across teams, versioning, and point-in-time correct joins for training data.

What is cyclical encoding and when should you use it?

Cyclical encoding uses sine and cosine transformations to represent periodic features like hour of day or day of week. Without it, a model treats hour 0 and hour 23 as maximally distant when they are actually adjacent.

How do you handle high-cardinality categorical features?

Use target encoding (replace category with mean of target), frequency encoding, hashing trick for fixed dimensions, or learned embeddings via neural networks. Avoid one-hot encoding which creates millions of sparse columns.

What are the main feature selection techniques?

Filter methods like mutual information and ANOVA rank features statistically. Wrapper methods like forward selection evaluate subsets with cross-validation. Embedded methods like L1 regularization perform selection during training. Tree-based feature importance provides a practical shortcut.


Originally published at: arunbaby.com/ml-system-design/0007-feature-engineering

Want to work together?

I take on projects, advisory roles, and fractional CTO engagements in AI/ML. I also help businesses go AI-native with agentic workflows and agent orchestration.

Get in touch