349 lines
13 KiB
Python
349 lines
13 KiB
Python
"""Tests for CRDT DCA schedules — G-Set of chunks with monotone status."""
|
|
|
|
import pytest
|
|
|
|
from src.crdt.dca_schedule import (
|
|
DCAChunk,
|
|
DCASchedule,
|
|
DCAScheduleRegistry,
|
|
create_dca_schedule,
|
|
advance_chunk_status,
|
|
materialize_chunks,
|
|
reconcile_settled_chunks,
|
|
merge_dca_schedules,
|
|
schedule_completion_fraction,
|
|
_CHUNK_STATUS_RANK,
|
|
)
|
|
from src.crdt.intent_matching import Intent, IntentSet, add_intent, fill_intent
|
|
from src.crdt.batch_settlement import Settlement, Match
|
|
|
|
|
|
# --- TestCreateSchedule ---
|
|
|
|
class TestCreateSchedule:
|
|
def test_creates_correct_number_of_chunks(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 5, 0.0, 1.0)
|
|
assert len(sched.chunks) == 5
|
|
assert sched.total_amount == 100.0
|
|
|
|
def test_chunks_have_correct_amounts(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 4, 0.0, 1.0)
|
|
for chunk in sched.chunks.values():
|
|
assert abs(chunk.sell_amount - 25.0) < 1e-10
|
|
|
|
def test_chunks_are_scheduled_at_intervals(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 3, 10.0, 5.0)
|
|
times = sorted(c.scheduled_time for c in sched.chunks.values())
|
|
assert times == [10.0, 15.0, 20.0]
|
|
|
|
def test_all_chunks_start_pending(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0)
|
|
for chunk in sched.chunks.values():
|
|
assert chunk.status == "pending"
|
|
|
|
def test_chunk_ids_are_unique(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 10, 0.0, 1.0)
|
|
ids = [c.chunk_id for c in sched.chunks.values()]
|
|
assert len(ids) == len(set(ids))
|
|
|
|
def test_min_buy_ratio_applied(self):
|
|
sched = create_dca_schedule(
|
|
"s1", "alice", 100.0, 2, 0.0, 1.0, min_buy_ratio=0.9,
|
|
)
|
|
for chunk in sched.chunks.values():
|
|
assert abs(chunk.min_buy_amount - 50.0 * 0.9) < 1e-10
|
|
|
|
def test_custom_tokens(self):
|
|
sched = create_dca_schedule(
|
|
"s1", "alice", 100.0, 1, 0.0, 1.0,
|
|
sell_token="ETH", buy_token="DAI",
|
|
)
|
|
chunk = list(sched.chunks.values())[0]
|
|
assert chunk.sell_token == "ETH"
|
|
assert chunk.buy_token == "DAI"
|
|
|
|
|
|
# --- TestAdvanceStatus ---
|
|
|
|
class TestAdvanceStatus:
|
|
def test_pending_to_submitted(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
|
|
cid = list(sched.chunks.keys())[0]
|
|
updated = advance_chunk_status(sched, cid, "submitted")
|
|
assert updated.chunks[cid].status == "submitted"
|
|
|
|
def test_submitted_to_executed(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
|
|
cid = list(sched.chunks.keys())[0]
|
|
sched = advance_chunk_status(sched, cid, "submitted")
|
|
sched = advance_chunk_status(sched, cid, "executed")
|
|
assert sched.chunks[cid].status == "executed"
|
|
|
|
def test_backward_transition_rejected(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
|
|
cid = list(sched.chunks.keys())[0]
|
|
sched = advance_chunk_status(sched, cid, "submitted")
|
|
sched_after = advance_chunk_status(sched, cid, "pending")
|
|
assert sched_after.chunks[cid].status == "submitted"
|
|
|
|
def test_same_status_transition_rejected(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
|
|
cid = list(sched.chunks.keys())[0]
|
|
sched_after = advance_chunk_status(sched, cid, "pending")
|
|
assert sched_after.chunks[cid].status == "pending"
|
|
|
|
def test_nonexistent_chunk_returns_unchanged(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
|
|
sched_after = advance_chunk_status(sched, "nonexistent", "submitted")
|
|
assert sched == sched_after
|
|
|
|
def test_skip_to_executed_allowed(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
|
|
cid = list(sched.chunks.keys())[0]
|
|
sched = advance_chunk_status(sched, cid, "executed")
|
|
assert sched.chunks[cid].status == "executed"
|
|
|
|
|
|
# --- TestMaterializeChunks ---
|
|
|
|
class TestMaterializeChunks:
|
|
def test_due_chunks_become_intents(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0)
|
|
registry = DCAScheduleRegistry(schedules={"s1": sched})
|
|
iset = IntentSet()
|
|
|
|
registry, iset, materialized = materialize_chunks(registry, iset, 1.5)
|
|
# Chunks at t=0.0 and t=1.0 are due; t=2.0 not yet
|
|
assert len(materialized) == 2
|
|
assert len(iset.intents) == 2
|
|
|
|
def test_materialized_chunks_become_submitted(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0)
|
|
registry = DCAScheduleRegistry(schedules={"s1": sched})
|
|
iset = IntentSet()
|
|
|
|
registry, iset, materialized = materialize_chunks(registry, iset, 10.0)
|
|
for cid in materialized:
|
|
assert registry.schedules["s1"].chunks[cid].status == "submitted"
|
|
|
|
def test_already_submitted_not_rematerialized(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0)
|
|
registry = DCAScheduleRegistry(schedules={"s1": sched})
|
|
iset = IntentSet()
|
|
|
|
registry, iset, mat1 = materialize_chunks(registry, iset, 10.0)
|
|
registry, iset, mat2 = materialize_chunks(registry, iset, 10.0)
|
|
assert len(mat2) == 0
|
|
|
|
def test_future_chunks_not_materialized(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 5, 10.0, 1.0)
|
|
registry = DCAScheduleRegistry(schedules={"s1": sched})
|
|
iset = IntentSet()
|
|
|
|
registry, iset, materialized = materialize_chunks(registry, iset, 5.0)
|
|
assert len(materialized) == 0
|
|
|
|
def test_intent_inherits_chunk_fields(self):
|
|
sched = create_dca_schedule(
|
|
"s1", "alice", 100.0, 1, 0.0, 1.0,
|
|
sell_token="ETH", buy_token="DAI", min_buy_ratio=0.9,
|
|
)
|
|
registry = DCAScheduleRegistry(schedules={"s1": sched})
|
|
iset = IntentSet()
|
|
|
|
registry, iset, materialized = materialize_chunks(registry, iset, 1.0)
|
|
intent = iset.intents[materialized[0]]
|
|
assert intent.maker == "alice"
|
|
assert intent.sell_token == "ETH"
|
|
assert intent.buy_token == "DAI"
|
|
assert abs(intent.min_buy_amount - 100.0 * 0.9) < 1e-10
|
|
|
|
|
|
# --- TestReconcileSettled ---
|
|
|
|
class TestReconcileSettled:
|
|
def test_cow_matched_chunks_become_executed(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0)
|
|
cids = list(sched.chunks.keys())
|
|
|
|
# Submit both
|
|
sched = advance_chunk_status(sched, cids[0], "submitted")
|
|
sched = advance_chunk_status(sched, cids[1], "submitted")
|
|
registry = DCAScheduleRegistry(schedules={"s1": sched})
|
|
|
|
# Settle first chunk via CoW match
|
|
settlement = Settlement(
|
|
cow_matches=[
|
|
Match(
|
|
intent_a_id=cids[0],
|
|
intent_b_id="other_intent",
|
|
clearing_price=1.0,
|
|
amount_a_sold=50.0,
|
|
amount_b_sold=50.0,
|
|
)
|
|
],
|
|
)
|
|
iset = IntentSet()
|
|
|
|
registry = reconcile_settled_chunks(registry, settlement, iset)
|
|
assert registry.schedules["s1"].chunks[cids[0]].status == "executed"
|
|
assert registry.schedules["s1"].chunks[cids[1]].status == "submitted"
|
|
|
|
def test_filled_intents_mark_executed(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
|
|
cid = list(sched.chunks.keys())[0]
|
|
sched = advance_chunk_status(sched, cid, "submitted")
|
|
registry = DCAScheduleRegistry(schedules={"s1": sched})
|
|
|
|
# Mark intent as filled
|
|
iset = IntentSet()
|
|
intent = Intent(
|
|
intent_id=cid, maker="alice",
|
|
sell_token="USDC", sell_amount=100.0,
|
|
buy_token="MYCO", min_buy_amount=80.0,
|
|
valid_until=1000.0, status="filled",
|
|
)
|
|
iset = add_intent(iset, intent)
|
|
|
|
settlement = Settlement()
|
|
registry = reconcile_settled_chunks(registry, settlement, iset)
|
|
assert registry.schedules["s1"].chunks[cid].status == "executed"
|
|
|
|
def test_pending_chunks_not_affected(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
|
|
cid = list(sched.chunks.keys())[0]
|
|
# Leave as pending
|
|
registry = DCAScheduleRegistry(schedules={"s1": sched})
|
|
|
|
settlement = Settlement(
|
|
cow_matches=[
|
|
Match(cid, "other", 1.0, 50.0, 50.0),
|
|
],
|
|
)
|
|
iset = IntentSet()
|
|
registry = reconcile_settled_chunks(registry, settlement, iset)
|
|
# Still pending — reconciliation only affects submitted chunks
|
|
assert registry.schedules["s1"].chunks[cid].status == "pending"
|
|
|
|
|
|
# --- TestMergeCRDT ---
|
|
|
|
class TestMergeCRDT:
|
|
def test_union_of_disjoint_schedules(self):
|
|
s1 = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0)
|
|
s2 = create_dca_schedule("s2", "bob", 200.0, 3, 0.0, 1.0)
|
|
reg_a = DCAScheduleRegistry(schedules={"s1": s1})
|
|
reg_b = DCAScheduleRegistry(schedules={"s2": s2})
|
|
|
|
merged = merge_dca_schedules(reg_a, reg_b)
|
|
assert "s1" in merged.schedules
|
|
assert "s2" in merged.schedules
|
|
|
|
def test_max_status_wins_for_duplicate_chunks(self):
|
|
s1 = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
|
|
cid = list(s1.chunks.keys())[0]
|
|
|
|
# Replica A: submitted
|
|
s1a = advance_chunk_status(s1, cid, "submitted")
|
|
# Replica B: executed
|
|
s1b = advance_chunk_status(s1, cid, "executed")
|
|
|
|
reg_a = DCAScheduleRegistry(schedules={"s1": s1a})
|
|
reg_b = DCAScheduleRegistry(schedules={"s1": s1b})
|
|
|
|
merged = merge_dca_schedules(reg_a, reg_b)
|
|
assert merged.schedules["s1"].chunks[cid].status == "executed"
|
|
|
|
def test_merge_is_commutative(self):
|
|
s1 = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0)
|
|
cid = list(s1.chunks.keys())[0]
|
|
s1a = advance_chunk_status(s1, cid, "submitted")
|
|
|
|
reg_a = DCAScheduleRegistry(schedules={"s1": s1a})
|
|
reg_b = DCAScheduleRegistry(schedules={"s1": s1})
|
|
|
|
ab = merge_dca_schedules(reg_a, reg_b)
|
|
ba = merge_dca_schedules(reg_b, reg_a)
|
|
|
|
for cid in ab.schedules["s1"].chunks:
|
|
assert ab.schedules["s1"].chunks[cid].status == ba.schedules["s1"].chunks[cid].status
|
|
|
|
def test_merge_is_idempotent(self):
|
|
s1 = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0)
|
|
reg = DCAScheduleRegistry(schedules={"s1": s1})
|
|
|
|
merged = merge_dca_schedules(reg, reg)
|
|
for cid in merged.schedules["s1"].chunks:
|
|
assert merged.schedules["s1"].chunks[cid].status == s1.chunks[cid].status
|
|
|
|
def test_merge_union_of_chunks(self):
|
|
"""Different replicas have different chunks of same schedule."""
|
|
s1a = DCASchedule(
|
|
schedule_id="s1", maker="alice",
|
|
chunks={"c1": DCAChunk("c1", "s1", "alice", "USDC", 50.0, "MYCO", 40.0, 0.0)},
|
|
total_amount=100.0,
|
|
)
|
|
s1b = DCASchedule(
|
|
schedule_id="s1", maker="alice",
|
|
chunks={"c2": DCAChunk("c2", "s1", "alice", "USDC", 50.0, "MYCO", 40.0, 1.0)},
|
|
total_amount=100.0,
|
|
)
|
|
reg_a = DCAScheduleRegistry(schedules={"s1": s1a})
|
|
reg_b = DCAScheduleRegistry(schedules={"s1": s1b})
|
|
|
|
merged = merge_dca_schedules(reg_a, reg_b)
|
|
assert "c1" in merged.schedules["s1"].chunks
|
|
assert "c2" in merged.schedules["s1"].chunks
|
|
|
|
|
|
# --- TestPartitionReconnect ---
|
|
|
|
class TestPartitionReconnect:
|
|
"""Simulate a network partition + reconnect scenario."""
|
|
|
|
def test_partition_and_merge(self):
|
|
"""Two replicas diverge, then merge — highest status wins."""
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0)
|
|
reg = DCAScheduleRegistry(schedules={"s1": sched})
|
|
|
|
cids = sorted(sched.chunks.keys())
|
|
|
|
# Replica A processes chunk 0
|
|
reg_a = DCAScheduleRegistry(schedules={
|
|
"s1": advance_chunk_status(reg.schedules["s1"], cids[0], "submitted"),
|
|
})
|
|
reg_a = DCAScheduleRegistry(schedules={
|
|
"s1": advance_chunk_status(reg_a.schedules["s1"], cids[0], "executed"),
|
|
})
|
|
|
|
# Replica B processes chunk 1
|
|
reg_b = DCAScheduleRegistry(schedules={
|
|
"s1": advance_chunk_status(reg.schedules["s1"], cids[1], "submitted"),
|
|
})
|
|
|
|
merged = merge_dca_schedules(reg_a, reg_b)
|
|
assert merged.schedules["s1"].chunks[cids[0]].status == "executed"
|
|
assert merged.schedules["s1"].chunks[cids[1]].status == "submitted"
|
|
assert merged.schedules["s1"].chunks[cids[2]].status == "pending"
|
|
|
|
|
|
# --- TestScheduleCompletion ---
|
|
|
|
class TestScheduleCompletion:
|
|
def test_empty_schedule_zero(self):
|
|
sched = DCASchedule(schedule_id="s1", maker="alice")
|
|
assert schedule_completion_fraction(sched) == 0.0
|
|
|
|
def test_all_executed_one(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0)
|
|
for cid in list(sched.chunks.keys()):
|
|
sched = advance_chunk_status(sched, cid, "executed")
|
|
assert abs(schedule_completion_fraction(sched) - 1.0) < 1e-10
|
|
|
|
def test_partial_completion(self):
|
|
sched = create_dca_schedule("s1", "alice", 100.0, 4, 0.0, 1.0)
|
|
cids = list(sched.chunks.keys())
|
|
sched = advance_chunk_status(sched, cids[0], "executed")
|
|
sched = advance_chunk_status(sched, cids[1], "executed")
|
|
assert abs(schedule_completion_fraction(sched) - 0.5) < 1e-10
|