"""Tests for CRDT DCA schedules — G-Set of chunks with monotone status.""" import pytest from src.crdt.dca_schedule import ( DCAChunk, DCASchedule, DCAScheduleRegistry, create_dca_schedule, advance_chunk_status, materialize_chunks, reconcile_settled_chunks, merge_dca_schedules, schedule_completion_fraction, _CHUNK_STATUS_RANK, ) from src.crdt.intent_matching import Intent, IntentSet, add_intent, fill_intent from src.crdt.batch_settlement import Settlement, Match # --- TestCreateSchedule --- class TestCreateSchedule: def test_creates_correct_number_of_chunks(self): sched = create_dca_schedule("s1", "alice", 100.0, 5, 0.0, 1.0) assert len(sched.chunks) == 5 assert sched.total_amount == 100.0 def test_chunks_have_correct_amounts(self): sched = create_dca_schedule("s1", "alice", 100.0, 4, 0.0, 1.0) for chunk in sched.chunks.values(): assert abs(chunk.sell_amount - 25.0) < 1e-10 def test_chunks_are_scheduled_at_intervals(self): sched = create_dca_schedule("s1", "alice", 100.0, 3, 10.0, 5.0) times = sorted(c.scheduled_time for c in sched.chunks.values()) assert times == [10.0, 15.0, 20.0] def test_all_chunks_start_pending(self): sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0) for chunk in sched.chunks.values(): assert chunk.status == "pending" def test_chunk_ids_are_unique(self): sched = create_dca_schedule("s1", "alice", 100.0, 10, 0.0, 1.0) ids = [c.chunk_id for c in sched.chunks.values()] assert len(ids) == len(set(ids)) def test_min_buy_ratio_applied(self): sched = create_dca_schedule( "s1", "alice", 100.0, 2, 0.0, 1.0, min_buy_ratio=0.9, ) for chunk in sched.chunks.values(): assert abs(chunk.min_buy_amount - 50.0 * 0.9) < 1e-10 def test_custom_tokens(self): sched = create_dca_schedule( "s1", "alice", 100.0, 1, 0.0, 1.0, sell_token="ETH", buy_token="DAI", ) chunk = list(sched.chunks.values())[0] assert chunk.sell_token == "ETH" assert chunk.buy_token == "DAI" # --- TestAdvanceStatus --- class TestAdvanceStatus: def test_pending_to_submitted(self): sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) cid = list(sched.chunks.keys())[0] updated = advance_chunk_status(sched, cid, "submitted") assert updated.chunks[cid].status == "submitted" def test_submitted_to_executed(self): sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) cid = list(sched.chunks.keys())[0] sched = advance_chunk_status(sched, cid, "submitted") sched = advance_chunk_status(sched, cid, "executed") assert sched.chunks[cid].status == "executed" def test_backward_transition_rejected(self): sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) cid = list(sched.chunks.keys())[0] sched = advance_chunk_status(sched, cid, "submitted") sched_after = advance_chunk_status(sched, cid, "pending") assert sched_after.chunks[cid].status == "submitted" def test_same_status_transition_rejected(self): sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) cid = list(sched.chunks.keys())[0] sched_after = advance_chunk_status(sched, cid, "pending") assert sched_after.chunks[cid].status == "pending" def test_nonexistent_chunk_returns_unchanged(self): sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) sched_after = advance_chunk_status(sched, "nonexistent", "submitted") assert sched == sched_after def test_skip_to_executed_allowed(self): sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) cid = list(sched.chunks.keys())[0] sched = advance_chunk_status(sched, cid, "executed") assert sched.chunks[cid].status == "executed" # --- TestMaterializeChunks --- class TestMaterializeChunks: def test_due_chunks_become_intents(self): sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0) registry = DCAScheduleRegistry(schedules={"s1": sched}) iset = IntentSet() registry, iset, materialized = materialize_chunks(registry, iset, 1.5) # Chunks at t=0.0 and t=1.0 are due; t=2.0 not yet assert len(materialized) == 2 assert len(iset.intents) == 2 def test_materialized_chunks_become_submitted(self): sched = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0) registry = DCAScheduleRegistry(schedules={"s1": sched}) iset = IntentSet() registry, iset, materialized = materialize_chunks(registry, iset, 10.0) for cid in materialized: assert registry.schedules["s1"].chunks[cid].status == "submitted" def test_already_submitted_not_rematerialized(self): sched = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0) registry = DCAScheduleRegistry(schedules={"s1": sched}) iset = IntentSet() registry, iset, mat1 = materialize_chunks(registry, iset, 10.0) registry, iset, mat2 = materialize_chunks(registry, iset, 10.0) assert len(mat2) == 0 def test_future_chunks_not_materialized(self): sched = create_dca_schedule("s1", "alice", 100.0, 5, 10.0, 1.0) registry = DCAScheduleRegistry(schedules={"s1": sched}) iset = IntentSet() registry, iset, materialized = materialize_chunks(registry, iset, 5.0) assert len(materialized) == 0 def test_intent_inherits_chunk_fields(self): sched = create_dca_schedule( "s1", "alice", 100.0, 1, 0.0, 1.0, sell_token="ETH", buy_token="DAI", min_buy_ratio=0.9, ) registry = DCAScheduleRegistry(schedules={"s1": sched}) iset = IntentSet() registry, iset, materialized = materialize_chunks(registry, iset, 1.0) intent = iset.intents[materialized[0]] assert intent.maker == "alice" assert intent.sell_token == "ETH" assert intent.buy_token == "DAI" assert abs(intent.min_buy_amount - 100.0 * 0.9) < 1e-10 # --- TestReconcileSettled --- class TestReconcileSettled: def test_cow_matched_chunks_become_executed(self): sched = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0) cids = list(sched.chunks.keys()) # Submit both sched = advance_chunk_status(sched, cids[0], "submitted") sched = advance_chunk_status(sched, cids[1], "submitted") registry = DCAScheduleRegistry(schedules={"s1": sched}) # Settle first chunk via CoW match settlement = Settlement( cow_matches=[ Match( intent_a_id=cids[0], intent_b_id="other_intent", clearing_price=1.0, amount_a_sold=50.0, amount_b_sold=50.0, ) ], ) iset = IntentSet() registry = reconcile_settled_chunks(registry, settlement, iset) assert registry.schedules["s1"].chunks[cids[0]].status == "executed" assert registry.schedules["s1"].chunks[cids[1]].status == "submitted" def test_filled_intents_mark_executed(self): sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) cid = list(sched.chunks.keys())[0] sched = advance_chunk_status(sched, cid, "submitted") registry = DCAScheduleRegistry(schedules={"s1": sched}) # Mark intent as filled iset = IntentSet() intent = Intent( intent_id=cid, maker="alice", sell_token="USDC", sell_amount=100.0, buy_token="MYCO", min_buy_amount=80.0, valid_until=1000.0, status="filled", ) iset = add_intent(iset, intent) settlement = Settlement() registry = reconcile_settled_chunks(registry, settlement, iset) assert registry.schedules["s1"].chunks[cid].status == "executed" def test_pending_chunks_not_affected(self): sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) cid = list(sched.chunks.keys())[0] # Leave as pending registry = DCAScheduleRegistry(schedules={"s1": sched}) settlement = Settlement( cow_matches=[ Match(cid, "other", 1.0, 50.0, 50.0), ], ) iset = IntentSet() registry = reconcile_settled_chunks(registry, settlement, iset) # Still pending — reconciliation only affects submitted chunks assert registry.schedules["s1"].chunks[cid].status == "pending" # --- TestMergeCRDT --- class TestMergeCRDT: def test_union_of_disjoint_schedules(self): s1 = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0) s2 = create_dca_schedule("s2", "bob", 200.0, 3, 0.0, 1.0) reg_a = DCAScheduleRegistry(schedules={"s1": s1}) reg_b = DCAScheduleRegistry(schedules={"s2": s2}) merged = merge_dca_schedules(reg_a, reg_b) assert "s1" in merged.schedules assert "s2" in merged.schedules def test_max_status_wins_for_duplicate_chunks(self): s1 = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0) cid = list(s1.chunks.keys())[0] # Replica A: submitted s1a = advance_chunk_status(s1, cid, "submitted") # Replica B: executed s1b = advance_chunk_status(s1, cid, "executed") reg_a = DCAScheduleRegistry(schedules={"s1": s1a}) reg_b = DCAScheduleRegistry(schedules={"s1": s1b}) merged = merge_dca_schedules(reg_a, reg_b) assert merged.schedules["s1"].chunks[cid].status == "executed" def test_merge_is_commutative(self): s1 = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0) cid = list(s1.chunks.keys())[0] s1a = advance_chunk_status(s1, cid, "submitted") reg_a = DCAScheduleRegistry(schedules={"s1": s1a}) reg_b = DCAScheduleRegistry(schedules={"s1": s1}) ab = merge_dca_schedules(reg_a, reg_b) ba = merge_dca_schedules(reg_b, reg_a) for cid in ab.schedules["s1"].chunks: assert ab.schedules["s1"].chunks[cid].status == ba.schedules["s1"].chunks[cid].status def test_merge_is_idempotent(self): s1 = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0) reg = DCAScheduleRegistry(schedules={"s1": s1}) merged = merge_dca_schedules(reg, reg) for cid in merged.schedules["s1"].chunks: assert merged.schedules["s1"].chunks[cid].status == s1.chunks[cid].status def test_merge_union_of_chunks(self): """Different replicas have different chunks of same schedule.""" s1a = DCASchedule( schedule_id="s1", maker="alice", chunks={"c1": DCAChunk("c1", "s1", "alice", "USDC", 50.0, "MYCO", 40.0, 0.0)}, total_amount=100.0, ) s1b = DCASchedule( schedule_id="s1", maker="alice", chunks={"c2": DCAChunk("c2", "s1", "alice", "USDC", 50.0, "MYCO", 40.0, 1.0)}, total_amount=100.0, ) reg_a = DCAScheduleRegistry(schedules={"s1": s1a}) reg_b = DCAScheduleRegistry(schedules={"s1": s1b}) merged = merge_dca_schedules(reg_a, reg_b) assert "c1" in merged.schedules["s1"].chunks assert "c2" in merged.schedules["s1"].chunks # --- TestPartitionReconnect --- class TestPartitionReconnect: """Simulate a network partition + reconnect scenario.""" def test_partition_and_merge(self): """Two replicas diverge, then merge — highest status wins.""" sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0) reg = DCAScheduleRegistry(schedules={"s1": sched}) cids = sorted(sched.chunks.keys()) # Replica A processes chunk 0 reg_a = DCAScheduleRegistry(schedules={ "s1": advance_chunk_status(reg.schedules["s1"], cids[0], "submitted"), }) reg_a = DCAScheduleRegistry(schedules={ "s1": advance_chunk_status(reg_a.schedules["s1"], cids[0], "executed"), }) # Replica B processes chunk 1 reg_b = DCAScheduleRegistry(schedules={ "s1": advance_chunk_status(reg.schedules["s1"], cids[1], "submitted"), }) merged = merge_dca_schedules(reg_a, reg_b) assert merged.schedules["s1"].chunks[cids[0]].status == "executed" assert merged.schedules["s1"].chunks[cids[1]].status == "submitted" assert merged.schedules["s1"].chunks[cids[2]].status == "pending" # --- TestScheduleCompletion --- class TestScheduleCompletion: def test_empty_schedule_zero(self): sched = DCASchedule(schedule_id="s1", maker="alice") assert schedule_completion_fraction(sched) == 0.0 def test_all_executed_one(self): sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0) for cid in list(sched.chunks.keys()): sched = advance_chunk_status(sched, cid, "executed") assert abs(schedule_completion_fraction(sched) - 1.0) < 1e-10 def test_partial_completion(self): sched = create_dca_schedule("s1", "alice", 100.0, 4, 0.0, 1.0) cids = list(sched.chunks.keys()) sched = advance_chunk_status(sched, cids[0], "executed") sched = advance_chunk_status(sched, cids[1], "executed") assert abs(schedule_completion_fraction(sched) - 0.5) < 1e-10