"""Tests for CRDT bridge — multi-peer simulation and merge semantics.""" import pytest from src.crdt.bridge import PeerState, merge_peers, Network, NetworkConfig, EventType from src.crdt.labor_crdt import ( CRDTLaborSystem, AttestationEntry, submit_attestation, ) from src.crdt.intent_matching import Intent, IntentSet, add_intent from src.crdt.dca_schedule import create_dca_schedule, DCAScheduleRegistry # --- Helpers --- def _peer_with_labor(contributor: str, entry_id: str) -> PeerState: """Create a peer with one labor attestation.""" state = PeerState() entry = AttestationEntry( entry_id=entry_id, contribution_type="code", units=5.0, timestamp=1.0, attester="admin", ) state.labor = submit_attestation(state.labor, contributor, entry) return state def _peer_with_intent(intent_id: str) -> PeerState: """Create a peer with one intent.""" state = PeerState() intent = Intent( intent_id=intent_id, maker="alice", sell_token="USDC", sell_amount=100.0, buy_token="MYCO", min_buy_amount=80.0, valid_until=999.0, ) state.intents = add_intent(state.intents, intent) return state def _peer_with_dca(schedule_id: str) -> PeerState: """Create a peer with one DCA schedule.""" state = PeerState() schedule = create_dca_schedule( schedule_id=schedule_id, maker="alice", total_amount=1000.0, n_chunks=5, start_time=0.0, interval=1.0, ) state.dca = DCAScheduleRegistry(schedules={schedule_id: schedule}) return state # --- TestMergePeers --- class TestMergePeers: def test_commutativity(self): a = _peer_with_labor("alice", "e1") b = _peer_with_labor("bob", "e2") ab = merge_peers(a, b) ba = merge_peers(b, a) # Both should have both contributors assert set(ab.labor.logs.keys()) == {"alice", "bob"} assert set(ba.labor.logs.keys()) == {"alice", "bob"} def test_idempotency(self): a = _peer_with_labor("alice", "e1") aa = merge_peers(a, a) assert set(aa.labor.logs.keys()) == {"alice"} assert len(aa.labor.logs["alice"].entries) == 1 def test_merge_intents(self): a = _peer_with_intent("i1") b = _peer_with_intent("i2") merged = merge_peers(a, b) assert "i1" in merged.intents.intents assert "i2" in merged.intents.intents def test_merge_dca_schedules(self): a = _peer_with_dca("s1") b = _peer_with_dca("s2") merged = merge_peers(a, b) assert "s1" in merged.dca.schedules assert "s2" in merged.dca.schedules # --- TestNetworkConvergence --- class TestNetworkConvergence: def test_converges_after_mutations(self): """All peers see all mutations after enough gossip rounds.""" net = Network.create(["p1", "p2", "p3"], NetworkConfig(seed=42)) # Mutate different peers net.mutate_peer("p1", lambda s: _peer_with_labor("alice", "e1")) net.mutate_peer("p2", lambda s: _peer_with_intent("i1")) net.mutate_peer("p3", lambda s: _peer_with_dca("s1")) # Run enough rounds to propagate for _ in range(20): net.step() # All peers should have converged assert net.divergence() == 1 # All peers should have all data for pid in ["p1", "p2", "p3"]: state = net.peers[pid] assert "alice" in state.labor.logs assert "i1" in state.intents.intents assert "s1" in state.dca.schedules def test_empty_network_divergence_one(self): net = Network.create(["p1", "p2"], NetworkConfig(seed=0)) assert net.divergence() == 1 # All empty = converged class TestNetworkPartitions: def test_partition_and_reconnect(self): """Peers reach consistency even after partitions.""" config = NetworkConfig( partition_probability=0.5, reconnect_delay=3.0, seed=123, ) net = Network.create(["p1", "p2", "p3", "p4"], config) # Mutate before running net.mutate_peer("p1", lambda s: _peer_with_labor("alice", "e1")) net.mutate_peer("p4", lambda s: _peer_with_intent("i1")) # Run many steps to ensure partition + reconnect + convergence for _ in range(50): net.step() # After enough rounds, should converge assert net.divergence() == 1 def test_convergence_time_none_during_partition(self): """convergence_time() returns None while partitioned.""" config = NetworkConfig( partition_probability=1.0, # Force partition on first step reconnect_delay=100.0, # Never reconnect during test seed=42, ) net = Network.create(["p1", "p2", "p3"], config) net.mutate_peer("p1", lambda s: _peer_with_labor("alice", "e1")) net.step() # Triggers partition assert net.convergence_time() is None def test_convergence_time_returns_float_after_reconnect(self): config = NetworkConfig( partition_probability=1.0, reconnect_delay=3.0, seed=42, ) net = Network.create(["p1", "p2"], config) net.mutate_peer("p1", lambda s: _peer_with_labor("alice", "e1")) # Force partition then wait for reconnect + convergence for _ in range(20): net.step() ct = net.convergence_time() assert ct is not None assert isinstance(ct, float) class TestNetworkSimulate: def test_simulate_returns_metrics(self): config = NetworkConfig(seed=42) net = Network.create(["p1", "p2", "p3"], config) net.mutate_peer("p1", lambda s: _peer_with_labor("alice", "e1")) result = net.simulate(steps=10) assert "times" in result assert "divergence" in result assert "merge_count" in result assert "partition_active" in result assert "peer_signatures" in result assert "events" in result assert len(result["times"]) == 10 # After 10 rounds, should converge assert result["divergence"][-1] == 1