"""Tests for hub-and-spoke cross-chain architecture (src/crosschain/hub_spoke.py).""" import pytest import numpy as np from src.crosschain.hub_spoke import ( StakingAsset, CCIPMessage, SpokeVault, HubRegistry, CrossChainSystem, create_default_system, simulate_deposit, tick, apply_price_shock, get_crosschain_metrics, ) # ---------- Helpers ---------- def make_eth_asset(price: float = 2400.0) -> StakingAsset: return StakingAsset( symbol="stETH", chain="ethereum", price=price, staking_apy=0.035, weight=1.0, risk_score=0.05, ) def make_spoke(chain: str = "ethereum") -> SpokeVault: asset = make_eth_asset() return SpokeVault(chain=chain, accepted_assets=[asset]) def make_hub_with_spoke() -> tuple[HubRegistry, SpokeVault]: hub = HubRegistry() spoke = make_spoke("ethereum") hub.register_spoke(spoke) return hub, spoke # ---------- StakingAsset ---------- class TestStakingAsset: def test_defaults(self): asset = StakingAsset(symbol="stETH", chain="ethereum") assert asset.price == 1.0 assert asset.staking_apy == 0.04 assert asset.weight == 1.0 assert asset.risk_score == 0.1 def test_custom_values(self): asset = StakingAsset("rETH", "arbitrum", price=2420.0, staking_apy=0.032) assert asset.price == 2420.0 assert asset.staking_apy == 0.032 # ---------- create_default_system ---------- class TestCreateDefaultSystem: def test_has_five_chains(self): system = create_default_system() assert len(system.hub.spokes) == 5 def test_expected_chains_present(self): system = create_default_system() chains = set(system.hub.spokes.keys()) assert "ethereum" in chains assert "arbitrum" in chains assert "optimism" in chains assert "base" in chains assert "polygon" in chains def test_each_spoke_has_assets(self): system = create_default_system() for chain, spoke in system.hub.spokes.items(): assert len(spoke.accepted_assets) > 0, f"{chain} has no assets" def test_initial_state_zeros(self): system = create_default_system() assert system.time == 0.0 assert system.total_messages_sent == 0 assert system.total_messages_delivered == 0 assert system.total_yield_generated == 0.0 assert system.hub.total_collateral_usd == 0.0 # ---------- SpokeVault.deposit ---------- class TestSpokeVaultDeposit: def test_deposit_increases_balance(self): spoke = make_spoke() spoke.deposit("stETH", 10.0, 0.0) assert spoke.balances.get("stETH", 0.0) == pytest.approx(10.0) def test_deposit_returns_ccip_message(self): spoke = make_spoke() msg = spoke.deposit("stETH", 10.0, 0.0) assert isinstance(msg, CCIPMessage) def test_deposit_message_type(self): spoke = make_spoke() msg = spoke.deposit("stETH", 5.0, 1.0) assert msg.msg_type == "deposit_report" def test_deposit_message_source_chain(self): spoke = make_spoke("ethereum") msg = spoke.deposit("stETH", 5.0, 0.0) assert msg.source_chain == "ethereum" def test_deposit_message_dest_chain(self): spoke = make_spoke() msg = spoke.deposit("stETH", 5.0, 0.0) assert msg.dest_chain == "base" def test_deposit_message_payload(self): spoke = make_spoke() msg = spoke.deposit("stETH", 7.0, 0.0) assert msg.payload["asset"] == "stETH" assert msg.payload["amount"] == pytest.approx(7.0) def test_deposit_appends_to_pending_reports(self): spoke = make_spoke() assert len(spoke.pending_reports) == 0 spoke.deposit("stETH", 5.0, 0.0) assert len(spoke.pending_reports) == 1 def test_multiple_deposits_accumulate(self): spoke = make_spoke() spoke.deposit("stETH", 3.0, 0.0) spoke.deposit("stETH", 7.0, 1.0) assert spoke.balances["stETH"] == pytest.approx(10.0) def test_deposit_updates_total_value(self): spoke = make_spoke() price = spoke.accepted_assets[0].price spoke.deposit("stETH", 5.0, 0.0) assert spoke.total_value_usd == pytest.approx(5.0 * price * 1.0) # ---------- SpokeVault.withdraw ---------- class TestSpokeVaultWithdraw: def test_withdraw_decreases_balance(self): spoke = make_spoke() spoke.deposit("stETH", 10.0, 0.0) spoke.withdraw("stETH", 3.0, 1.0) assert spoke.balances["stETH"] == pytest.approx(7.0) def test_withdraw_creates_negative_report(self): spoke = make_spoke() spoke.deposit("stETH", 10.0, 0.0) # Clear pending from deposit spoke.pending_reports.clear() msg = spoke.withdraw("stETH", 3.0, 1.0) assert msg is not None assert msg.payload["amount"] == pytest.approx(-3.0) def test_withdraw_capped_at_balance(self): spoke = make_spoke() spoke.deposit("stETH", 5.0, 0.0) spoke.withdraw("stETH", 999.0, 1.0) assert spoke.balances.get("stETH", 0.0) == pytest.approx(0.0) def test_withdraw_with_no_balance_returns_none(self): spoke = make_spoke() result = spoke.withdraw("stETH", 10.0, 0.0) assert result is None def test_withdraw_appends_pending_report(self): spoke = make_spoke() spoke.deposit("stETH", 10.0, 0.0) initial_count = len(spoke.pending_reports) spoke.withdraw("stETH", 3.0, 1.0) assert len(spoke.pending_reports) == initial_count + 1 # ---------- SpokeVault.apply_staking_yield ---------- class TestSpokeVaultApplyStakingYield: def test_yield_increases_balance(self): spoke = make_spoke() spoke.deposit("stETH", 100.0, 0.0) balance_before = spoke.balances["stETH"] spoke.apply_staking_yield(1.0 / 365) assert spoke.balances["stETH"] > balance_before def test_yield_amount_correct(self): spoke = make_spoke() spoke.deposit("stETH", 100.0, 0.0) asset = spoke.accepted_assets[0] dt = 1.0 / 365 expected_token_yield = 100.0 * asset.staking_apy * dt expected_usd_yield = expected_token_yield * asset.price actual_yield = spoke.apply_staking_yield(dt) assert actual_yield == pytest.approx(expected_usd_yield) def test_empty_vault_yields_zero(self): spoke = make_spoke() yield_amount = spoke.apply_staking_yield(1.0) assert yield_amount == pytest.approx(0.0) def test_yield_updates_total_value_usd(self): spoke = make_spoke() spoke.deposit("stETH", 100.0, 0.0) value_before = spoke.total_value_usd spoke.apply_staking_yield(1.0 / 12) # Monthly assert spoke.total_value_usd > value_before # ---------- HubRegistry.register_spoke ---------- class TestHubRegistryRegisterSpoke: def test_registers_spoke_by_chain(self): hub = HubRegistry() spoke = make_spoke("arbitrum") hub.register_spoke(spoke) assert "arbitrum" in hub.spokes def test_registers_assets(self): hub = HubRegistry() spoke = make_spoke("ethereum") hub.register_spoke(spoke) assert "stETH" in hub.all_assets def test_multiple_spokes(self): hub = HubRegistry() hub.register_spoke(make_spoke("ethereum")) hub.register_spoke(make_spoke("arbitrum")) assert len(hub.spokes) == 2 # ---------- HubRegistry.process_messages ---------- class TestHubRegistryProcessMessages: def test_delivers_message_after_latency(self): hub, spoke = make_hub_with_spoke() spoke.deposit("stETH", 10.0, 0.0) # Advance time far beyond any possible latency delivered = hub.process_messages(1.0) assert len(delivered) >= 1 def test_delivered_message_marked(self): hub, spoke = make_hub_with_spoke() spoke.deposit("stETH", 5.0, 0.0) hub.process_messages(1.0) # All pending reports should be cleared after delivery assert len(spoke.pending_reports) == 0 def test_message_not_delivered_before_latency(self): hub, spoke = make_hub_with_spoke() spoke.deposit("stETH", 5.0, 0.0) # Process at exactly time 0 — latency hasn't elapsed yet # (latency is always > 0 due to base_latency + randomness) delivered = hub.process_messages(0.0) # No delivery expected at t=0 assert len(delivered) == 0 def test_updates_global_collateral_after_delivery(self): hub, spoke = make_hub_with_spoke() spoke.deposit("stETH", 10.0, 0.0) hub.process_messages(1.0) assert hub.global_collateral.get("stETH", 0.0) == pytest.approx(10.0) def test_updates_total_collateral_usd(self): hub, spoke = make_hub_with_spoke() spoke.deposit("stETH", 5.0, 0.0) hub.process_messages(1.0) asset = spoke.accepted_assets[0] expected_usd = 5.0 * asset.price * asset.weight assert hub.total_collateral_usd == pytest.approx(expected_usd) # ---------- simulate_deposit ---------- class TestSimulateDeposit: def test_returns_ccip_message(self): system = create_default_system() msg = simulate_deposit(system, "ethereum", "stETH", 10.0, 0.0) assert isinstance(msg, CCIPMessage) def test_increments_messages_sent(self): system = create_default_system() before = system.total_messages_sent simulate_deposit(system, "ethereum", "stETH", 10.0, 0.0) assert system.total_messages_sent == before + 1 def test_appends_to_message_log(self): system = create_default_system() before = len(system.message_log) simulate_deposit(system, "arbitrum", "wstETH", 5.0, 0.0) assert len(system.message_log) == before + 1 def test_spoke_balance_updated(self): system = create_default_system() simulate_deposit(system, "ethereum", "stETH", 50.0, 0.0) assert system.hub.spokes["ethereum"].balances.get("stETH", 0.0) == pytest.approx(50.0) def test_unknown_chain_raises(self): system = create_default_system() with pytest.raises(ValueError, match="Unknown chain"): simulate_deposit(system, "moon", "stETH", 1.0, 0.0) # ---------- tick ---------- class TestTick: def test_tick_advances_time(self): system = create_default_system() dt = 1.0 / 365 tick(system, dt) assert system.time == pytest.approx(dt) def test_tick_multiple_advances_time(self): system = create_default_system() dt = 1.0 / 365 for _ in range(5): tick(system, dt) assert system.time == pytest.approx(5 * dt) def test_tick_with_deposits_generates_yield(self): system = create_default_system() simulate_deposit(system, "ethereum", "stETH", 100.0, 0.0) # Advance past latency tick(system, 1.0) assert system.total_yield_generated > 0.0 def test_tick_returns_dict_with_expected_keys(self): system = create_default_system() result = tick(system, 1.0 / 365) assert "time" in result assert "yield_this_tick" in result assert "messages_delivered" in result assert "total_collateral_usd" in result assert "per_chain" in result def test_tick_per_chain_has_all_chains(self): system = create_default_system() result = tick(system, 1.0 / 365) for chain in ["ethereum", "arbitrum", "optimism", "base", "polygon"]: assert chain in result["per_chain"] def test_tick_increases_total_messages_sent(self): system = create_default_system() before = system.total_messages_sent tick(system, 1.0 / 365) # Broadcasts state sync to all spokes each tick assert system.total_messages_sent > before # ---------- apply_price_shock ---------- class TestApplyPriceShock: def test_price_shock_changes_asset_price(self): system = create_default_system() eth_spoke = system.hub.spokes["ethereum"] for asset in eth_spoke.accepted_assets: if asset.symbol == "stETH": old_price = asset.price break apply_price_shock(system, "stETH", 0.5) for asset in eth_spoke.accepted_assets: if asset.symbol == "stETH": assert asset.price == pytest.approx(old_price * 0.5) break def test_price_shock_affects_all_chains_with_asset(self): system = create_default_system() apply_price_shock(system, "rETH", 0.8) # rETH exists on ethereum and arbitrum for chain in ["ethereum", "arbitrum"]: spoke = system.hub.spokes[chain] for asset in spoke.accepted_assets: if asset.symbol == "rETH": assert asset.price == pytest.approx(2420.0 * 0.8) break def test_price_shock_recalculates_total_collateral(self): system = create_default_system() simulate_deposit(system, "ethereum", "stETH", 10.0, 0.0) tick(system, 1.0) # Process messages value_before = system.hub.total_collateral_usd apply_price_shock(system, "stETH", 0.5) # Value should decrease (stETH is worth less now) # Only meaningful if there's actual stETH balance # At minimum, recalculate_total should have run without error # and total_collateral_usd is a valid float assert isinstance(system.hub.total_collateral_usd, float) def test_price_shock_upward(self): system = create_default_system() eth_spoke = system.hub.spokes["ethereum"] for asset in eth_spoke.accepted_assets: if asset.symbol == "stETH": old_price = asset.price break apply_price_shock(system, "stETH", 2.0) for asset in eth_spoke.accepted_assets: if asset.symbol == "stETH": assert asset.price == pytest.approx(old_price * 2.0) break # ---------- get_crosschain_metrics ---------- class TestGetCrosschainMetrics: def test_returns_all_expected_fields(self): system = create_default_system() m = get_crosschain_metrics(system) assert "time" in m assert "total_collateral_usd" in m assert "total_messages_sent" in m assert "total_messages_delivered" in m assert "total_yield_generated" in m assert "chains" in m assert "global_collateral" in m def test_chains_contains_all_five(self): system = create_default_system() m = get_crosschain_metrics(system) for chain in ["ethereum", "arbitrum", "optimism", "base", "polygon"]: assert chain in m["chains"] def test_chain_entry_has_assets_field(self): system = create_default_system() m = get_crosschain_metrics(system) for chain, data in m["chains"].items(): assert "assets" in data assert "total_value_usd" in data def test_metrics_update_after_deposit(self): system = create_default_system() simulate_deposit(system, "ethereum", "stETH", 50.0, 0.0) tick(system, 1.0) # Process CCIP messages m = get_crosschain_metrics(system) # stETH balance should be reflected in chain assets eth_assets = m["chains"]["ethereum"]["assets"] assert eth_assets["stETH"]["balance"] == pytest.approx( system.hub.spokes["ethereum"].balances.get("stETH", 0.0) ) # ---------- Full lifecycle ---------- class TestFullCrossChainLifecycle: def test_deposit_on_multiple_chains_then_tick(self): system = create_default_system() # Deposit on multiple chains simulate_deposit(system, "ethereum", "stETH", 100.0, 0.0) simulate_deposit(system, "arbitrum", "wstETH", 50.0, 0.0) simulate_deposit(system, "base", "cbETH", 30.0, 0.0) assert system.total_messages_sent == 3 # Tick to process CCIP messages and accrue yield result = tick(system, 1.0) assert system.total_messages_delivered > 0 assert system.total_yield_generated > 0.0 assert result["yield_this_tick"] > 0.0 # Metrics should be populated m = get_crosschain_metrics(system) assert m["total_messages_sent"] > 0 assert m["total_yield_generated"] > 0.0 def test_tick_multiple_times_accumulates_yield(self): system = create_default_system() simulate_deposit(system, "ethereum", "stETH", 100.0, 0.0) # Process deposit tick(system, 1.0) yield_after_1 = system.total_yield_generated # More ticks for _ in range(10): tick(system, 1.0 / 365) assert system.total_yield_generated > yield_after_1 def test_full_lifecycle_verify_metrics(self): system = create_default_system() # Deposit on all five chains for chain, asset in [ ("ethereum", "stETH"), ("arbitrum", "wstETH"), ("optimism", "wstETH"), ("base", "cbETH"), ("polygon", "stMATIC"), ]: simulate_deposit(system, chain, asset, 100.0, 0.0) # Tick for one year for _ in range(12): tick(system, 1.0 / 12) m = get_crosschain_metrics(system) assert m["time"] == pytest.approx(1.0, rel=1e-3) assert m["total_yield_generated"] > 0.0 assert m["total_messages_sent"] > 5 # Initial deposits + syncs assert m["total_collateral_usd"] > 0.0