feat: add signal router, CRDT DCA schedules, and subscription/donation DCA (74 tests)

Three integration layers connecting the TWAP oracle and DCA executor
to the rest of the system:

- Signal Router: routes oracle volatility/deviation signals into
  adaptive params (flow threshold, PAMM curvature, surge fees)
- CRDT DCA Schedules: G-Set of chunks with monotone status lattice
  (pending→submitted→executed), composes with batch settlement
- Subscription/Donation DCA: recurring payments and one-time donations
  flow through the bonding curve via time-spread DCA chunks

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jeff Emmett 2026-04-01 12:28:18 -07:00
parent 7fd03463fd
commit e27d76d273
8 changed files with 1881 additions and 0 deletions

View File

@ -2,3 +2,16 @@ from src.composed.dca_executor import ( # noqa: F401
DCAParams, DCAOrder, DCAResult,
create_dca_order, compute_chunk_size, execute_chunk, simulate_dca,
)
from src.composed.subscription_dca import ( # noqa: F401
SubscriptionDCAConfig, DonationDCAConfig,
SubscriptionDCAOrder, DonationDCAOrder, SubscriptionDCAState,
create_subscription_dca_order, create_donation_dca_order,
execute_subscription_chunk, execute_donation_chunk,
pending_chunks, simulate_subscription_dca,
)
from src.crdt.dca_schedule import ( # noqa: F401
DCAChunk, DCASchedule, DCAScheduleRegistry,
create_dca_schedule, advance_chunk_status,
materialize_chunks, reconcile_settled_chunks,
merge_dca_schedules, schedule_completion_fraction,
)

View File

@ -38,6 +38,9 @@ from src.primitives.twap_oracle import (
TWAPOracleParams, TWAPOracleState,
create_oracle, record_observation, compute_twap,
)
from src.primitives.signal_router import (
SignalRouterConfig, AdaptiveParams, route_signals,
)
from src.primitives.n_dimensional_surface import spot_prices
from src.commitments.labor import (
LaborIssuanceSystem, ContributorState,
@ -415,6 +418,37 @@ class MycoSystem:
)
return float(prices[0])
def apply_signal_routing(
self,
router_config: SignalRouterConfig,
base_params: AdaptiveParams | None = None,
) -> AdaptiveParams:
"""Route oracle signals into adaptive system parameters.
Opt-in: returns base_params unchanged if oracle is None.
If base_params is None, builds defaults from current config.
"""
if base_params is None:
base_params = AdaptiveParams(
flow_threshold=self.config.flow_threshold,
pamm_alpha_bar=self.config.pamm_params.alpha_bar,
surge_fee_rate=self.config.surge_fee_rate,
oracle_multiplier_velocity=0.0,
)
spot = self.get_spot_price()
adapted = route_signals(
self.state.oracle_state, base_params, router_config,
current_spot=spot,
)
# Apply adapted params back to live config
self.state.flow_tracker.threshold = adapted.flow_threshold
self.config.pamm_params.alpha_bar = adapted.pamm_alpha_bar
self.config.surge_fee_rate = adapted.surge_fee_rate
return adapted
def get_metrics(self) -> dict:
"""Get current system metrics."""
state = self.state

View File

@ -0,0 +1,356 @@
"""Subscription/Donation DCA — recurring payments and donations via DCA.
Composes subscription tiers with the DCA executor so that recurring
payments and one-time donations flow through the bonding curve as
time-spread chunks instead of flat-rate commitment minting.
Subscription DCA:
Each subscription period, the payment is split into n_chunks and
executed over the period via the DCA executor. Loyalty multiplier
produces bonus tokens on top of curve-minted tokens.
Donation DCA:
Donors choose at donation time whether to DCA (spread over chunks)
or do a single instant deposit. DCA reduces price impact for
large donations.
"""
import numpy as np
from numpy.typing import NDArray
from dataclasses import dataclass, field
from src.composed.dca_executor import (
DCAParams,
DCAOrder,
create_dca_order,
execute_chunk,
)
from src.composed.myco_surface import MycoSystem, MycoSystemConfig
from src.primitives.twap_oracle import TWAPOracleState, TWAPOracleParams, create_oracle
from src.commitments.subscription import Subscription, SubscriptionTier
@dataclass
class SubscriptionDCAConfig:
"""Configuration for subscription-based DCA."""
n_chunks: int = 5 # Chunks per subscription period
spread_fraction: float = 0.8 # Fraction of period over which to spread
strategy: str = "fixed" # "fixed" or "twap_aware"
loyalty_as_bonus: bool = True # Mint loyalty bonus on top of curve tokens
@dataclass
class DonationDCAConfig:
"""Configuration for donation DCA — donor chooses at donation time."""
enable_dca: bool = True # False = single instant deposit
n_chunks: int = 5 # Number of DCA chunks
interval: float = 1.0 # Time between chunks
strategy: str = "fixed" # "fixed" or "twap_aware"
@dataclass
class SubscriptionDCAOrder:
"""A DCA order tied to a subscription period."""
order: DCAOrder
subscription_id: str
tier: str
loyalty_multiplier: float
loyalty_bonus_minted: float = 0.0
period_index: int = 0
@dataclass
class DonationDCAOrder:
"""A DCA order for a one-time donation."""
order: DCAOrder
donor: str
donation_amount: float
@dataclass
class SubscriptionDCAState:
"""Tracks all active and historical DCA orders."""
# subscriber -> list of subscription DCA orders (one per period)
subscription_orders: dict[str, list[SubscriptionDCAOrder]] = field(
default_factory=dict,
)
# All donation orders
donation_orders: list[DonationDCAOrder] = field(default_factory=list)
# --- Subscription DCA ---
def _compute_loyalty_multiplier(
tier: SubscriptionTier,
subscription: Subscription,
current_time: float,
) -> float:
"""Compute loyalty multiplier matching subscription.py's formula."""
duration = current_time - subscription.start_time
return 1.0 + (tier.loyalty_multiplier_max - 1.0) * (
1.0 - np.exp(-0.693 * duration / tier.loyalty_halflife)
)
def create_subscription_dca_order(
subscription: Subscription,
tier: SubscriptionTier,
config: SubscriptionDCAConfig,
current_time: float,
loyalty_multiplier: float,
period_index: int = 0,
) -> SubscriptionDCAOrder:
"""Create a DCA order for one subscription period.
Splits the tier's payment_per_period into n_chunks spread over
spread_fraction of the period.
"""
spread_duration = tier.period_length * config.spread_fraction
interval = spread_duration / max(config.n_chunks, 1)
dca_params = DCAParams(
total_amount=tier.payment_per_period,
n_chunks=config.n_chunks,
interval=interval,
asset_index=0,
strategy=config.strategy,
)
order = create_dca_order(dca_params, start_time=current_time)
return SubscriptionDCAOrder(
order=order,
subscription_id=subscription.subscriber,
tier=subscription.tier,
loyalty_multiplier=loyalty_multiplier,
period_index=period_index,
)
# --- Donation DCA ---
def create_donation_dca_order(
donor: str,
amount: float,
config: DonationDCAConfig,
current_time: float,
) -> DonationDCAOrder:
"""Create a DCA order for a one-time donation.
If enable_dca is False or n_chunks <= 1, creates a single-chunk
order (instant deposit).
"""
n = config.n_chunks if config.enable_dca and config.n_chunks > 1 else 1
interval = config.interval if n > 1 else 0.0
dca_params = DCAParams(
total_amount=amount,
n_chunks=n,
interval=interval,
asset_index=0,
strategy=config.strategy,
)
order = create_dca_order(dca_params, start_time=current_time)
return DonationDCAOrder(
order=order,
donor=donor,
donation_amount=amount,
)
# --- Chunk execution ---
def execute_subscription_chunk(
state: SubscriptionDCAState,
subscriber: str,
system: MycoSystem,
oracle_state: TWAPOracleState,
current_time: float,
) -> tuple[SubscriptionDCAState, MycoSystem, TWAPOracleState, float, float]:
"""Execute the next pending chunk for a subscriber's latest order.
Returns:
(state, system, oracle_state, curve_tokens, loyalty_bonus)
The loyalty bonus is: curve_tokens * (loyalty_multiplier - 1.0)
when loyalty_as_bonus is configured. This bonus is NOT minted through
the curve it's minted as commitment tokens on top.
"""
orders = state.subscription_orders.get(subscriber)
if not orders:
return state, system, oracle_state, 0.0, 0.0
# Find the latest non-complete order
sub_order = orders[-1]
if sub_order.order.is_complete:
return state, system, oracle_state, 0.0, 0.0
# Execute through DCA executor
system, sub_order.order, oracle_state, curve_tokens = execute_chunk(
system, sub_order.order, oracle_state, current_time,
)
# Compute loyalty bonus
loyalty_bonus = 0.0
if curve_tokens > 0 and sub_order.loyalty_multiplier > 1.0:
loyalty_bonus = curve_tokens * (sub_order.loyalty_multiplier - 1.0)
sub_order.loyalty_bonus_minted += loyalty_bonus
return state, system, oracle_state, curve_tokens, loyalty_bonus
def execute_donation_chunk(
state: SubscriptionDCAState,
order_index: int,
system: MycoSystem,
oracle_state: TWAPOracleState,
current_time: float,
) -> tuple[SubscriptionDCAState, MycoSystem, TWAPOracleState, float]:
"""Execute the next pending chunk for a donation order.
Returns:
(state, system, oracle_state, tokens_minted)
"""
if order_index >= len(state.donation_orders):
return state, system, oracle_state, 0.0
don_order = state.donation_orders[order_index]
if don_order.order.is_complete:
return state, system, oracle_state, 0.0
system, don_order.order, oracle_state, tokens = execute_chunk(
system, don_order.order, oracle_state, current_time,
)
return state, system, oracle_state, tokens
# --- Query ---
def pending_chunks(
state: SubscriptionDCAState,
subscriber: str,
current_time: float,
) -> list[tuple[int, int]]:
"""List pending (unexecuted) chunks for a subscriber.
Returns list of (order_index, chunk_number) tuples where
chunk_number is the next chunk to execute in that order.
"""
orders = state.subscription_orders.get(subscriber, [])
result = []
for i, sub_order in enumerate(orders):
order = sub_order.order
if order.is_complete:
continue
next_chunk = order.chunks_executed
next_time = order.start_time + next_chunk * order.params.interval
if current_time >= next_time:
result.append((i, next_chunk))
return result
# --- Simulation ---
def simulate_subscription_dca(
tiers: dict[str, SubscriptionTier],
config: SubscriptionDCAConfig,
system_config: MycoSystemConfig,
subscribers: list[tuple[str, str, float]],
duration: float,
dt: float = 1.0,
) -> dict[str, NDArray]:
"""Simulate subscription DCA over time.
Args:
tiers: Available subscription tiers.
config: DCA configuration.
system_config: MycoSystem configuration.
subscribers: List of (name, tier_name, start_time).
duration: Total simulation duration.
dt: Time step.
Returns:
Dict with arrays: times, total_curve_tokens, total_loyalty_bonus,
total_supply.
"""
system = MycoSystem(system_config)
n = system_config.n_reserve_assets
# Bootstrap the system
bootstrap = np.full(n, 10000.0 / n)
system.deposit(bootstrap, 0.0)
oracle_state = create_oracle(TWAPOracleParams(default_window=24.0))
state = SubscriptionDCAState()
subs: dict[str, Subscription] = {}
n_steps = int(duration / dt)
times = np.zeros(n_steps)
curve_tokens_arr = np.zeros(n_steps)
loyalty_bonus_arr = np.zeros(n_steps)
supply_arr = np.zeros(n_steps)
total_curve = 0.0
total_loyalty = 0.0
for step in range(n_steps):
t = step * dt
times[step] = t
# Create subscriptions as they start
for name, tier_name, start_time in subscribers:
if abs(t - start_time) < dt / 2 and name not in subs:
tier = tiers[tier_name]
sub = Subscription(
subscriber=name,
tier=tier_name,
start_time=t,
last_payment_time=t,
)
subs[name] = sub
lm = _compute_loyalty_multiplier(tier, sub, t)
order = create_subscription_dca_order(
sub, tier, config, t, lm, period_index=0,
)
state.subscription_orders[name] = [order]
# Execute pending chunks
for name in list(state.subscription_orders.keys()):
if name not in subs:
continue
pending = pending_chunks(state, name, t)
for _ in pending:
state, system, oracle_state, ct, lb = execute_subscription_chunk(
state, name, system, oracle_state, t,
)
total_curve += ct
total_loyalty += lb
# Check if order complete and new period due
orders = state.subscription_orders.get(name, [])
if orders and orders[-1].order.is_complete:
sub = subs[name]
tier = tiers[sub.tier]
period_idx = orders[-1].period_index + 1
next_period_start = sub.start_time + period_idx * tier.period_length
if t >= next_period_start:
lm = _compute_loyalty_multiplier(tier, sub, t)
new_order = create_subscription_dca_order(
sub, tier, config, t, lm, period_index=period_idx,
)
orders.append(new_order)
curve_tokens_arr[step] = total_curve
loyalty_bonus_arr[step] = total_loyalty
supply_arr[step] = system.state.supply
return {
"times": times,
"total_curve_tokens": curve_tokens_arr,
"total_loyalty_bonus": loyalty_bonus_arr,
"total_supply": supply_arr,
}

328
src/crdt/dca_schedule.py Normal file
View File

@ -0,0 +1,328 @@
"""CRDT-native DCA schedules — G-Set of chunks with monotone status.
Each DCA schedule is a set of chunks whose status follows a monotone
lattice: pending(0) submitted(1) executed(2). This mirrors the
_STATUS_RANK pattern from intent_matching.py.
Composition points:
materialize_chunks() before create_batch() due chunks become Intents
reconcile_settled_chunks() after solve_batch() settled intents mark
chunks as executed.
CRDT merge: union of schedules/chunks, max status wins for duplicates.
This is commutative, associative, and idempotent.
"""
from dataclasses import dataclass, field
from src.crdt.intent_matching import Intent, IntentSet, add_intent
from src.crdt.batch_settlement import Settlement
# --- Status lattice ---
_CHUNK_STATUS_RANK = {"pending": 0, "submitted": 1, "executed": 2}
def _higher_chunk_status(a: str, b: str) -> str:
"""Return the more advanced chunk status (monotone lattice)."""
return a if _CHUNK_STATUS_RANK.get(a, 0) >= _CHUNK_STATUS_RANK.get(b, 0) else b
# --- Data models ---
@dataclass(frozen=True)
class DCAChunk:
"""A single chunk within a DCA schedule.
Frozen so it can serve as a value in the G-Set.
Status transitions create new instances.
"""
chunk_id: str
schedule_id: str
maker: str
sell_token: str
sell_amount: float
buy_token: str
min_buy_amount: float
scheduled_time: float
status: str = "pending" # "pending" | "submitted" | "executed"
@dataclass
class DCASchedule:
"""A complete DCA schedule: a set of chunks for one maker."""
schedule_id: str
maker: str
chunks: dict[str, DCAChunk] = field(default_factory=dict)
total_amount: float = 0.0
@dataclass
class DCAScheduleRegistry:
"""Registry of all DCA schedules across all makers."""
schedules: dict[str, DCASchedule] = field(default_factory=dict)
# --- Schedule creation ---
def create_dca_schedule(
schedule_id: str,
maker: str,
total_amount: float,
n_chunks: int,
start_time: float,
interval: float,
sell_token: str = "USDC",
buy_token: str = "MYCO",
min_buy_ratio: float = 0.8,
oracle_state=None,
) -> DCASchedule:
"""Create a DCA schedule with n_chunks spread over time.
Args:
schedule_id: Unique schedule identifier.
maker: The entity placing the order.
total_amount: Total amount to deploy across all chunks.
n_chunks: Number of chunks.
start_time: Timestamp of first chunk.
interval: Time between chunks.
sell_token: Token being sold (deposited).
buy_token: Token being bought (minted).
min_buy_ratio: Minimum acceptable output as fraction of input.
oracle_state: Optional oracle for TWAP-aware sizing (unused
in this CRDT layer sizing happens at execution time).
Returns:
A DCASchedule with all chunks in 'pending' status.
"""
chunk_amount = total_amount / n_chunks
chunks: dict[str, DCAChunk] = {}
for i in range(n_chunks):
cid = f"{schedule_id}_chunk_{i}"
chunks[cid] = DCAChunk(
chunk_id=cid,
schedule_id=schedule_id,
maker=maker,
sell_token=sell_token,
sell_amount=chunk_amount,
buy_token=buy_token,
min_buy_amount=chunk_amount * min_buy_ratio,
scheduled_time=start_time + i * interval,
)
return DCASchedule(
schedule_id=schedule_id,
maker=maker,
chunks=chunks,
total_amount=total_amount,
)
# --- Status advancement ---
def advance_chunk_status(
schedule: DCASchedule,
chunk_id: str,
new_status: str,
) -> DCASchedule:
"""Advance a chunk's status monotonically.
Only allows forward transitions in the lattice:
pending submitted executed.
Returns schedule unchanged if transition is not forward.
"""
chunk = schedule.chunks.get(chunk_id)
if chunk is None:
return schedule
current_rank = _CHUNK_STATUS_RANK.get(chunk.status, 0)
new_rank = _CHUNK_STATUS_RANK.get(new_status, 0)
if new_rank <= current_rank:
return schedule
updated = DCAChunk(
chunk_id=chunk.chunk_id,
schedule_id=chunk.schedule_id,
maker=chunk.maker,
sell_token=chunk.sell_token,
sell_amount=chunk.sell_amount,
buy_token=chunk.buy_token,
min_buy_amount=chunk.min_buy_amount,
scheduled_time=chunk.scheduled_time,
status=new_status,
)
new_chunks = {**schedule.chunks, chunk_id: updated}
return DCASchedule(
schedule_id=schedule.schedule_id,
maker=schedule.maker,
chunks=new_chunks,
total_amount=schedule.total_amount,
)
# --- Materialization: pending chunks → Intents ---
def materialize_chunks(
registry: DCAScheduleRegistry,
intent_set: IntentSet,
current_time: float,
) -> tuple[DCAScheduleRegistry, IntentSet, list[str]]:
"""Convert due pending chunks into Intent objects.
A chunk is due when current_time >= chunk.scheduled_time and
chunk.status == 'pending'.
Returns:
(updated_registry, updated_intent_set, list_of_materialized_chunk_ids)
"""
materialized: list[str] = []
new_schedules = dict(registry.schedules)
for sid, schedule in registry.schedules.items():
updated_schedule = schedule
for cid, chunk in schedule.chunks.items():
if chunk.status != "pending":
continue
if current_time < chunk.scheduled_time:
continue
# Create an Intent from this chunk
intent = Intent(
intent_id=chunk.chunk_id,
maker=chunk.maker,
sell_token=chunk.sell_token,
sell_amount=chunk.sell_amount,
buy_token=chunk.buy_token,
min_buy_amount=chunk.min_buy_amount,
valid_until=chunk.scheduled_time + 1000.0,
)
intent_set = add_intent(intent_set, intent)
# Advance to submitted
updated_schedule = advance_chunk_status(
updated_schedule, cid, "submitted",
)
materialized.append(cid)
new_schedules[sid] = updated_schedule
return (
DCAScheduleRegistry(schedules=new_schedules),
intent_set,
materialized,
)
# --- Reconciliation: settled intents → executed chunks ---
def reconcile_settled_chunks(
registry: DCAScheduleRegistry,
settlement: Settlement,
intent_set: IntentSet,
) -> DCAScheduleRegistry:
"""Mark chunks as executed when their corresponding intents settled.
A chunk is settled if:
- It was part of a CoW match (intent filled), OR
- Its intent contributed to net_curve_buy/sell (unmatched but settled)
For simplicity, we mark all submitted chunks whose intent_id appears
in the settlement's cow_matches or in the intent_set as 'filled'.
"""
# Collect all settled intent IDs from CoW matches
settled_ids: set[str] = set()
for match in settlement.cow_matches:
settled_ids.add(match.intent_a_id)
settled_ids.add(match.intent_b_id)
# Also check intent_set for filled intents
for iid, intent in intent_set.intents.items():
if intent.status == "filled":
settled_ids.add(iid)
new_schedules = dict(registry.schedules)
for sid, schedule in registry.schedules.items():
updated = schedule
for cid, chunk in schedule.chunks.items():
if chunk.status == "submitted" and cid in settled_ids:
updated = advance_chunk_status(updated, cid, "executed")
new_schedules[sid] = updated
return DCAScheduleRegistry(schedules=new_schedules)
# --- CRDT merge ---
def merge_dca_schedules(
a: DCAScheduleRegistry,
b: DCAScheduleRegistry,
) -> DCAScheduleRegistry:
"""Merge two DCA schedule registries (CRDT merge).
Union of schedules. For duplicate schedule_ids, merge chunks:
union of chunks, max status for duplicates.
Commutative, associative, idempotent.
"""
all_sids = set(a.schedules) | set(b.schedules)
merged: dict[str, DCASchedule] = {}
for sid in all_sids:
sa = a.schedules.get(sid)
sb = b.schedules.get(sid)
if sa is None:
merged[sid] = sb # type: ignore[assignment]
elif sb is None:
merged[sid] = sa
else:
# Merge chunks
all_cids = set(sa.chunks) | set(sb.chunks)
merged_chunks: dict[str, DCAChunk] = {}
for cid in all_cids:
ca = sa.chunks.get(cid)
cb = sb.chunks.get(cid)
if ca is None:
merged_chunks[cid] = cb # type: ignore[assignment]
elif cb is None:
merged_chunks[cid] = ca
else:
# Max status wins
winner_status = _higher_chunk_status(ca.status, cb.status)
winner = ca if ca.status == winner_status else cb
merged_chunks[cid] = DCAChunk(
chunk_id=winner.chunk_id,
schedule_id=winner.schedule_id,
maker=winner.maker,
sell_token=winner.sell_token,
sell_amount=winner.sell_amount,
buy_token=winner.buy_token,
min_buy_amount=winner.min_buy_amount,
scheduled_time=winner.scheduled_time,
status=winner_status,
)
merged[sid] = DCASchedule(
schedule_id=sa.schedule_id,
maker=sa.maker,
chunks=merged_chunks,
total_amount=max(sa.total_amount, sb.total_amount),
)
return DCAScheduleRegistry(schedules=merged)
# --- Utilities ---
def schedule_completion_fraction(schedule: DCASchedule) -> float:
"""Fraction of chunks that have reached 'executed' status."""
if not schedule.chunks:
return 0.0
executed = sum(
1 for c in schedule.chunks.values() if c.status == "executed"
)
return executed / len(schedule.chunks)

View File

@ -0,0 +1,221 @@
"""Signal Router — oracle signals → adaptive system parameters.
Routes TWAP deviation and volatility signals from the oracle into
system parameters: flow threshold, PAMM curvature, surge fees, and
oracle weight multiplier velocity.
All transforms are stateless pure functions: oracle state in,
adaptive parameters out. The router is opt-in returns base params
unchanged when oracle is None or has insufficient data.
Mapping:
volatility tighter flow threshold (reduces max outflow)
|deviation| steeper PAMM discount (wider redemption haircut)
volatility higher surge fee (imbalance fee increases)
deviation oracle velocity signal (weight drift direction)
"""
import math
from dataclasses import dataclass
from src.primitives.twap_oracle import (
TWAPOracleState,
compute_twap,
get_volatility,
spot_vs_twap_deviation,
)
@dataclass
class SignalRouterConfig:
"""Coefficients and clamp bounds for signal routing."""
# Multiplicative coefficients
k_vol_flow: float = 1.0 # volatility → flow_threshold scaling
k_dev_alpha: float = 2.0 # |deviation| → pamm_alpha_bar scaling
k_vol_fee: float = 1.5 # volatility → surge_fee_rate scaling
k_oracle_vel: float = 0.01 # deviation → oracle_multiplier_velocity
# Safety clamp bounds
flow_threshold_min: float = 0.01
flow_threshold_max: float = 0.5
alpha_bar_min: float = 1.0
alpha_bar_max: float = 100.0
surge_fee_min: float = 0.0
surge_fee_max: float = 0.5
velocity_min: float = -0.1
velocity_max: float = 0.1
@dataclass
class AdaptiveParams:
"""Snapshot of adapted system parameters."""
flow_threshold: float
pamm_alpha_bar: float
surge_fee_rate: float
oracle_multiplier_velocity: float
@dataclass
class RouterSignals:
"""Intermediate signals extracted from oracle state."""
twap_deviation: float # (spot - twap) / twap
volatility: float # log-return std dev
spot_price: float
twap: float
is_valid: bool # False if oracle has insufficient data
def extract_signals(
oracle_state: TWAPOracleState,
current_spot: float,
window: float | None = None,
) -> RouterSignals:
"""Extract routing signals from oracle state and current spot price.
Returns RouterSignals with is_valid=False if oracle has
fewer than min_observations.
"""
twap = compute_twap(oracle_state, window)
if twap <= 0:
return RouterSignals(
twap_deviation=0.0,
volatility=0.0,
spot_price=current_spot,
twap=0.0,
is_valid=False,
)
deviation = spot_vs_twap_deviation(oracle_state, current_spot, window)
vol = get_volatility(oracle_state, window)
return RouterSignals(
twap_deviation=deviation,
volatility=vol,
spot_price=current_spot,
twap=twap,
is_valid=True,
)
def _clamp(value: float, lo: float, hi: float) -> float:
return max(lo, min(hi, value))
def apply_signals(
signals: RouterSignals,
base_params: AdaptiveParams,
config: SignalRouterConfig,
) -> AdaptiveParams:
"""Apply extracted signals to base parameters.
Multiplicative scaling with clamping:
flow_threshold *= (1 - k_vol_flow * volatility)
pamm_alpha_bar *= (1 + k_dev_alpha * |deviation|)
surge_fee_rate *= (1 + k_vol_fee * volatility)
oracle_multiplier_velocity = k_oracle_vel * deviation
Returns base_params unchanged if signals are invalid.
"""
if not signals.is_valid:
return base_params
vol = signals.volatility
dev = signals.twap_deviation
flow = base_params.flow_threshold * (1.0 - config.k_vol_flow * vol)
flow = _clamp(flow, config.flow_threshold_min, config.flow_threshold_max)
alpha = base_params.pamm_alpha_bar * (1.0 + config.k_dev_alpha * abs(dev))
alpha = _clamp(alpha, config.alpha_bar_min, config.alpha_bar_max)
fee = base_params.surge_fee_rate * (1.0 + config.k_vol_fee * vol)
fee = _clamp(fee, config.surge_fee_min, config.surge_fee_max)
velocity = config.k_oracle_vel * dev
velocity = _clamp(velocity, config.velocity_min, config.velocity_max)
return AdaptiveParams(
flow_threshold=flow,
pamm_alpha_bar=alpha,
surge_fee_rate=fee,
oracle_multiplier_velocity=velocity,
)
def route_signals(
oracle_state: TWAPOracleState | None,
base_params: AdaptiveParams,
config: SignalRouterConfig,
current_spot: float = 0.0,
window: float | None = None,
) -> AdaptiveParams:
"""Top-level convenience: extract + apply in one call.
No-op (returns base_params) when oracle_state is None.
"""
if oracle_state is None:
return base_params
if current_spot <= 0 and oracle_state.observations:
current_spot = oracle_state.observations[-1].price
signals = extract_signals(oracle_state, current_spot, window)
return apply_signals(signals, base_params, config)
def simulate_signal_routing(
base_params: AdaptiveParams,
config: SignalRouterConfig,
price_trajectory: list[float],
window: float | None = None,
) -> dict[str, list[float]]:
"""Simulate signal routing over a price trajectory.
Builds an oracle from the trajectory and computes adaptive
params at each step. Useful for visualizing parameter response
to different price patterns.
Args:
base_params: Starting parameter values.
config: Router coefficients.
price_trajectory: Sequence of spot prices (one per time step).
window: TWAP lookback window.
Returns:
Dict with keys: times, flow_threshold, pamm_alpha_bar,
surge_fee_rate, oracle_velocity, twap_deviation, volatility.
"""
from src.primitives.twap_oracle import (
TWAPOracleParams,
create_oracle,
record_observation,
)
oracle = create_oracle(TWAPOracleParams(
default_window=window if window is not None else 24.0,
))
out: dict[str, list[float]] = {
"times": [],
"flow_threshold": [],
"pamm_alpha_bar": [],
"surge_fee_rate": [],
"oracle_velocity": [],
"twap_deviation": [],
"volatility": [],
}
for t, price in enumerate(price_trajectory):
oracle = record_observation(oracle, price, float(t))
adapted = route_signals(oracle, base_params, config, price, window)
signals = extract_signals(oracle, price, window)
out["times"].append(float(t))
out["flow_threshold"].append(adapted.flow_threshold)
out["pamm_alpha_bar"].append(adapted.pamm_alpha_bar)
out["surge_fee_rate"].append(adapted.surge_fee_rate)
out["oracle_velocity"].append(adapted.oracle_multiplier_velocity)
out["twap_deviation"].append(signals.twap_deviation)
out["volatility"].append(signals.volatility)
return out

348
tests/test_dca_schedule.py Normal file
View File

@ -0,0 +1,348 @@
"""Tests for CRDT DCA schedules — G-Set of chunks with monotone status."""
import pytest
from src.crdt.dca_schedule import (
DCAChunk,
DCASchedule,
DCAScheduleRegistry,
create_dca_schedule,
advance_chunk_status,
materialize_chunks,
reconcile_settled_chunks,
merge_dca_schedules,
schedule_completion_fraction,
_CHUNK_STATUS_RANK,
)
from src.crdt.intent_matching import Intent, IntentSet, add_intent, fill_intent
from src.crdt.batch_settlement import Settlement, Match
# --- TestCreateSchedule ---
class TestCreateSchedule:
def test_creates_correct_number_of_chunks(self):
sched = create_dca_schedule("s1", "alice", 100.0, 5, 0.0, 1.0)
assert len(sched.chunks) == 5
assert sched.total_amount == 100.0
def test_chunks_have_correct_amounts(self):
sched = create_dca_schedule("s1", "alice", 100.0, 4, 0.0, 1.0)
for chunk in sched.chunks.values():
assert abs(chunk.sell_amount - 25.0) < 1e-10
def test_chunks_are_scheduled_at_intervals(self):
sched = create_dca_schedule("s1", "alice", 100.0, 3, 10.0, 5.0)
times = sorted(c.scheduled_time for c in sched.chunks.values())
assert times == [10.0, 15.0, 20.0]
def test_all_chunks_start_pending(self):
sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0)
for chunk in sched.chunks.values():
assert chunk.status == "pending"
def test_chunk_ids_are_unique(self):
sched = create_dca_schedule("s1", "alice", 100.0, 10, 0.0, 1.0)
ids = [c.chunk_id for c in sched.chunks.values()]
assert len(ids) == len(set(ids))
def test_min_buy_ratio_applied(self):
sched = create_dca_schedule(
"s1", "alice", 100.0, 2, 0.0, 1.0, min_buy_ratio=0.9,
)
for chunk in sched.chunks.values():
assert abs(chunk.min_buy_amount - 50.0 * 0.9) < 1e-10
def test_custom_tokens(self):
sched = create_dca_schedule(
"s1", "alice", 100.0, 1, 0.0, 1.0,
sell_token="ETH", buy_token="DAI",
)
chunk = list(sched.chunks.values())[0]
assert chunk.sell_token == "ETH"
assert chunk.buy_token == "DAI"
# --- TestAdvanceStatus ---
class TestAdvanceStatus:
def test_pending_to_submitted(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(sched.chunks.keys())[0]
updated = advance_chunk_status(sched, cid, "submitted")
assert updated.chunks[cid].status == "submitted"
def test_submitted_to_executed(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(sched.chunks.keys())[0]
sched = advance_chunk_status(sched, cid, "submitted")
sched = advance_chunk_status(sched, cid, "executed")
assert sched.chunks[cid].status == "executed"
def test_backward_transition_rejected(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(sched.chunks.keys())[0]
sched = advance_chunk_status(sched, cid, "submitted")
sched_after = advance_chunk_status(sched, cid, "pending")
assert sched_after.chunks[cid].status == "submitted"
def test_same_status_transition_rejected(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(sched.chunks.keys())[0]
sched_after = advance_chunk_status(sched, cid, "pending")
assert sched_after.chunks[cid].status == "pending"
def test_nonexistent_chunk_returns_unchanged(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
sched_after = advance_chunk_status(sched, "nonexistent", "submitted")
assert sched == sched_after
def test_skip_to_executed_allowed(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(sched.chunks.keys())[0]
sched = advance_chunk_status(sched, cid, "executed")
assert sched.chunks[cid].status == "executed"
# --- TestMaterializeChunks ---
class TestMaterializeChunks:
def test_due_chunks_become_intents(self):
sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0)
registry = DCAScheduleRegistry(schedules={"s1": sched})
iset = IntentSet()
registry, iset, materialized = materialize_chunks(registry, iset, 1.5)
# Chunks at t=0.0 and t=1.0 are due; t=2.0 not yet
assert len(materialized) == 2
assert len(iset.intents) == 2
def test_materialized_chunks_become_submitted(self):
sched = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0)
registry = DCAScheduleRegistry(schedules={"s1": sched})
iset = IntentSet()
registry, iset, materialized = materialize_chunks(registry, iset, 10.0)
for cid in materialized:
assert registry.schedules["s1"].chunks[cid].status == "submitted"
def test_already_submitted_not_rematerialized(self):
sched = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0)
registry = DCAScheduleRegistry(schedules={"s1": sched})
iset = IntentSet()
registry, iset, mat1 = materialize_chunks(registry, iset, 10.0)
registry, iset, mat2 = materialize_chunks(registry, iset, 10.0)
assert len(mat2) == 0
def test_future_chunks_not_materialized(self):
sched = create_dca_schedule("s1", "alice", 100.0, 5, 10.0, 1.0)
registry = DCAScheduleRegistry(schedules={"s1": sched})
iset = IntentSet()
registry, iset, materialized = materialize_chunks(registry, iset, 5.0)
assert len(materialized) == 0
def test_intent_inherits_chunk_fields(self):
sched = create_dca_schedule(
"s1", "alice", 100.0, 1, 0.0, 1.0,
sell_token="ETH", buy_token="DAI", min_buy_ratio=0.9,
)
registry = DCAScheduleRegistry(schedules={"s1": sched})
iset = IntentSet()
registry, iset, materialized = materialize_chunks(registry, iset, 1.0)
intent = iset.intents[materialized[0]]
assert intent.maker == "alice"
assert intent.sell_token == "ETH"
assert intent.buy_token == "DAI"
assert abs(intent.min_buy_amount - 100.0 * 0.9) < 1e-10
# --- TestReconcileSettled ---
class TestReconcileSettled:
def test_cow_matched_chunks_become_executed(self):
sched = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0)
cids = list(sched.chunks.keys())
# Submit both
sched = advance_chunk_status(sched, cids[0], "submitted")
sched = advance_chunk_status(sched, cids[1], "submitted")
registry = DCAScheduleRegistry(schedules={"s1": sched})
# Settle first chunk via CoW match
settlement = Settlement(
cow_matches=[
Match(
intent_a_id=cids[0],
intent_b_id="other_intent",
clearing_price=1.0,
amount_a_sold=50.0,
amount_b_sold=50.0,
)
],
)
iset = IntentSet()
registry = reconcile_settled_chunks(registry, settlement, iset)
assert registry.schedules["s1"].chunks[cids[0]].status == "executed"
assert registry.schedules["s1"].chunks[cids[1]].status == "submitted"
def test_filled_intents_mark_executed(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(sched.chunks.keys())[0]
sched = advance_chunk_status(sched, cid, "submitted")
registry = DCAScheduleRegistry(schedules={"s1": sched})
# Mark intent as filled
iset = IntentSet()
intent = Intent(
intent_id=cid, maker="alice",
sell_token="USDC", sell_amount=100.0,
buy_token="MYCO", min_buy_amount=80.0,
valid_until=1000.0, status="filled",
)
iset = add_intent(iset, intent)
settlement = Settlement()
registry = reconcile_settled_chunks(registry, settlement, iset)
assert registry.schedules["s1"].chunks[cid].status == "executed"
def test_pending_chunks_not_affected(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(sched.chunks.keys())[0]
# Leave as pending
registry = DCAScheduleRegistry(schedules={"s1": sched})
settlement = Settlement(
cow_matches=[
Match(cid, "other", 1.0, 50.0, 50.0),
],
)
iset = IntentSet()
registry = reconcile_settled_chunks(registry, settlement, iset)
# Still pending — reconciliation only affects submitted chunks
assert registry.schedules["s1"].chunks[cid].status == "pending"
# --- TestMergeCRDT ---
class TestMergeCRDT:
def test_union_of_disjoint_schedules(self):
s1 = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0)
s2 = create_dca_schedule("s2", "bob", 200.0, 3, 0.0, 1.0)
reg_a = DCAScheduleRegistry(schedules={"s1": s1})
reg_b = DCAScheduleRegistry(schedules={"s2": s2})
merged = merge_dca_schedules(reg_a, reg_b)
assert "s1" in merged.schedules
assert "s2" in merged.schedules
def test_max_status_wins_for_duplicate_chunks(self):
s1 = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(s1.chunks.keys())[0]
# Replica A: submitted
s1a = advance_chunk_status(s1, cid, "submitted")
# Replica B: executed
s1b = advance_chunk_status(s1, cid, "executed")
reg_a = DCAScheduleRegistry(schedules={"s1": s1a})
reg_b = DCAScheduleRegistry(schedules={"s1": s1b})
merged = merge_dca_schedules(reg_a, reg_b)
assert merged.schedules["s1"].chunks[cid].status == "executed"
def test_merge_is_commutative(self):
s1 = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0)
cid = list(s1.chunks.keys())[0]
s1a = advance_chunk_status(s1, cid, "submitted")
reg_a = DCAScheduleRegistry(schedules={"s1": s1a})
reg_b = DCAScheduleRegistry(schedules={"s1": s1})
ab = merge_dca_schedules(reg_a, reg_b)
ba = merge_dca_schedules(reg_b, reg_a)
for cid in ab.schedules["s1"].chunks:
assert ab.schedules["s1"].chunks[cid].status == ba.schedules["s1"].chunks[cid].status
def test_merge_is_idempotent(self):
s1 = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0)
reg = DCAScheduleRegistry(schedules={"s1": s1})
merged = merge_dca_schedules(reg, reg)
for cid in merged.schedules["s1"].chunks:
assert merged.schedules["s1"].chunks[cid].status == s1.chunks[cid].status
def test_merge_union_of_chunks(self):
"""Different replicas have different chunks of same schedule."""
s1a = DCASchedule(
schedule_id="s1", maker="alice",
chunks={"c1": DCAChunk("c1", "s1", "alice", "USDC", 50.0, "MYCO", 40.0, 0.0)},
total_amount=100.0,
)
s1b = DCASchedule(
schedule_id="s1", maker="alice",
chunks={"c2": DCAChunk("c2", "s1", "alice", "USDC", 50.0, "MYCO", 40.0, 1.0)},
total_amount=100.0,
)
reg_a = DCAScheduleRegistry(schedules={"s1": s1a})
reg_b = DCAScheduleRegistry(schedules={"s1": s1b})
merged = merge_dca_schedules(reg_a, reg_b)
assert "c1" in merged.schedules["s1"].chunks
assert "c2" in merged.schedules["s1"].chunks
# --- TestPartitionReconnect ---
class TestPartitionReconnect:
"""Simulate a network partition + reconnect scenario."""
def test_partition_and_merge(self):
"""Two replicas diverge, then merge — highest status wins."""
sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0)
reg = DCAScheduleRegistry(schedules={"s1": sched})
cids = sorted(sched.chunks.keys())
# Replica A processes chunk 0
reg_a = DCAScheduleRegistry(schedules={
"s1": advance_chunk_status(reg.schedules["s1"], cids[0], "submitted"),
})
reg_a = DCAScheduleRegistry(schedules={
"s1": advance_chunk_status(reg_a.schedules["s1"], cids[0], "executed"),
})
# Replica B processes chunk 1
reg_b = DCAScheduleRegistry(schedules={
"s1": advance_chunk_status(reg.schedules["s1"], cids[1], "submitted"),
})
merged = merge_dca_schedules(reg_a, reg_b)
assert merged.schedules["s1"].chunks[cids[0]].status == "executed"
assert merged.schedules["s1"].chunks[cids[1]].status == "submitted"
assert merged.schedules["s1"].chunks[cids[2]].status == "pending"
# --- TestScheduleCompletion ---
class TestScheduleCompletion:
def test_empty_schedule_zero(self):
sched = DCASchedule(schedule_id="s1", maker="alice")
assert schedule_completion_fraction(sched) == 0.0
def test_all_executed_one(self):
sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0)
for cid in list(sched.chunks.keys()):
sched = advance_chunk_status(sched, cid, "executed")
assert abs(schedule_completion_fraction(sched) - 1.0) < 1e-10
def test_partial_completion(self):
sched = create_dca_schedule("s1", "alice", 100.0, 4, 0.0, 1.0)
cids = list(sched.chunks.keys())
sched = advance_chunk_status(sched, cids[0], "executed")
sched = advance_chunk_status(sched, cids[1], "executed")
assert abs(schedule_completion_fraction(sched) - 0.5) < 1e-10

270
tests/test_signal_router.py Normal file
View File

@ -0,0 +1,270 @@
"""Tests for signal router — oracle signals → adaptive parameters."""
import math
import pytest
from src.primitives.signal_router import (
SignalRouterConfig,
AdaptiveParams,
RouterSignals,
extract_signals,
apply_signals,
route_signals,
simulate_signal_routing,
)
from src.primitives.twap_oracle import (
TWAPOracleParams,
TWAPOracleState,
create_oracle,
record_observation,
)
# --- Helpers ---
def _build_oracle(prices: list[float], dt: float = 1.0) -> TWAPOracleState:
"""Build an oracle with observations from a price list."""
oracle = create_oracle(TWAPOracleParams(default_window=len(prices) * dt))
for i, p in enumerate(prices):
oracle = record_observation(oracle, p, float(i) * dt)
return oracle
def _base_params() -> AdaptiveParams:
return AdaptiveParams(
flow_threshold=0.1,
pamm_alpha_bar=10.0,
surge_fee_rate=0.05,
oracle_multiplier_velocity=0.0,
)
# --- TestExtractSignals ---
class TestExtractSignals:
def test_insufficient_data_returns_invalid(self):
oracle = create_oracle()
signals = extract_signals(oracle, 1.0)
assert signals.is_valid is False
assert signals.twap_deviation == 0.0
def test_single_observation_returns_invalid(self):
oracle = create_oracle()
oracle = record_observation(oracle, 1.0, 0.0)
signals = extract_signals(oracle, 1.0)
assert signals.is_valid is False
def test_stable_prices_zero_deviation(self):
oracle = _build_oracle([1.0, 1.0, 1.0, 1.0, 1.0])
signals = extract_signals(oracle, 1.0)
assert signals.is_valid is True
assert abs(signals.twap_deviation) < 1e-10
assert signals.volatility == 0.0
def test_spot_above_twap_positive_deviation(self):
oracle = _build_oracle([1.0, 1.0, 1.0, 1.0, 1.0])
signals = extract_signals(oracle, 1.5)
assert signals.is_valid is True
assert signals.twap_deviation > 0
def test_spot_below_twap_negative_deviation(self):
oracle = _build_oracle([1.0, 1.0, 1.0, 1.0, 1.0])
signals = extract_signals(oracle, 0.5)
assert signals.is_valid is True
assert signals.twap_deviation < 0
def test_volatile_prices_nonzero_volatility(self):
oracle = _build_oracle([1.0, 1.5, 0.8, 1.3, 0.9, 1.1])
signals = extract_signals(oracle, 1.1)
assert signals.is_valid is True
assert signals.volatility > 0
# --- TestApplySignals ---
class TestApplySignals:
def test_invalid_signals_return_base(self):
base = _base_params()
config = SignalRouterConfig()
invalid = RouterSignals(0.0, 0.0, 1.0, 0.0, is_valid=False)
result = apply_signals(invalid, base, config)
assert result == base
def test_volatility_tightens_flow_threshold(self):
base = _base_params()
config = SignalRouterConfig(k_vol_flow=1.0)
signals = RouterSignals(
twap_deviation=0.0, volatility=0.2,
spot_price=1.0, twap=1.0, is_valid=True,
)
result = apply_signals(signals, base, config)
# flow *= (1 - 1.0 * 0.2) = 0.8 * base
assert result.flow_threshold < base.flow_threshold
assert abs(result.flow_threshold - 0.08) < 1e-10
def test_deviation_steepens_alpha(self):
base = _base_params()
config = SignalRouterConfig(k_dev_alpha=2.0)
signals = RouterSignals(
twap_deviation=0.1, volatility=0.0,
spot_price=1.1, twap=1.0, is_valid=True,
)
result = apply_signals(signals, base, config)
# alpha *= (1 + 2.0 * 0.1) = 1.2 * base
assert result.pamm_alpha_bar > base.pamm_alpha_bar
assert abs(result.pamm_alpha_bar - 12.0) < 1e-10
def test_volatility_increases_surge_fee(self):
base = _base_params()
config = SignalRouterConfig(k_vol_fee=1.5)
signals = RouterSignals(
twap_deviation=0.0, volatility=0.3,
spot_price=1.0, twap=1.0, is_valid=True,
)
result = apply_signals(signals, base, config)
# fee *= (1 + 1.5 * 0.3) = 1.45 * base
expected = 0.05 * 1.45
assert abs(result.surge_fee_rate - expected) < 1e-10
def test_deviation_sets_velocity(self):
base = _base_params()
config = SignalRouterConfig(k_oracle_vel=0.01)
signals = RouterSignals(
twap_deviation=-0.2, volatility=0.0,
spot_price=0.8, twap=1.0, is_valid=True,
)
result = apply_signals(signals, base, config)
assert abs(result.oracle_multiplier_velocity - (-0.002)) < 1e-10
def test_clamping_respects_bounds(self):
base = AdaptiveParams(
flow_threshold=0.1, pamm_alpha_bar=10.0,
surge_fee_rate=0.4, oracle_multiplier_velocity=0.0,
)
config = SignalRouterConfig(
k_vol_flow=10.0, k_vol_fee=10.0,
surge_fee_max=0.5, flow_threshold_min=0.01,
)
signals = RouterSignals(
twap_deviation=0.0, volatility=0.5,
spot_price=1.0, twap=1.0, is_valid=True,
)
result = apply_signals(signals, base, config)
assert result.flow_threshold >= config.flow_threshold_min
assert result.surge_fee_rate <= config.surge_fee_max
# --- TestRouteSignals ---
class TestRouteSignals:
def test_none_oracle_returns_base(self):
base = _base_params()
config = SignalRouterConfig()
result = route_signals(None, base, config)
assert result == base
def test_with_oracle_modifies_params(self):
oracle = _build_oracle([1.0, 1.0, 1.0, 1.2, 1.3])
base = _base_params()
config = SignalRouterConfig()
result = route_signals(oracle, base, config, current_spot=1.5)
# Spot > TWAP, so deviation > 0 → alpha increases
assert result.pamm_alpha_bar > base.pamm_alpha_bar
def test_uses_last_obs_when_no_spot(self):
oracle = _build_oracle([1.0, 1.0, 1.0])
base = _base_params()
config = SignalRouterConfig()
# current_spot=0 → should use last observation price
result = route_signals(oracle, base, config, current_spot=0.0)
# With spot == twap, deviation ≈ 0, so params near base
assert abs(result.flow_threshold - base.flow_threshold) < base.flow_threshold * 0.5
# --- TestKZeroDisablesLink ---
class TestKZeroDisablesLink:
def test_zero_k_vol_flow_leaves_threshold_unchanged(self):
base = _base_params()
config = SignalRouterConfig(k_vol_flow=0.0)
signals = RouterSignals(
twap_deviation=0.0, volatility=0.5,
spot_price=1.0, twap=1.0, is_valid=True,
)
result = apply_signals(signals, base, config)
assert abs(result.flow_threshold - base.flow_threshold) < 1e-10
def test_zero_k_dev_alpha_leaves_alpha_unchanged(self):
base = _base_params()
config = SignalRouterConfig(k_dev_alpha=0.0)
signals = RouterSignals(
twap_deviation=0.5, volatility=0.0,
spot_price=1.5, twap=1.0, is_valid=True,
)
result = apply_signals(signals, base, config)
assert abs(result.pamm_alpha_bar - base.pamm_alpha_bar) < 1e-10
def test_zero_k_vol_fee_leaves_fee_unchanged(self):
base = _base_params()
config = SignalRouterConfig(k_vol_fee=0.0)
signals = RouterSignals(
twap_deviation=0.0, volatility=0.5,
spot_price=1.0, twap=1.0, is_valid=True,
)
result = apply_signals(signals, base, config)
assert abs(result.surge_fee_rate - base.surge_fee_rate) < 1e-10
def test_zero_k_oracle_vel_leaves_velocity_zero(self):
base = _base_params()
config = SignalRouterConfig(k_oracle_vel=0.0)
signals = RouterSignals(
twap_deviation=0.3, volatility=0.0,
spot_price=1.3, twap=1.0, is_valid=True,
)
result = apply_signals(signals, base, config)
assert result.oracle_multiplier_velocity == 0.0
# --- TestSimulation ---
class TestSimulation:
def test_simulation_returns_correct_keys(self):
base = _base_params()
config = SignalRouterConfig()
prices = [1.0] * 10
result = simulate_signal_routing(base, config, prices)
assert set(result.keys()) == {
"times", "flow_threshold", "pamm_alpha_bar",
"surge_fee_rate", "oracle_velocity",
"twap_deviation", "volatility",
}
assert len(result["times"]) == 10
def test_volatile_trajectory_tightens_params(self):
base = _base_params()
config = SignalRouterConfig(k_vol_flow=2.0, k_vol_fee=2.0)
# Volatile trajectory
prices = [1.0, 1.5, 0.8, 1.3, 0.7, 1.4, 0.9, 1.2, 0.85, 1.1]
result = simulate_signal_routing(base, config, prices)
# After volatile period, flow_threshold should be below base
# (after enough observations for volatility to register)
final_flow = result["flow_threshold"][-1]
assert final_flow <= base.flow_threshold
def test_stable_trajectory_preserves_params(self):
base = _base_params()
config = SignalRouterConfig()
prices = [1.0] * 20
result = simulate_signal_routing(base, config, prices)
# Stable prices → zero volatility, zero deviation
# Params should stay at base (after oracle warms up)
assert abs(result["flow_threshold"][-1] - base.flow_threshold) < 1e-6
assert abs(result["surge_fee_rate"][-1] - base.surge_fee_rate) < 1e-6
def test_simulation_length_matches_trajectory(self):
base = _base_params()
config = SignalRouterConfig()
prices = [1.0 + 0.01 * i for i in range(50)]
result = simulate_signal_routing(base, config, prices)
for key in result:
assert len(result[key]) == 50

View File

@ -0,0 +1,311 @@
"""Tests for subscription/donation DCA integration."""
import numpy as np
import pytest
from src.composed.subscription_dca import (
SubscriptionDCAConfig,
DonationDCAConfig,
SubscriptionDCAOrder,
DonationDCAOrder,
SubscriptionDCAState,
create_subscription_dca_order,
create_donation_dca_order,
execute_subscription_chunk,
execute_donation_chunk,
pending_chunks,
simulate_subscription_dca,
)
from src.composed.dca_executor import DCAOrder
from src.composed.myco_surface import MycoSystem, MycoSystemConfig
from src.commitments.subscription import (
Subscription,
SubscriptionTier,
create_default_tiers,
)
from src.primitives.twap_oracle import (
TWAPOracleParams,
TWAPOracleState,
create_oracle,
record_observation,
)
# --- Helpers ---
def _make_system() -> MycoSystem:
config = MycoSystemConfig(n_reserve_assets=3)
system = MycoSystem(config)
# Bootstrap
system.deposit(np.array([1000.0, 1000.0, 1000.0]), 0.0)
return system
def _make_oracle() -> TWAPOracleState:
oracle = create_oracle(TWAPOracleParams(default_window=24.0))
oracle = record_observation(oracle, 1.0, 0.0)
oracle = record_observation(oracle, 1.0, 1.0)
return oracle
def _make_subscription() -> tuple[Subscription, SubscriptionTier]:
tier = SubscriptionTier(
name="Sustainer",
payment_per_period=50.0,
period_length=30.0,
base_mint_rate=1.8,
loyalty_multiplier_max=1.5,
loyalty_halflife=120.0,
)
sub = Subscription(
subscriber="alice",
tier="sustainer",
start_time=0.0,
last_payment_time=0.0,
total_paid=0.0,
total_minted=0.0,
periods_paid=0,
)
return sub, tier
# --- TestSubscriptionDCAOrder ---
class TestSubscriptionDCAOrder:
def test_creates_correct_chunks(self):
sub, tier = _make_subscription()
config = SubscriptionDCAConfig(n_chunks=5, spread_fraction=0.8)
order = create_subscription_dca_order(sub, tier, config, 10.0, 1.2, 0)
assert order.order.params.n_chunks == 5
assert abs(order.order.params.total_amount - 50.0) < 1e-10
def test_interval_respects_spread(self):
sub, tier = _make_subscription()
config = SubscriptionDCAConfig(n_chunks=5, spread_fraction=0.8)
order = create_subscription_dca_order(sub, tier, config, 0.0, 1.0, 0)
expected_interval = (30.0 * 0.8) / 5
assert abs(order.order.params.interval - expected_interval) < 1e-10
def test_loyalty_multiplier_stored(self):
sub, tier = _make_subscription()
config = SubscriptionDCAConfig()
order = create_subscription_dca_order(sub, tier, config, 0.0, 1.35, 2)
assert abs(order.loyalty_multiplier - 1.35) < 1e-10
assert order.period_index == 2
def test_strategy_propagated(self):
sub, tier = _make_subscription()
config = SubscriptionDCAConfig(strategy="twap_aware")
order = create_subscription_dca_order(sub, tier, config, 0.0, 1.0, 0)
assert order.order.params.strategy == "twap_aware"
# --- TestDonationDCAOrder ---
class TestDonationDCAOrder:
def test_dca_enabled_multi_chunk(self):
config = DonationDCAConfig(enable_dca=True, n_chunks=5, interval=2.0)
order = create_donation_dca_order("bob", 500.0, config, 0.0)
assert order.order.params.n_chunks == 5
assert abs(order.donation_amount - 500.0) < 1e-10
def test_dca_disabled_single_chunk(self):
config = DonationDCAConfig(enable_dca=False, n_chunks=5)
order = create_donation_dca_order("bob", 500.0, config, 0.0)
assert order.order.params.n_chunks == 1
def test_single_chunk_when_n_chunks_one(self):
config = DonationDCAConfig(enable_dca=True, n_chunks=1)
order = create_donation_dca_order("bob", 100.0, config, 0.0)
assert order.order.params.n_chunks == 1
assert order.order.params.interval == 0.0
def test_donor_stored(self):
config = DonationDCAConfig()
order = create_donation_dca_order("carol", 200.0, config, 5.0)
assert order.donor == "carol"
assert abs(order.order.start_time - 5.0) < 1e-10
# --- TestChunkExecution ---
class TestChunkExecution:
def test_subscription_chunk_mints_tokens(self):
system = _make_system()
oracle = _make_oracle()
sub, tier = _make_subscription()
config = SubscriptionDCAConfig(n_chunks=3)
order = create_subscription_dca_order(sub, tier, config, 1.0, 1.0, 0)
state = SubscriptionDCAState(
subscription_orders={"alice": [order]},
)
state, system, oracle, tokens, bonus = execute_subscription_chunk(
state, "alice", system, oracle, 1.0,
)
assert tokens > 0
assert bonus == 0.0 # loyalty_multiplier == 1.0
def test_loyalty_bonus_computed(self):
system = _make_system()
oracle = _make_oracle()
sub, tier = _make_subscription()
config = SubscriptionDCAConfig(n_chunks=3)
order = create_subscription_dca_order(sub, tier, config, 1.0, 1.5, 0)
state = SubscriptionDCAState(
subscription_orders={"alice": [order]},
)
state, system, oracle, tokens, bonus = execute_subscription_chunk(
state, "alice", system, oracle, 1.0,
)
assert tokens > 0
# bonus = tokens * (1.5 - 1.0) = tokens * 0.5
assert abs(bonus - tokens * 0.5) < 1e-10
def test_donation_chunk_mints_tokens(self):
system = _make_system()
oracle = _make_oracle()
config = DonationDCAConfig(enable_dca=True, n_chunks=3, interval=1.0)
don_order = create_donation_dca_order("bob", 300.0, config, 1.0)
state = SubscriptionDCAState(donation_orders=[don_order])
state, system, oracle, tokens = execute_donation_chunk(
state, 0, system, oracle, 1.0,
)
assert tokens > 0
def test_complete_order_returns_zero(self):
system = _make_system()
oracle = _make_oracle()
sub, tier = _make_subscription()
config = SubscriptionDCAConfig(n_chunks=1)
order = create_subscription_dca_order(sub, tier, config, 1.0, 1.0, 0)
state = SubscriptionDCAState(
subscription_orders={"alice": [order]},
)
# Execute the single chunk
state, system, oracle, t1, b1 = execute_subscription_chunk(
state, "alice", system, oracle, 1.0,
)
assert t1 > 0
# Second execution should return zero
state, system, oracle, t2, b2 = execute_subscription_chunk(
state, "alice", system, oracle, 2.0,
)
assert t2 == 0.0
assert b2 == 0.0
def test_nonexistent_subscriber_returns_zero(self):
system = _make_system()
oracle = _make_oracle()
state = SubscriptionDCAState()
state, system, oracle, tokens, bonus = execute_subscription_chunk(
state, "nobody", system, oracle, 1.0,
)
assert tokens == 0.0
def test_out_of_range_donation_index_returns_zero(self):
system = _make_system()
oracle = _make_oracle()
state = SubscriptionDCAState()
state, system, oracle, tokens = execute_donation_chunk(
state, 99, system, oracle, 1.0,
)
assert tokens == 0.0
# --- TestPendingChunks ---
class TestPendingChunks:
def test_lists_due_chunks(self):
sub, tier = _make_subscription()
config = SubscriptionDCAConfig(n_chunks=5, spread_fraction=1.0)
order = create_subscription_dca_order(sub, tier, config, 0.0, 1.0, 0)
state = SubscriptionDCAState(
subscription_orders={"alice": [order]},
)
# interval = 30.0 / 5 = 6.0, so at t=7 chunk 0 and 1 are due
result = pending_chunks(state, "alice", 7.0)
assert len(result) >= 1
def test_no_pending_when_complete(self):
sub, tier = _make_subscription()
config = SubscriptionDCAConfig(n_chunks=1)
order = create_subscription_dca_order(sub, tier, config, 0.0, 1.0, 0)
order.order.is_complete = True
state = SubscriptionDCAState(
subscription_orders={"alice": [order]},
)
result = pending_chunks(state, "alice", 100.0)
assert len(result) == 0
def test_empty_for_unknown_subscriber(self):
state = SubscriptionDCAState()
result = pending_chunks(state, "nobody", 0.0)
assert result == []
# --- TestBackwardCompat ---
class TestBackwardCompat:
"""Ensure the new modules don't break existing DCA or subscription imports."""
def test_dca_executor_still_works(self):
from src.composed.dca_executor import DCAParams, simulate_dca
config = MycoSystemConfig(n_reserve_assets=3)
params = DCAParams(
total_amount=100.0, n_chunks=5,
interval=1.0, asset_index=0,
)
result = simulate_dca(config, params)
assert result.order.total_tokens_received > 0
def test_subscription_module_still_works(self):
from src.commitments.subscription import (
SubscriptionSystem, create_subscription,
process_payment, create_default_tiers,
)
tiers = create_default_tiers()
system = SubscriptionSystem(tiers=tiers)
system, sub = create_subscription(system, "alice", "supporter", 0.0)
system, tokens = process_payment(system, "alice", 31.0)
assert tokens > 0
def test_myco_system_signal_routing(self):
"""Test the apply_signal_routing integration on MycoSystem."""
from src.primitives.signal_router import SignalRouterConfig
config = MycoSystemConfig(
n_reserve_assets=3,
twap_oracle_params=TWAPOracleParams(),
)
system = MycoSystem(config)
system.deposit(np.array([1000.0, 1000.0, 1000.0]), 0.0)
# Do a few deposits to build oracle history
for i in range(1, 6):
system.deposit(np.array([10.0, 10.0, 10.0]), float(i))
router_config = SignalRouterConfig()
adapted = system.apply_signal_routing(router_config)
assert adapted.flow_threshold > 0
assert adapted.pamm_alpha_bar > 0
def test_simulation_runs(self):
tiers = create_default_tiers()
config = SubscriptionDCAConfig(n_chunks=3)
sys_config = MycoSystemConfig(n_reserve_assets=3)
subscribers = [("alice", "supporter", 0.0)]
result = simulate_subscription_dca(
tiers, config, sys_config, subscribers, duration=60.0, dt=5.0,
)
assert "times" in result
assert len(result["times"]) == 12