Skip to content

TWAP Execution Algorithm

Overview

TWAP (Time-Weighted Average Price) execution distributes orders evenly over a specified time period. It is the simplest execution algorithm and works well for liquid securities with stable volume profiles.

Difficulty intermediate

twap calculation

TWAP = (P1 + P2 + ... + Pn) / n

Where:
Pi = Price at time interval i
n = Number of time intervals

For execution:
Target participation per interval = Total Order Size / Number of Intervals

where: Pi price at interval i · n number of equal-length intervals over the execution horizon. does: TWAP is both the benchmark (the average price over the horizon) and the algo (split the order into n equal slices and trade one per interval). Pick TWAP when the order is small relative to ADV, urgency is low, and the volume profile is roughly flat — when in doubt and you want a simple, defensible schedule that minimizes signaling.

Algorithm Design

Basic TWAP

import numpy as np
import pandas as pd
from typing import Dict, List, Optional

class TWAPExecution:
    """Time-Weighted Average Price execution algorithm."""

    def __init__(self, target_shares: int, duration_minutes: int,
                 start_time: str = "09:30", end_time: str = None,
                 max_participation: float = 0.10):
        self.target = target_shares
        self.remaining = target_shares
        self.duration = duration_minutes
        self.max_participation = max_participation

        if end_time:
            self.duration = self._time_diff(start_time, end_time)

        self.interval_seconds = 60  # 1-minute intervals
        self.n_intervals = self.duration
        self.interval_size = target_shares / self.n_intervals

        self.filled = 0
        self.total_cost = 0.0
        self.trades = []
        self.current_interval = 0

    def _time_diff(self, start: str, end: str) -> int:
        """Calculate minutes between two times."""
        h1, m1 = map(int, start.split(':'))
        h2, m2 = map(int, end.split(':'))
        return (h2 * 60 + m2) - (h1 * 60 + m1)

    def get_target_for_interval(self) -> int:
        """Calculate target shares for current interval."""
        if self.remaining <= 0:
            return 0

        intervals_remaining = self.n_intervals - self.current_interval
        if intervals_remaining <= 0:
            return self.remaining

        # Base target
        target = self.remaining / intervals_remaining

        # Ensure we complete on time
        if self.remaining > target * intervals_remaining:
            target = self.remaining / intervals_remaining

        return max(1, int(np.ceil(target)))

    def execute_interval(self, market_volume: int, current_price: float,
                         participation_limit: float = None) -> Dict:
        """Execute for current time interval."""
        target = self.get_target_for_interval()

        # Apply participation limit
        if participation_limit:
            max_shares = int(market_volume * participation_limit)
            target = min(target, max_shares)
        else:
            max_shares = int(market_volume * self.max_participation)
            target = min(target, max_shares)

        # Execute
        if target > 0 and self.remaining > 0:
            executed = min(target, self.remaining)
            self.filled += executed
            self.remaining -= executed
            self.total_cost += executed * current_price

            self.trades.append({
                'interval': self.current_interval,
                'price': current_price,
                'shares': executed,
                'cumulative': self.filled,
                'remaining': self.remaining
            })

        self.current_interval += 1

        return {
            'executed': executed if target > 0 else 0,
            'filled': self.filled,
            'remaining': self.remaining,
            'avg_price': self.total_cost / self.filled if self.filled > 0 else 0,
            'done': self.remaining <= 0
        }

    def get_results(self) -> Dict:
        """Get execution results."""
        avg_price = self.total_cost / self.filled if self.filled > 0 else 0

        return {
            'target_shares': self.target,
            'filled_shares': self.filled,
            'remaining_shares': self.remaining,
            'fill_rate': self.filled / self.target if self.target > 0 else 0,
            'avg_execution_price': round(avg_price, 4),
            'total_cost': round(self.total_cost, 2),
            'n_trades': len(self.trades),
            'trade_details': self.trades
        }

    def compare_to_vwap(self, market_vwap: float) -> Dict:
        """Compare execution quality to VWAP benchmark."""
        avg_price = self.total_cost / self.filled if self.filled > 0 else 0
        slippage = avg_price - market_vwap
        slippage_bps = (slippage / market_vwap) * 10000

        return {
            'execution_price': round(avg_price, 4),
            'vwap': round(market_vwap, 4),
            'slippage': round(slippage, 4),
            'slippage_bps': round(slippage_bps, 2),
            'better_than_vwap': slippage < 0
        }

    def compare_to_twap(self, market_twap: float) -> Dict:
        """Compare to TWAP benchmark."""
        avg_price = self.total_cost / self.filled if self.filled > 0 else 0
        slippage = avg_price - market_twap

        return {
            'execution_price': round(avg_price, 4),
            'twap': round(market_twap, 4),
            'slippage': round(slippage, 4),
            'slippage_bps': round((slippage / market_twap) * 10000, 2)
        }


def randomized_twap(target_shares: int, duration: int,
                     jitter_pct: float = 0.3) -> List[int]:
    """TWAP with randomization to avoid detection."""
    base = target_shares / duration
    sizes = []

    for _ in range(duration):
        jitter = np.random.uniform(-jitter_pct, jitter_pct) * base
        size = max(0, int(base + jitter))
        sizes.append(size)

    # Adjust to hit exact target
    diff = target_shares - sum(sizes)
    if diff != 0:
        # Distribute difference randomly
        indices = np.random.choice(duration, abs(diff), replace=True)
        for idx in indices:
            sizes[idx] += 1 if diff > 0 else -1

    return sizes

twap vs. volume profile

When TWAP Works Best

1. Flat volume profile throughout the day
2. Highly liquid securities
3. No urgency in execution
4. Small order relative to ADV (< 5%)
5. Market impact is primary concern

When TWAP is Suboptimal

1. U-shaped volume profile (high open/close, low midday)
   → VWAP is better
2. Urgent execution needed
   → Implementation Shortfall is better
3. Illiquid security
   → POV (Participation of Volume) is better
4. Large order (> 10% ADV)
   → Complex adaptive algo needed

adaptive twap

class AdaptiveTWAP(TWAPExecution):
    """TWAP that adapts to market conditions."""

    def __init__(self, *args, urgency: float = 0.5, **kwargs):
        super().__init__(*args, **kwargs)
        self.urgency = urgency  # 0 = patient, 1 = aggressive
        self.volume_history = []
        self.price_history = []

    def execute_adaptive(self, market_volume: int, current_price: float,
                          participation_limit: float = None) -> Dict:
        """Execute with adaptive sizing."""
        self.volume_history.append(market_volume)
        self.price_history.append(current_price)

        # Base TWAP size
        base_size = self.get_target_for_interval()

        # Adjust based on volume profile
        if len(self.volume_history) > 5:
            recent_vol = np.mean(self.volume_history[-5:])
            avg_vol = np.mean(self.volume_history)
            vol_ratio = recent_vol / (avg_vol + 1)

            # Trade more when volume is high
            volume_adj = 1 + (vol_ratio - 1) * self.urgency
            base_size = int(base_size * volume_adj)

        # Adjust based on price trend
        if len(self.price_history) > 10:
            recent_prices = self.price_history[-10:]
            trend = (recent_prices[-1] - recent_prices[0]) / recent_prices[0]

            # If buying and price is falling, slow down
            # If buying and price is rising, speed up
            trend_adj = 1 + trend * self.urgency * 5
            base_size = int(base_size * trend_adj)

        # Ensure non-negative
        base_size = max(0, base_size)

        return super().execute_interval(market_volume, current_price,
                                         participation_limit)

Implementation Checklist

  • [ ] Duration appropriate for order size
  • [ ] Interval size reasonable (not too small/large)
  • [ ] Participation limit set to avoid signaling
  • [ ] Randomization added to avoid pattern detection
  • [ ] Start/end times appropriate (avoid open/close if needed)
  • [ ] Urgency parameter set based on market conditions
  • [ ] Benchmark selected (TWAP, VWAP, arrival price)
  • [ ] Post-trade analysis performed
  • [ ] Implementation shortfall calculated
  • [ ] Slippage attributed to market impact vs. timing

References

  1. Kissell, R. (2013). The Science of Algorithmic Trading and Portfolio Management. Academic Press.
  2. Almgren, R. & Chriss, N. (2000). "Optimal Execution of Portfolio Transactions." Journal of Risk, 3, 5-39.
  3. Laruelle, S., Lehalle, C.A., & Pagès, G. (2013). "Optimal Split of Orders Across Liquidity Pools." SIAM Journal on Financial Mathematics, 4(1), 367-396.