Data Pipelines¶
Difficulty expert
Overview¶
Data pipelines ingest, process, store, and serve market data to trading systems. Reliable data infrastructure is the foundation of any trading operation.
Pipeline Architecture¶
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Data │────→│ Process │────→│ Store │────→│ Serve │
│ Source │ │ & │ │ │ │ │
│ │ │ Transform│ │ │ │ │
│ Exchanges│ │ Clean, │ │ Time- │ │ API, │
│ Vendors │ │ Normalize│ │ Series │ │ Stream │
│ APIs │ │ Enrich │ │ DB │ │ Feed │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
Data Sources¶
Real-Time Data¶
class MarketDataHandler:
"""Handle real-time market data feeds."""
def __init__(self, symbol, data_source):
self.symbol = symbol
self.source = data_source
self.subscribers = []
self.order_book = OrderBook(symbol)
self.last_trade = None
def on_message(self, message):
"""Process incoming market data message."""
if message.type == 'trade':
self.last_trade = message
self.order_book.update(message)
self.notify_subscribers(message)
elif message.type == 'quote':
self.order_book.update(message)
self.notify_subscribers(message)
elif message.type == 'order_book':
self.order_book.rebuild(message)
self.notify_subscribers(message)
def subscribe(self, callback):
"""Register subscriber for data updates."""
self.subscribers.append(callback)
def notify_subscribers(self, data):
"""Send data to all subscribers."""
for callback in self.subscribers:
callback(data)
Historical Data¶
class HistoricalDataLoader:
"""Load and validate historical market data."""
def __init__(self, storage):
self.storage = storage # Database or file system
def load_ohlcv(self, symbol, start_date, end_date, timeframe='1D'):
"""Load OHLCV data with validation."""
data = self.storage.query(symbol, start_date, end_date, timeframe)
# Validate
assert not data.isnull().any().any(), "Missing values detected"
assert (data['high'] >= data['low']).all(), "High < Low"
assert (data['volume'] >= 0).all(), "Negative volume"
return data
def load_tick_data(self, symbol, date):
"""Load tick-by-tick data."""
return self.storage.query_ticks(symbol, date)
Data Processing¶
Cleaning¶
def clean_market_data(data):
"""Clean and validate market data."""
# Remove duplicates
data = data.drop_duplicates()
# Handle missing values
data = data.fillna(method='ffill')
# Remove outliers
z_scores = np.abs((data['close'] - data['close'].mean()) / data['close'].std())
data = data[z_scores < 5]
# Ensure chronological order
data = data.sort_index()
return data
Normalization¶
def normalize_data(data):
"""Normalize data for cross-asset comparison."""
# Returns
data['returns'] = data['close'].pct_change()
# Z-score
data['returns_z'] = (data['returns'] - data['returns'].rolling(252).mean()) / data['returns'].rolling(252).std()
# Volatility-adjusted
data['vol_adjusted'] = data['returns'] / data['returns'].rolling(20).std()
return data
Feature Engineering¶
def engineer_features(data):
"""Create derived features from raw data."""
# Technical indicators
data['sma_20'] = data['close'].rolling(20).mean()
data['sma_50'] = data['close'].rolling(50).mean()
data['rsi'] = calculate_rsi(data['close'])
data['atr'] = calculate_atr(data['high'], data['low'], data['close'])
# Volume features
data['volume_ma'] = data['volume'].rolling(20).mean()
data['volume_ratio'] = data['volume'] / data['volume_ma']
return data
Data Storage¶
Time-Series Database¶
class TimeSeriesDB:
"""Store and query time-series data."""
def __init__(self, connection_string):
self.conn = connection_string
def store_tick(self, symbol, tick):
"""Store individual tick data."""
query = """
INSERT INTO ticks (symbol, timestamp, price, volume, side)
VALUES ($1, $2, $3, $4, $5)
"""
self.conn.execute(query, (symbol, tick.timestamp, tick.price, tick.volume, tick.side))
def query_ohlcv(self, symbol, start, end, timeframe):
"""Query aggregated OHLCV data."""
query = """
SELECT time_bucket($1, timestamp) AS period,
FIRST(price, timestamp) AS open,
MAX(price) AS high,
MIN(price) AS low,
LAST(price, timestamp) AS close,
SUM(volume) AS volume
FROM ticks
WHERE symbol = $2 AND timestamp BETWEEN $3 AND $4
GROUP BY period
ORDER BY period
"""
return self.conn.execute(query, (timeframe, symbol, start, end))
Data Quality¶
Monitoring¶
class DataQualityMonitor:
"""Monitor data quality in real-time."""
def __init__(self, thresholds):
self.thresholds = thresholds
self.alerts = []
def check_latency(self, timestamp):
"""Check data feed latency."""
latency = time.time() - timestamp
if latency > self.thresholds['max_latency']:
self.alerts.append(f"High latency: {latency:.3f}s")
def check_completeness(self, expected_count, actual_count):
"""Check for missing data."""
completeness = actual_count / expected_count
if completeness < self.thresholds['min_completeness']:
self.alerts.append(f"Low completeness: {completeness:.1%}")
def check_staleness(self, last_update):
"""Check if data is stale."""
age = time.time() - last_update
if age > self.thresholds['max_staleness']:
self.alerts.append(f"Stale data: {age:.1f}s old")
Practical Guidelines¶
- Validate Everything — Garbage in, garbage out
- Redundancy — Multiple data sources for critical feeds
- Monitoring — Alert on data quality issues
- Backup — Never lose market data
- Documentation — Document data sources and transformations
- Testing — Test pipeline changes before production
- Scalability — Design for growth in data volume
Next Steps¶
- System Design — Full system architecture
- Backtesting Engine — Using data for backtesting
- Technology Stack — Technology choices