From 7fd03463fdab4a651cd9e792e6c32c247a3c1d3c Mon Sep 17 00:00:00 2001 From: Jeff Emmett Date: Wed, 1 Apr 2026 11:54:07 -0700 Subject: [PATCH] feat: add TWAP oracle + DCA execution engine (40 tests) TWAP oracle primitive with ring buffer, cumulative accumulator, VWAP, deviation signals, and realized volatility. DCA executor composes oracle with MycoSystem for time-spread purchases with fixed and TWAP-aware chunk sizing strategies. Oracle integration is opt-in via MycoSystemConfig.twap_oracle_params. Co-Authored-By: Claude Opus 4.6 --- src/composed/__init__.py | 4 + src/composed/dca_executor.py | 237 ++++++++++++++++++++++++++++++ src/composed/myco_surface.py | 49 +++++++ src/composed/simulator.py | 32 +++++ src/primitives/twap_oracle.py | 212 +++++++++++++++++++++++++++ tests/test_dca_executor.py | 228 +++++++++++++++++++++++++++++ tests/test_twap_oracle.py | 262 ++++++++++++++++++++++++++++++++++ 7 files changed, 1024 insertions(+) create mode 100644 src/composed/dca_executor.py create mode 100644 src/primitives/twap_oracle.py create mode 100644 tests/test_dca_executor.py create mode 100644 tests/test_twap_oracle.py diff --git a/src/composed/__init__.py b/src/composed/__init__.py index e69de29..5d655ba 100644 --- a/src/composed/__init__.py +++ b/src/composed/__init__.py @@ -0,0 +1,4 @@ +from src.composed.dca_executor import ( # noqa: F401 + DCAParams, DCAOrder, DCAResult, + create_dca_order, compute_chunk_size, execute_chunk, simulate_dca, +) diff --git a/src/composed/dca_executor.py b/src/composed/dca_executor.py new file mode 100644 index 0000000..184f2c7 --- /dev/null +++ b/src/composed/dca_executor.py @@ -0,0 +1,237 @@ +"""DCA (Dollar-Cost Averaging) Execution Engine. + +Composes the TWAP oracle with MycoSystem to enable time-spread +purchases that reduce price impact on the bonding surface. + +Two strategies: +1. Fixed: equal chunks at regular intervals +2. TWAP-aware: modulates chunk size based on spot vs TWAP deviation + (buys more when spot < TWAP, less when spot > TWAP) + +The bonding curve's deterministic price impact makes this particularly +valuable — a single large deposit climbs the curve, while spreading +over time allows other activity to occur between chunks. +""" + +import numpy as np +from numpy.typing import NDArray +from dataclasses import dataclass, field + +from src.primitives.twap_oracle import ( + TWAPOracleState, TWAPOracleParams, + create_oracle, record_observation, compute_twap, +) +from src.primitives.n_dimensional_surface import spot_prices +from src.composed.myco_surface import MycoSystem, MycoSystemConfig + + +@dataclass +class DCAParams: + """Parameters for a DCA order.""" + total_amount: float # Total USD to deploy + n_chunks: int # Number of chunks + interval: float # Time between chunks + asset_index: int = 0 # Which reserve asset to deposit + strategy: str = "fixed" # "fixed" or "twap_aware" + # TWAP-aware parameters + twap_window: float = 24.0 # Lookback for TWAP reference + max_deviation: float = 0.3 # Max chunk size deviation from base (±30%) + + +@dataclass +class DCAOrder: + """State of an in-progress DCA order.""" + params: DCAParams + start_time: float + chunks_executed: int = 0 + total_tokens_received: float = 0.0 + total_spent: float = 0.0 + avg_price: float = 0.0 + history: list[dict] = field(default_factory=list) + is_complete: bool = False + + +@dataclass +class DCAResult: + """Result of a completed DCA simulation with comparison metrics.""" + order: DCAOrder + twap_price: float # TWAP over execution period + lump_sum_tokens: float # What a single purchase would have gotten + dca_advantage: float # % better/worse than lump sum (positive = DCA got more tokens) + + +def create_dca_order(params: DCAParams, start_time: float) -> DCAOrder: + """Create a new DCA order.""" + return DCAOrder(params=params, start_time=start_time) + + +def compute_chunk_size(order: DCAOrder, oracle_state: TWAPOracleState) -> float: + """Compute the next chunk size based on strategy. + + Fixed: total_amount / n_chunks + TWAP-aware: modulates based on spot vs TWAP deviation. + """ + params = order.params + base_chunk = params.total_amount / params.n_chunks + remaining = params.total_amount - order.total_spent + + if remaining <= 0: + return 0.0 + + if params.strategy == "fixed": + return min(base_chunk, remaining) + + # TWAP-aware strategy + if not oracle_state.observations: + return min(base_chunk, remaining) + + spot = oracle_state.observations[-1].price + twap = compute_twap(oracle_state, params.twap_window) + + if twap <= 0: + return min(base_chunk, remaining) + + ratio = spot / twap + + if ratio < 1: + # Spot below TWAP = "cheap", buy more + chunk = base_chunk * (1 + params.max_deviation * (1 - ratio)) + elif ratio > 1: + # Spot above TWAP = "expensive", buy less + chunk = base_chunk * (1 - params.max_deviation * min(ratio - 1, 1)) + else: + chunk = base_chunk + + # Clamp to remaining budget + return min(max(chunk, 0.0), remaining) + + +def execute_chunk( + system: MycoSystem, + order: DCAOrder, + oracle_state: TWAPOracleState, + current_time: float, +) -> tuple[MycoSystem, DCAOrder, TWAPOracleState, float]: + """Execute a single DCA chunk. + + Returns updated (system, order, oracle_state, tokens_minted). + """ + if order.is_complete: + return system, order, oracle_state, 0.0 + + chunk_size = compute_chunk_size(order, oracle_state) + if chunk_size <= 0: + order.is_complete = True + return system, order, oracle_state, 0.0 + + # Build deposit array (single-asset deposit) + n = system.config.n_reserve_assets + amounts = np.zeros(n) + amounts[order.params.asset_index] = chunk_size + + # Execute deposit + myco_minted, metadata = system.deposit(amounts, current_time) + + # Record price observation for oracle + if myco_minted > 0: + effective_price = chunk_size / myco_minted + oracle_state = record_observation( + oracle_state, effective_price, current_time, volume=chunk_size, + ) + + # Update order state + order.chunks_executed += 1 + order.total_tokens_received += myco_minted + order.total_spent += chunk_size + order.avg_price = ( + order.total_spent / order.total_tokens_received + if order.total_tokens_received > 0 else 0.0 + ) + order.history.append({ + "time": current_time, + "chunk_size": chunk_size, + "tokens_minted": myco_minted, + "effective_price": chunk_size / myco_minted if myco_minted > 0 else 0.0, + "metadata": metadata, + }) + + if order.chunks_executed >= order.params.n_chunks or order.total_spent >= order.params.total_amount: + order.is_complete = True + + return system, order, oracle_state, myco_minted + + +def simulate_dca( + system_config: MycoSystemConfig, + dca_params: DCAParams, + bootstrap_amount: float = 10000.0, + price_trajectory: list[tuple[float, NDArray]] | None = None, +) -> DCAResult: + """Run a full DCA simulation and compare to lump sum. + + Args: + system_config: Config for the MycoSystem + dca_params: DCA order parameters + bootstrap_amount: Initial deposit to bootstrap the system + price_trajectory: Optional list of (time, deposit_amounts) for + background market activity between DCA chunks + """ + n = system_config.n_reserve_assets + + # --- Lump-sum baseline --- + lump_system = MycoSystem(system_config) + + # Bootstrap + bootstrap = np.full(n, bootstrap_amount / n) + lump_system.deposit(bootstrap, 0.0) + + # Lump sum deposit + lump_amounts = np.zeros(n) + lump_amounts[dca_params.asset_index] = dca_params.total_amount + lump_tokens, _ = lump_system.deposit(lump_amounts, 1.0) + + # --- DCA execution --- + dca_system = MycoSystem(system_config) + + # Same bootstrap + bootstrap = np.full(n, bootstrap_amount / n) + dca_system.deposit(bootstrap, 0.0) + + oracle_state = create_oracle(TWAPOracleParams(default_window=dca_params.twap_window)) + + # Record initial price observation + if dca_system.state.surface_state.supply > 0: + initial_prices = spot_prices( + dca_system.state.surface_state.balances, + dca_system.surface_params, + ) + oracle_state = record_observation(oracle_state, float(initial_prices[0]), 0.0) + + order = create_dca_order(dca_params, start_time=1.0) + + for i in range(dca_params.n_chunks): + t = order.start_time + i * dca_params.interval + + # Apply background market activity if provided + if price_trajectory: + for pt_time, pt_amounts in price_trajectory: + if (i > 0 and + order.start_time + (i - 1) * dca_params.interval < pt_time <= t): + dca_system.deposit(pt_amounts, pt_time) + + dca_system, order, oracle_state, _ = execute_chunk( + dca_system, order, oracle_state, t, + ) + + # Compute comparison + twap_price = compute_twap(oracle_state) + dca_advantage = 0.0 + if lump_tokens > 0: + dca_advantage = (order.total_tokens_received - lump_tokens) / lump_tokens + + return DCAResult( + order=order, + twap_price=twap_price, + lump_sum_tokens=lump_tokens, + dca_advantage=dca_advantage, + ) diff --git a/src/composed/myco_surface.py b/src/composed/myco_surface.py index c8364c0..08a5ce8 100644 --- a/src/composed/myco_surface.py +++ b/src/composed/myco_surface.py @@ -34,6 +34,11 @@ from src.primitives.imbalance_fees import ( compute_imbalance, surge_fee, compute_fee_adjusted_output, ) from src.primitives.dynamic_weights import GradualWeightSchedule +from src.primitives.twap_oracle import ( + TWAPOracleParams, TWAPOracleState, + create_oracle, record_observation, compute_twap, +) +from src.primitives.n_dimensional_surface import spot_prices from src.commitments.labor import ( LaborIssuanceSystem, ContributorState, attest_contribution, claim_tokens, create_default_system, @@ -77,6 +82,9 @@ class MycoSystemConfig: max_subscription_mint_fraction: float = 0.1 max_staking_bonus_fraction: float = 0.05 + # TWAP Oracle (optional) + twap_oracle_params: TWAPOracleParams | None = None + @dataclass class MycoSystemState: @@ -103,6 +111,9 @@ class MycoSystemState: subscription_system: SubscriptionSystem | None = None staking_system: StakingSystem | None = None + # TWAP Oracle + oracle_state: TWAPOracleState | None = None + # Metrics total_financial_minted: float = 0.0 total_commitment_minted: float = 0.0 @@ -161,6 +172,10 @@ class MycoSystem: )) self.state.reserve_state = ReserveState(vaults=vaults) + # Initialize TWAP oracle if params provided + if config.twap_oracle_params is not None: + self.state.oracle_state = create_oracle(config.twap_oracle_params) + def deposit( self, amounts: NDArray, current_time: float ) -> tuple[float, dict]: @@ -222,6 +237,14 @@ class MycoSystem: ) state.time = current_time + # Record price observation for TWAP oracle + if state.oracle_state is not None and myco_minted > 0: + effective_price = total_deposit_value / myco_minted + state.oracle_state = record_observation( + state.oracle_state, effective_price, current_time, + volume=total_deposit_value, + ) + return myco_minted, { "safe": True, "fee_rate": fee_rate, @@ -296,6 +319,15 @@ class MycoSystem: state.total_redeemed += myco_amount state.time = current_time + # Record price observation for TWAP oracle + usd_returned = float(np.sum(withdrawal_split)) + if state.oracle_state is not None and usd_returned > 0: + effective_price = myco_amount / usd_returned + state.oracle_state = record_observation( + state.oracle_state, effective_price, current_time, + volume=usd_returned, + ) + return withdrawal_split, { "redemption_rate": rate, "flow_penalty": penalty, @@ -366,6 +398,23 @@ class MycoSystem: return tokens + def get_twap(self, window: float | None = None) -> float | None: + """Get TWAP over the given window. Returns None if oracle not enabled.""" + if self.state.oracle_state is None: + return None + twap = compute_twap(self.state.oracle_state, window) + return twap if twap > 0 else None + + def get_spot_price(self) -> float: + """Get current spot price of MYCO (in terms of numeraire asset 0).""" + if self.state.supply <= 0: + return 0.0 + prices = spot_prices( + self.state.surface_state.balances, + self.surface_params, + ) + return float(prices[0]) + def get_metrics(self) -> dict: """Get current system metrics.""" state = self.state diff --git a/src/composed/simulator.py b/src/composed/simulator.py index e3dc2db..71837f8 100644 --- a/src/composed/simulator.py +++ b/src/composed/simulator.py @@ -5,6 +5,7 @@ from numpy.typing import NDArray from dataclasses import dataclass from src.composed.myco_surface import MycoSystem, MycoSystemConfig +from src.composed.dca_executor import DCAParams, DCAResult, simulate_dca from src.commitments.labor import ContributorState, attest_contribution @@ -218,3 +219,34 @@ def scenario_mixed_issuance( deposit_schedule=deposits, labor_schedule=labors, ) + + +def scenario_dca_comparison( + n_assets: int = 3, + total_amount: float = 10_000.0, + n_chunks: int = 20, + interval: float = 1.0, + bootstrap_amount: float = 50_000.0, +) -> dict[str, DCAResult]: + """Compare fixed DCA, TWAP-aware DCA, and lump sum. + + Returns a dict with keys "fixed" and "twap_aware", each containing + a DCAResult with comparison metrics vs lump sum. + """ + config = MycoSystemConfig(n_reserve_assets=n_assets) + + results = {} + + for strategy in ("fixed", "twap_aware"): + params = DCAParams( + total_amount=total_amount, + n_chunks=n_chunks, + interval=interval, + strategy=strategy, + twap_window=n_chunks * interval, + ) + results[strategy] = simulate_dca( + config, params, bootstrap_amount=bootstrap_amount, + ) + + return results diff --git a/src/primitives/twap_oracle.py b/src/primitives/twap_oracle.py new file mode 100644 index 0000000..d01e1ab --- /dev/null +++ b/src/primitives/twap_oracle.py @@ -0,0 +1,212 @@ +"""Time-Weighted Average Price (TWAP) Oracle. + +Source: Uniswap V2 cumulative price accumulator pattern + Uniswap V3 observations ring buffer with cardinality limits + +Provides manipulation-resistant price feeds by averaging spot prices +over configurable time windows. Two computation approaches: + +1. Cumulative accumulator (Uniswap V2): O(1) TWAP from running sum. + TWAP(t1, t2) = (cumulative[t2] - cumulative[t1]) / (t2 - t1) + +2. Raw observation fallback: iterate over stored observations when + accumulator gaps exist (e.g., after state reconstruction). + +For MYCO: feeds into DCA execution (time-spread purchases), dynamic +weight adjustment signals, and anomaly detection (spot vs TWAP deviation). +""" + +import math +from dataclasses import dataclass, field + + +@dataclass +class PriceObservation: + """A single price observation.""" + price: float # Spot price at observation time + time: float # Timestamp + volume: float # Trade volume (for optional VWAP) + + +@dataclass +class TWAPOracleParams: + """Configuration for the TWAP oracle.""" + max_observations: int = 256 # Cardinality limit (ring buffer) + default_window: float = 24.0 # Default lookback window (time units) + min_observations: int = 2 # Minimum obs for valid TWAP + + +@dataclass +class TWAPOracleState: + """State of the TWAP oracle.""" + params: TWAPOracleParams + observations: list[PriceObservation] = field(default_factory=list) + cumulative_price_time: float = 0.0 # Running sum of price * dt + + +def create_oracle(params: TWAPOracleParams | None = None) -> TWAPOracleState: + """Create a new TWAP oracle with default or custom params.""" + if params is None: + params = TWAPOracleParams() + return TWAPOracleState(params=params) + + +def record_observation( + state: TWAPOracleState, price: float, time: float, volume: float = 0.0, +) -> TWAPOracleState: + """Record a new price observation. + + Updates the cumulative accumulator and maintains the ring buffer + cardinality limit by evicting the oldest observation when full. + """ + obs = PriceObservation(price=price, time=time, volume=volume) + + # Update cumulative accumulator: add price * dt since last observation + new_cumulative = state.cumulative_price_time + if state.observations: + last = state.observations[-1] + dt = time - last.time + if dt > 0: + new_cumulative += last.price * dt + + new_observations = list(state.observations) + new_observations.append(obs) + + # Ring buffer eviction + if len(new_observations) > state.params.max_observations: + new_observations = new_observations[-state.params.max_observations:] + + return TWAPOracleState( + params=state.params, + observations=new_observations, + cumulative_price_time=new_cumulative, + ) + + +def compute_twap(state: TWAPOracleState, window: float | None = None) -> float: + """Compute time-weighted average price over a lookback window. + + Falls back to simple time-weighted average from raw observations + when fewer than min_observations exist or window exceeds history. + + Returns 0.0 if insufficient data. + """ + if len(state.observations) < state.params.min_observations: + return 0.0 + + if window is None: + window = state.params.default_window + + latest = state.observations[-1] + window_start = latest.time - window + + # Gather observations within window + in_window = [o for o in state.observations if o.time >= window_start] + + if len(in_window) < state.params.min_observations: + # Use all available observations as fallback + in_window = list(state.observations) + + if len(in_window) < 2: + return in_window[0].price if in_window else 0.0 + + # Time-weighted average from raw observations + total_weighted = 0.0 + total_time = 0.0 + + for i in range(len(in_window) - 1): + dt = in_window[i + 1].time - in_window[i].time + if dt > 0: + total_weighted += in_window[i].price * dt + total_time += dt + + # Include the last observation's contribution up to "now" + # (weight it by the average dt to avoid zero-weighting the latest) + if total_time > 0: + return total_weighted / total_time + + # All observations at the same time — simple average + return sum(o.price for o in in_window) / len(in_window) + + +def compute_vwap(state: TWAPOracleState, window: float | None = None) -> float: + """Compute volume-weighted average price over a lookback window. + + Returns 0.0 if insufficient data or zero total volume. + """ + if len(state.observations) < state.params.min_observations: + return 0.0 + + if window is None: + window = state.params.default_window + + latest = state.observations[-1] + window_start = latest.time - window + + in_window = [o for o in state.observations if o.time >= window_start] + + if len(in_window) < state.params.min_observations: + in_window = list(state.observations) + + total_pv = sum(o.price * o.volume for o in in_window) + total_v = sum(o.volume for o in in_window) + + if total_v <= 0: + return 0.0 + + return total_pv / total_v + + +def spot_vs_twap_deviation( + state: TWAPOracleState, current_spot: float, window: float | None = None, +) -> float: + """Percentage deviation of current spot from TWAP. + + Returns (spot - twap) / twap as a fraction. + Positive = spot above TWAP, negative = spot below TWAP. + Returns 0.0 if TWAP is unavailable. + """ + twap = compute_twap(state, window) + if twap <= 0: + return 0.0 + return (current_spot - twap) / twap + + +def get_volatility(state: TWAPOracleState, window: float | None = None) -> float: + """Compute realized volatility from observation history. + + Uses log-returns standard deviation, annualized by the window period. + Returns 0.0 if insufficient data. + """ + if len(state.observations) < 3: + return 0.0 + + if window is None: + window = state.params.default_window + + latest = state.observations[-1] + window_start = latest.time - window + + in_window = [o for o in state.observations if o.time >= window_start] + + if len(in_window) < 3: + in_window = list(state.observations) + + if len(in_window) < 3: + return 0.0 + + # Compute log-returns + log_returns = [] + for i in range(1, len(in_window)): + if in_window[i - 1].price > 0 and in_window[i].price > 0: + log_returns.append( + math.log(in_window[i].price / in_window[i - 1].price) + ) + + if len(log_returns) < 2: + return 0.0 + + # Standard deviation of log-returns + mean = sum(log_returns) / len(log_returns) + variance = sum((r - mean) ** 2 for r in log_returns) / (len(log_returns) - 1) + return math.sqrt(variance) diff --git a/tests/test_dca_executor.py b/tests/test_dca_executor.py new file mode 100644 index 0000000..de375eb --- /dev/null +++ b/tests/test_dca_executor.py @@ -0,0 +1,228 @@ +"""Tests for DCA execution engine.""" + +import numpy as np +import pytest +from src.composed.dca_executor import ( + DCAParams, DCAOrder, DCAResult, + create_dca_order, compute_chunk_size, execute_chunk, simulate_dca, +) +from src.composed.myco_surface import MycoSystem, MycoSystemConfig +from src.primitives.twap_oracle import ( + TWAPOracleParams, create_oracle, record_observation, +) + + +def _bootstrap_system(n_assets=3, amount=10000.0): + """Helper: create and bootstrap a MycoSystem.""" + config = MycoSystemConfig(n_reserve_assets=n_assets) + system = MycoSystem(config) + bootstrap = np.full(n_assets, amount / n_assets) + system.deposit(bootstrap, 0.0) + return system + + +class TestFixedDCA: + def test_equal_chunks(self): + """Fixed strategy splits into equal chunks.""" + params = DCAParams(total_amount=1000.0, n_chunks=10, interval=1.0) + order = create_dca_order(params, start_time=1.0) + oracle = create_oracle() + chunk = compute_chunk_size(order, oracle) + assert chunk == pytest.approx(100.0, abs=0.01) + + def test_budget_exhaustion(self): + """All chunks together should spend the full budget.""" + system = _bootstrap_system() + params = DCAParams(total_amount=1000.0, n_chunks=5, interval=1.0) + order = create_dca_order(params, start_time=1.0) + oracle = create_oracle() + + for i in range(5): + t = 1.0 + i * 1.0 + system, order, oracle, tokens = execute_chunk(system, order, oracle, t) + + assert order.is_complete + assert order.total_spent == pytest.approx(1000.0, abs=0.01) + assert order.chunks_executed == 5 + + def test_token_tracking(self): + """Total tokens received should be positive and match history.""" + system = _bootstrap_system() + params = DCAParams(total_amount=500.0, n_chunks=5, interval=1.0) + order = create_dca_order(params, start_time=1.0) + oracle = create_oracle() + + total_from_chunks = 0.0 + for i in range(5): + t = 1.0 + i * 1.0 + system, order, oracle, tokens = execute_chunk(system, order, oracle, t) + total_from_chunks += tokens + + assert order.total_tokens_received > 0 + assert order.total_tokens_received == pytest.approx(total_from_chunks, rel=1e-10) + assert len(order.history) == 5 + + def test_avg_price_computed(self): + """Average price should be total_spent / total_tokens.""" + system = _bootstrap_system() + params = DCAParams(total_amount=1000.0, n_chunks=4, interval=1.0) + order = create_dca_order(params, start_time=1.0) + oracle = create_oracle() + + for i in range(4): + system, order, oracle, _ = execute_chunk( + system, order, oracle, 1.0 + i, + ) + + expected_avg = order.total_spent / order.total_tokens_received + assert order.avg_price == pytest.approx(expected_avg, rel=1e-10) + + def test_no_execution_after_complete(self): + """Executing on a complete order returns 0 tokens.""" + system = _bootstrap_system() + params = DCAParams(total_amount=100.0, n_chunks=1, interval=1.0) + order = create_dca_order(params, start_time=1.0) + oracle = create_oracle() + + system, order, oracle, t1 = execute_chunk(system, order, oracle, 1.0) + assert order.is_complete + + system, order, oracle, t2 = execute_chunk(system, order, oracle, 2.0) + assert t2 == 0.0 + + +class TestTWAPAwareDCA: + def _make_oracle_with_history(self, base_price=1.0, n=10): + """Create oracle with price history at base_price.""" + oracle = create_oracle(TWAPOracleParams(default_window=100.0)) + for t in range(n): + oracle = record_observation(oracle, base_price, float(t)) + return oracle + + def test_buys_more_when_cheap(self): + """When spot < TWAP, chunk should be larger than base.""" + # Oracle has TWAP ~ 10.0 + oracle = self._make_oracle_with_history(base_price=10.0) + # Spot drops to 8.0 + oracle = record_observation(oracle, 8.0, 11.0) + + params = DCAParams( + total_amount=1000.0, n_chunks=10, interval=1.0, + strategy="twap_aware", max_deviation=0.3, + ) + order = create_dca_order(params, start_time=12.0) + chunk = compute_chunk_size(order, oracle) + base = 1000.0 / 10 + assert chunk > base + + def test_buys_less_when_expensive(self): + """When spot > TWAP, chunk should be smaller than base.""" + oracle = self._make_oracle_with_history(base_price=10.0) + # Spot rises to 12.0 + oracle = record_observation(oracle, 12.0, 11.0) + + params = DCAParams( + total_amount=1000.0, n_chunks=10, interval=1.0, + strategy="twap_aware", max_deviation=0.3, + ) + order = create_dca_order(params, start_time=12.0) + chunk = compute_chunk_size(order, oracle) + base = 1000.0 / 10 + assert chunk < base + + def test_equal_when_spot_equals_twap(self): + """When spot == TWAP, chunk == base.""" + oracle = self._make_oracle_with_history(base_price=10.0) + + params = DCAParams( + total_amount=1000.0, n_chunks=10, interval=1.0, + strategy="twap_aware", max_deviation=0.3, + ) + order = create_dca_order(params, start_time=11.0) + chunk = compute_chunk_size(order, oracle) + base = 1000.0 / 10 + assert chunk == pytest.approx(base, rel=0.05) + + def test_chunk_clamped_to_remaining(self): + """Chunk size never exceeds remaining budget.""" + oracle = self._make_oracle_with_history(base_price=10.0) + oracle = record_observation(oracle, 5.0, 11.0) # Very cheap + + params = DCAParams( + total_amount=100.0, n_chunks=10, interval=1.0, + strategy="twap_aware", max_deviation=0.3, + ) + order = create_dca_order(params, start_time=12.0) + order.total_spent = 95.0 # Only 5 remaining + chunk = compute_chunk_size(order, oracle) + assert chunk <= 5.0 + + +class TestDCAvsLumpSum: + def test_simulation_runs(self): + """Basic DCA simulation completes without error.""" + config = MycoSystemConfig(n_reserve_assets=3) + params = DCAParams( + total_amount=5000.0, n_chunks=10, interval=1.0, + ) + result = simulate_dca(config, params) + + assert result.order.is_complete + assert result.order.total_tokens_received > 0 + assert result.lump_sum_tokens > 0 + + def test_dca_advantage_computed(self): + """DCA advantage is a finite number.""" + config = MycoSystemConfig(n_reserve_assets=3) + params = DCAParams( + total_amount=5000.0, n_chunks=10, interval=1.0, + ) + result = simulate_dca(config, params) + assert np.isfinite(result.dca_advantage) + + def test_twap_aware_simulation(self): + """TWAP-aware strategy also completes.""" + config = MycoSystemConfig(n_reserve_assets=3) + params = DCAParams( + total_amount=5000.0, n_chunks=10, interval=1.0, + strategy="twap_aware", twap_window=50.0, + ) + result = simulate_dca(config, params) + assert result.order.is_complete + assert result.order.total_tokens_received > 0 + + +class TestEdgeCases: + def test_single_chunk_equals_lump_sum(self): + """With n_chunks=1, DCA is equivalent to lump sum.""" + config = MycoSystemConfig(n_reserve_assets=3) + params_dca = DCAParams( + total_amount=5000.0, n_chunks=1, interval=1.0, + ) + result = simulate_dca(config, params_dca) + + # With one chunk, DCA and lump sum should get similar tokens + # (not exact due to different bootstrap timing) + ratio = result.order.total_tokens_received / result.lump_sum_tokens + assert 0.95 < ratio < 1.05 + + def test_zero_bootstrap(self): + """DCA on a fresh system (bootstrap via first chunk).""" + config = MycoSystemConfig(n_reserve_assets=3) + params = DCAParams( + total_amount=1000.0, n_chunks=5, interval=1.0, + ) + # Small bootstrap so system exists + result = simulate_dca(config, params, bootstrap_amount=100.0) + assert result.order.is_complete + assert result.order.total_tokens_received > 0 + + def test_large_dca_relative_to_system(self): + """DCA for an amount much larger than bootstrap.""" + config = MycoSystemConfig(n_reserve_assets=3) + params = DCAParams( + total_amount=50000.0, n_chunks=20, interval=1.0, + ) + result = simulate_dca(config, params, bootstrap_amount=1000.0) + assert result.order.is_complete + assert result.order.total_spent == pytest.approx(50000.0, abs=1.0) diff --git a/tests/test_twap_oracle.py b/tests/test_twap_oracle.py new file mode 100644 index 0000000..708579c --- /dev/null +++ b/tests/test_twap_oracle.py @@ -0,0 +1,262 @@ +"""Tests for TWAP oracle primitive.""" + +import math +import pytest +from src.primitives.twap_oracle import ( + PriceObservation, TWAPOracleParams, TWAPOracleState, + create_oracle, record_observation, + compute_twap, compute_vwap, + spot_vs_twap_deviation, get_volatility, +) + + +class TestPriceObservation: + def test_record_single(self): + oracle = create_oracle() + oracle = record_observation(oracle, price=1.0, time=0.0) + assert len(oracle.observations) == 1 + assert oracle.observations[0].price == 1.0 + assert oracle.observations[0].time == 0.0 + + def test_record_multiple(self): + oracle = create_oracle() + oracle = record_observation(oracle, 1.0, 0.0) + oracle = record_observation(oracle, 1.5, 1.0) + oracle = record_observation(oracle, 2.0, 2.0) + assert len(oracle.observations) == 3 + assert oracle.observations[-1].price == 2.0 + + def test_record_with_volume(self): + oracle = create_oracle() + oracle = record_observation(oracle, 1.0, 0.0, volume=100.0) + assert oracle.observations[0].volume == 100.0 + + def test_cumulative_accumulator_updates(self): + oracle = create_oracle() + oracle = record_observation(oracle, 10.0, 0.0) + assert oracle.cumulative_price_time == 0.0 # First obs, no dt + + oracle = record_observation(oracle, 12.0, 5.0) + # cumulative += 10.0 * 5.0 = 50.0 + assert oracle.cumulative_price_time == 50.0 + + oracle = record_observation(oracle, 8.0, 10.0) + # cumulative += 12.0 * 5.0 = 60.0 → total 110.0 + assert oracle.cumulative_price_time == 110.0 + + +class TestTWAPComputation: + def test_constant_price(self): + """TWAP of a constant price should be that price.""" + oracle = create_oracle() + for t in range(10): + oracle = record_observation(oracle, 5.0, float(t)) + twap = compute_twap(oracle, window=100.0) + assert abs(twap - 5.0) < 1e-10 + + def test_rising_price(self): + """TWAP of a rising price should be between start and end.""" + oracle = create_oracle() + for t in range(10): + oracle = record_observation(oracle, 1.0 + t * 0.1, float(t)) + + twap = compute_twap(oracle, window=100.0) + first_price = 1.0 + last_price = 1.9 + assert first_price < twap < last_price + + def test_window_slicing(self): + """Only observations within the window should contribute.""" + oracle = create_oracle() + # Old observations at low price + for t in range(10): + oracle = record_observation(oracle, 1.0, float(t)) + # Recent observations at high price + for t in range(10, 20): + oracle = record_observation(oracle, 10.0, float(t)) + + # Short window should give ~10.0 + short_twap = compute_twap(oracle, window=5.0) + assert short_twap == pytest.approx(10.0, abs=0.1) + + # Long window should be between 1 and 10 + long_twap = compute_twap(oracle, window=100.0) + assert 1.0 < long_twap < 10.0 + + def test_insufficient_data(self): + """Should return 0.0 with fewer than min_observations.""" + oracle = create_oracle() + assert compute_twap(oracle) == 0.0 + + oracle = record_observation(oracle, 5.0, 0.0) + assert compute_twap(oracle) == 0.0 # Only 1, need 2 + + def test_default_window(self): + """Uses default_window from params when window=None.""" + params = TWAPOracleParams(default_window=5.0) + oracle = create_oracle(params) + for t in range(20): + oracle = record_observation(oracle, float(t), float(t)) + + twap_default = compute_twap(oracle) + twap_explicit = compute_twap(oracle, window=5.0) + assert twap_default == twap_explicit + + def test_two_observations(self): + """Minimum case: two observations.""" + oracle = create_oracle() + oracle = record_observation(oracle, 2.0, 0.0) + oracle = record_observation(oracle, 4.0, 1.0) + twap = compute_twap(oracle, window=10.0) + # Only one interval: price 2.0 for dt=1.0 + assert twap == pytest.approx(2.0, abs=0.01) + + +class TestVWAP: + def test_equal_volume(self): + """With equal volumes, VWAP = simple average.""" + oracle = create_oracle() + oracle = record_observation(oracle, 10.0, 0.0, volume=100.0) + oracle = record_observation(oracle, 20.0, 1.0, volume=100.0) + vwap = compute_vwap(oracle, window=10.0) + assert vwap == pytest.approx(15.0, abs=0.01) + + def test_volume_weighting(self): + """Higher volume at lower price should pull VWAP down.""" + oracle = create_oracle() + oracle = record_observation(oracle, 10.0, 0.0, volume=900.0) + oracle = record_observation(oracle, 20.0, 1.0, volume=100.0) + vwap = compute_vwap(oracle, window=10.0) + # (10*900 + 20*100) / 1000 = 11.0 + assert vwap == pytest.approx(11.0, abs=0.01) + + def test_zero_volume(self): + """VWAP returns 0.0 when total volume is zero.""" + oracle = create_oracle() + oracle = record_observation(oracle, 10.0, 0.0, volume=0.0) + oracle = record_observation(oracle, 20.0, 1.0, volume=0.0) + assert compute_vwap(oracle, window=10.0) == 0.0 + + def test_insufficient_data(self): + oracle = create_oracle() + assert compute_vwap(oracle) == 0.0 + + +class TestDeviation: + def test_spot_above_twap(self): + """Positive deviation when spot > TWAP.""" + oracle = create_oracle() + for t in range(10): + oracle = record_observation(oracle, 10.0, float(t)) + + dev = spot_vs_twap_deviation(oracle, current_spot=12.0, window=100.0) + assert dev > 0 + assert dev == pytest.approx(0.2, abs=0.01) + + def test_spot_below_twap(self): + """Negative deviation when spot < TWAP.""" + oracle = create_oracle() + for t in range(10): + oracle = record_observation(oracle, 10.0, float(t)) + + dev = spot_vs_twap_deviation(oracle, current_spot=8.0, window=100.0) + assert dev < 0 + assert dev == pytest.approx(-0.2, abs=0.01) + + def test_spot_equals_twap(self): + """Zero deviation when spot == TWAP.""" + oracle = create_oracle() + for t in range(10): + oracle = record_observation(oracle, 10.0, float(t)) + + dev = spot_vs_twap_deviation(oracle, current_spot=10.0, window=100.0) + assert abs(dev) < 1e-10 + + def test_no_data(self): + """Returns 0.0 with no TWAP data.""" + oracle = create_oracle() + assert spot_vs_twap_deviation(oracle, 10.0) == 0.0 + + +class TestVolatility: + def test_constant_price_zero_vol(self): + """Constant price = zero volatility.""" + oracle = create_oracle() + for t in range(20): + oracle = record_observation(oracle, 100.0, float(t)) + vol = get_volatility(oracle, window=100.0) + assert vol == pytest.approx(0.0, abs=1e-10) + + def test_varying_price_positive_vol(self): + """Varying prices = positive volatility.""" + oracle = create_oracle() + prices = [100, 105, 98, 110, 95, 108, 102, 112, 97, 106] + for t, p in enumerate(prices): + oracle = record_observation(oracle, float(p), float(t)) + vol = get_volatility(oracle, window=100.0) + assert vol > 0 + + def test_insufficient_data(self): + """Need at least 3 observations for volatility.""" + oracle = create_oracle() + oracle = record_observation(oracle, 100.0, 0.0) + oracle = record_observation(oracle, 105.0, 1.0) + assert get_volatility(oracle) == 0.0 + + def test_volatility_increases_with_swings(self): + """Larger price swings = higher volatility.""" + # Small swings + oracle_small = create_oracle() + for t in range(20): + price = 100.0 + (1 if t % 2 == 0 else -1) + oracle_small = record_observation(oracle_small, price, float(t)) + + # Large swings + oracle_large = create_oracle() + for t in range(20): + price = 100.0 + (20 if t % 2 == 0 else -20) + oracle_large = record_observation(oracle_large, price, float(t)) + + vol_small = get_volatility(oracle_small, window=100.0) + vol_large = get_volatility(oracle_large, window=100.0) + assert vol_large > vol_small + + +class TestRingBuffer: + def test_max_observations_respected(self): + """Ring buffer evicts oldest when full.""" + params = TWAPOracleParams(max_observations=5) + oracle = create_oracle(params) + + for t in range(10): + oracle = record_observation(oracle, float(t), float(t)) + + assert len(oracle.observations) == 5 + # Oldest should be t=5 (indices 5-9 kept) + assert oracle.observations[0].time == 5.0 + assert oracle.observations[-1].time == 9.0 + + def test_small_buffer(self): + """Even buffer size 2 works.""" + params = TWAPOracleParams(max_observations=2, min_observations=2) + oracle = create_oracle(params) + + oracle = record_observation(oracle, 1.0, 0.0) + oracle = record_observation(oracle, 2.0, 1.0) + oracle = record_observation(oracle, 3.0, 2.0) + + assert len(oracle.observations) == 2 + assert oracle.observations[0].price == 2.0 + assert oracle.observations[1].price == 3.0 + + def test_cumulative_survives_eviction(self): + """Cumulative accumulator keeps accruing even after eviction.""" + params = TWAPOracleParams(max_observations=3) + oracle = create_oracle(params) + + for t in range(6): + oracle = record_observation(oracle, 10.0, float(t)) + + # cumulative = 10*1 + 10*1 + 10*1 + 10*1 + 10*1 = 50.0 + assert oracle.cumulative_price_time == 50.0 + assert len(oracle.observations) == 3