System Design for Trading¶
Difficulty expert
Overview¶
A trading system encompasses everything from data ingestion to order execution. Institutional-grade systems require robust architecture, low latency, and fault tolerance.
System Architecture¶
┌─────────────────────────────────────────────────────────────┐
│ TRADING SYSTEM │
├──────────┬──────────┬───────────┬──────────┬───────────────┤
│ Data │ Research │ Risk │ Execution│ Monitoring │
│ Layer │ Engine │ Engine │ Engine │ & Alerts │
│ │ │ │ │ │
│ ┌──────┐ │ ┌──────┐ │ ┌──────┐ │ ┌──────┐ │ ┌───────────┐ │
│ │Market│ │ │Signal│ │ │Pos │ │ │Order │ │ │Dashboard │ │
│ │Data │ │ │Gen │ │ │Mgmt │ │ │Router │ │ │Metrics │ │
│ └──────┘ │ └──────┘ │ └──────┘ │ └──────┘ │ └───────────┘ │
│ ┌──────┐ │ ┌──────┐ │ ┌──────┐ │ ┌──────┐ │ ┌───────────┐ │
│ │Ref │ │ │Back │ │ │Risk │ │ │Smart │ │ │Alerting │ │
│ │Data │ │ │test │ │ │Limits│ │ │Routing│ │ │Logging │ │
│ └──────┘ │ └──────┘ │ └──────┘ │ └──────┘ │ └───────────┘ │
└──────────┴──────────┴───────────┴──────────┴───────────────┘
Component Design¶
1. Data Layer¶
class MarketDataFeed:
"""Real-time market data feed handler."""
def __init__(self, symbols, data_source):
self.symbols = symbols
self.source = data_source
self.callbacks = {}
self.order_books = {s: OrderBook() for s in symbols}
def on_tick(self, tick):
"""Process incoming tick data."""
self.order_books[tick.symbol].update(tick)
if tick.symbol in self.callbacks:
for callback in self.callbacks[tick.symbol]:
callback(tick)
def subscribe(self, symbol, callback):
self.callbacks.setdefault(symbol, []).append(callback)
2. Signal Generation Engine¶
class SignalEngine:
"""Generate trading signals from models."""
def __init__(self, models, risk_engine):
self.models = models
self.risk_engine = risk_engine
self.positions = {}
def process_data(self, market_data):
"""Process market data and generate signals."""
signals = {}
for name, model in self.models.items():
signal = model.predict(market_data)
signals[name] = signal
# Aggregate signals
combined = self.aggregate_signals(signals)
# Risk check
if self.risk_engine.check(combined, self.positions):
return combined
return None
def aggregate_signals(self, signals):
"""Weight and combine signals from multiple models."""
weights = {name: model.weight for name, model in self.models.items()}
total_weight = sum(weights.values())
combined = sum(s * weights[n] / total_weight for n, s in signals.items())
return combined
3. Risk Engine¶
class RiskEngine:
"""Pre-trade and post-trade risk checks."""
def __init__(self, limits):
self.limits = limits # Position limits, VaR limits, etc.
def check(self, signal, current_positions):
"""Pre-trade risk checks."""
# Position size limit
if abs(signal) > self.limits['max_position_size']:
return False
# Portfolio risk limit
projected_portfolio = self.project_portfolio(signal, current_positions)
if projected_portfolio.risk > self.limits['max_portfolio_risk']:
return False
# Concentration limit
if self.check_concentration(projected_portfolio):
return False
return True
4. Execution Engine¶
class ExecutionEngine:
"""Route and manage order execution."""
def __init__(self, broker, execution_algos):
self.broker = broker
self.algos = execution_algos
def execute(self, signal, market_data):
"""Execute trading signal."""
# Select execution algorithm
algo = self.select_algo(signal, market_data)
# Generate orders
orders = algo.generate_orders(signal, market_data)
# Send to broker
for order in orders:
self.broker.send_order(order)
return orders
def select_algo(self, signal, market_data):
"""Choose execution algorithm based on context."""
if signal.size > market_data.avg_daily_volume * 0.1:
return self.algos['vwap'] # Large order
elif market_data.volatility > threshold:
return self.algos['twap'] # Volatile market
else:
return self.algos['smart'] # Normal conditions
Fault Tolerance¶
Redundancy¶
Primary System ──→ Backup System (hot standby)
│ │
└──── Health Check ──┘
If primary fails → Automatic failover to backup
Circuit Breakers¶
class CircuitBreaker:
"""Stop trading if abnormal conditions detected."""
def __init__(self, max_loss_per_day, max_orders_per_minute):
self.max_loss = max_loss_per_day
self.max_orders = max_orders_per_minute
self.daily_loss = 0
self.order_count = 0
def check(self):
if self.daily_loss > self.max_loss:
return 'STOP_TRADING', 'Daily loss limit exceeded'
if self.order_count > self.max_orders:
return 'SLOW_DOWN', 'Order rate limit exceeded'
return 'OK', None
Data Storage¶
| Data Type | Storage | Latency |
|---|---|---|
| Tick data | Time-series DB (kdb+, InfluxDB) | Microseconds |
| OHLCV | PostgreSQL/TimescaleDB | Milliseconds |
| Signals | Redis | Microseconds |
| Orders/Trades | PostgreSQL | Milliseconds |
| Logs | Elasticsearch | Seconds |
Monitoring Stack¶
Metrics: Prometheus + Grafana
Logs: ELK Stack (Elasticsearch, Logstash, Kibana)
Alerting: PagerDuty, Slack
Tracing: Jaeger
Practical Guidelines¶
- Start Simple — Don't over-engineer initially
- Modular Design — Each component should be independent
- Test Everything — Unit tests, integration tests, load tests
- Logging — Log everything for debugging
- Latency Budget — Know where time is spent
- Disaster Recovery — Have a plan for system failure
- Compliance — Build in audit trails from day one
Next Steps¶
- Data Pipelines — Data ingestion and processing
- Backtesting Engine — Strategy testing infrastructure
- Technology Stack — Technology choices