myco-bonding-curve/tests/test_dca_schedule.py

349 lines
13 KiB
Python

"""Tests for CRDT DCA schedules — G-Set of chunks with monotone status."""
import pytest
from src.crdt.dca_schedule import (
DCAChunk,
DCASchedule,
DCAScheduleRegistry,
create_dca_schedule,
advance_chunk_status,
materialize_chunks,
reconcile_settled_chunks,
merge_dca_schedules,
schedule_completion_fraction,
_CHUNK_STATUS_RANK,
)
from src.crdt.intent_matching import Intent, IntentSet, add_intent, fill_intent
from src.crdt.batch_settlement import Settlement, Match
# --- TestCreateSchedule ---
class TestCreateSchedule:
def test_creates_correct_number_of_chunks(self):
sched = create_dca_schedule("s1", "alice", 100.0, 5, 0.0, 1.0)
assert len(sched.chunks) == 5
assert sched.total_amount == 100.0
def test_chunks_have_correct_amounts(self):
sched = create_dca_schedule("s1", "alice", 100.0, 4, 0.0, 1.0)
for chunk in sched.chunks.values():
assert abs(chunk.sell_amount - 25.0) < 1e-10
def test_chunks_are_scheduled_at_intervals(self):
sched = create_dca_schedule("s1", "alice", 100.0, 3, 10.0, 5.0)
times = sorted(c.scheduled_time for c in sched.chunks.values())
assert times == [10.0, 15.0, 20.0]
def test_all_chunks_start_pending(self):
sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0)
for chunk in sched.chunks.values():
assert chunk.status == "pending"
def test_chunk_ids_are_unique(self):
sched = create_dca_schedule("s1", "alice", 100.0, 10, 0.0, 1.0)
ids = [c.chunk_id for c in sched.chunks.values()]
assert len(ids) == len(set(ids))
def test_min_buy_ratio_applied(self):
sched = create_dca_schedule(
"s1", "alice", 100.0, 2, 0.0, 1.0, min_buy_ratio=0.9,
)
for chunk in sched.chunks.values():
assert abs(chunk.min_buy_amount - 50.0 * 0.9) < 1e-10
def test_custom_tokens(self):
sched = create_dca_schedule(
"s1", "alice", 100.0, 1, 0.0, 1.0,
sell_token="ETH", buy_token="DAI",
)
chunk = list(sched.chunks.values())[0]
assert chunk.sell_token == "ETH"
assert chunk.buy_token == "DAI"
# --- TestAdvanceStatus ---
class TestAdvanceStatus:
def test_pending_to_submitted(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(sched.chunks.keys())[0]
updated = advance_chunk_status(sched, cid, "submitted")
assert updated.chunks[cid].status == "submitted"
def test_submitted_to_executed(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(sched.chunks.keys())[0]
sched = advance_chunk_status(sched, cid, "submitted")
sched = advance_chunk_status(sched, cid, "executed")
assert sched.chunks[cid].status == "executed"
def test_backward_transition_rejected(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(sched.chunks.keys())[0]
sched = advance_chunk_status(sched, cid, "submitted")
sched_after = advance_chunk_status(sched, cid, "pending")
assert sched_after.chunks[cid].status == "submitted"
def test_same_status_transition_rejected(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(sched.chunks.keys())[0]
sched_after = advance_chunk_status(sched, cid, "pending")
assert sched_after.chunks[cid].status == "pending"
def test_nonexistent_chunk_returns_unchanged(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
sched_after = advance_chunk_status(sched, "nonexistent", "submitted")
assert sched == sched_after
def test_skip_to_executed_allowed(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(sched.chunks.keys())[0]
sched = advance_chunk_status(sched, cid, "executed")
assert sched.chunks[cid].status == "executed"
# --- TestMaterializeChunks ---
class TestMaterializeChunks:
def test_due_chunks_become_intents(self):
sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0)
registry = DCAScheduleRegistry(schedules={"s1": sched})
iset = IntentSet()
registry, iset, materialized = materialize_chunks(registry, iset, 1.5)
# Chunks at t=0.0 and t=1.0 are due; t=2.0 not yet
assert len(materialized) == 2
assert len(iset.intents) == 2
def test_materialized_chunks_become_submitted(self):
sched = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0)
registry = DCAScheduleRegistry(schedules={"s1": sched})
iset = IntentSet()
registry, iset, materialized = materialize_chunks(registry, iset, 10.0)
for cid in materialized:
assert registry.schedules["s1"].chunks[cid].status == "submitted"
def test_already_submitted_not_rematerialized(self):
sched = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0)
registry = DCAScheduleRegistry(schedules={"s1": sched})
iset = IntentSet()
registry, iset, mat1 = materialize_chunks(registry, iset, 10.0)
registry, iset, mat2 = materialize_chunks(registry, iset, 10.0)
assert len(mat2) == 0
def test_future_chunks_not_materialized(self):
sched = create_dca_schedule("s1", "alice", 100.0, 5, 10.0, 1.0)
registry = DCAScheduleRegistry(schedules={"s1": sched})
iset = IntentSet()
registry, iset, materialized = materialize_chunks(registry, iset, 5.0)
assert len(materialized) == 0
def test_intent_inherits_chunk_fields(self):
sched = create_dca_schedule(
"s1", "alice", 100.0, 1, 0.0, 1.0,
sell_token="ETH", buy_token="DAI", min_buy_ratio=0.9,
)
registry = DCAScheduleRegistry(schedules={"s1": sched})
iset = IntentSet()
registry, iset, materialized = materialize_chunks(registry, iset, 1.0)
intent = iset.intents[materialized[0]]
assert intent.maker == "alice"
assert intent.sell_token == "ETH"
assert intent.buy_token == "DAI"
assert abs(intent.min_buy_amount - 100.0 * 0.9) < 1e-10
# --- TestReconcileSettled ---
class TestReconcileSettled:
def test_cow_matched_chunks_become_executed(self):
sched = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0)
cids = list(sched.chunks.keys())
# Submit both
sched = advance_chunk_status(sched, cids[0], "submitted")
sched = advance_chunk_status(sched, cids[1], "submitted")
registry = DCAScheduleRegistry(schedules={"s1": sched})
# Settle first chunk via CoW match
settlement = Settlement(
cow_matches=[
Match(
intent_a_id=cids[0],
intent_b_id="other_intent",
clearing_price=1.0,
amount_a_sold=50.0,
amount_b_sold=50.0,
)
],
)
iset = IntentSet()
registry = reconcile_settled_chunks(registry, settlement, iset)
assert registry.schedules["s1"].chunks[cids[0]].status == "executed"
assert registry.schedules["s1"].chunks[cids[1]].status == "submitted"
def test_filled_intents_mark_executed(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(sched.chunks.keys())[0]
sched = advance_chunk_status(sched, cid, "submitted")
registry = DCAScheduleRegistry(schedules={"s1": sched})
# Mark intent as filled
iset = IntentSet()
intent = Intent(
intent_id=cid, maker="alice",
sell_token="USDC", sell_amount=100.0,
buy_token="MYCO", min_buy_amount=80.0,
valid_until=1000.0, status="filled",
)
iset = add_intent(iset, intent)
settlement = Settlement()
registry = reconcile_settled_chunks(registry, settlement, iset)
assert registry.schedules["s1"].chunks[cid].status == "executed"
def test_pending_chunks_not_affected(self):
sched = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(sched.chunks.keys())[0]
# Leave as pending
registry = DCAScheduleRegistry(schedules={"s1": sched})
settlement = Settlement(
cow_matches=[
Match(cid, "other", 1.0, 50.0, 50.0),
],
)
iset = IntentSet()
registry = reconcile_settled_chunks(registry, settlement, iset)
# Still pending — reconciliation only affects submitted chunks
assert registry.schedules["s1"].chunks[cid].status == "pending"
# --- TestMergeCRDT ---
class TestMergeCRDT:
def test_union_of_disjoint_schedules(self):
s1 = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0)
s2 = create_dca_schedule("s2", "bob", 200.0, 3, 0.0, 1.0)
reg_a = DCAScheduleRegistry(schedules={"s1": s1})
reg_b = DCAScheduleRegistry(schedules={"s2": s2})
merged = merge_dca_schedules(reg_a, reg_b)
assert "s1" in merged.schedules
assert "s2" in merged.schedules
def test_max_status_wins_for_duplicate_chunks(self):
s1 = create_dca_schedule("s1", "alice", 100.0, 1, 0.0, 1.0)
cid = list(s1.chunks.keys())[0]
# Replica A: submitted
s1a = advance_chunk_status(s1, cid, "submitted")
# Replica B: executed
s1b = advance_chunk_status(s1, cid, "executed")
reg_a = DCAScheduleRegistry(schedules={"s1": s1a})
reg_b = DCAScheduleRegistry(schedules={"s1": s1b})
merged = merge_dca_schedules(reg_a, reg_b)
assert merged.schedules["s1"].chunks[cid].status == "executed"
def test_merge_is_commutative(self):
s1 = create_dca_schedule("s1", "alice", 100.0, 2, 0.0, 1.0)
cid = list(s1.chunks.keys())[0]
s1a = advance_chunk_status(s1, cid, "submitted")
reg_a = DCAScheduleRegistry(schedules={"s1": s1a})
reg_b = DCAScheduleRegistry(schedules={"s1": s1})
ab = merge_dca_schedules(reg_a, reg_b)
ba = merge_dca_schedules(reg_b, reg_a)
for cid in ab.schedules["s1"].chunks:
assert ab.schedules["s1"].chunks[cid].status == ba.schedules["s1"].chunks[cid].status
def test_merge_is_idempotent(self):
s1 = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0)
reg = DCAScheduleRegistry(schedules={"s1": s1})
merged = merge_dca_schedules(reg, reg)
for cid in merged.schedules["s1"].chunks:
assert merged.schedules["s1"].chunks[cid].status == s1.chunks[cid].status
def test_merge_union_of_chunks(self):
"""Different replicas have different chunks of same schedule."""
s1a = DCASchedule(
schedule_id="s1", maker="alice",
chunks={"c1": DCAChunk("c1", "s1", "alice", "USDC", 50.0, "MYCO", 40.0, 0.0)},
total_amount=100.0,
)
s1b = DCASchedule(
schedule_id="s1", maker="alice",
chunks={"c2": DCAChunk("c2", "s1", "alice", "USDC", 50.0, "MYCO", 40.0, 1.0)},
total_amount=100.0,
)
reg_a = DCAScheduleRegistry(schedules={"s1": s1a})
reg_b = DCAScheduleRegistry(schedules={"s1": s1b})
merged = merge_dca_schedules(reg_a, reg_b)
assert "c1" in merged.schedules["s1"].chunks
assert "c2" in merged.schedules["s1"].chunks
# --- TestPartitionReconnect ---
class TestPartitionReconnect:
"""Simulate a network partition + reconnect scenario."""
def test_partition_and_merge(self):
"""Two replicas diverge, then merge — highest status wins."""
sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0)
reg = DCAScheduleRegistry(schedules={"s1": sched})
cids = sorted(sched.chunks.keys())
# Replica A processes chunk 0
reg_a = DCAScheduleRegistry(schedules={
"s1": advance_chunk_status(reg.schedules["s1"], cids[0], "submitted"),
})
reg_a = DCAScheduleRegistry(schedules={
"s1": advance_chunk_status(reg_a.schedules["s1"], cids[0], "executed"),
})
# Replica B processes chunk 1
reg_b = DCAScheduleRegistry(schedules={
"s1": advance_chunk_status(reg.schedules["s1"], cids[1], "submitted"),
})
merged = merge_dca_schedules(reg_a, reg_b)
assert merged.schedules["s1"].chunks[cids[0]].status == "executed"
assert merged.schedules["s1"].chunks[cids[1]].status == "submitted"
assert merged.schedules["s1"].chunks[cids[2]].status == "pending"
# --- TestScheduleCompletion ---
class TestScheduleCompletion:
def test_empty_schedule_zero(self):
sched = DCASchedule(schedule_id="s1", maker="alice")
assert schedule_completion_fraction(sched) == 0.0
def test_all_executed_one(self):
sched = create_dca_schedule("s1", "alice", 100.0, 3, 0.0, 1.0)
for cid in list(sched.chunks.keys()):
sched = advance_chunk_status(sched, cid, "executed")
assert abs(schedule_completion_fraction(sched) - 1.0) < 1e-10
def test_partial_completion(self):
sched = create_dca_schedule("s1", "alice", 100.0, 4, 0.0, 1.0)
cids = list(sched.chunks.keys())
sched = advance_chunk_status(sched, cids[0], "executed")
sched = advance_chunk_status(sched, cids[1], "executed")
assert abs(schedule_completion_fraction(sched) - 0.5) < 1e-10