myco-bonding-curve/tests/test_crdt_bridge.py

194 lines
6.1 KiB
Python

"""Tests for CRDT bridge — multi-peer simulation and merge semantics."""
import pytest
from src.crdt.bridge import PeerState, merge_peers, Network, NetworkConfig, EventType
from src.crdt.labor_crdt import (
CRDTLaborSystem,
AttestationEntry,
submit_attestation,
)
from src.crdt.intent_matching import Intent, IntentSet, add_intent
from src.crdt.dca_schedule import create_dca_schedule, DCAScheduleRegistry
# --- Helpers ---
def _peer_with_labor(contributor: str, entry_id: str) -> PeerState:
"""Create a peer with one labor attestation."""
state = PeerState()
entry = AttestationEntry(
entry_id=entry_id,
contribution_type="code",
units=5.0,
timestamp=1.0,
attester="admin",
)
state.labor = submit_attestation(state.labor, contributor, entry)
return state
def _peer_with_intent(intent_id: str) -> PeerState:
"""Create a peer with one intent."""
state = PeerState()
intent = Intent(
intent_id=intent_id,
maker="alice",
sell_token="USDC",
sell_amount=100.0,
buy_token="MYCO",
min_buy_amount=80.0,
valid_until=999.0,
)
state.intents = add_intent(state.intents, intent)
return state
def _peer_with_dca(schedule_id: str) -> PeerState:
"""Create a peer with one DCA schedule."""
state = PeerState()
schedule = create_dca_schedule(
schedule_id=schedule_id,
maker="alice",
total_amount=1000.0,
n_chunks=5,
start_time=0.0,
interval=1.0,
)
state.dca = DCAScheduleRegistry(schedules={schedule_id: schedule})
return state
# --- TestMergePeers ---
class TestMergePeers:
def test_commutativity(self):
a = _peer_with_labor("alice", "e1")
b = _peer_with_labor("bob", "e2")
ab = merge_peers(a, b)
ba = merge_peers(b, a)
# Both should have both contributors
assert set(ab.labor.logs.keys()) == {"alice", "bob"}
assert set(ba.labor.logs.keys()) == {"alice", "bob"}
def test_idempotency(self):
a = _peer_with_labor("alice", "e1")
aa = merge_peers(a, a)
assert set(aa.labor.logs.keys()) == {"alice"}
assert len(aa.labor.logs["alice"].entries) == 1
def test_merge_intents(self):
a = _peer_with_intent("i1")
b = _peer_with_intent("i2")
merged = merge_peers(a, b)
assert "i1" in merged.intents.intents
assert "i2" in merged.intents.intents
def test_merge_dca_schedules(self):
a = _peer_with_dca("s1")
b = _peer_with_dca("s2")
merged = merge_peers(a, b)
assert "s1" in merged.dca.schedules
assert "s2" in merged.dca.schedules
# --- TestNetworkConvergence ---
class TestNetworkConvergence:
def test_converges_after_mutations(self):
"""All peers see all mutations after enough gossip rounds."""
net = Network.create(["p1", "p2", "p3"], NetworkConfig(seed=42))
# Mutate different peers
net.mutate_peer("p1", lambda s: _peer_with_labor("alice", "e1"))
net.mutate_peer("p2", lambda s: _peer_with_intent("i1"))
net.mutate_peer("p3", lambda s: _peer_with_dca("s1"))
# Run enough rounds to propagate
for _ in range(20):
net.step()
# All peers should have converged
assert net.divergence() == 1
# All peers should have all data
for pid in ["p1", "p2", "p3"]:
state = net.peers[pid]
assert "alice" in state.labor.logs
assert "i1" in state.intents.intents
assert "s1" in state.dca.schedules
def test_empty_network_divergence_one(self):
net = Network.create(["p1", "p2"], NetworkConfig(seed=0))
assert net.divergence() == 1 # All empty = converged
class TestNetworkPartitions:
def test_partition_and_reconnect(self):
"""Peers reach consistency even after partitions."""
config = NetworkConfig(
partition_probability=0.5,
reconnect_delay=3.0,
seed=123,
)
net = Network.create(["p1", "p2", "p3", "p4"], config)
# Mutate before running
net.mutate_peer("p1", lambda s: _peer_with_labor("alice", "e1"))
net.mutate_peer("p4", lambda s: _peer_with_intent("i1"))
# Run many steps to ensure partition + reconnect + convergence
for _ in range(50):
net.step()
# After enough rounds, should converge
assert net.divergence() == 1
def test_convergence_time_none_during_partition(self):
"""convergence_time() returns None while partitioned."""
config = NetworkConfig(
partition_probability=1.0, # Force partition on first step
reconnect_delay=100.0, # Never reconnect during test
seed=42,
)
net = Network.create(["p1", "p2", "p3"], config)
net.mutate_peer("p1", lambda s: _peer_with_labor("alice", "e1"))
net.step() # Triggers partition
assert net.convergence_time() is None
def test_convergence_time_returns_float_after_reconnect(self):
config = NetworkConfig(
partition_probability=1.0,
reconnect_delay=3.0,
seed=42,
)
net = Network.create(["p1", "p2"], config)
net.mutate_peer("p1", lambda s: _peer_with_labor("alice", "e1"))
# Force partition then wait for reconnect + convergence
for _ in range(20):
net.step()
ct = net.convergence_time()
assert ct is not None
assert isinstance(ct, float)
class TestNetworkSimulate:
def test_simulate_returns_metrics(self):
config = NetworkConfig(seed=42)
net = Network.create(["p1", "p2", "p3"], config)
net.mutate_peer("p1", lambda s: _peer_with_labor("alice", "e1"))
result = net.simulate(steps=10)
assert "times" in result
assert "divergence" in result
assert "merge_count" in result
assert "partition_active" in result
assert "peer_signatures" in result
assert "events" in result
assert len(result["times"]) == 10
# After 10 rounds, should converge
assert result["divergence"][-1] == 1