Feature Engineering at Scale
Feature engineering makes or breaks ML models, learn how to build scalable, production-ready feature pipelines that power real-world systems.
Introduction
Feature engineering is the process of transforming raw data into features that better represent the underlying problem to ML models.
Why it matters:
- Makes models better: Good features > complex models with bad features
- Domain knowledge encoding: Capture expert insights in features
- Data quality: Garbage in = garbage out
- Production complexity: 80% of ML engineering time is data/feature work
Stat: Andrew Ng says “Applied ML is basically feature engineering”
Feature Engineering Pipeline Architecture
High-Level Architecture
┌──────────────┐
│ Raw Data │ (Logs, DB, Streams)
└──────┬───────┘
│
▼
┌──────────────────────────────┐
│ Feature Engineering Layer │
│ ┌─────────┐ ┌─────────┐ │
│ │Transform│ │ Compute │ │
│ │ Logic │ │ Engines │ │
│ └─────────┘ └─────────┘ │
└──────────┬───────────────────┘
│
▼
┌──────────────────────────────┐
│ Feature Store │
│ ┌────────┐ ┌──────────┐ │
│ │ Online │ │ Offline │ │
│ │Features│ │ Features │ │
│ │(low ms)│ │ (batch) │ │
│ └────────┘ └──────────┘ │
└──────────┬───────────────────┘
│
▼
┌──────────────────────────────┐
│ ML Models │
│ ┌─────────┐ ┌──────────┐ │
│ │Training │ │ Serving │ │
│ └─────────┘ └──────────┘ │
└──────────────────────────────┘
Types of Features
1. Numerical Features
Raw numerical values
import pandas as pd
import numpy as np
# Example dataset
df = pd.DataFrame({
'age': [25, 30, 35, 40],
'income': [50000, 75000, 100000, 125000],
'num_purchases': [5, 12, 20, 15]
})
# Common transformations
df['age_squared'] = df['age'] ** 2
df['log_income'] = np.log(df['income'])
df['income_per_purchase'] = df['income'] / (df['num_purchases'] + 1) # +1 to avoid division by zero
2. Categorical Features
Discrete values that represent categories
One-Hot Encoding
# Simple one-hot encoding
df_categorical = pd.DataFrame({
'city': ['NYC', 'SF', 'LA', 'NYC', 'SF'],
'device': ['mobile', 'desktop', 'mobile', 'tablet', 'desktop']
})
# One-hot encode
df_encoded = pd.get_dummies(df_categorical, columns=['city', 'device'])
print(df_encoded)
# city_LA city_NYC city_SF device_desktop device_mobile device_tablet
# 0 0 1 0 0 1 0
# 1 0 0 1 1 0 0
# ...
Label Encoding (for ordinal features)
from sklearn.preprocessing import LabelEncoder
df = pd.DataFrame({
'size': ['small', 'medium', 'large', 'small', 'large']
})
le = LabelEncoder()
df['size_encoded'] = le.fit_transform(df['size'])
# small→0, medium→1, large→2
Target Encoding (Mean Encoding)
def target_encode(df, column, target):
"""
Replace category with mean of target variable
Good for high-cardinality categoricals
"""
means = df.groupby(column)[target].mean()
return df[column].map(means)
# Example
df = pd.DataFrame({
'city': ['NYC', 'SF', 'LA', 'NYC', 'SF', 'LA'],
'conversion': [1, 0, 1, 1, 0, 0]
})
df['city_encoded'] = target_encode(df, 'city', 'conversion')
# NYC → 1.0 (2/2), SF → 0.0 (0/2), LA → 0.5 (1/2)
3. Text Features
Transform text into numerical representations
TF-IDF
from sklearn.feature_extraction.text import TfidfVectorizer
documents = [
"machine learning is awesome",
"deep learning is a subset of machine learning",
"natural language processing is fun"
]
vectorizer = TfidfVectorizer(max_features=10)
tfidf_matrix = vectorizer.fit_transform(documents)
print(f"Shape: {tfidf_matrix.shape}")
print(f"Features: {vectorizer.get_feature_names_out()}")
Word Embeddings
# Using pre-trained embeddings (e.g., Word2Vec, GloVe)
import gensim.downloader as api
# Load pre-trained model
word_vectors = api.load("glove-wiki-gigaword-100")
def text_to_embedding(text, word_vectors):
"""
Average word vectors for text embedding
"""
words = text.lower().split()
vectors = [word_vectors[word] for word in words if word in word_vectors]
if not vectors:
return np.zeros(100)
return np.mean(vectors, axis=0)
# Example
text = "machine learning"
embedding = text_to_embedding(text, word_vectors)
print(f"Embedding shape: {embedding.shape}") # (100,)
4. Time-Based Features
Extract temporal patterns
import pandas as pd
df = pd.DataFrame({
'timestamp': pd.date_range('2024-01-01', periods=100, freq='H')
})
# Extract time features
df['hour'] = df['timestamp'].dt.hour
df['day_of_week'] = df['timestamp'].dt.dayofweek
df['day_of_month'] = df['timestamp'].dt.day
df['month'] = df['timestamp'].dt.month
df['is_weekend'] = df['day_of_week'].isin([5, 6]).astype(int)
df['is_holiday'] = df['timestamp'].isin(holiday_dates).astype(int)
# Cyclical encoding (hour wraps around)
df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)
df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)
Cyclical encoding visualization:
Hour encoding (linear):
0 ─ 6 ─ 12 ─ 18 ─ 24
│
└─> Problem: 0 and 24 are far apart numerically!
Hour encoding (cyclical):
0/24
│
21──┼──3
│ │ │
18 │ 6
│ │ │
15──┼──9
12
Using sin/cos captures cyclical nature:
hour_sin = sin(2π × hour / 24)
hour_cos = cos(2π × hour / 24)
5. Aggregation Features
Statistics over groups
# Example: user behavior features
user_sessions = pd.DataFrame({
'user_id': [1, 1, 1, 2, 2, 3],
'session_duration': [120, 300, 180, 450, 200, 350],
'pages_viewed': [5, 12, 8, 20, 10, 15],
'timestamp': pd.date_range('2024-01-01', periods=6, freq='D')
})
# Aggregate by user
user_features = user_sessions.groupby('user_id').agg({
'session_duration': ['mean', 'std', 'min', 'max', 'sum'],
'pages_viewed': ['mean', 'sum', 'count'],
'timestamp': ['min', 'max'] # First/last session
}).reset_index()
# Flatten column names
user_features.columns = ['_'.join(col).strip('_') for col in user_features.columns.values]
# Time-windowed aggregations
user_sessions['date'] = user_sessions['timestamp'].dt.date
# Last 7 days features
last_7_days = user_sessions[
user_sessions['timestamp'] >= (user_sessions['timestamp'].max() - pd.Timedelta(days=7))
]
user_features_7d = last_7_days.groupby('user_id').agg({
'session_duration': 'mean',
'pages_viewed': 'sum'
}).add_suffix('_7d')
Advanced Feature Engineering Techniques
1. Interaction Features
Capture relationships between features
from sklearn.preprocessing import PolynomialFeatures
# Simple example
df = pd.DataFrame({
'feature_a': [1, 2, 3],
'feature_b': [4, 5, 6]
})
# Polynomial features (includes interactions)
poly = PolynomialFeatures(degree=2, include_bias=False)
poly_features = poly.fit_transform(df[['feature_a', 'feature_b']])
# Creates: [a, b, a², ab, b²]
print(poly.get_feature_names_out())
# ['feature_a', 'feature_b', 'feature_a^2', 'feature_a feature_b', 'feature_b^2']
# Manual domain-specific interactions
df['price_per_sqft'] = df['price'] / df['sqft']
df['bedrooms_bathrooms_ratio'] = df['bedrooms'] / (df['bathrooms'] + 1)
2. Binning/Discretization
Convert continuous to categorical
# Equal-width binning
df['age_bin'] = pd.cut(df['age'], bins=[0, 18, 35, 50, 100],
labels=['child', 'young_adult', 'adult', 'senior'])
# Equal-frequency binning (quantiles)
df['income_quartile'] = pd.qcut(df['income'], q=4, labels=['Q1', 'Q2', 'Q3', 'Q4'])
# Custom bins based on domain knowledge
def categorize_temperature(temp):
if temp < 32:
return 'freezing'
elif temp < 60:
return 'cold'
elif temp < 80:
return 'mild'
else:
return 'hot'
df['temp_category'] = df['temperature'].apply(categorize_temperature)
3. Feature Crosses
Combine multiple categorical features
# Simple feature cross
df['city_device'] = df['city'] + '_' + df['device']
# Creates: 'NYC_mobile', 'SF_desktop', etc.
# Multiple feature crosses
df['city_device_hour'] = df['city'] + '_' + df['device'] + '_' + df['hour_bin']
# Then one-hot encode the crosses
df_crossed = pd.get_dummies(df['city_device'], prefix='city_device')
4. Embedding Features
Learn dense representations
import tensorflow as tf
def create_embedding_layer(vocab_size, embedding_dim):
"""
Create embedding layer for categorical feature
Useful for high-cardinality categoricals (e.g., user_id, item_id)
"""
return tf.keras.layers.Embedding(
input_dim=vocab_size,
output_dim=embedding_dim,
embeddings_regularizer=tf.keras.regularizers.l2(1e-6)
)
# Example: User embeddings
num_users = 10000
user_embedding_dim = 32
user_input = tf.keras.layers.Input(shape=(1,), name='user_id')
user_embedding = create_embedding_layer(num_users, user_embedding_dim)(user_input)
user_vec = tf.keras.layers.Flatten()(user_embedding)
Feature Store Architecture
Problem: Features computed differently in training vs serving → prediction skew
Solution: Centralized feature store with unified computation
Feature Store Components
from dataclasses import dataclass
from typing import Callable, List
import numpy as np
import pandas as pd
@dataclass
class Feature:
"""Feature definition"""
name: str
transform_fn: Callable
dependencies: List[str]
batch_source: str # Where to get data for batch computation
stream_source: str # Where to get data for real-time
class FeatureStore:
"""
Simplified feature store
Real systems: Feast, Tecton, AWS SageMaker Feature Store
"""
def __init__(self):
self.features = {}
self.offline_store = {} # Batch features (historical)
self.online_store = {} # Real-time features (low latency)
def register_feature(self, feature: Feature):
"""Register feature definition"""
self.features[feature.name] = feature
def compute_batch_features(self, entity_ids: List[str], features: List[str]):
"""
Compute features for training (batch)
Returns: DataFrame with features
"""
result = pd.DataFrame({'entity_id': entity_ids})
for feature_name in features:
feature = self.features[feature_name]
# Load batch data
data = self._load_batch_data(feature.batch_source, entity_id=None)
# Compute feature
result[feature_name] = feature.transform_fn(data)
return result
def get_online_features(self, entity_id: str, features: List[str]):
"""
Get features for serving (real-time)
Returns: Dict of feature values
"""
result = {}
for feature_name in features:
# Check online store
key = f"{entity_id}:{feature_name}"
if key in self.online_store:
result[feature_name] = self.online_store[key]
else:
# Compute on-the-fly (fallback)
feature = self.features[feature_name]
data = self._load_stream_data(feature.stream_source, entity_id)
result[feature_name] = feature.transform_fn(data)
return result
def materialize_features(self, features: List[str]):
"""
Pre-compute features and store in online store
Batch job that runs periodically
"""
for feature_name in features:
feature = self.features[feature_name]
# Compute for all entities
all_entities = self._get_all_entities()
for entity_id in all_entities:
data = self._load_batch_data(feature.batch_source, entity_id)
value = feature.transform_fn(data)
# Store in online store
key = f"{entity_id}:{feature_name}"
self.online_store[key] = value
def _load_batch_data(self, source, entity_id=None):
# Load from data warehouse (e.g., BigQuery, Snowflake)
pass
def _load_stream_data(self, source, entity_id):
# Load from stream (e.g., Kafka, Kinesis)
pass
def _get_all_entities(self):
# Get all entity IDs
pass
# Example usage
feature_store = FeatureStore()
# Register features
feature_store.register_feature(Feature(
name='user_avg_purchase_amount_30d',
transform_fn=lambda data: data['purchase_amount'].mean(),
dependencies=['purchase_amount'],
batch_source='dwh.purchases',
stream_source='kafka.purchases'
))
# Training: Get batch features
training_features = feature_store.compute_batch_features(
entity_ids=['user_1', 'user_2'],
features=['user_avg_purchase_amount_30d']
)
# Serving: Get online features (< 10ms)
serving_features = feature_store.get_online_features(
entity_id='user_1',
features=['user_avg_purchase_amount_30d']
)
Feature Store Benefits
Training-Serving Consistency:
Without Feature Store:
Training: Compute features in Python/Spark
Serving: Reimplement in Java/Go
Result: Different implementations → prediction skew!
With Feature Store:
Training: feature_store.get_offline_features()
Serving: feature_store.get_online_features()
Result: Same computation logic → consistent!
Feature Engineering for Tree Traversal
Connecting to DSA Day 7 (tree traversal):
Hierarchical Features
class CategoryTree:
"""
Category hierarchy (like tree traversal)
Example:
Electronics
/ \
Computers Phones
/ \ |
Laptops Desktops Smartphones
"""
def __init__(self):
self.tree = {
'Electronics': {
'Computers': {
'Laptops': {},
'Desktops': {}
},
'Phones': {
'Smartphones': {}
}
}
}
def get_category_path(self, category: str) -> List[str]:
"""
Get path from root to category
Uses DFS (similar to tree traversal)
"""
def dfs(node, target, path):
if node == target:
return path + [node]
if isinstance(node, dict):
for child, subtree in node.items():
result = dfs(subtree, target, path + [child])
if result:
return result
return None
for root, subtree in self.tree.items():
path = dfs(subtree, category, [root])
if path:
return path
return []
def category_level_features(self, category: str):
"""
Create features from category hierarchy
level_1: Electronics
level_2: Computers
level_3: Laptops
"""
path = self.get_category_path(category)
features = {}
for i, cat in enumerate(path):
features[f'category_level_{i+1}'] = cat
return features
# Example
cat_tree = CategoryTree()
features = cat_tree.category_level_features('Laptops')
print(features)
# {'category_level_1': 'Electronics',
# 'category_level_2': 'Computers',
# 'category_level_3': 'Laptops'}
Connection to Speech Processing (Day 7)
Feature engineering is critical in speech ML:
Audio Feature Extraction Pipeline
class AudioFeatureExtractor:
"""
Extract features from audio (similar to general feature engineering)
"""
def extract_spectral_features(self, audio):
"""
Extract spectral features
Similar to numerical feature engineering
"""
import librosa
# Mel-frequency cepstral coefficients
mfccs = librosa.feature.mfcc(y=audio, sr=22050, n_mfcc=13)
# Spectral features
spectral_centroid = librosa.feature.spectral_centroid(y=audio, sr=22050)
spectral_rolloff = librosa.feature.spectral_rolloff(y=audio, sr=22050)
# Aggregate over time (similar to aggregation features)
features = {
'mfcc_mean': np.mean(mfccs, axis=1),
'mfcc_std': np.std(mfccs, axis=1),
'spectral_centroid_mean': np.mean(spectral_centroid),
'spectral_rolloff_mean': np.mean(spectral_rolloff)
}
return features
def extract_prosodic_features(self, audio):
"""
Extract prosody features (pitch, energy, duration)
Domain-specific feature engineering
"""
import librosa
# Pitch (F0)
f0, voiced_flag, voiced_probs = librosa.pyin(
audio,
fmin=librosa.note_to_hz('C2'),
fmax=librosa.note_to_hz('C7')
)
# Energy
energy = librosa.feature.rms(y=audio)
# Duration features
zero_crossings = librosa.feature.zero_crossing_rate(audio)
features = {
'pitch_mean': np.nanmean(f0),
'pitch_std': np.nanstd(f0),
'pitch_range': np.nanmax(f0) - np.nanmin(f0),
'energy_mean': np.mean(energy),
'energy_std': np.std(energy),
'zcr_mean': np.mean(zero_crossings)
}
return features
Production Best Practices
1. Feature Versioning
class VersionedFeature:
"""Track feature versions"""
def __init__(self, name, version, transform_fn):
self.name = name
self.version = version
self.transform_fn = transform_fn
self.created_at = datetime.now()
def get_full_name(self):
return f"{self.name}_v{self.version}"
# Example
user_age_v1 = VersionedFeature(
name='user_age',
version=1,
transform_fn=lambda df: df['birth_year'].apply(lambda x: 2024 - x)
)
user_age_v2 = VersionedFeature(
name='user_age',
version=2,
transform_fn=lambda df: (datetime.now().year - df['birth_year']).clip(0, 120)
)
# Models can specify feature version
model_features = {
'user_age_v2', # Use version 2
'income_v1'
}
2. Feature Monitoring
class FeatureMonitor:
"""Monitor feature distributions"""
def __init__(self):
self.baseline_stats = {}
def compute_stats(self, feature_name, values):
"""Compute feature statistics"""
return {
'mean': np.mean(values),
'std': np.std(values),
'min': np.min(values),
'max': np.max(values),
'nulls': np.isnan(values).sum(),
'unique_count': len(np.unique(values))
}
def set_baseline(self, feature_name, values):
"""Set baseline statistics"""
self.baseline_stats[feature_name] = self.compute_stats(feature_name, values)
def check_drift(self, feature_name, values, threshold=0.1):
"""
Check if feature distribution has drifted
Returns: (has_drifted, drift_metrics)
"""
if feature_name not in self.baseline_stats:
return False, {}
current_stats = self.compute_stats(feature_name, values)
baseline_stats = self.baseline_stats[feature_name]
# Check mean drift
mean_drift = abs(current_stats['mean'] - baseline_stats['mean']) / (baseline_stats['std'] + 1e-8)
# Check std drift
std_ratio = current_stats['std'] / (baseline_stats['std'] + 1e-8)
drift_metrics = {
'mean_drift': mean_drift,
'std_ratio': std_ratio,
'null_rate_change': current_stats['nulls'] / len(values) - baseline_stats['nulls'] / len(values)
}
has_drifted = mean_drift > threshold or std_ratio < 0.5 or std_ratio > 2.0
return has_drifted, drift_metrics
# Usage
monitor = FeatureMonitor()
# Set baseline during training
monitor.set_baseline('user_age', training_df['user_age'].values)
# Check for drift in production
has_drifted, metrics = monitor.check_drift('user_age', production_df['user_age'].values)
if has_drifted:
print(f"⚠️ Feature drift detected: {metrics}")
3. Feature Documentation
@dataclass
class FeatureDocumentation:
"""Document features for team collaboration"""
name: str
description: str
owner: str
creation_date: str
dependencies: List[str]
update_frequency: str # 'realtime', 'hourly', 'daily'
sla_ms: int # SLA for feature computation
example_values: List
def to_markdown(self):
"""Generate markdown documentation"""
return f"""
# Feature: {self.name}
**Description:** {self.description}
**Owner:** {self.owner}
**Created:** {self.creation_date}
**Update Frequency:** {self.update_frequency}
**SLA:** {self.sla_ms}ms
**Dependencies:** {', '.join(self.dependencies)}
**Example Values:** {self.example_values[:5]}
"""
# Example
feature_doc = FeatureDocumentation(
name='user_purchase_frequency_30d',
description='Number of purchases by user in last 30 days',
owner='ml-team@company.com',
creation_date='2024-01-15',
dependencies=['purchase_events'],
update_frequency='hourly',
sla_ms=100,
example_values=[0, 2, 5, 1, 3, 0, 7]
)
print(feature_doc.to_markdown())
Feature Selection Techniques
Problem: Too many features can lead to:
- Overfitting
- Increased computation
- Reduced interpretability
1. Filter Methods
from sklearn.feature_selection import SelectKBest, f_classif, mutual_info_classif
from sklearn.datasets import make_classification
import pandas as pd
# Generate sample data
X, y = make_classification(n_samples=1000, n_features=20, n_informative=10, random_state=42)
# ANOVA F-test
selector_f = SelectKBest(f_classif, k=10)
X_selected_f = selector_f.fit_transform(X, y)
# Get selected feature indices
selected_features_f = selector_f.get_support(indices=True)
print(f"Selected features (F-test): {selected_features_f}")
# Mutual Information
selector_mi = SelectKBest(mutual_info_classif, k=10)
X_selected_mi = selector_mi.fit_transform(X, y)
print(f"Original features: {X.shape[1]}")
print(f"Selected features: {X_selected_f.shape[1]}")
2. Wrapper Methods (Forward/Backward Selection)
from sklearn.feature_selection import SequentialFeatureSelector
from sklearn.ensemble import RandomForestClassifier
# Forward selection
sfs = SequentialFeatureSelector(
RandomForestClassifier(n_estimators=100),
n_features_to_select=10,
direction='forward',
cv=5
)
sfs.fit(X, y)
selected_features = sfs.get_support(indices=True)
print(f"Forward selection features: {selected_features}")
3. Embedded Methods (L1 Regularization)
from sklearn.linear_model import LassoCV
import numpy as np
# Lasso for feature selection
lasso = LassoCV(cv=5, random_state=42)
lasso.fit(X, y)
# Features with non-zero coefficients
importance = np.abs(lasso.coef_)
selected_features = np.where(importance > 0.01)[0]
print(f"Lasso selected {len(selected_features)} features")
print(f"Feature importance: {importance}")
4. Feature Importance from Models
from sklearn.ensemble import RandomForestClassifier
import matplotlib.pyplot as plt
# Train model
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X, y)
# Get feature importance
importance = rf.feature_importances_
indices = np.argsort(importance)[::-1]
# Plot
plt.figure(figsize=(10, 6))
plt.title('Feature Importance')
plt.bar(range(X.shape[1]), importance[indices])
plt.xlabel('Feature Index')
plt.ylabel('Importance')
plt.show()
# Select top k features
k = 10
top_features = indices[:k]
print(f"Top {k} features: {top_features}")
Automated Feature Engineering
AutoFeat with Featuretools
# Featuretools for automated feature engineering
import featuretools as ft
import pandas as pd
# Example: E-commerce transactions
customers = pd.DataFrame({
'customer_id': [1, 2, 3],
'age': [25, 35, 45],
'city': ['NYC', 'SF', 'LA']
})
transactions = pd.DataFrame({
'transaction_id': [1, 2, 3, 4, 5],
'customer_id': [1, 1, 2, 2, 3],
'amount': [100, 150, 200, 50, 300],
'timestamp': pd.date_range('2024-01-01', periods=5, freq='D')
})
# Create entity set
es = ft.EntitySet(id='customer_transactions')
# Add entities
es = es.add_dataframe(
dataframe_name='customers',
dataframe=customers,
index='customer_id'
)
es = es.add_dataframe(
dataframe_name='transactions',
dataframe=transactions,
index='transaction_id',
time_index='timestamp'
)
# Add relationship
es = es.add_relationship('customers', 'customer_id', 'transactions', 'customer_id')
# Generate features automatically
feature_matrix, feature_defs = ft.dfs(
entityset=es,
target_dataframe_name='customers',
max_depth=2,
verbose=True
)
print(f"Generated {len(feature_defs)} features automatically")
print(feature_matrix.head())
# Features like:
# - SUM(transactions.amount)
# - MEAN(transactions.amount)
# - COUNT(transactions)
# - MAX(transactions.timestamp)
Custom Feature Generation
class AutoFeatureGenerator:
"""
Automatically generate mathematical transformations
"""
def __init__(self, operations=['square', 'sqrt', 'log', 'reciprocal']):
self.operations = operations
def generate(self, df, numerical_columns):
"""
Generate features by applying operations
Args:
df: DataFrame
numerical_columns: Columns to transform
Returns:
DataFrame with original + generated features
"""
result = df.copy()
for col in numerical_columns:
if 'square' in self.operations:
result[f'{col}_squared'] = df[col] ** 2
if 'sqrt' in self.operations:
# Only for non-negative
if (df[col] >= 0).all():
result[f'{col}_sqrt'] = np.sqrt(df[col])
if 'log' in self.operations:
# Only for positive
if (df[col] > 0).all():
result[f'{col}_log'] = np.log(df[col])
if 'reciprocal' in self.operations:
# Avoid division by zero
result[f'{col}_reciprocal'] = 1 / (df[col] + 1e-8)
# Generate interactions
for i, col1 in enumerate(numerical_columns):
for col2 in numerical_columns[i+1:]:
result[f'{col1}_times_{col2}'] = df[col1] * df[col2]
result[f'{col1}_div_{col2}'] = df[col1] / (df[col2] + 1e-8)
return result
# Usage
df = pd.DataFrame({
'feature_a': [1, 2, 3, 4, 5],
'feature_b': [10, 20, 30, 40, 50]
})
generator = AutoFeatureGenerator()
df_with_features = generator.generate(df, ['feature_a', 'feature_b'])
print(f"Original features: {df.shape[1]}")
print(f"After generation: {df_with_features.shape[1]}")
print(df_with_features.columns.tolist())
Real-World Case Studies
Case Study 1: Netflix Recommendation Features
class NetflixFeatureEngine:
"""
Feature engineering for content recommendation
Based on public Netflix research papers
"""
def engineer_user_features(self, user_history):
"""
User behavioral features
Args:
user_history: DataFrame with user viewing history
Returns:
User features
"""
features = {}
# Viewing patterns
features['total_watch_time'] = user_history['watch_duration'].sum()
features['avg_watch_time'] = user_history['watch_duration'].mean()
features['num_titles_watched'] = user_history['title_id'].nunique()
# Time-based patterns
user_history['hour'] = pd.to_datetime(user_history['timestamp']).dt.hour
features['favorite_hour'] = user_history.groupby('hour').size().idxmax()
features['weekend_ratio'] = (user_history['is_weekend'].sum() / len(user_history))
# Genre preferences
genre_counts = user_history['genre'].value_counts()
features['favorite_genre'] = genre_counts.index[0] if len(genre_counts) > 0 else 'unknown'
features['genre_diversity'] = user_history['genre'].nunique()
# Completion rate
features['completion_rate'] = (
user_history['watch_duration'] / user_history['total_duration']
).mean()
# Binge-watching behavior
features['avg_sessions_per_day'] = user_history.groupby(
user_history['timestamp'].dt.date
).size().mean()
# Recency features
last_watch = user_history['timestamp'].max()
features['days_since_last_watch'] = (pd.Timestamp.now() - last_watch).days
return features
def engineer_content_features(self, content_metadata, user_interactions):
"""
Content-based features
Combine metadata + user engagement
"""
features = {}
# Popularity features
features['view_count'] = len(user_interactions)
features['unique_viewers'] = user_interactions['user_id'].nunique()
features['avg_rating'] = user_interactions['rating'].mean()
# Engagement features
features['avg_completion_rate'] = (
user_interactions['watch_duration'] / content_metadata['duration']
).mean()
# Temporal features
features['days_since_release'] = (
pd.Timestamp.now() - pd.to_datetime(content_metadata['release_date'])
).days
# Freshness score (decaying popularity)
features['freshness_score'] = (
features['view_count'] / (1 + np.log(1 + features['days_since_release']))
)
return features
Case Study 2: Uber Demand Prediction Features
class UberDemandFeatures:
"""
Feature engineering for ride demand prediction
Inspired by Uber's blog posts on ML
"""
def engineer_spatial_features(self, location_data):
"""
Spatial features for demand prediction
"""
features = {}
# Grid-based features
features['grid_id'] = self.lat_lon_to_grid(
location_data['lat'],
location_data['lon']
)
# Distance to key locations
features['dist_to_airport'] = self.haversine_distance(
location_data['lat'], location_data['lon'],
airport_lat, airport_lon
)
features['dist_to_downtown'] = self.haversine_distance(
location_data['lat'], location_data['lon'],
downtown_lat, downtown_lon
)
# Neighborhood features
features['is_business_district'] = self.check_business_district(
location_data['lat'], location_data['lon']
)
return features
def engineer_temporal_features(self, timestamp):
"""
Time-based features for demand
"""
ts = pd.to_datetime(timestamp)
features = {}
# Basic time features
features['hour'] = ts.hour
features['day_of_week'] = ts.dayofweek
features['is_weekend'] = ts.dayofweek in [5, 6]
# Peak hours
features['is_morning_rush'] = (7 <= ts.hour <= 9)
features['is_evening_rush'] = (17 <= ts.hour <= 19)
features['is_late_night'] = (23 <= ts.hour or ts.hour <= 5)
# Special events
features['is_holiday'] = self.check_holiday(ts)
features['is_major_event_day'] = self.check_events(ts, location)
# Weather features (if available)
features['is_raining'] = self.get_weather(ts, 'rain')
features['temperature'] = self.get_weather(ts, 'temp')
return features
def engineer_historical_features(self, location, timestamp, lookback_days=7):
"""
Historical demand features
"""
features = {}
# Same hour, previous days
for days_ago in [1, 7, 14]:
past_timestamp = timestamp - pd.Timedelta(days=days_ago)
features[f'demand_{days_ago}d_ago'] = self.get_historical_demand(
location, past_timestamp
)
# Moving averages
features['demand_7d_avg'] = self.get_avg_demand(
location, timestamp, lookback_days=7
)
features['demand_7d_std'] = self.get_std_demand(
location, timestamp, lookback_days=7
)
# Trend
recent_demand = self.get_demand_series(location, timestamp, days=7)
features['demand_trend'] = self.compute_trend(recent_demand)
return features
@staticmethod
def haversine_distance(lat1, lon1, lat2, lon2):
"""Calculate distance between two points on Earth"""
from math import radians, cos, sin, asin, sqrt
# Convert to radians
lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
# Haversine formula
dlat = lat2 - lat1
dlon = lon2 - lon1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
# Radius of Earth in kilometers
r = 6371
return c * r
Feature Engineering at Scale with Spark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
class SparkFeatureEngine:
"""
Scalable feature engineering with Apache Spark
For datasets too large for pandas
"""
def __init__(self):
self.spark = SparkSession.builder \
.appName("FeatureEngineering") \
.getOrCreate()
def aggregate_features(self, df, group_by_col, agg_col):
"""
Compute aggregations at scale
Args:
df: Spark DataFrame
group_by_col: Column to group by (e.g., 'user_id')
agg_col: Column to aggregate (e.g., 'purchase_amount')
Returns:
DataFrame with aggregated features
"""
agg_df = df.groupBy(group_by_col).agg(
F.count(agg_col).alias(f'{agg_col}_count'),
F.sum(agg_col).alias(f'{agg_col}_sum'),
F.mean(agg_col).alias(f'{agg_col}_mean'),
F.stddev(agg_col).alias(f'{agg_col}_std'),
F.min(agg_col).alias(f'{agg_col}_min'),
F.max(agg_col).alias(f'{agg_col}_max')
)
return agg_df
def window_features(self, df, partition_col, order_col, value_col):
"""
Compute window features (rolling aggregations)
Example: 7-day rolling average
"""
# Define window
days_7 = 7 * 86400 # 7 days in seconds
window_spec = Window \
.partitionBy(partition_col) \
.orderBy(F.col(order_col).cast('long')) \
.rangeBetween(-days_7, 0)
# Compute rolling features
df_with_window = df.withColumn(
f'{value_col}_7d_avg',
F.avg(value_col).over(window_spec)
).withColumn(
f'{value_col}_7d_sum',
F.sum(value_col).over(window_spec)
).withColumn(
f'{value_col}_7d_count',
F.count(value_col).over(window_spec)
)
return df_with_window
def lag_features(self, df, partition_col, order_col, value_col, lags=[1, 7, 30]):
"""
Create lag features (previous values)
"""
window_spec = Window \
.partitionBy(partition_col) \
.orderBy(order_col)
for lag in lags:
df = df.withColumn(
f'{value_col}_lag_{lag}',
F.lag(value_col, lag).over(window_spec)
)
return df
# Usage example
# spark_fe = SparkFeatureEngine()
#
# # Load large dataset
# df = spark_fe.spark.read.parquet('s3://bucket/data/')
#
# # Compute features at scale
# df_features = spark_fe.aggregate_features(df, 'user_id', 'purchase_amount')
# df_features = spark_fe.window_features(df, 'user_id', 'timestamp', 'purchase_amount')
Cost Analysis & Optimization
Feature Computation Cost
class FeatureCostAnalyzer:
"""
Analyze cost of feature computation
Important for production systems
"""
def __init__(self):
self.feature_costs = {}
def measure_cost(self, feature_name, compute_fn, data, iterations=100):
"""
Measure computation cost
Returns: (time_ms, memory_mb)
"""
import time
import tracemalloc
# Measure time
times = []
for _ in range(iterations):
start = time.perf_counter()
compute_fn(data)
end = time.perf_counter()
times.append((end - start) * 1000) # ms
avg_time = np.mean(times)
# Measure memory
tracemalloc.start()
compute_fn(data)
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
memory_mb = peak / 1024 / 1024
self.feature_costs[feature_name] = {
'time_ms': avg_time,
'memory_mb': memory_mb,
'cost_score': avg_time * memory_mb # Simple cost metric
}
return avg_time, memory_mb
def recommend_features(self, feature_importance, cost_threshold=100):
"""
Recommend features based on importance vs cost trade-off
Args:
feature_importance: Dict {feature_name: importance_score}
cost_threshold: Maximum acceptable cost
Returns:
List of recommended features
"""
recommendations = []
for feature_name, importance in feature_importance.items():
if feature_name not in self.feature_costs:
continue
cost = self.feature_costs[feature_name]['cost_score']
# Value/cost ratio
value_ratio = importance / (cost + 1e-8)
if cost <= cost_threshold:
recommendations.append({
'feature': feature_name,
'importance': importance,
'cost': cost,
'value_ratio': value_ratio
})
# Sort by value ratio
recommendations.sort(key=lambda x: x['value_ratio'], reverse=True)
return recommendations
# Example usage
analyzer = FeatureCostAnalyzer()
# Measure costs
analyzer.measure_cost('simple_sum', lambda df: df['col1'] + df['col2'], data)
analyzer.measure_cost('complex_agg', lambda df: df.groupby('id').agg({'col': ['mean', 'std', 'max']}), data)
# Get recommendations
feature_importance = {'simple_sum': 0.8, 'complex_agg': 0.3}
recommended = analyzer.recommend_features(feature_importance)
Key Takeaways
✅ Feature engineering is critical - Often more impactful than model choice
✅ Feature stores solve consistency - Same code for training and serving
✅ Domain knowledge matters - Best features come from understanding the problem
✅ Monitor features in production - Detect drift and data quality issues
✅ Version features - Track changes, enable rollback
✅ Document everything - Features are long-lived assets
✅ Like tree traversal - Hierarchical features need DFS/BFS logic
Originally published at: arunbaby.com/ml-system-design/0007-feature-engineering
If you found this helpful, consider sharing it with others who might benefit.