feat: add TWAP oracle + DCA execution engine (40 tests)

TWAP oracle primitive with ring buffer, cumulative accumulator,
VWAP, deviation signals, and realized volatility. DCA executor
composes oracle with MycoSystem for time-spread purchases with
fixed and TWAP-aware chunk sizing strategies. Oracle integration
is opt-in via MycoSystemConfig.twap_oracle_params.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jeff Emmett 2026-04-01 11:54:07 -07:00
parent 89b74049e8
commit 7fd03463fd
7 changed files with 1024 additions and 0 deletions

View File

@ -0,0 +1,4 @@
from src.composed.dca_executor import ( # noqa: F401
DCAParams, DCAOrder, DCAResult,
create_dca_order, compute_chunk_size, execute_chunk, simulate_dca,
)

View File

@ -0,0 +1,237 @@
"""DCA (Dollar-Cost Averaging) Execution Engine.
Composes the TWAP oracle with MycoSystem to enable time-spread
purchases that reduce price impact on the bonding surface.
Two strategies:
1. Fixed: equal chunks at regular intervals
2. TWAP-aware: modulates chunk size based on spot vs TWAP deviation
(buys more when spot < TWAP, less when spot > TWAP)
The bonding curve's deterministic price impact makes this particularly
valuable a single large deposit climbs the curve, while spreading
over time allows other activity to occur between chunks.
"""
import numpy as np
from numpy.typing import NDArray
from dataclasses import dataclass, field
from src.primitives.twap_oracle import (
TWAPOracleState, TWAPOracleParams,
create_oracle, record_observation, compute_twap,
)
from src.primitives.n_dimensional_surface import spot_prices
from src.composed.myco_surface import MycoSystem, MycoSystemConfig
@dataclass
class DCAParams:
"""Parameters for a DCA order."""
total_amount: float # Total USD to deploy
n_chunks: int # Number of chunks
interval: float # Time between chunks
asset_index: int = 0 # Which reserve asset to deposit
strategy: str = "fixed" # "fixed" or "twap_aware"
# TWAP-aware parameters
twap_window: float = 24.0 # Lookback for TWAP reference
max_deviation: float = 0.3 # Max chunk size deviation from base (±30%)
@dataclass
class DCAOrder:
"""State of an in-progress DCA order."""
params: DCAParams
start_time: float
chunks_executed: int = 0
total_tokens_received: float = 0.0
total_spent: float = 0.0
avg_price: float = 0.0
history: list[dict] = field(default_factory=list)
is_complete: bool = False
@dataclass
class DCAResult:
"""Result of a completed DCA simulation with comparison metrics."""
order: DCAOrder
twap_price: float # TWAP over execution period
lump_sum_tokens: float # What a single purchase would have gotten
dca_advantage: float # % better/worse than lump sum (positive = DCA got more tokens)
def create_dca_order(params: DCAParams, start_time: float) -> DCAOrder:
"""Create a new DCA order."""
return DCAOrder(params=params, start_time=start_time)
def compute_chunk_size(order: DCAOrder, oracle_state: TWAPOracleState) -> float:
"""Compute the next chunk size based on strategy.
Fixed: total_amount / n_chunks
TWAP-aware: modulates based on spot vs TWAP deviation.
"""
params = order.params
base_chunk = params.total_amount / params.n_chunks
remaining = params.total_amount - order.total_spent
if remaining <= 0:
return 0.0
if params.strategy == "fixed":
return min(base_chunk, remaining)
# TWAP-aware strategy
if not oracle_state.observations:
return min(base_chunk, remaining)
spot = oracle_state.observations[-1].price
twap = compute_twap(oracle_state, params.twap_window)
if twap <= 0:
return min(base_chunk, remaining)
ratio = spot / twap
if ratio < 1:
# Spot below TWAP = "cheap", buy more
chunk = base_chunk * (1 + params.max_deviation * (1 - ratio))
elif ratio > 1:
# Spot above TWAP = "expensive", buy less
chunk = base_chunk * (1 - params.max_deviation * min(ratio - 1, 1))
else:
chunk = base_chunk
# Clamp to remaining budget
return min(max(chunk, 0.0), remaining)
def execute_chunk(
system: MycoSystem,
order: DCAOrder,
oracle_state: TWAPOracleState,
current_time: float,
) -> tuple[MycoSystem, DCAOrder, TWAPOracleState, float]:
"""Execute a single DCA chunk.
Returns updated (system, order, oracle_state, tokens_minted).
"""
if order.is_complete:
return system, order, oracle_state, 0.0
chunk_size = compute_chunk_size(order, oracle_state)
if chunk_size <= 0:
order.is_complete = True
return system, order, oracle_state, 0.0
# Build deposit array (single-asset deposit)
n = system.config.n_reserve_assets
amounts = np.zeros(n)
amounts[order.params.asset_index] = chunk_size
# Execute deposit
myco_minted, metadata = system.deposit(amounts, current_time)
# Record price observation for oracle
if myco_minted > 0:
effective_price = chunk_size / myco_minted
oracle_state = record_observation(
oracle_state, effective_price, current_time, volume=chunk_size,
)
# Update order state
order.chunks_executed += 1
order.total_tokens_received += myco_minted
order.total_spent += chunk_size
order.avg_price = (
order.total_spent / order.total_tokens_received
if order.total_tokens_received > 0 else 0.0
)
order.history.append({
"time": current_time,
"chunk_size": chunk_size,
"tokens_minted": myco_minted,
"effective_price": chunk_size / myco_minted if myco_minted > 0 else 0.0,
"metadata": metadata,
})
if order.chunks_executed >= order.params.n_chunks or order.total_spent >= order.params.total_amount:
order.is_complete = True
return system, order, oracle_state, myco_minted
def simulate_dca(
system_config: MycoSystemConfig,
dca_params: DCAParams,
bootstrap_amount: float = 10000.0,
price_trajectory: list[tuple[float, NDArray]] | None = None,
) -> DCAResult:
"""Run a full DCA simulation and compare to lump sum.
Args:
system_config: Config for the MycoSystem
dca_params: DCA order parameters
bootstrap_amount: Initial deposit to bootstrap the system
price_trajectory: Optional list of (time, deposit_amounts) for
background market activity between DCA chunks
"""
n = system_config.n_reserve_assets
# --- Lump-sum baseline ---
lump_system = MycoSystem(system_config)
# Bootstrap
bootstrap = np.full(n, bootstrap_amount / n)
lump_system.deposit(bootstrap, 0.0)
# Lump sum deposit
lump_amounts = np.zeros(n)
lump_amounts[dca_params.asset_index] = dca_params.total_amount
lump_tokens, _ = lump_system.deposit(lump_amounts, 1.0)
# --- DCA execution ---
dca_system = MycoSystem(system_config)
# Same bootstrap
bootstrap = np.full(n, bootstrap_amount / n)
dca_system.deposit(bootstrap, 0.0)
oracle_state = create_oracle(TWAPOracleParams(default_window=dca_params.twap_window))
# Record initial price observation
if dca_system.state.surface_state.supply > 0:
initial_prices = spot_prices(
dca_system.state.surface_state.balances,
dca_system.surface_params,
)
oracle_state = record_observation(oracle_state, float(initial_prices[0]), 0.0)
order = create_dca_order(dca_params, start_time=1.0)
for i in range(dca_params.n_chunks):
t = order.start_time + i * dca_params.interval
# Apply background market activity if provided
if price_trajectory:
for pt_time, pt_amounts in price_trajectory:
if (i > 0 and
order.start_time + (i - 1) * dca_params.interval < pt_time <= t):
dca_system.deposit(pt_amounts, pt_time)
dca_system, order, oracle_state, _ = execute_chunk(
dca_system, order, oracle_state, t,
)
# Compute comparison
twap_price = compute_twap(oracle_state)
dca_advantage = 0.0
if lump_tokens > 0:
dca_advantage = (order.total_tokens_received - lump_tokens) / lump_tokens
return DCAResult(
order=order,
twap_price=twap_price,
lump_sum_tokens=lump_tokens,
dca_advantage=dca_advantage,
)

View File

@ -34,6 +34,11 @@ from src.primitives.imbalance_fees import (
compute_imbalance, surge_fee, compute_fee_adjusted_output,
)
from src.primitives.dynamic_weights import GradualWeightSchedule
from src.primitives.twap_oracle import (
TWAPOracleParams, TWAPOracleState,
create_oracle, record_observation, compute_twap,
)
from src.primitives.n_dimensional_surface import spot_prices
from src.commitments.labor import (
LaborIssuanceSystem, ContributorState,
attest_contribution, claim_tokens, create_default_system,
@ -77,6 +82,9 @@ class MycoSystemConfig:
max_subscription_mint_fraction: float = 0.1
max_staking_bonus_fraction: float = 0.05
# TWAP Oracle (optional)
twap_oracle_params: TWAPOracleParams | None = None
@dataclass
class MycoSystemState:
@ -103,6 +111,9 @@ class MycoSystemState:
subscription_system: SubscriptionSystem | None = None
staking_system: StakingSystem | None = None
# TWAP Oracle
oracle_state: TWAPOracleState | None = None
# Metrics
total_financial_minted: float = 0.0
total_commitment_minted: float = 0.0
@ -161,6 +172,10 @@ class MycoSystem:
))
self.state.reserve_state = ReserveState(vaults=vaults)
# Initialize TWAP oracle if params provided
if config.twap_oracle_params is not None:
self.state.oracle_state = create_oracle(config.twap_oracle_params)
def deposit(
self, amounts: NDArray, current_time: float
) -> tuple[float, dict]:
@ -222,6 +237,14 @@ class MycoSystem:
)
state.time = current_time
# Record price observation for TWAP oracle
if state.oracle_state is not None and myco_minted > 0:
effective_price = total_deposit_value / myco_minted
state.oracle_state = record_observation(
state.oracle_state, effective_price, current_time,
volume=total_deposit_value,
)
return myco_minted, {
"safe": True,
"fee_rate": fee_rate,
@ -296,6 +319,15 @@ class MycoSystem:
state.total_redeemed += myco_amount
state.time = current_time
# Record price observation for TWAP oracle
usd_returned = float(np.sum(withdrawal_split))
if state.oracle_state is not None and usd_returned > 0:
effective_price = myco_amount / usd_returned
state.oracle_state = record_observation(
state.oracle_state, effective_price, current_time,
volume=usd_returned,
)
return withdrawal_split, {
"redemption_rate": rate,
"flow_penalty": penalty,
@ -366,6 +398,23 @@ class MycoSystem:
return tokens
def get_twap(self, window: float | None = None) -> float | None:
"""Get TWAP over the given window. Returns None if oracle not enabled."""
if self.state.oracle_state is None:
return None
twap = compute_twap(self.state.oracle_state, window)
return twap if twap > 0 else None
def get_spot_price(self) -> float:
"""Get current spot price of MYCO (in terms of numeraire asset 0)."""
if self.state.supply <= 0:
return 0.0
prices = spot_prices(
self.state.surface_state.balances,
self.surface_params,
)
return float(prices[0])
def get_metrics(self) -> dict:
"""Get current system metrics."""
state = self.state

View File

@ -5,6 +5,7 @@ from numpy.typing import NDArray
from dataclasses import dataclass
from src.composed.myco_surface import MycoSystem, MycoSystemConfig
from src.composed.dca_executor import DCAParams, DCAResult, simulate_dca
from src.commitments.labor import ContributorState, attest_contribution
@ -218,3 +219,34 @@ def scenario_mixed_issuance(
deposit_schedule=deposits,
labor_schedule=labors,
)
def scenario_dca_comparison(
n_assets: int = 3,
total_amount: float = 10_000.0,
n_chunks: int = 20,
interval: float = 1.0,
bootstrap_amount: float = 50_000.0,
) -> dict[str, DCAResult]:
"""Compare fixed DCA, TWAP-aware DCA, and lump sum.
Returns a dict with keys "fixed" and "twap_aware", each containing
a DCAResult with comparison metrics vs lump sum.
"""
config = MycoSystemConfig(n_reserve_assets=n_assets)
results = {}
for strategy in ("fixed", "twap_aware"):
params = DCAParams(
total_amount=total_amount,
n_chunks=n_chunks,
interval=interval,
strategy=strategy,
twap_window=n_chunks * interval,
)
results[strategy] = simulate_dca(
config, params, bootstrap_amount=bootstrap_amount,
)
return results

View File

@ -0,0 +1,212 @@
"""Time-Weighted Average Price (TWAP) Oracle.
Source: Uniswap V2 cumulative price accumulator pattern
Uniswap V3 observations ring buffer with cardinality limits
Provides manipulation-resistant price feeds by averaging spot prices
over configurable time windows. Two computation approaches:
1. Cumulative accumulator (Uniswap V2): O(1) TWAP from running sum.
TWAP(t1, t2) = (cumulative[t2] - cumulative[t1]) / (t2 - t1)
2. Raw observation fallback: iterate over stored observations when
accumulator gaps exist (e.g., after state reconstruction).
For MYCO: feeds into DCA execution (time-spread purchases), dynamic
weight adjustment signals, and anomaly detection (spot vs TWAP deviation).
"""
import math
from dataclasses import dataclass, field
@dataclass
class PriceObservation:
"""A single price observation."""
price: float # Spot price at observation time
time: float # Timestamp
volume: float # Trade volume (for optional VWAP)
@dataclass
class TWAPOracleParams:
"""Configuration for the TWAP oracle."""
max_observations: int = 256 # Cardinality limit (ring buffer)
default_window: float = 24.0 # Default lookback window (time units)
min_observations: int = 2 # Minimum obs for valid TWAP
@dataclass
class TWAPOracleState:
"""State of the TWAP oracle."""
params: TWAPOracleParams
observations: list[PriceObservation] = field(default_factory=list)
cumulative_price_time: float = 0.0 # Running sum of price * dt
def create_oracle(params: TWAPOracleParams | None = None) -> TWAPOracleState:
"""Create a new TWAP oracle with default or custom params."""
if params is None:
params = TWAPOracleParams()
return TWAPOracleState(params=params)
def record_observation(
state: TWAPOracleState, price: float, time: float, volume: float = 0.0,
) -> TWAPOracleState:
"""Record a new price observation.
Updates the cumulative accumulator and maintains the ring buffer
cardinality limit by evicting the oldest observation when full.
"""
obs = PriceObservation(price=price, time=time, volume=volume)
# Update cumulative accumulator: add price * dt since last observation
new_cumulative = state.cumulative_price_time
if state.observations:
last = state.observations[-1]
dt = time - last.time
if dt > 0:
new_cumulative += last.price * dt
new_observations = list(state.observations)
new_observations.append(obs)
# Ring buffer eviction
if len(new_observations) > state.params.max_observations:
new_observations = new_observations[-state.params.max_observations:]
return TWAPOracleState(
params=state.params,
observations=new_observations,
cumulative_price_time=new_cumulative,
)
def compute_twap(state: TWAPOracleState, window: float | None = None) -> float:
"""Compute time-weighted average price over a lookback window.
Falls back to simple time-weighted average from raw observations
when fewer than min_observations exist or window exceeds history.
Returns 0.0 if insufficient data.
"""
if len(state.observations) < state.params.min_observations:
return 0.0
if window is None:
window = state.params.default_window
latest = state.observations[-1]
window_start = latest.time - window
# Gather observations within window
in_window = [o for o in state.observations if o.time >= window_start]
if len(in_window) < state.params.min_observations:
# Use all available observations as fallback
in_window = list(state.observations)
if len(in_window) < 2:
return in_window[0].price if in_window else 0.0
# Time-weighted average from raw observations
total_weighted = 0.0
total_time = 0.0
for i in range(len(in_window) - 1):
dt = in_window[i + 1].time - in_window[i].time
if dt > 0:
total_weighted += in_window[i].price * dt
total_time += dt
# Include the last observation's contribution up to "now"
# (weight it by the average dt to avoid zero-weighting the latest)
if total_time > 0:
return total_weighted / total_time
# All observations at the same time — simple average
return sum(o.price for o in in_window) / len(in_window)
def compute_vwap(state: TWAPOracleState, window: float | None = None) -> float:
"""Compute volume-weighted average price over a lookback window.
Returns 0.0 if insufficient data or zero total volume.
"""
if len(state.observations) < state.params.min_observations:
return 0.0
if window is None:
window = state.params.default_window
latest = state.observations[-1]
window_start = latest.time - window
in_window = [o for o in state.observations if o.time >= window_start]
if len(in_window) < state.params.min_observations:
in_window = list(state.observations)
total_pv = sum(o.price * o.volume for o in in_window)
total_v = sum(o.volume for o in in_window)
if total_v <= 0:
return 0.0
return total_pv / total_v
def spot_vs_twap_deviation(
state: TWAPOracleState, current_spot: float, window: float | None = None,
) -> float:
"""Percentage deviation of current spot from TWAP.
Returns (spot - twap) / twap as a fraction.
Positive = spot above TWAP, negative = spot below TWAP.
Returns 0.0 if TWAP is unavailable.
"""
twap = compute_twap(state, window)
if twap <= 0:
return 0.0
return (current_spot - twap) / twap
def get_volatility(state: TWAPOracleState, window: float | None = None) -> float:
"""Compute realized volatility from observation history.
Uses log-returns standard deviation, annualized by the window period.
Returns 0.0 if insufficient data.
"""
if len(state.observations) < 3:
return 0.0
if window is None:
window = state.params.default_window
latest = state.observations[-1]
window_start = latest.time - window
in_window = [o for o in state.observations if o.time >= window_start]
if len(in_window) < 3:
in_window = list(state.observations)
if len(in_window) < 3:
return 0.0
# Compute log-returns
log_returns = []
for i in range(1, len(in_window)):
if in_window[i - 1].price > 0 and in_window[i].price > 0:
log_returns.append(
math.log(in_window[i].price / in_window[i - 1].price)
)
if len(log_returns) < 2:
return 0.0
# Standard deviation of log-returns
mean = sum(log_returns) / len(log_returns)
variance = sum((r - mean) ** 2 for r in log_returns) / (len(log_returns) - 1)
return math.sqrt(variance)

228
tests/test_dca_executor.py Normal file
View File

@ -0,0 +1,228 @@
"""Tests for DCA execution engine."""
import numpy as np
import pytest
from src.composed.dca_executor import (
DCAParams, DCAOrder, DCAResult,
create_dca_order, compute_chunk_size, execute_chunk, simulate_dca,
)
from src.composed.myco_surface import MycoSystem, MycoSystemConfig
from src.primitives.twap_oracle import (
TWAPOracleParams, create_oracle, record_observation,
)
def _bootstrap_system(n_assets=3, amount=10000.0):
"""Helper: create and bootstrap a MycoSystem."""
config = MycoSystemConfig(n_reserve_assets=n_assets)
system = MycoSystem(config)
bootstrap = np.full(n_assets, amount / n_assets)
system.deposit(bootstrap, 0.0)
return system
class TestFixedDCA:
def test_equal_chunks(self):
"""Fixed strategy splits into equal chunks."""
params = DCAParams(total_amount=1000.0, n_chunks=10, interval=1.0)
order = create_dca_order(params, start_time=1.0)
oracle = create_oracle()
chunk = compute_chunk_size(order, oracle)
assert chunk == pytest.approx(100.0, abs=0.01)
def test_budget_exhaustion(self):
"""All chunks together should spend the full budget."""
system = _bootstrap_system()
params = DCAParams(total_amount=1000.0, n_chunks=5, interval=1.0)
order = create_dca_order(params, start_time=1.0)
oracle = create_oracle()
for i in range(5):
t = 1.0 + i * 1.0
system, order, oracle, tokens = execute_chunk(system, order, oracle, t)
assert order.is_complete
assert order.total_spent == pytest.approx(1000.0, abs=0.01)
assert order.chunks_executed == 5
def test_token_tracking(self):
"""Total tokens received should be positive and match history."""
system = _bootstrap_system()
params = DCAParams(total_amount=500.0, n_chunks=5, interval=1.0)
order = create_dca_order(params, start_time=1.0)
oracle = create_oracle()
total_from_chunks = 0.0
for i in range(5):
t = 1.0 + i * 1.0
system, order, oracle, tokens = execute_chunk(system, order, oracle, t)
total_from_chunks += tokens
assert order.total_tokens_received > 0
assert order.total_tokens_received == pytest.approx(total_from_chunks, rel=1e-10)
assert len(order.history) == 5
def test_avg_price_computed(self):
"""Average price should be total_spent / total_tokens."""
system = _bootstrap_system()
params = DCAParams(total_amount=1000.0, n_chunks=4, interval=1.0)
order = create_dca_order(params, start_time=1.0)
oracle = create_oracle()
for i in range(4):
system, order, oracle, _ = execute_chunk(
system, order, oracle, 1.0 + i,
)
expected_avg = order.total_spent / order.total_tokens_received
assert order.avg_price == pytest.approx(expected_avg, rel=1e-10)
def test_no_execution_after_complete(self):
"""Executing on a complete order returns 0 tokens."""
system = _bootstrap_system()
params = DCAParams(total_amount=100.0, n_chunks=1, interval=1.0)
order = create_dca_order(params, start_time=1.0)
oracle = create_oracle()
system, order, oracle, t1 = execute_chunk(system, order, oracle, 1.0)
assert order.is_complete
system, order, oracle, t2 = execute_chunk(system, order, oracle, 2.0)
assert t2 == 0.0
class TestTWAPAwareDCA:
def _make_oracle_with_history(self, base_price=1.0, n=10):
"""Create oracle with price history at base_price."""
oracle = create_oracle(TWAPOracleParams(default_window=100.0))
for t in range(n):
oracle = record_observation(oracle, base_price, float(t))
return oracle
def test_buys_more_when_cheap(self):
"""When spot < TWAP, chunk should be larger than base."""
# Oracle has TWAP ~ 10.0
oracle = self._make_oracle_with_history(base_price=10.0)
# Spot drops to 8.0
oracle = record_observation(oracle, 8.0, 11.0)
params = DCAParams(
total_amount=1000.0, n_chunks=10, interval=1.0,
strategy="twap_aware", max_deviation=0.3,
)
order = create_dca_order(params, start_time=12.0)
chunk = compute_chunk_size(order, oracle)
base = 1000.0 / 10
assert chunk > base
def test_buys_less_when_expensive(self):
"""When spot > TWAP, chunk should be smaller than base."""
oracle = self._make_oracle_with_history(base_price=10.0)
# Spot rises to 12.0
oracle = record_observation(oracle, 12.0, 11.0)
params = DCAParams(
total_amount=1000.0, n_chunks=10, interval=1.0,
strategy="twap_aware", max_deviation=0.3,
)
order = create_dca_order(params, start_time=12.0)
chunk = compute_chunk_size(order, oracle)
base = 1000.0 / 10
assert chunk < base
def test_equal_when_spot_equals_twap(self):
"""When spot == TWAP, chunk == base."""
oracle = self._make_oracle_with_history(base_price=10.0)
params = DCAParams(
total_amount=1000.0, n_chunks=10, interval=1.0,
strategy="twap_aware", max_deviation=0.3,
)
order = create_dca_order(params, start_time=11.0)
chunk = compute_chunk_size(order, oracle)
base = 1000.0 / 10
assert chunk == pytest.approx(base, rel=0.05)
def test_chunk_clamped_to_remaining(self):
"""Chunk size never exceeds remaining budget."""
oracle = self._make_oracle_with_history(base_price=10.0)
oracle = record_observation(oracle, 5.0, 11.0) # Very cheap
params = DCAParams(
total_amount=100.0, n_chunks=10, interval=1.0,
strategy="twap_aware", max_deviation=0.3,
)
order = create_dca_order(params, start_time=12.0)
order.total_spent = 95.0 # Only 5 remaining
chunk = compute_chunk_size(order, oracle)
assert chunk <= 5.0
class TestDCAvsLumpSum:
def test_simulation_runs(self):
"""Basic DCA simulation completes without error."""
config = MycoSystemConfig(n_reserve_assets=3)
params = DCAParams(
total_amount=5000.0, n_chunks=10, interval=1.0,
)
result = simulate_dca(config, params)
assert result.order.is_complete
assert result.order.total_tokens_received > 0
assert result.lump_sum_tokens > 0
def test_dca_advantage_computed(self):
"""DCA advantage is a finite number."""
config = MycoSystemConfig(n_reserve_assets=3)
params = DCAParams(
total_amount=5000.0, n_chunks=10, interval=1.0,
)
result = simulate_dca(config, params)
assert np.isfinite(result.dca_advantage)
def test_twap_aware_simulation(self):
"""TWAP-aware strategy also completes."""
config = MycoSystemConfig(n_reserve_assets=3)
params = DCAParams(
total_amount=5000.0, n_chunks=10, interval=1.0,
strategy="twap_aware", twap_window=50.0,
)
result = simulate_dca(config, params)
assert result.order.is_complete
assert result.order.total_tokens_received > 0
class TestEdgeCases:
def test_single_chunk_equals_lump_sum(self):
"""With n_chunks=1, DCA is equivalent to lump sum."""
config = MycoSystemConfig(n_reserve_assets=3)
params_dca = DCAParams(
total_amount=5000.0, n_chunks=1, interval=1.0,
)
result = simulate_dca(config, params_dca)
# With one chunk, DCA and lump sum should get similar tokens
# (not exact due to different bootstrap timing)
ratio = result.order.total_tokens_received / result.lump_sum_tokens
assert 0.95 < ratio < 1.05
def test_zero_bootstrap(self):
"""DCA on a fresh system (bootstrap via first chunk)."""
config = MycoSystemConfig(n_reserve_assets=3)
params = DCAParams(
total_amount=1000.0, n_chunks=5, interval=1.0,
)
# Small bootstrap so system exists
result = simulate_dca(config, params, bootstrap_amount=100.0)
assert result.order.is_complete
assert result.order.total_tokens_received > 0
def test_large_dca_relative_to_system(self):
"""DCA for an amount much larger than bootstrap."""
config = MycoSystemConfig(n_reserve_assets=3)
params = DCAParams(
total_amount=50000.0, n_chunks=20, interval=1.0,
)
result = simulate_dca(config, params, bootstrap_amount=1000.0)
assert result.order.is_complete
assert result.order.total_spent == pytest.approx(50000.0, abs=1.0)

262
tests/test_twap_oracle.py Normal file
View File

@ -0,0 +1,262 @@
"""Tests for TWAP oracle primitive."""
import math
import pytest
from src.primitives.twap_oracle import (
PriceObservation, TWAPOracleParams, TWAPOracleState,
create_oracle, record_observation,
compute_twap, compute_vwap,
spot_vs_twap_deviation, get_volatility,
)
class TestPriceObservation:
def test_record_single(self):
oracle = create_oracle()
oracle = record_observation(oracle, price=1.0, time=0.0)
assert len(oracle.observations) == 1
assert oracle.observations[0].price == 1.0
assert oracle.observations[0].time == 0.0
def test_record_multiple(self):
oracle = create_oracle()
oracle = record_observation(oracle, 1.0, 0.0)
oracle = record_observation(oracle, 1.5, 1.0)
oracle = record_observation(oracle, 2.0, 2.0)
assert len(oracle.observations) == 3
assert oracle.observations[-1].price == 2.0
def test_record_with_volume(self):
oracle = create_oracle()
oracle = record_observation(oracle, 1.0, 0.0, volume=100.0)
assert oracle.observations[0].volume == 100.0
def test_cumulative_accumulator_updates(self):
oracle = create_oracle()
oracle = record_observation(oracle, 10.0, 0.0)
assert oracle.cumulative_price_time == 0.0 # First obs, no dt
oracle = record_observation(oracle, 12.0, 5.0)
# cumulative += 10.0 * 5.0 = 50.0
assert oracle.cumulative_price_time == 50.0
oracle = record_observation(oracle, 8.0, 10.0)
# cumulative += 12.0 * 5.0 = 60.0 → total 110.0
assert oracle.cumulative_price_time == 110.0
class TestTWAPComputation:
def test_constant_price(self):
"""TWAP of a constant price should be that price."""
oracle = create_oracle()
for t in range(10):
oracle = record_observation(oracle, 5.0, float(t))
twap = compute_twap(oracle, window=100.0)
assert abs(twap - 5.0) < 1e-10
def test_rising_price(self):
"""TWAP of a rising price should be between start and end."""
oracle = create_oracle()
for t in range(10):
oracle = record_observation(oracle, 1.0 + t * 0.1, float(t))
twap = compute_twap(oracle, window=100.0)
first_price = 1.0
last_price = 1.9
assert first_price < twap < last_price
def test_window_slicing(self):
"""Only observations within the window should contribute."""
oracle = create_oracle()
# Old observations at low price
for t in range(10):
oracle = record_observation(oracle, 1.0, float(t))
# Recent observations at high price
for t in range(10, 20):
oracle = record_observation(oracle, 10.0, float(t))
# Short window should give ~10.0
short_twap = compute_twap(oracle, window=5.0)
assert short_twap == pytest.approx(10.0, abs=0.1)
# Long window should be between 1 and 10
long_twap = compute_twap(oracle, window=100.0)
assert 1.0 < long_twap < 10.0
def test_insufficient_data(self):
"""Should return 0.0 with fewer than min_observations."""
oracle = create_oracle()
assert compute_twap(oracle) == 0.0
oracle = record_observation(oracle, 5.0, 0.0)
assert compute_twap(oracle) == 0.0 # Only 1, need 2
def test_default_window(self):
"""Uses default_window from params when window=None."""
params = TWAPOracleParams(default_window=5.0)
oracle = create_oracle(params)
for t in range(20):
oracle = record_observation(oracle, float(t), float(t))
twap_default = compute_twap(oracle)
twap_explicit = compute_twap(oracle, window=5.0)
assert twap_default == twap_explicit
def test_two_observations(self):
"""Minimum case: two observations."""
oracle = create_oracle()
oracle = record_observation(oracle, 2.0, 0.0)
oracle = record_observation(oracle, 4.0, 1.0)
twap = compute_twap(oracle, window=10.0)
# Only one interval: price 2.0 for dt=1.0
assert twap == pytest.approx(2.0, abs=0.01)
class TestVWAP:
def test_equal_volume(self):
"""With equal volumes, VWAP = simple average."""
oracle = create_oracle()
oracle = record_observation(oracle, 10.0, 0.0, volume=100.0)
oracle = record_observation(oracle, 20.0, 1.0, volume=100.0)
vwap = compute_vwap(oracle, window=10.0)
assert vwap == pytest.approx(15.0, abs=0.01)
def test_volume_weighting(self):
"""Higher volume at lower price should pull VWAP down."""
oracle = create_oracle()
oracle = record_observation(oracle, 10.0, 0.0, volume=900.0)
oracle = record_observation(oracle, 20.0, 1.0, volume=100.0)
vwap = compute_vwap(oracle, window=10.0)
# (10*900 + 20*100) / 1000 = 11.0
assert vwap == pytest.approx(11.0, abs=0.01)
def test_zero_volume(self):
"""VWAP returns 0.0 when total volume is zero."""
oracle = create_oracle()
oracle = record_observation(oracle, 10.0, 0.0, volume=0.0)
oracle = record_observation(oracle, 20.0, 1.0, volume=0.0)
assert compute_vwap(oracle, window=10.0) == 0.0
def test_insufficient_data(self):
oracle = create_oracle()
assert compute_vwap(oracle) == 0.0
class TestDeviation:
def test_spot_above_twap(self):
"""Positive deviation when spot > TWAP."""
oracle = create_oracle()
for t in range(10):
oracle = record_observation(oracle, 10.0, float(t))
dev = spot_vs_twap_deviation(oracle, current_spot=12.0, window=100.0)
assert dev > 0
assert dev == pytest.approx(0.2, abs=0.01)
def test_spot_below_twap(self):
"""Negative deviation when spot < TWAP."""
oracle = create_oracle()
for t in range(10):
oracle = record_observation(oracle, 10.0, float(t))
dev = spot_vs_twap_deviation(oracle, current_spot=8.0, window=100.0)
assert dev < 0
assert dev == pytest.approx(-0.2, abs=0.01)
def test_spot_equals_twap(self):
"""Zero deviation when spot == TWAP."""
oracle = create_oracle()
for t in range(10):
oracle = record_observation(oracle, 10.0, float(t))
dev = spot_vs_twap_deviation(oracle, current_spot=10.0, window=100.0)
assert abs(dev) < 1e-10
def test_no_data(self):
"""Returns 0.0 with no TWAP data."""
oracle = create_oracle()
assert spot_vs_twap_deviation(oracle, 10.0) == 0.0
class TestVolatility:
def test_constant_price_zero_vol(self):
"""Constant price = zero volatility."""
oracle = create_oracle()
for t in range(20):
oracle = record_observation(oracle, 100.0, float(t))
vol = get_volatility(oracle, window=100.0)
assert vol == pytest.approx(0.0, abs=1e-10)
def test_varying_price_positive_vol(self):
"""Varying prices = positive volatility."""
oracle = create_oracle()
prices = [100, 105, 98, 110, 95, 108, 102, 112, 97, 106]
for t, p in enumerate(prices):
oracle = record_observation(oracle, float(p), float(t))
vol = get_volatility(oracle, window=100.0)
assert vol > 0
def test_insufficient_data(self):
"""Need at least 3 observations for volatility."""
oracle = create_oracle()
oracle = record_observation(oracle, 100.0, 0.0)
oracle = record_observation(oracle, 105.0, 1.0)
assert get_volatility(oracle) == 0.0
def test_volatility_increases_with_swings(self):
"""Larger price swings = higher volatility."""
# Small swings
oracle_small = create_oracle()
for t in range(20):
price = 100.0 + (1 if t % 2 == 0 else -1)
oracle_small = record_observation(oracle_small, price, float(t))
# Large swings
oracle_large = create_oracle()
for t in range(20):
price = 100.0 + (20 if t % 2 == 0 else -20)
oracle_large = record_observation(oracle_large, price, float(t))
vol_small = get_volatility(oracle_small, window=100.0)
vol_large = get_volatility(oracle_large, window=100.0)
assert vol_large > vol_small
class TestRingBuffer:
def test_max_observations_respected(self):
"""Ring buffer evicts oldest when full."""
params = TWAPOracleParams(max_observations=5)
oracle = create_oracle(params)
for t in range(10):
oracle = record_observation(oracle, float(t), float(t))
assert len(oracle.observations) == 5
# Oldest should be t=5 (indices 5-9 kept)
assert oracle.observations[0].time == 5.0
assert oracle.observations[-1].time == 9.0
def test_small_buffer(self):
"""Even buffer size 2 works."""
params = TWAPOracleParams(max_observations=2, min_observations=2)
oracle = create_oracle(params)
oracle = record_observation(oracle, 1.0, 0.0)
oracle = record_observation(oracle, 2.0, 1.0)
oracle = record_observation(oracle, 3.0, 2.0)
assert len(oracle.observations) == 2
assert oracle.observations[0].price == 2.0
assert oracle.observations[1].price == 3.0
def test_cumulative_survives_eviction(self):
"""Cumulative accumulator keeps accruing even after eviction."""
params = TWAPOracleParams(max_observations=3)
oracle = create_oracle(params)
for t in range(6):
oracle = record_observation(oracle, 10.0, float(t))
# cumulative = 10*1 + 10*1 + 10*1 + 10*1 + 10*1 = 50.0
assert oracle.cumulative_price_time == 50.0
assert len(oracle.observations) == 3