Skip to content

Data Pipelines

Difficulty expert

Overview

Data pipelines ingest, process, store, and serve market data to trading systems. Reliable data infrastructure is the foundation of any trading operation.

Pipeline Architecture

┌──────────┐     ┌──────────┐     ┌──────────┐     ┌──────────┐
│  Data    │────→│ Process  │────→│  Store   │────→│  Serve   │
│  Source  │     │   &      │     │          │     │          │
│          │     │ Transform│     │          │     │          │
│ Exchanges│     │ Clean,   │     │ Time-    │     │ API,     │
│ Vendors  │     │ Normalize│     │ Series   │     │ Stream   │
│ APIs     │     │ Enrich   │     │ DB       │     │ Feed     │
└──────────┘     └──────────┘     └──────────┘     └──────────┘

Data Sources

Real-Time Data

class MarketDataHandler:
    """Handle real-time market data feeds."""

    def __init__(self, symbol, data_source):
        self.symbol = symbol
        self.source = data_source
        self.subscribers = []
        self.order_book = OrderBook(symbol)
        self.last_trade = None

    def on_message(self, message):
        """Process incoming market data message."""
        if message.type == 'trade':
            self.last_trade = message
            self.order_book.update(message)
            self.notify_subscribers(message)
        elif message.type == 'quote':
            self.order_book.update(message)
            self.notify_subscribers(message)
        elif message.type == 'order_book':
            self.order_book.rebuild(message)
            self.notify_subscribers(message)

    def subscribe(self, callback):
        """Register subscriber for data updates."""
        self.subscribers.append(callback)

    def notify_subscribers(self, data):
        """Send data to all subscribers."""
        for callback in self.subscribers:
            callback(data)

Historical Data

class HistoricalDataLoader:
    """Load and validate historical market data."""

    def __init__(self, storage):
        self.storage = storage  # Database or file system

    def load_ohlcv(self, symbol, start_date, end_date, timeframe='1D'):
        """Load OHLCV data with validation."""
        data = self.storage.query(symbol, start_date, end_date, timeframe)

        # Validate
        assert not data.isnull().any().any(), "Missing values detected"
        assert (data['high'] >= data['low']).all(), "High < Low"
        assert (data['volume'] >= 0).all(), "Negative volume"

        return data

    def load_tick_data(self, symbol, date):
        """Load tick-by-tick data."""
        return self.storage.query_ticks(symbol, date)

Data Processing

Cleaning

def clean_market_data(data):
    """Clean and validate market data."""
    # Remove duplicates
    data = data.drop_duplicates()

    # Handle missing values
    data = data.fillna(method='ffill')

    # Remove outliers
    z_scores = np.abs((data['close'] - data['close'].mean()) / data['close'].std())
    data = data[z_scores < 5]

    # Ensure chronological order
    data = data.sort_index()

    return data

Normalization

def normalize_data(data):
    """Normalize data for cross-asset comparison."""
    # Returns
    data['returns'] = data['close'].pct_change()

    # Z-score
    data['returns_z'] = (data['returns'] - data['returns'].rolling(252).mean()) / data['returns'].rolling(252).std()

    # Volatility-adjusted
    data['vol_adjusted'] = data['returns'] / data['returns'].rolling(20).std()

    return data

Feature Engineering

def engineer_features(data):
    """Create derived features from raw data."""
    # Technical indicators
    data['sma_20'] = data['close'].rolling(20).mean()
    data['sma_50'] = data['close'].rolling(50).mean()
    data['rsi'] = calculate_rsi(data['close'])
    data['atr'] = calculate_atr(data['high'], data['low'], data['close'])

    # Volume features
    data['volume_ma'] = data['volume'].rolling(20).mean()
    data['volume_ratio'] = data['volume'] / data['volume_ma']

    return data

Data Storage

Time-Series Database

class TimeSeriesDB:
    """Store and query time-series data."""

    def __init__(self, connection_string):
        self.conn = connection_string

    def store_tick(self, symbol, tick):
        """Store individual tick data."""
        query = """
        INSERT INTO ticks (symbol, timestamp, price, volume, side)
        VALUES ($1, $2, $3, $4, $5)
        """
        self.conn.execute(query, (symbol, tick.timestamp, tick.price, tick.volume, tick.side))

    def query_ohlcv(self, symbol, start, end, timeframe):
        """Query aggregated OHLCV data."""
        query = """
        SELECT time_bucket($1, timestamp) AS period,
               FIRST(price, timestamp) AS open,
               MAX(price) AS high,
               MIN(price) AS low,
               LAST(price, timestamp) AS close,
               SUM(volume) AS volume
        FROM ticks
        WHERE symbol = $2 AND timestamp BETWEEN $3 AND $4
        GROUP BY period
        ORDER BY period
        """
        return self.conn.execute(query, (timeframe, symbol, start, end))

Data Quality

Monitoring

class DataQualityMonitor:
    """Monitor data quality in real-time."""

    def __init__(self, thresholds):
        self.thresholds = thresholds
        self.alerts = []

    def check_latency(self, timestamp):
        """Check data feed latency."""
        latency = time.time() - timestamp
        if latency > self.thresholds['max_latency']:
            self.alerts.append(f"High latency: {latency:.3f}s")

    def check_completeness(self, expected_count, actual_count):
        """Check for missing data."""
        completeness = actual_count / expected_count
        if completeness < self.thresholds['min_completeness']:
            self.alerts.append(f"Low completeness: {completeness:.1%}")

    def check_staleness(self, last_update):
        """Check if data is stale."""
        age = time.time() - last_update
        if age > self.thresholds['max_staleness']:
            self.alerts.append(f"Stale data: {age:.1f}s old")

Practical Guidelines

  1. Validate Everything — Garbage in, garbage out
  2. Redundancy — Multiple data sources for critical feeds
  3. Monitoring — Alert on data quality issues
  4. Backup — Never lose market data
  5. Documentation — Document data sources and transformations
  6. Testing — Test pipeline changes before production
  7. Scalability — Design for growth in data volume

Next Steps