diff --git a/src/composed/__init__.py b/src/composed/__init__.py index 5d655ba..46c0d73 100644 --- a/src/composed/__init__.py +++ b/src/composed/__init__.py @@ -2,3 +2,16 @@ from src.composed.dca_executor import ( # noqa: F401 DCAParams, DCAOrder, DCAResult, create_dca_order, compute_chunk_size, execute_chunk, simulate_dca, ) +from src.composed.subscription_dca import ( # noqa: F401 + SubscriptionDCAConfig, DonationDCAConfig, + SubscriptionDCAOrder, DonationDCAOrder, SubscriptionDCAState, + create_subscription_dca_order, create_donation_dca_order, + execute_subscription_chunk, execute_donation_chunk, + pending_chunks, simulate_subscription_dca, +) +from src.crdt.dca_schedule import ( # noqa: F401 + DCAChunk, DCASchedule, DCAScheduleRegistry, + create_dca_schedule, advance_chunk_status, + materialize_chunks, reconcile_settled_chunks, + merge_dca_schedules, schedule_completion_fraction, +) diff --git a/src/composed/myco_surface.py b/src/composed/myco_surface.py index 08a5ce8..901e43e 100644 --- a/src/composed/myco_surface.py +++ b/src/composed/myco_surface.py @@ -38,6 +38,9 @@ from src.primitives.twap_oracle import ( TWAPOracleParams, TWAPOracleState, create_oracle, record_observation, compute_twap, ) +from src.primitives.signal_router import ( + SignalRouterConfig, AdaptiveParams, route_signals, +) from src.primitives.n_dimensional_surface import spot_prices from src.commitments.labor import ( LaborIssuanceSystem, ContributorState, @@ -415,6 +418,37 @@ class MycoSystem: ) return float(prices[0]) + def apply_signal_routing( + self, + router_config: SignalRouterConfig, + base_params: AdaptiveParams | None = None, + ) -> AdaptiveParams: + """Route oracle signals into adaptive system parameters. + + Opt-in: returns base_params unchanged if oracle is None. + If base_params is None, builds defaults from current config. + """ + if base_params is None: + base_params = AdaptiveParams( + flow_threshold=self.config.flow_threshold, + pamm_alpha_bar=self.config.pamm_params.alpha_bar, + surge_fee_rate=self.config.surge_fee_rate, + oracle_multiplier_velocity=0.0, + ) + + spot = self.get_spot_price() + adapted = route_signals( + self.state.oracle_state, base_params, router_config, + current_spot=spot, + ) + + # Apply adapted params back to live config + self.state.flow_tracker.threshold = adapted.flow_threshold + self.config.pamm_params.alpha_bar = adapted.pamm_alpha_bar + self.config.surge_fee_rate = adapted.surge_fee_rate + + return adapted + def get_metrics(self) -> dict: """Get current system metrics.""" state = self.state diff --git a/src/composed/subscription_dca.py b/src/composed/subscription_dca.py new file mode 100644 index 0000000..689576f --- /dev/null +++ b/src/composed/subscription_dca.py @@ -0,0 +1,356 @@ +"""Subscription/Donation DCA — recurring payments and donations via DCA. + +Composes subscription tiers with the DCA executor so that recurring +payments and one-time donations flow through the bonding curve as +time-spread chunks instead of flat-rate commitment minting. + +Subscription DCA: + Each subscription period, the payment is split into n_chunks and + executed over the period via the DCA executor. Loyalty multiplier + produces bonus tokens on top of curve-minted tokens. + +Donation DCA: + Donors choose at donation time whether to DCA (spread over chunks) + or do a single instant deposit. DCA reduces price impact for + large donations. +""" + +import numpy as np +from numpy.typing import NDArray +from dataclasses import dataclass, field + +from src.composed.dca_executor import ( + DCAParams, + DCAOrder, + create_dca_order, + execute_chunk, +) +from src.composed.myco_surface import MycoSystem, MycoSystemConfig +from src.primitives.twap_oracle import TWAPOracleState, TWAPOracleParams, create_oracle +from src.commitments.subscription import Subscription, SubscriptionTier + + +@dataclass +class SubscriptionDCAConfig: + """Configuration for subscription-based DCA.""" + n_chunks: int = 5 # Chunks per subscription period + spread_fraction: float = 0.8 # Fraction of period over which to spread + strategy: str = "fixed" # "fixed" or "twap_aware" + loyalty_as_bonus: bool = True # Mint loyalty bonus on top of curve tokens + + +@dataclass +class DonationDCAConfig: + """Configuration for donation DCA — donor chooses at donation time.""" + enable_dca: bool = True # False = single instant deposit + n_chunks: int = 5 # Number of DCA chunks + interval: float = 1.0 # Time between chunks + strategy: str = "fixed" # "fixed" or "twap_aware" + + +@dataclass +class SubscriptionDCAOrder: + """A DCA order tied to a subscription period.""" + order: DCAOrder + subscription_id: str + tier: str + loyalty_multiplier: float + loyalty_bonus_minted: float = 0.0 + period_index: int = 0 + + +@dataclass +class DonationDCAOrder: + """A DCA order for a one-time donation.""" + order: DCAOrder + donor: str + donation_amount: float + + +@dataclass +class SubscriptionDCAState: + """Tracks all active and historical DCA orders.""" + # subscriber -> list of subscription DCA orders (one per period) + subscription_orders: dict[str, list[SubscriptionDCAOrder]] = field( + default_factory=dict, + ) + # All donation orders + donation_orders: list[DonationDCAOrder] = field(default_factory=list) + + +# --- Subscription DCA --- + +def _compute_loyalty_multiplier( + tier: SubscriptionTier, + subscription: Subscription, + current_time: float, +) -> float: + """Compute loyalty multiplier matching subscription.py's formula.""" + duration = current_time - subscription.start_time + return 1.0 + (tier.loyalty_multiplier_max - 1.0) * ( + 1.0 - np.exp(-0.693 * duration / tier.loyalty_halflife) + ) + + +def create_subscription_dca_order( + subscription: Subscription, + tier: SubscriptionTier, + config: SubscriptionDCAConfig, + current_time: float, + loyalty_multiplier: float, + period_index: int = 0, +) -> SubscriptionDCAOrder: + """Create a DCA order for one subscription period. + + Splits the tier's payment_per_period into n_chunks spread over + spread_fraction of the period. + """ + spread_duration = tier.period_length * config.spread_fraction + interval = spread_duration / max(config.n_chunks, 1) + + dca_params = DCAParams( + total_amount=tier.payment_per_period, + n_chunks=config.n_chunks, + interval=interval, + asset_index=0, + strategy=config.strategy, + ) + + order = create_dca_order(dca_params, start_time=current_time) + + return SubscriptionDCAOrder( + order=order, + subscription_id=subscription.subscriber, + tier=subscription.tier, + loyalty_multiplier=loyalty_multiplier, + period_index=period_index, + ) + + +# --- Donation DCA --- + +def create_donation_dca_order( + donor: str, + amount: float, + config: DonationDCAConfig, + current_time: float, +) -> DonationDCAOrder: + """Create a DCA order for a one-time donation. + + If enable_dca is False or n_chunks <= 1, creates a single-chunk + order (instant deposit). + """ + n = config.n_chunks if config.enable_dca and config.n_chunks > 1 else 1 + interval = config.interval if n > 1 else 0.0 + + dca_params = DCAParams( + total_amount=amount, + n_chunks=n, + interval=interval, + asset_index=0, + strategy=config.strategy, + ) + + order = create_dca_order(dca_params, start_time=current_time) + + return DonationDCAOrder( + order=order, + donor=donor, + donation_amount=amount, + ) + + +# --- Chunk execution --- + +def execute_subscription_chunk( + state: SubscriptionDCAState, + subscriber: str, + system: MycoSystem, + oracle_state: TWAPOracleState, + current_time: float, +) -> tuple[SubscriptionDCAState, MycoSystem, TWAPOracleState, float, float]: + """Execute the next pending chunk for a subscriber's latest order. + + Returns: + (state, system, oracle_state, curve_tokens, loyalty_bonus) + + The loyalty bonus is: curve_tokens * (loyalty_multiplier - 1.0) + when loyalty_as_bonus is configured. This bonus is NOT minted through + the curve — it's minted as commitment tokens on top. + """ + orders = state.subscription_orders.get(subscriber) + if not orders: + return state, system, oracle_state, 0.0, 0.0 + + # Find the latest non-complete order + sub_order = orders[-1] + if sub_order.order.is_complete: + return state, system, oracle_state, 0.0, 0.0 + + # Execute through DCA executor + system, sub_order.order, oracle_state, curve_tokens = execute_chunk( + system, sub_order.order, oracle_state, current_time, + ) + + # Compute loyalty bonus + loyalty_bonus = 0.0 + if curve_tokens > 0 and sub_order.loyalty_multiplier > 1.0: + loyalty_bonus = curve_tokens * (sub_order.loyalty_multiplier - 1.0) + sub_order.loyalty_bonus_minted += loyalty_bonus + + return state, system, oracle_state, curve_tokens, loyalty_bonus + + +def execute_donation_chunk( + state: SubscriptionDCAState, + order_index: int, + system: MycoSystem, + oracle_state: TWAPOracleState, + current_time: float, +) -> tuple[SubscriptionDCAState, MycoSystem, TWAPOracleState, float]: + """Execute the next pending chunk for a donation order. + + Returns: + (state, system, oracle_state, tokens_minted) + """ + if order_index >= len(state.donation_orders): + return state, system, oracle_state, 0.0 + + don_order = state.donation_orders[order_index] + if don_order.order.is_complete: + return state, system, oracle_state, 0.0 + + system, don_order.order, oracle_state, tokens = execute_chunk( + system, don_order.order, oracle_state, current_time, + ) + + return state, system, oracle_state, tokens + + +# --- Query --- + +def pending_chunks( + state: SubscriptionDCAState, + subscriber: str, + current_time: float, +) -> list[tuple[int, int]]: + """List pending (unexecuted) chunks for a subscriber. + + Returns list of (order_index, chunk_number) tuples where + chunk_number is the next chunk to execute in that order. + """ + orders = state.subscription_orders.get(subscriber, []) + result = [] + for i, sub_order in enumerate(orders): + order = sub_order.order + if order.is_complete: + continue + next_chunk = order.chunks_executed + next_time = order.start_time + next_chunk * order.params.interval + if current_time >= next_time: + result.append((i, next_chunk)) + return result + + +# --- Simulation --- + +def simulate_subscription_dca( + tiers: dict[str, SubscriptionTier], + config: SubscriptionDCAConfig, + system_config: MycoSystemConfig, + subscribers: list[tuple[str, str, float]], + duration: float, + dt: float = 1.0, +) -> dict[str, NDArray]: + """Simulate subscription DCA over time. + + Args: + tiers: Available subscription tiers. + config: DCA configuration. + system_config: MycoSystem configuration. + subscribers: List of (name, tier_name, start_time). + duration: Total simulation duration. + dt: Time step. + + Returns: + Dict with arrays: times, total_curve_tokens, total_loyalty_bonus, + total_supply. + """ + system = MycoSystem(system_config) + n = system_config.n_reserve_assets + + # Bootstrap the system + bootstrap = np.full(n, 10000.0 / n) + system.deposit(bootstrap, 0.0) + + oracle_state = create_oracle(TWAPOracleParams(default_window=24.0)) + + state = SubscriptionDCAState() + subs: dict[str, Subscription] = {} + + n_steps = int(duration / dt) + times = np.zeros(n_steps) + curve_tokens_arr = np.zeros(n_steps) + loyalty_bonus_arr = np.zeros(n_steps) + supply_arr = np.zeros(n_steps) + + total_curve = 0.0 + total_loyalty = 0.0 + + for step in range(n_steps): + t = step * dt + times[step] = t + + # Create subscriptions as they start + for name, tier_name, start_time in subscribers: + if abs(t - start_time) < dt / 2 and name not in subs: + tier = tiers[tier_name] + sub = Subscription( + subscriber=name, + tier=tier_name, + start_time=t, + last_payment_time=t, + ) + subs[name] = sub + lm = _compute_loyalty_multiplier(tier, sub, t) + order = create_subscription_dca_order( + sub, tier, config, t, lm, period_index=0, + ) + state.subscription_orders[name] = [order] + + # Execute pending chunks + for name in list(state.subscription_orders.keys()): + if name not in subs: + continue + pending = pending_chunks(state, name, t) + for _ in pending: + state, system, oracle_state, ct, lb = execute_subscription_chunk( + state, name, system, oracle_state, t, + ) + total_curve += ct + total_loyalty += lb + + # Check if order complete and new period due + orders = state.subscription_orders.get(name, []) + if orders and orders[-1].order.is_complete: + sub = subs[name] + tier = tiers[sub.tier] + period_idx = orders[-1].period_index + 1 + next_period_start = sub.start_time + period_idx * tier.period_length + if t >= next_period_start: + lm = _compute_loyalty_multiplier(tier, sub, t) + new_order = create_subscription_dca_order( + sub, tier, config, t, lm, period_index=period_idx, + ) + orders.append(new_order) + + curve_tokens_arr[step] = total_curve + loyalty_bonus_arr[step] = total_loyalty + supply_arr[step] = system.state.supply + + return { + "times": times, + "total_curve_tokens": curve_tokens_arr, + "total_loyalty_bonus": loyalty_bonus_arr, + "total_supply": supply_arr, + } diff --git a/src/crdt/dca_schedule.py b/src/crdt/dca_schedule.py new file mode 100644 index 0000000..dc12563 --- /dev/null +++ b/src/crdt/dca_schedule.py @@ -0,0 +1,328 @@ +"""CRDT-native DCA schedules — G-Set of chunks with monotone status. + +Each DCA schedule is a set of chunks whose status follows a monotone +lattice: pending(0) → submitted(1) → executed(2). This mirrors the +_STATUS_RANK pattern from intent_matching.py. + +Composition points: + materialize_chunks() before create_batch() — due chunks become Intents + reconcile_settled_chunks() after solve_batch() — settled intents mark + chunks as executed. + +CRDT merge: union of schedules/chunks, max status wins for duplicates. +This is commutative, associative, and idempotent. +""" + +from dataclasses import dataclass, field + +from src.crdt.intent_matching import Intent, IntentSet, add_intent +from src.crdt.batch_settlement import Settlement + + +# --- Status lattice --- + +_CHUNK_STATUS_RANK = {"pending": 0, "submitted": 1, "executed": 2} + + +def _higher_chunk_status(a: str, b: str) -> str: + """Return the more advanced chunk status (monotone lattice).""" + return a if _CHUNK_STATUS_RANK.get(a, 0) >= _CHUNK_STATUS_RANK.get(b, 0) else b + + +# --- Data models --- + +@dataclass(frozen=True) +class DCAChunk: + """A single chunk within a DCA schedule. + + Frozen so it can serve as a value in the G-Set. + Status transitions create new instances. + """ + chunk_id: str + schedule_id: str + maker: str + sell_token: str + sell_amount: float + buy_token: str + min_buy_amount: float + scheduled_time: float + status: str = "pending" # "pending" | "submitted" | "executed" + + +@dataclass +class DCASchedule: + """A complete DCA schedule: a set of chunks for one maker.""" + schedule_id: str + maker: str + chunks: dict[str, DCAChunk] = field(default_factory=dict) + total_amount: float = 0.0 + + +@dataclass +class DCAScheduleRegistry: + """Registry of all DCA schedules across all makers.""" + schedules: dict[str, DCASchedule] = field(default_factory=dict) + + +# --- Schedule creation --- + +def create_dca_schedule( + schedule_id: str, + maker: str, + total_amount: float, + n_chunks: int, + start_time: float, + interval: float, + sell_token: str = "USDC", + buy_token: str = "MYCO", + min_buy_ratio: float = 0.8, + oracle_state=None, +) -> DCASchedule: + """Create a DCA schedule with n_chunks spread over time. + + Args: + schedule_id: Unique schedule identifier. + maker: The entity placing the order. + total_amount: Total amount to deploy across all chunks. + n_chunks: Number of chunks. + start_time: Timestamp of first chunk. + interval: Time between chunks. + sell_token: Token being sold (deposited). + buy_token: Token being bought (minted). + min_buy_ratio: Minimum acceptable output as fraction of input. + oracle_state: Optional oracle for TWAP-aware sizing (unused + in this CRDT layer — sizing happens at execution time). + + Returns: + A DCASchedule with all chunks in 'pending' status. + """ + chunk_amount = total_amount / n_chunks + chunks: dict[str, DCAChunk] = {} + + for i in range(n_chunks): + cid = f"{schedule_id}_chunk_{i}" + chunks[cid] = DCAChunk( + chunk_id=cid, + schedule_id=schedule_id, + maker=maker, + sell_token=sell_token, + sell_amount=chunk_amount, + buy_token=buy_token, + min_buy_amount=chunk_amount * min_buy_ratio, + scheduled_time=start_time + i * interval, + ) + + return DCASchedule( + schedule_id=schedule_id, + maker=maker, + chunks=chunks, + total_amount=total_amount, + ) + + +# --- Status advancement --- + +def advance_chunk_status( + schedule: DCASchedule, + chunk_id: str, + new_status: str, +) -> DCASchedule: + """Advance a chunk's status monotonically. + + Only allows forward transitions in the lattice: + pending → submitted → executed. + + Returns schedule unchanged if transition is not forward. + """ + chunk = schedule.chunks.get(chunk_id) + if chunk is None: + return schedule + + current_rank = _CHUNK_STATUS_RANK.get(chunk.status, 0) + new_rank = _CHUNK_STATUS_RANK.get(new_status, 0) + + if new_rank <= current_rank: + return schedule + + updated = DCAChunk( + chunk_id=chunk.chunk_id, + schedule_id=chunk.schedule_id, + maker=chunk.maker, + sell_token=chunk.sell_token, + sell_amount=chunk.sell_amount, + buy_token=chunk.buy_token, + min_buy_amount=chunk.min_buy_amount, + scheduled_time=chunk.scheduled_time, + status=new_status, + ) + + new_chunks = {**schedule.chunks, chunk_id: updated} + return DCASchedule( + schedule_id=schedule.schedule_id, + maker=schedule.maker, + chunks=new_chunks, + total_amount=schedule.total_amount, + ) + + +# --- Materialization: pending chunks → Intents --- + +def materialize_chunks( + registry: DCAScheduleRegistry, + intent_set: IntentSet, + current_time: float, +) -> tuple[DCAScheduleRegistry, IntentSet, list[str]]: + """Convert due pending chunks into Intent objects. + + A chunk is due when current_time >= chunk.scheduled_time and + chunk.status == 'pending'. + + Returns: + (updated_registry, updated_intent_set, list_of_materialized_chunk_ids) + """ + materialized: list[str] = [] + new_schedules = dict(registry.schedules) + + for sid, schedule in registry.schedules.items(): + updated_schedule = schedule + for cid, chunk in schedule.chunks.items(): + if chunk.status != "pending": + continue + if current_time < chunk.scheduled_time: + continue + + # Create an Intent from this chunk + intent = Intent( + intent_id=chunk.chunk_id, + maker=chunk.maker, + sell_token=chunk.sell_token, + sell_amount=chunk.sell_amount, + buy_token=chunk.buy_token, + min_buy_amount=chunk.min_buy_amount, + valid_until=chunk.scheduled_time + 1000.0, + ) + intent_set = add_intent(intent_set, intent) + + # Advance to submitted + updated_schedule = advance_chunk_status( + updated_schedule, cid, "submitted", + ) + materialized.append(cid) + + new_schedules[sid] = updated_schedule + + return ( + DCAScheduleRegistry(schedules=new_schedules), + intent_set, + materialized, + ) + + +# --- Reconciliation: settled intents → executed chunks --- + +def reconcile_settled_chunks( + registry: DCAScheduleRegistry, + settlement: Settlement, + intent_set: IntentSet, +) -> DCAScheduleRegistry: + """Mark chunks as executed when their corresponding intents settled. + + A chunk is settled if: + - It was part of a CoW match (intent filled), OR + - Its intent contributed to net_curve_buy/sell (unmatched but settled) + + For simplicity, we mark all submitted chunks whose intent_id appears + in the settlement's cow_matches or in the intent_set as 'filled'. + """ + # Collect all settled intent IDs from CoW matches + settled_ids: set[str] = set() + for match in settlement.cow_matches: + settled_ids.add(match.intent_a_id) + settled_ids.add(match.intent_b_id) + + # Also check intent_set for filled intents + for iid, intent in intent_set.intents.items(): + if intent.status == "filled": + settled_ids.add(iid) + + new_schedules = dict(registry.schedules) + for sid, schedule in registry.schedules.items(): + updated = schedule + for cid, chunk in schedule.chunks.items(): + if chunk.status == "submitted" and cid in settled_ids: + updated = advance_chunk_status(updated, cid, "executed") + new_schedules[sid] = updated + + return DCAScheduleRegistry(schedules=new_schedules) + + +# --- CRDT merge --- + +def merge_dca_schedules( + a: DCAScheduleRegistry, + b: DCAScheduleRegistry, +) -> DCAScheduleRegistry: + """Merge two DCA schedule registries (CRDT merge). + + Union of schedules. For duplicate schedule_ids, merge chunks: + union of chunks, max status for duplicates. + Commutative, associative, idempotent. + """ + all_sids = set(a.schedules) | set(b.schedules) + merged: dict[str, DCASchedule] = {} + + for sid in all_sids: + sa = a.schedules.get(sid) + sb = b.schedules.get(sid) + + if sa is None: + merged[sid] = sb # type: ignore[assignment] + elif sb is None: + merged[sid] = sa + else: + # Merge chunks + all_cids = set(sa.chunks) | set(sb.chunks) + merged_chunks: dict[str, DCAChunk] = {} + for cid in all_cids: + ca = sa.chunks.get(cid) + cb = sb.chunks.get(cid) + if ca is None: + merged_chunks[cid] = cb # type: ignore[assignment] + elif cb is None: + merged_chunks[cid] = ca + else: + # Max status wins + winner_status = _higher_chunk_status(ca.status, cb.status) + winner = ca if ca.status == winner_status else cb + merged_chunks[cid] = DCAChunk( + chunk_id=winner.chunk_id, + schedule_id=winner.schedule_id, + maker=winner.maker, + sell_token=winner.sell_token, + sell_amount=winner.sell_amount, + buy_token=winner.buy_token, + min_buy_amount=winner.min_buy_amount, + scheduled_time=winner.scheduled_time, + status=winner_status, + ) + + merged[sid] = DCASchedule( + schedule_id=sa.schedule_id, + maker=sa.maker, + chunks=merged_chunks, + total_amount=max(sa.total_amount, sb.total_amount), + ) + + return DCAScheduleRegistry(schedules=merged) + + +# --- Utilities --- + +def schedule_completion_fraction(schedule: DCASchedule) -> float: + """Fraction of chunks that have reached 'executed' status.""" + if not schedule.chunks: + return 0.0 + executed = sum( + 1 for c in schedule.chunks.values() if c.status == "executed" + ) + return executed / len(schedule.chunks) diff --git a/src/primitives/signal_router.py b/src/primitives/signal_router.py new file mode 100644 index 0000000..3f34307 --- /dev/null +++ b/src/primitives/signal_router.py @@ -0,0 +1,221 @@ +"""Signal Router — oracle signals → adaptive system parameters. + +Routes TWAP deviation and volatility signals from the oracle into +system parameters: flow threshold, PAMM curvature, surge fees, and +oracle weight multiplier velocity. + +All transforms are stateless pure functions: oracle state in, +adaptive parameters out. The router is opt-in — returns base params +unchanged when oracle is None or has insufficient data. + +Mapping: + volatility → tighter flow threshold (reduces max outflow) + |deviation| → steeper PAMM discount (wider redemption haircut) + volatility → higher surge fee (imbalance fee increases) + deviation → oracle velocity signal (weight drift direction) +""" + +import math +from dataclasses import dataclass + +from src.primitives.twap_oracle import ( + TWAPOracleState, + compute_twap, + get_volatility, + spot_vs_twap_deviation, +) + + +@dataclass +class SignalRouterConfig: + """Coefficients and clamp bounds for signal routing.""" + # Multiplicative coefficients + k_vol_flow: float = 1.0 # volatility → flow_threshold scaling + k_dev_alpha: float = 2.0 # |deviation| → pamm_alpha_bar scaling + k_vol_fee: float = 1.5 # volatility → surge_fee_rate scaling + k_oracle_vel: float = 0.01 # deviation → oracle_multiplier_velocity + + # Safety clamp bounds + flow_threshold_min: float = 0.01 + flow_threshold_max: float = 0.5 + alpha_bar_min: float = 1.0 + alpha_bar_max: float = 100.0 + surge_fee_min: float = 0.0 + surge_fee_max: float = 0.5 + velocity_min: float = -0.1 + velocity_max: float = 0.1 + + +@dataclass +class AdaptiveParams: + """Snapshot of adapted system parameters.""" + flow_threshold: float + pamm_alpha_bar: float + surge_fee_rate: float + oracle_multiplier_velocity: float + + +@dataclass +class RouterSignals: + """Intermediate signals extracted from oracle state.""" + twap_deviation: float # (spot - twap) / twap + volatility: float # log-return std dev + spot_price: float + twap: float + is_valid: bool # False if oracle has insufficient data + + +def extract_signals( + oracle_state: TWAPOracleState, + current_spot: float, + window: float | None = None, +) -> RouterSignals: + """Extract routing signals from oracle state and current spot price. + + Returns RouterSignals with is_valid=False if oracle has + fewer than min_observations. + """ + twap = compute_twap(oracle_state, window) + if twap <= 0: + return RouterSignals( + twap_deviation=0.0, + volatility=0.0, + spot_price=current_spot, + twap=0.0, + is_valid=False, + ) + + deviation = spot_vs_twap_deviation(oracle_state, current_spot, window) + vol = get_volatility(oracle_state, window) + + return RouterSignals( + twap_deviation=deviation, + volatility=vol, + spot_price=current_spot, + twap=twap, + is_valid=True, + ) + + +def _clamp(value: float, lo: float, hi: float) -> float: + return max(lo, min(hi, value)) + + +def apply_signals( + signals: RouterSignals, + base_params: AdaptiveParams, + config: SignalRouterConfig, +) -> AdaptiveParams: + """Apply extracted signals to base parameters. + + Multiplicative scaling with clamping: + flow_threshold *= (1 - k_vol_flow * volatility) + pamm_alpha_bar *= (1 + k_dev_alpha * |deviation|) + surge_fee_rate *= (1 + k_vol_fee * volatility) + oracle_multiplier_velocity = k_oracle_vel * deviation + + Returns base_params unchanged if signals are invalid. + """ + if not signals.is_valid: + return base_params + + vol = signals.volatility + dev = signals.twap_deviation + + flow = base_params.flow_threshold * (1.0 - config.k_vol_flow * vol) + flow = _clamp(flow, config.flow_threshold_min, config.flow_threshold_max) + + alpha = base_params.pamm_alpha_bar * (1.0 + config.k_dev_alpha * abs(dev)) + alpha = _clamp(alpha, config.alpha_bar_min, config.alpha_bar_max) + + fee = base_params.surge_fee_rate * (1.0 + config.k_vol_fee * vol) + fee = _clamp(fee, config.surge_fee_min, config.surge_fee_max) + + velocity = config.k_oracle_vel * dev + velocity = _clamp(velocity, config.velocity_min, config.velocity_max) + + return AdaptiveParams( + flow_threshold=flow, + pamm_alpha_bar=alpha, + surge_fee_rate=fee, + oracle_multiplier_velocity=velocity, + ) + + +def route_signals( + oracle_state: TWAPOracleState | None, + base_params: AdaptiveParams, + config: SignalRouterConfig, + current_spot: float = 0.0, + window: float | None = None, +) -> AdaptiveParams: + """Top-level convenience: extract + apply in one call. + + No-op (returns base_params) when oracle_state is None. + """ + if oracle_state is None: + return base_params + + if current_spot <= 0 and oracle_state.observations: + current_spot = oracle_state.observations[-1].price + + signals = extract_signals(oracle_state, current_spot, window) + return apply_signals(signals, base_params, config) + + +def simulate_signal_routing( + base_params: AdaptiveParams, + config: SignalRouterConfig, + price_trajectory: list[float], + window: float | None = None, +) -> dict[str, list[float]]: + """Simulate signal routing over a price trajectory. + + Builds an oracle from the trajectory and computes adaptive + params at each step. Useful for visualizing parameter response + to different price patterns. + + Args: + base_params: Starting parameter values. + config: Router coefficients. + price_trajectory: Sequence of spot prices (one per time step). + window: TWAP lookback window. + + Returns: + Dict with keys: times, flow_threshold, pamm_alpha_bar, + surge_fee_rate, oracle_velocity, twap_deviation, volatility. + """ + from src.primitives.twap_oracle import ( + TWAPOracleParams, + create_oracle, + record_observation, + ) + + oracle = create_oracle(TWAPOracleParams( + default_window=window if window is not None else 24.0, + )) + + out: dict[str, list[float]] = { + "times": [], + "flow_threshold": [], + "pamm_alpha_bar": [], + "surge_fee_rate": [], + "oracle_velocity": [], + "twap_deviation": [], + "volatility": [], + } + + for t, price in enumerate(price_trajectory): + oracle = record_observation(oracle, price, float(t)) + adapted = route_signals(oracle, base_params, config, price, window) + signals = extract_signals(oracle, price, window) + + out["times"].append(float(t)) + out["flow_threshold"].append(adapted.flow_threshold) + out["pamm_alpha_bar"].append(adapted.pamm_alpha_bar) + out["surge_fee_rate"].append(adapted.surge_fee_rate) + out["oracle_velocity"].append(adapted.oracle_multiplier_velocity) + out["twap_deviation"].append(signals.twap_deviation) + out["volatility"].append(signals.volatility) + + return out diff --git a/tests/test_dca_schedule.py b/tests/test_dca_schedule.py new file mode 100644 index 0000000..cc0120c --- /dev/null +++ b/tests/test_dca_schedule.py @@ -0,0 +1,348 @@ +"""Tests for CRDT DCA schedules — G-Set of chunks with monotone status.""" + +import pytest + +from src.crdt.dca_schedule import ( + DCAChunk, + DCASchedule, + DCAScheduleRegistry, + create_dca_schedule, + advance_chunk_status, + materialize_chunks, + reconcile_settled_chunks, + merge_dca_schedules, + schedule_completion_fraction, + _CHUNK_STATUS_RANK, +) +from src.crdt.intent_matching import Intent, IntentSet, add_intent, fill_intent +from src.crdt.batch_settlement import Settlement, Match + + +# --- TestCreateSchedule --- + +class TestCreateSchedule: + def test_creates_correct_number_of_chunks(self): + sched = create_dca_schedule("s1", "alice", 100.0, 5, 0.0, 1.0) + assert len(sched.chunks) == 5 + assert sched.total_amount == 100.0 + + def test_chunks_have_correct_amounts(self): + sched = create_dca_schedule("s1", "alice", 100.0, 4, 0.0, 1.0) + for chunk in sched.chunks.values(): + assert abs(chunk.sell_amount - 25.0) < 1e-10 + + def test_chunks_are_scheduled_at_intervals(self): + sched = create_dca_schedule("s1", "alice", 100.0, 3, 10.0, 5.0) + times = sorted(c.scheduled_time for c in sched.chunks.values()) + assert times == [10.0, 15.0, 20.0] + + def test_all_chunks_start_pending(self): + sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0) + for chunk in sched.chunks.values(): + assert chunk.status == "pending" + + def test_chunk_ids_are_unique(self): + sched = create_dca_schedule("s1", "alice", 100.0, 10, 0.0, 1.0) + ids = [c.chunk_id for c in sched.chunks.values()] + assert len(ids) == len(set(ids)) + + def test_min_buy_ratio_applied(self): + sched = create_dca_schedule( + "s1", "alice", 100.0, 2, 0.0, 1.0, min_buy_ratio=0.9, + ) + for chunk in sched.chunks.values(): + assert abs(chunk.min_buy_amount - 50.0 * 0.9) < 1e-10 + + def test_custom_tokens(self): + sched = create_dca_schedule( + "s1", "alice", 100.0, 1, 0.0, 1.0, + sell_token="ETH", buy_token="DAI", + ) + chunk = list(sched.chunks.values())[0] + assert chunk.sell_token == "ETH" + assert chunk.buy_token == "DAI" + + +# --- TestAdvanceStatus --- + +class TestAdvanceStatus: + def test_pending_to_submitted(self): + sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) + cid = list(sched.chunks.keys())[0] + updated = advance_chunk_status(sched, cid, "submitted") + assert updated.chunks[cid].status == "submitted" + + def test_submitted_to_executed(self): + sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) + cid = list(sched.chunks.keys())[0] + sched = advance_chunk_status(sched, cid, "submitted") + sched = advance_chunk_status(sched, cid, "executed") + assert sched.chunks[cid].status == "executed" + + def test_backward_transition_rejected(self): + sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) + cid = list(sched.chunks.keys())[0] + sched = advance_chunk_status(sched, cid, "submitted") + sched_after = advance_chunk_status(sched, cid, "pending") + assert sched_after.chunks[cid].status == "submitted" + + def test_same_status_transition_rejected(self): + sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) + cid = list(sched.chunks.keys())[0] + sched_after = advance_chunk_status(sched, cid, "pending") + assert sched_after.chunks[cid].status == "pending" + + def test_nonexistent_chunk_returns_unchanged(self): + sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) + sched_after = advance_chunk_status(sched, "nonexistent", "submitted") + assert sched == sched_after + + def test_skip_to_executed_allowed(self): + sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) + cid = list(sched.chunks.keys())[0] + sched = advance_chunk_status(sched, cid, "executed") + assert sched.chunks[cid].status == "executed" + + +# --- TestMaterializeChunks --- + +class TestMaterializeChunks: + def test_due_chunks_become_intents(self): + sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0) + registry = DCAScheduleRegistry(schedules={"s1": sched}) + iset = IntentSet() + + registry, iset, materialized = materialize_chunks(registry, iset, 1.5) + # Chunks at t=0.0 and t=1.0 are due; t=2.0 not yet + assert len(materialized) == 2 + assert len(iset.intents) == 2 + + def test_materialized_chunks_become_submitted(self): + sched = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0) + registry = DCAScheduleRegistry(schedules={"s1": sched}) + iset = IntentSet() + + registry, iset, materialized = materialize_chunks(registry, iset, 10.0) + for cid in materialized: + assert registry.schedules["s1"].chunks[cid].status == "submitted" + + def test_already_submitted_not_rematerialized(self): + sched = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0) + registry = DCAScheduleRegistry(schedules={"s1": sched}) + iset = IntentSet() + + registry, iset, mat1 = materialize_chunks(registry, iset, 10.0) + registry, iset, mat2 = materialize_chunks(registry, iset, 10.0) + assert len(mat2) == 0 + + def test_future_chunks_not_materialized(self): + sched = create_dca_schedule("s1", "alice", 100.0, 5, 10.0, 1.0) + registry = DCAScheduleRegistry(schedules={"s1": sched}) + iset = IntentSet() + + registry, iset, materialized = materialize_chunks(registry, iset, 5.0) + assert len(materialized) == 0 + + def test_intent_inherits_chunk_fields(self): + sched = create_dca_schedule( + "s1", "alice", 100.0, 1, 0.0, 1.0, + sell_token="ETH", buy_token="DAI", min_buy_ratio=0.9, + ) + registry = DCAScheduleRegistry(schedules={"s1": sched}) + iset = IntentSet() + + registry, iset, materialized = materialize_chunks(registry, iset, 1.0) + intent = iset.intents[materialized[0]] + assert intent.maker == "alice" + assert intent.sell_token == "ETH" + assert intent.buy_token == "DAI" + assert abs(intent.min_buy_amount - 100.0 * 0.9) < 1e-10 + + +# --- TestReconcileSettled --- + +class TestReconcileSettled: + def test_cow_matched_chunks_become_executed(self): + sched = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0) + cids = list(sched.chunks.keys()) + + # Submit both + sched = advance_chunk_status(sched, cids[0], "submitted") + sched = advance_chunk_status(sched, cids[1], "submitted") + registry = DCAScheduleRegistry(schedules={"s1": sched}) + + # Settle first chunk via CoW match + settlement = Settlement( + cow_matches=[ + Match( + intent_a_id=cids[0], + intent_b_id="other_intent", + clearing_price=1.0, + amount_a_sold=50.0, + amount_b_sold=50.0, + ) + ], + ) + iset = IntentSet() + + registry = reconcile_settled_chunks(registry, settlement, iset) + assert registry.schedules["s1"].chunks[cids[0]].status == "executed" + assert registry.schedules["s1"].chunks[cids[1]].status == "submitted" + + def test_filled_intents_mark_executed(self): + sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) + cid = list(sched.chunks.keys())[0] + sched = advance_chunk_status(sched, cid, "submitted") + registry = DCAScheduleRegistry(schedules={"s1": sched}) + + # Mark intent as filled + iset = IntentSet() + intent = Intent( + intent_id=cid, maker="alice", + sell_token="USDC", sell_amount=100.0, + buy_token="MYCO", min_buy_amount=80.0, + valid_until=1000.0, status="filled", + ) + iset = add_intent(iset, intent) + + settlement = Settlement() + registry = reconcile_settled_chunks(registry, settlement, iset) + assert registry.schedules["s1"].chunks[cid].status == "executed" + + def test_pending_chunks_not_affected(self): + sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) + cid = list(sched.chunks.keys())[0] + # Leave as pending + registry = DCAScheduleRegistry(schedules={"s1": sched}) + + settlement = Settlement( + cow_matches=[ + Match(cid, "other", 1.0, 50.0, 50.0), + ], + ) + iset = IntentSet() + registry = reconcile_settled_chunks(registry, settlement, iset) + # Still pending — reconciliation only affects submitted chunks + assert registry.schedules["s1"].chunks[cid].status == "pending" + + +# --- TestMergeCRDT --- + +class TestMergeCRDT: + def test_union_of_disjoint_schedules(self): + s1 = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0) + s2 = create_dca_schedule("s2", "bob", 200.0, 3, 0.0, 1.0) + reg_a = DCAScheduleRegistry(schedules={"s1": s1}) + reg_b = DCAScheduleRegistry(schedules={"s2": s2}) + + merged = merge_dca_schedules(reg_a, reg_b) + assert "s1" in merged.schedules + assert "s2" in merged.schedules + + def test_max_status_wins_for_duplicate_chunks(self): + s1 = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) + cid = list(s1.chunks.keys())[0] + + # Replica A: submitted + s1a = advance_chunk_status(s1, cid, "submitted") + # Replica B: executed + s1b = advance_chunk_status(s1, cid, "executed") + + reg_a = DCAScheduleRegistry(schedules={"s1": s1a}) + reg_b = DCAScheduleRegistry(schedules={"s1": s1b}) + + merged = merge_dca_schedules(reg_a, reg_b) + assert merged.schedules["s1"].chunks[cid].status == "executed" + + def test_merge_is_commutative(self): + s1 = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0) + cid = list(s1.chunks.keys())[0] + s1a = advance_chunk_status(s1, cid, "submitted") + + reg_a = DCAScheduleRegistry(schedules={"s1": s1a}) + reg_b = DCAScheduleRegistry(schedules={"s1": s1}) + + ab = merge_dca_schedules(reg_a, reg_b) + ba = merge_dca_schedules(reg_b, reg_a) + + for cid in ab.schedules["s1"].chunks: + assert ab.schedules["s1"].chunks[cid].status == ba.schedules["s1"].chunks[cid].status + + def test_merge_is_idempotent(self): + s1 = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0) + reg = DCAScheduleRegistry(schedules={"s1": s1}) + + merged = merge_dca_schedules(reg, reg) + for cid in merged.schedules["s1"].chunks: + assert merged.schedules["s1"].chunks[cid].status == s1.chunks[cid].status + + def test_merge_union_of_chunks(self): + """Different replicas have different chunks of same schedule.""" + s1a = DCASchedule( + schedule_id="s1", maker="alice", + chunks={"c1": DCAChunk("c1", "s1", "alice", "USDC", 50.0, "MYCO", 40.0, 0.0)}, + total_amount=100.0, + ) + s1b = DCASchedule( + schedule_id="s1", maker="alice", + chunks={"c2": DCAChunk("c2", "s1", "alice", "USDC", 50.0, "MYCO", 40.0, 1.0)}, + total_amount=100.0, + ) + reg_a = DCAScheduleRegistry(schedules={"s1": s1a}) + reg_b = DCAScheduleRegistry(schedules={"s1": s1b}) + + merged = merge_dca_schedules(reg_a, reg_b) + assert "c1" in merged.schedules["s1"].chunks + assert "c2" in merged.schedules["s1"].chunks + + +# --- TestPartitionReconnect --- + +class TestPartitionReconnect: + """Simulate a network partition + reconnect scenario.""" + + def test_partition_and_merge(self): + """Two replicas diverge, then merge — highest status wins.""" + sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0) + reg = DCAScheduleRegistry(schedules={"s1": sched}) + + cids = sorted(sched.chunks.keys()) + + # Replica A processes chunk 0 + reg_a = DCAScheduleRegistry(schedules={ + "s1": advance_chunk_status(reg.schedules["s1"], cids[0], "submitted"), + }) + reg_a = DCAScheduleRegistry(schedules={ + "s1": advance_chunk_status(reg_a.schedules["s1"], cids[0], "executed"), + }) + + # Replica B processes chunk 1 + reg_b = DCAScheduleRegistry(schedules={ + "s1": advance_chunk_status(reg.schedules["s1"], cids[1], "submitted"), + }) + + merged = merge_dca_schedules(reg_a, reg_b) + assert merged.schedules["s1"].chunks[cids[0]].status == "executed" + assert merged.schedules["s1"].chunks[cids[1]].status == "submitted" + assert merged.schedules["s1"].chunks[cids[2]].status == "pending" + + +# --- TestScheduleCompletion --- + +class TestScheduleCompletion: + def test_empty_schedule_zero(self): + sched = DCASchedule(schedule_id="s1", maker="alice") + assert schedule_completion_fraction(sched) == 0.0 + + def test_all_executed_one(self): + sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0) + for cid in list(sched.chunks.keys()): + sched = advance_chunk_status(sched, cid, "executed") + assert abs(schedule_completion_fraction(sched) - 1.0) < 1e-10 + + def test_partial_completion(self): + sched = create_dca_schedule("s1", "alice", 100.0, 4, 0.0, 1.0) + cids = list(sched.chunks.keys()) + sched = advance_chunk_status(sched, cids[0], "executed") + sched = advance_chunk_status(sched, cids[1], "executed") + assert abs(schedule_completion_fraction(sched) - 0.5) < 1e-10 diff --git a/tests/test_signal_router.py b/tests/test_signal_router.py new file mode 100644 index 0000000..f3ec437 --- /dev/null +++ b/tests/test_signal_router.py @@ -0,0 +1,270 @@ +"""Tests for signal router — oracle signals → adaptive parameters.""" + +import math +import pytest + +from src.primitives.signal_router import ( + SignalRouterConfig, + AdaptiveParams, + RouterSignals, + extract_signals, + apply_signals, + route_signals, + simulate_signal_routing, +) +from src.primitives.twap_oracle import ( + TWAPOracleParams, + TWAPOracleState, + create_oracle, + record_observation, +) + + +# --- Helpers --- + +def _build_oracle(prices: list[float], dt: float = 1.0) -> TWAPOracleState: + """Build an oracle with observations from a price list.""" + oracle = create_oracle(TWAPOracleParams(default_window=len(prices) * dt)) + for i, p in enumerate(prices): + oracle = record_observation(oracle, p, float(i) * dt) + return oracle + + +def _base_params() -> AdaptiveParams: + return AdaptiveParams( + flow_threshold=0.1, + pamm_alpha_bar=10.0, + surge_fee_rate=0.05, + oracle_multiplier_velocity=0.0, + ) + + +# --- TestExtractSignals --- + +class TestExtractSignals: + def test_insufficient_data_returns_invalid(self): + oracle = create_oracle() + signals = extract_signals(oracle, 1.0) + assert signals.is_valid is False + assert signals.twap_deviation == 0.0 + + def test_single_observation_returns_invalid(self): + oracle = create_oracle() + oracle = record_observation(oracle, 1.0, 0.0) + signals = extract_signals(oracle, 1.0) + assert signals.is_valid is False + + def test_stable_prices_zero_deviation(self): + oracle = _build_oracle([1.0, 1.0, 1.0, 1.0, 1.0]) + signals = extract_signals(oracle, 1.0) + assert signals.is_valid is True + assert abs(signals.twap_deviation) < 1e-10 + assert signals.volatility == 0.0 + + def test_spot_above_twap_positive_deviation(self): + oracle = _build_oracle([1.0, 1.0, 1.0, 1.0, 1.0]) + signals = extract_signals(oracle, 1.5) + assert signals.is_valid is True + assert signals.twap_deviation > 0 + + def test_spot_below_twap_negative_deviation(self): + oracle = _build_oracle([1.0, 1.0, 1.0, 1.0, 1.0]) + signals = extract_signals(oracle, 0.5) + assert signals.is_valid is True + assert signals.twap_deviation < 0 + + def test_volatile_prices_nonzero_volatility(self): + oracle = _build_oracle([1.0, 1.5, 0.8, 1.3, 0.9, 1.1]) + signals = extract_signals(oracle, 1.1) + assert signals.is_valid is True + assert signals.volatility > 0 + + +# --- TestApplySignals --- + +class TestApplySignals: + def test_invalid_signals_return_base(self): + base = _base_params() + config = SignalRouterConfig() + invalid = RouterSignals(0.0, 0.0, 1.0, 0.0, is_valid=False) + result = apply_signals(invalid, base, config) + assert result == base + + def test_volatility_tightens_flow_threshold(self): + base = _base_params() + config = SignalRouterConfig(k_vol_flow=1.0) + signals = RouterSignals( + twap_deviation=0.0, volatility=0.2, + spot_price=1.0, twap=1.0, is_valid=True, + ) + result = apply_signals(signals, base, config) + # flow *= (1 - 1.0 * 0.2) = 0.8 * base + assert result.flow_threshold < base.flow_threshold + assert abs(result.flow_threshold - 0.08) < 1e-10 + + def test_deviation_steepens_alpha(self): + base = _base_params() + config = SignalRouterConfig(k_dev_alpha=2.0) + signals = RouterSignals( + twap_deviation=0.1, volatility=0.0, + spot_price=1.1, twap=1.0, is_valid=True, + ) + result = apply_signals(signals, base, config) + # alpha *= (1 + 2.0 * 0.1) = 1.2 * base + assert result.pamm_alpha_bar > base.pamm_alpha_bar + assert abs(result.pamm_alpha_bar - 12.0) < 1e-10 + + def test_volatility_increases_surge_fee(self): + base = _base_params() + config = SignalRouterConfig(k_vol_fee=1.5) + signals = RouterSignals( + twap_deviation=0.0, volatility=0.3, + spot_price=1.0, twap=1.0, is_valid=True, + ) + result = apply_signals(signals, base, config) + # fee *= (1 + 1.5 * 0.3) = 1.45 * base + expected = 0.05 * 1.45 + assert abs(result.surge_fee_rate - expected) < 1e-10 + + def test_deviation_sets_velocity(self): + base = _base_params() + config = SignalRouterConfig(k_oracle_vel=0.01) + signals = RouterSignals( + twap_deviation=-0.2, volatility=0.0, + spot_price=0.8, twap=1.0, is_valid=True, + ) + result = apply_signals(signals, base, config) + assert abs(result.oracle_multiplier_velocity - (-0.002)) < 1e-10 + + def test_clamping_respects_bounds(self): + base = AdaptiveParams( + flow_threshold=0.1, pamm_alpha_bar=10.0, + surge_fee_rate=0.4, oracle_multiplier_velocity=0.0, + ) + config = SignalRouterConfig( + k_vol_flow=10.0, k_vol_fee=10.0, + surge_fee_max=0.5, flow_threshold_min=0.01, + ) + signals = RouterSignals( + twap_deviation=0.0, volatility=0.5, + spot_price=1.0, twap=1.0, is_valid=True, + ) + result = apply_signals(signals, base, config) + assert result.flow_threshold >= config.flow_threshold_min + assert result.surge_fee_rate <= config.surge_fee_max + + +# --- TestRouteSignals --- + +class TestRouteSignals: + def test_none_oracle_returns_base(self): + base = _base_params() + config = SignalRouterConfig() + result = route_signals(None, base, config) + assert result == base + + def test_with_oracle_modifies_params(self): + oracle = _build_oracle([1.0, 1.0, 1.0, 1.2, 1.3]) + base = _base_params() + config = SignalRouterConfig() + result = route_signals(oracle, base, config, current_spot=1.5) + # Spot > TWAP, so deviation > 0 → alpha increases + assert result.pamm_alpha_bar > base.pamm_alpha_bar + + def test_uses_last_obs_when_no_spot(self): + oracle = _build_oracle([1.0, 1.0, 1.0]) + base = _base_params() + config = SignalRouterConfig() + # current_spot=0 → should use last observation price + result = route_signals(oracle, base, config, current_spot=0.0) + # With spot == twap, deviation ≈ 0, so params near base + assert abs(result.flow_threshold - base.flow_threshold) < base.flow_threshold * 0.5 + + +# --- TestKZeroDisablesLink --- + +class TestKZeroDisablesLink: + def test_zero_k_vol_flow_leaves_threshold_unchanged(self): + base = _base_params() + config = SignalRouterConfig(k_vol_flow=0.0) + signals = RouterSignals( + twap_deviation=0.0, volatility=0.5, + spot_price=1.0, twap=1.0, is_valid=True, + ) + result = apply_signals(signals, base, config) + assert abs(result.flow_threshold - base.flow_threshold) < 1e-10 + + def test_zero_k_dev_alpha_leaves_alpha_unchanged(self): + base = _base_params() + config = SignalRouterConfig(k_dev_alpha=0.0) + signals = RouterSignals( + twap_deviation=0.5, volatility=0.0, + spot_price=1.5, twap=1.0, is_valid=True, + ) + result = apply_signals(signals, base, config) + assert abs(result.pamm_alpha_bar - base.pamm_alpha_bar) < 1e-10 + + def test_zero_k_vol_fee_leaves_fee_unchanged(self): + base = _base_params() + config = SignalRouterConfig(k_vol_fee=0.0) + signals = RouterSignals( + twap_deviation=0.0, volatility=0.5, + spot_price=1.0, twap=1.0, is_valid=True, + ) + result = apply_signals(signals, base, config) + assert abs(result.surge_fee_rate - base.surge_fee_rate) < 1e-10 + + def test_zero_k_oracle_vel_leaves_velocity_zero(self): + base = _base_params() + config = SignalRouterConfig(k_oracle_vel=0.0) + signals = RouterSignals( + twap_deviation=0.3, volatility=0.0, + spot_price=1.3, twap=1.0, is_valid=True, + ) + result = apply_signals(signals, base, config) + assert result.oracle_multiplier_velocity == 0.0 + + +# --- TestSimulation --- + +class TestSimulation: + def test_simulation_returns_correct_keys(self): + base = _base_params() + config = SignalRouterConfig() + prices = [1.0] * 10 + result = simulate_signal_routing(base, config, prices) + assert set(result.keys()) == { + "times", "flow_threshold", "pamm_alpha_bar", + "surge_fee_rate", "oracle_velocity", + "twap_deviation", "volatility", + } + assert len(result["times"]) == 10 + + def test_volatile_trajectory_tightens_params(self): + base = _base_params() + config = SignalRouterConfig(k_vol_flow=2.0, k_vol_fee=2.0) + # Volatile trajectory + prices = [1.0, 1.5, 0.8, 1.3, 0.7, 1.4, 0.9, 1.2, 0.85, 1.1] + result = simulate_signal_routing(base, config, prices) + # After volatile period, flow_threshold should be below base + # (after enough observations for volatility to register) + final_flow = result["flow_threshold"][-1] + assert final_flow <= base.flow_threshold + + def test_stable_trajectory_preserves_params(self): + base = _base_params() + config = SignalRouterConfig() + prices = [1.0] * 20 + result = simulate_signal_routing(base, config, prices) + # Stable prices → zero volatility, zero deviation + # Params should stay at base (after oracle warms up) + assert abs(result["flow_threshold"][-1] - base.flow_threshold) < 1e-6 + assert abs(result["surge_fee_rate"][-1] - base.surge_fee_rate) < 1e-6 + + def test_simulation_length_matches_trajectory(self): + base = _base_params() + config = SignalRouterConfig() + prices = [1.0 + 0.01 * i for i in range(50)] + result = simulate_signal_routing(base, config, prices) + for key in result: + assert len(result[key]) == 50 diff --git a/tests/test_subscription_dca.py b/tests/test_subscription_dca.py new file mode 100644 index 0000000..f0abd19 --- /dev/null +++ b/tests/test_subscription_dca.py @@ -0,0 +1,311 @@ +"""Tests for subscription/donation DCA integration.""" + +import numpy as np +import pytest + +from src.composed.subscription_dca import ( + SubscriptionDCAConfig, + DonationDCAConfig, + SubscriptionDCAOrder, + DonationDCAOrder, + SubscriptionDCAState, + create_subscription_dca_order, + create_donation_dca_order, + execute_subscription_chunk, + execute_donation_chunk, + pending_chunks, + simulate_subscription_dca, +) +from src.composed.dca_executor import DCAOrder +from src.composed.myco_surface import MycoSystem, MycoSystemConfig +from src.commitments.subscription import ( + Subscription, + SubscriptionTier, + create_default_tiers, +) +from src.primitives.twap_oracle import ( + TWAPOracleParams, + TWAPOracleState, + create_oracle, + record_observation, +) + + +# --- Helpers --- + +def _make_system() -> MycoSystem: + config = MycoSystemConfig(n_reserve_assets=3) + system = MycoSystem(config) + # Bootstrap + system.deposit(np.array([1000.0, 1000.0, 1000.0]), 0.0) + return system + + +def _make_oracle() -> TWAPOracleState: + oracle = create_oracle(TWAPOracleParams(default_window=24.0)) + oracle = record_observation(oracle, 1.0, 0.0) + oracle = record_observation(oracle, 1.0, 1.0) + return oracle + + +def _make_subscription() -> tuple[Subscription, SubscriptionTier]: + tier = SubscriptionTier( + name="Sustainer", + payment_per_period=50.0, + period_length=30.0, + base_mint_rate=1.8, + loyalty_multiplier_max=1.5, + loyalty_halflife=120.0, + ) + sub = Subscription( + subscriber="alice", + tier="sustainer", + start_time=0.0, + last_payment_time=0.0, + total_paid=0.0, + total_minted=0.0, + periods_paid=0, + ) + return sub, tier + + +# --- TestSubscriptionDCAOrder --- + +class TestSubscriptionDCAOrder: + def test_creates_correct_chunks(self): + sub, tier = _make_subscription() + config = SubscriptionDCAConfig(n_chunks=5, spread_fraction=0.8) + order = create_subscription_dca_order(sub, tier, config, 10.0, 1.2, 0) + assert order.order.params.n_chunks == 5 + assert abs(order.order.params.total_amount - 50.0) < 1e-10 + + def test_interval_respects_spread(self): + sub, tier = _make_subscription() + config = SubscriptionDCAConfig(n_chunks=5, spread_fraction=0.8) + order = create_subscription_dca_order(sub, tier, config, 0.0, 1.0, 0) + expected_interval = (30.0 * 0.8) / 5 + assert abs(order.order.params.interval - expected_interval) < 1e-10 + + def test_loyalty_multiplier_stored(self): + sub, tier = _make_subscription() + config = SubscriptionDCAConfig() + order = create_subscription_dca_order(sub, tier, config, 0.0, 1.35, 2) + assert abs(order.loyalty_multiplier - 1.35) < 1e-10 + assert order.period_index == 2 + + def test_strategy_propagated(self): + sub, tier = _make_subscription() + config = SubscriptionDCAConfig(strategy="twap_aware") + order = create_subscription_dca_order(sub, tier, config, 0.0, 1.0, 0) + assert order.order.params.strategy == "twap_aware" + + +# --- TestDonationDCAOrder --- + +class TestDonationDCAOrder: + def test_dca_enabled_multi_chunk(self): + config = DonationDCAConfig(enable_dca=True, n_chunks=5, interval=2.0) + order = create_donation_dca_order("bob", 500.0, config, 0.0) + assert order.order.params.n_chunks == 5 + assert abs(order.donation_amount - 500.0) < 1e-10 + + def test_dca_disabled_single_chunk(self): + config = DonationDCAConfig(enable_dca=False, n_chunks=5) + order = create_donation_dca_order("bob", 500.0, config, 0.0) + assert order.order.params.n_chunks == 1 + + def test_single_chunk_when_n_chunks_one(self): + config = DonationDCAConfig(enable_dca=True, n_chunks=1) + order = create_donation_dca_order("bob", 100.0, config, 0.0) + assert order.order.params.n_chunks == 1 + assert order.order.params.interval == 0.0 + + def test_donor_stored(self): + config = DonationDCAConfig() + order = create_donation_dca_order("carol", 200.0, config, 5.0) + assert order.donor == "carol" + assert abs(order.order.start_time - 5.0) < 1e-10 + + +# --- TestChunkExecution --- + +class TestChunkExecution: + def test_subscription_chunk_mints_tokens(self): + system = _make_system() + oracle = _make_oracle() + sub, tier = _make_subscription() + config = SubscriptionDCAConfig(n_chunks=3) + + order = create_subscription_dca_order(sub, tier, config, 1.0, 1.0, 0) + state = SubscriptionDCAState( + subscription_orders={"alice": [order]}, + ) + + state, system, oracle, tokens, bonus = execute_subscription_chunk( + state, "alice", system, oracle, 1.0, + ) + assert tokens > 0 + assert bonus == 0.0 # loyalty_multiplier == 1.0 + + def test_loyalty_bonus_computed(self): + system = _make_system() + oracle = _make_oracle() + sub, tier = _make_subscription() + config = SubscriptionDCAConfig(n_chunks=3) + + order = create_subscription_dca_order(sub, tier, config, 1.0, 1.5, 0) + state = SubscriptionDCAState( + subscription_orders={"alice": [order]}, + ) + + state, system, oracle, tokens, bonus = execute_subscription_chunk( + state, "alice", system, oracle, 1.0, + ) + assert tokens > 0 + # bonus = tokens * (1.5 - 1.0) = tokens * 0.5 + assert abs(bonus - tokens * 0.5) < 1e-10 + + def test_donation_chunk_mints_tokens(self): + system = _make_system() + oracle = _make_oracle() + config = DonationDCAConfig(enable_dca=True, n_chunks=3, interval=1.0) + don_order = create_donation_dca_order("bob", 300.0, config, 1.0) + + state = SubscriptionDCAState(donation_orders=[don_order]) + + state, system, oracle, tokens = execute_donation_chunk( + state, 0, system, oracle, 1.0, + ) + assert tokens > 0 + + def test_complete_order_returns_zero(self): + system = _make_system() + oracle = _make_oracle() + sub, tier = _make_subscription() + config = SubscriptionDCAConfig(n_chunks=1) + + order = create_subscription_dca_order(sub, tier, config, 1.0, 1.0, 0) + state = SubscriptionDCAState( + subscription_orders={"alice": [order]}, + ) + + # Execute the single chunk + state, system, oracle, t1, b1 = execute_subscription_chunk( + state, "alice", system, oracle, 1.0, + ) + assert t1 > 0 + + # Second execution should return zero + state, system, oracle, t2, b2 = execute_subscription_chunk( + state, "alice", system, oracle, 2.0, + ) + assert t2 == 0.0 + assert b2 == 0.0 + + def test_nonexistent_subscriber_returns_zero(self): + system = _make_system() + oracle = _make_oracle() + state = SubscriptionDCAState() + + state, system, oracle, tokens, bonus = execute_subscription_chunk( + state, "nobody", system, oracle, 1.0, + ) + assert tokens == 0.0 + + def test_out_of_range_donation_index_returns_zero(self): + system = _make_system() + oracle = _make_oracle() + state = SubscriptionDCAState() + + state, system, oracle, tokens = execute_donation_chunk( + state, 99, system, oracle, 1.0, + ) + assert tokens == 0.0 + + +# --- TestPendingChunks --- + +class TestPendingChunks: + def test_lists_due_chunks(self): + sub, tier = _make_subscription() + config = SubscriptionDCAConfig(n_chunks=5, spread_fraction=1.0) + order = create_subscription_dca_order(sub, tier, config, 0.0, 1.0, 0) + state = SubscriptionDCAState( + subscription_orders={"alice": [order]}, + ) + # interval = 30.0 / 5 = 6.0, so at t=7 chunk 0 and 1 are due + result = pending_chunks(state, "alice", 7.0) + assert len(result) >= 1 + + def test_no_pending_when_complete(self): + sub, tier = _make_subscription() + config = SubscriptionDCAConfig(n_chunks=1) + order = create_subscription_dca_order(sub, tier, config, 0.0, 1.0, 0) + order.order.is_complete = True + state = SubscriptionDCAState( + subscription_orders={"alice": [order]}, + ) + result = pending_chunks(state, "alice", 100.0) + assert len(result) == 0 + + def test_empty_for_unknown_subscriber(self): + state = SubscriptionDCAState() + result = pending_chunks(state, "nobody", 0.0) + assert result == [] + + +# --- TestBackwardCompat --- + +class TestBackwardCompat: + """Ensure the new modules don't break existing DCA or subscription imports.""" + + def test_dca_executor_still_works(self): + from src.composed.dca_executor import DCAParams, simulate_dca + config = MycoSystemConfig(n_reserve_assets=3) + params = DCAParams( + total_amount=100.0, n_chunks=5, + interval=1.0, asset_index=0, + ) + result = simulate_dca(config, params) + assert result.order.total_tokens_received > 0 + + def test_subscription_module_still_works(self): + from src.commitments.subscription import ( + SubscriptionSystem, create_subscription, + process_payment, create_default_tiers, + ) + tiers = create_default_tiers() + system = SubscriptionSystem(tiers=tiers) + system, sub = create_subscription(system, "alice", "supporter", 0.0) + system, tokens = process_payment(system, "alice", 31.0) + assert tokens > 0 + + def test_myco_system_signal_routing(self): + """Test the apply_signal_routing integration on MycoSystem.""" + from src.primitives.signal_router import SignalRouterConfig + config = MycoSystemConfig( + n_reserve_assets=3, + twap_oracle_params=TWAPOracleParams(), + ) + system = MycoSystem(config) + system.deposit(np.array([1000.0, 1000.0, 1000.0]), 0.0) + # Do a few deposits to build oracle history + for i in range(1, 6): + system.deposit(np.array([10.0, 10.0, 10.0]), float(i)) + + router_config = SignalRouterConfig() + adapted = system.apply_signal_routing(router_config) + assert adapted.flow_threshold > 0 + assert adapted.pamm_alpha_bar > 0 + + def test_simulation_runs(self): + tiers = create_default_tiers() + config = SubscriptionDCAConfig(n_chunks=3) + sys_config = MycoSystemConfig(n_reserve_assets=3) + subscribers = [("alice", "supporter", 0.0)] + + result = simulate_subscription_dca( + tiers, config, sys_config, subscribers, duration=60.0, dt=5.0, + ) + assert "times" in result + assert len(result["times"]) == 12