feat: add CLI, dashboard, notebook, and CRDT bridge (350 tests)

- CRDT bridge: multi-peer gossip network simulation wrapping all 6 merge functions
- CLI tool: argparse entry point with simulate, compare-dca, signal-routing, stress-test
- Notebook 06: DCA strategies, signal routing, CRDT lifecycle visualizations
- Streamlit dashboard: 5-tab interactive visualization (plotly, optional dep)
- pyproject.toml: added myco CLI entry point + dashboard optional-deps

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jeff Emmett 2026-04-01 14:14:39 -07:00
parent e27d76d273
commit 1cb3701769
17 changed files with 2009 additions and 0 deletions

40
dashboard/app.py Normal file
View File

@ -0,0 +1,40 @@
"""MYCO Bonding Surface — Interactive Dashboard.
Run: streamlit run dashboard/app.py
"""
import streamlit as st
st.set_page_config(
page_title="MYCO Bonding Surface",
page_icon="🍄",
layout="wide",
)
st.title("MYCO Bonding Surface Dashboard")
st.caption("Interactive simulations for the multi-asset bonding curve with CRDT-native primitives.")
tab_launch, tab_dca, tab_signal, tab_stress, tab_crdt = st.tabs([
"Token Launch",
"DCA Explorer",
"Signal Router",
"Stress Tests",
"CRDT Flow",
])
from dashboard.tabs import token_launch, dca_explorer, signal_router, stress_tests, crdt_flow
with tab_launch:
token_launch.render()
with tab_dca:
dca_explorer.render()
with tab_signal:
signal_router.render()
with tab_stress:
stress_tests.render()
with tab_crdt:
crdt_flow.render()

203
dashboard/charts.py Normal file
View File

@ -0,0 +1,203 @@
"""Plotly figure factories for the MYCO dashboard.
All functions return go.Figure instances no Streamlit dependency.
"""
import plotly.graph_objects as go
from plotly.subplots import make_subplots
def fig_simulation_overview(result) -> go.Figure:
"""2x2 subplot: supply, reserve, backing ratio, minting breakdown."""
fig = make_subplots(
rows=2, cols=2,
subplot_titles=("Supply", "Reserve Value", "Backing Ratio", "Minting"),
)
t = result.times.tolist()
fig.add_trace(
go.Scatter(x=t, y=result.supply.tolist(), name="Supply", line=dict(color="#2196F3")),
row=1, col=1,
)
fig.add_trace(
go.Scatter(x=t, y=result.reserve_value.tolist(), name="Reserve", line=dict(color="#4CAF50")),
row=1, col=2,
)
fig.add_trace(
go.Scatter(x=t, y=result.backing_ratio.tolist(), name="Backing Ratio", line=dict(color="#FF9800")),
row=2, col=1,
)
fig.add_trace(
go.Scatter(x=t, y=result.financial_minted.tolist(), name="Financial", line=dict(color="#9C27B0")),
row=2, col=2,
)
fig.add_trace(
go.Scatter(x=t, y=result.commitment_minted.tolist(), name="Commitment", line=dict(color="#E91E63")),
row=2, col=2,
)
fig.update_layout(height=600, showlegend=True, template="plotly_white")
return fig
def fig_dca_comparison(results: dict) -> go.Figure:
"""Chunk prices + cumulative tokens for DCA strategies."""
fig = make_subplots(
rows=1, cols=2,
subplot_titles=("Price per Chunk", "Cumulative Tokens"),
)
colors = {"fixed": "#2196F3", "twap_aware": "#FF9800"}
for name, dca_result in results.items():
history = dca_result.order.history
chunks_x = list(range(len(history)))
prices = [h["price"] for h in history]
tokens = [h["tokens"] for h in history]
import numpy as np
cumulative = np.cumsum(tokens).tolist()
fig.add_trace(
go.Scatter(x=chunks_x, y=prices, name=f"{name} price",
mode="lines+markers", line=dict(color=colors[name])),
row=1, col=1,
)
fig.add_trace(
go.Scatter(x=chunks_x, y=cumulative, name=f"{name} cumulative",
mode="lines+markers", line=dict(color=colors[name])),
row=1, col=2,
)
# Lump sum reference
for name, dca_result in results.items():
fig.add_hline(
y=dca_result.lump_sum_tokens, row=1, col=2,
line_dash="dash", line_color="gray",
annotation_text="lump sum" if name == "fixed" else None,
)
break
fig.update_layout(height=400, template="plotly_white")
return fig
def fig_signal_routing(routing: dict) -> go.Figure:
"""3-panel: price+TWAP, deviation+volatility, adaptive params."""
fig = make_subplots(
rows=3, cols=1, shared_xaxes=True,
subplot_titles=("Price Trajectory", "Signals", "Adaptive Parameters"),
)
t = routing["times"]
fig.add_trace(
go.Scatter(x=t, y=routing.get("prices", t), name="Spot Price",
line=dict(color="#2196F3")),
row=1, col=1,
)
fig.add_trace(
go.Scatter(x=t, y=routing["twap_deviation"], name="TWAP Deviation",
line=dict(color="#FF5722")),
row=2, col=1,
)
fig.add_trace(
go.Scatter(x=t, y=routing["volatility"], name="Volatility",
line=dict(color="#9C27B0")),
row=2, col=1,
)
fig.add_trace(
go.Scatter(x=t, y=routing["flow_threshold"], name="Flow Threshold",
line=dict(color="#4CAF50")),
row=3, col=1,
)
fig.add_trace(
go.Scatter(x=t, y=routing["surge_fee_rate"], name="Surge Fee",
line=dict(color="#FF9800")),
row=3, col=1,
)
fig.update_layout(height=700, template="plotly_white")
return fig
def fig_bank_run_sweep(results: dict) -> go.Figure:
"""Overlaid reserve curves for different redemption fractions."""
fig = go.Figure()
colors = ["#2196F3", "#FF9800", "#E91E63", "#4CAF50", "#9C27B0"]
for i, (frac, result) in enumerate(results.items()):
fig.add_trace(go.Scatter(
x=result.times.tolist(),
y=result.reserve_value.tolist(),
name=f"{frac:.0%} redemption",
line=dict(color=colors[i % len(colors)]),
))
fig.update_layout(
title="Bank Run Stress Test — Reserve Curves",
xaxis_title="Time",
yaxis_title="Reserve Value",
height=500,
template="plotly_white",
)
return fig
def fig_crdt_divergence(sim_result: dict) -> go.Figure:
"""Peer divergence timeline + merge event scatter."""
fig = make_subplots(
rows=2, cols=1, shared_xaxes=True,
subplot_titles=("Peer Divergence", "Network Events"),
)
t = sim_result["times"]
# Divergence line
fig.add_trace(
go.Scatter(x=t, y=sim_result["divergence"], name="Divergence",
line=dict(color="#2196F3", width=2)),
row=1, col=1,
)
fig.add_hline(y=1, line_dash="dash", line_color="green", row=1, col=1)
# Shade partitions
for i, active in enumerate(sim_result["partition_active"]):
if active:
fig.add_vrect(
x0=t[i] - 0.5, x1=t[i] + 0.5,
fillcolor="red", opacity=0.1, line_width=0,
row=1, col=1,
)
# Event scatter
from src.crdt.bridge.events import EventType
events = sim_result.get("events", [])
peer_ids = list(sim_result.get("peer_signatures", {}).keys())
peer_y = {pid: i for i, pid in enumerate(peer_ids)}
for etype, color, marker, label in [
(EventType.MERGE, "blue", "circle", "Merge"),
(EventType.PARTITION, "red", "x", "Partition"),
(EventType.RECONNECT, "green", "triangle-up", "Reconnect"),
]:
filtered = [e for e in events if e.event_type == etype]
if filtered:
fig.add_trace(go.Scatter(
x=[e.time for e in filtered],
y=[peer_y.get(e.source_peer, 0) for e in filtered],
mode="markers",
marker=dict(color=color, symbol=marker, size=8),
name=label,
), row=2, col=1)
fig.update_yaxes(
tickvals=list(range(len(peer_ids))),
ticktext=peer_ids,
row=2, col=1,
)
fig.update_layout(height=600, template="plotly_white")
return fig

View File

@ -0,0 +1 @@
"""Dashboard tab modules."""

112
dashboard/tabs/crdt_flow.py Normal file
View File

@ -0,0 +1,112 @@
"""CRDT Flow tab — multi-peer network simulation."""
import streamlit as st
from src.crdt.bridge import Network, NetworkConfig, EventType
from src.crdt.labor_crdt import AttestationEntry, submit_attestation
from src.crdt.intent_matching import Intent, add_intent
from src.crdt.dca_schedule import create_dca_schedule, DCAScheduleRegistry
from dashboard.charts import fig_crdt_divergence
def render():
st.header("CRDT Network Simulation")
col1, col2, col3, col4 = st.columns(4)
with col1:
n_peers = st.slider("Peers", 2, 10, 5)
with col2:
partition_prob = st.slider("Partition probability", 0.0, 0.5, 0.15, 0.05)
with col3:
reconnect_delay = st.slider("Reconnect delay", 1.0, 20.0, 5.0, 1.0)
with col4:
sim_steps = st.slider("Steps", 10, 100, 40)
if st.button("Simulate Network", key="crdt_run"):
peer_ids = [f"p{i+1}" for i in range(n_peers)]
config = NetworkConfig(
partition_probability=partition_prob,
reconnect_delay=reconnect_delay,
seed=42,
)
net = Network.create(peer_ids, config)
# Inject mutations across peers
for i, pid in enumerate(peer_ids):
if i % 3 == 0:
entry = AttestationEntry(
entry_id=f"e{i}", contribution_type="code",
units=5.0, timestamp=1.0, attester="admin",
)
net.mutate_peer(pid, lambda s, e=entry, c=f"dev_{i}": _with_labor(s, c, e))
elif i % 3 == 1:
intent = Intent(
intent_id=f"i{i}", maker=f"user_{i}",
sell_token="USDC", sell_amount=100.0,
buy_token="MYCO", min_buy_amount=80.0,
valid_until=999.0,
)
net.mutate_peer(pid, lambda s, it=intent: _with_intent(s, it))
else:
schedule = create_dca_schedule(
schedule_id=f"s{i}", maker=f"maker_{i}",
total_amount=1000.0, n_chunks=5,
start_time=0.0, interval=1.0,
)
net.mutate_peer(pid, lambda s, sc=schedule: _with_dca(s, sc))
with st.spinner("Simulating network..."):
sim_result = net.simulate(steps=sim_steps)
sim_result["convergence_time"] = net.convergence_time()
st.session_state["crdt_result"] = sim_result
if "crdt_result" in st.session_state:
result = st.session_state["crdt_result"]
# Metrics
m1, m2, m3, m4 = st.columns(4)
m1.metric("Final Divergence", result["divergence"][-1])
m2.metric("Total Merges", result["merge_count"][-1])
n_partitions = sum(1 for e in result["events"] if e.event_type == EventType.PARTITION)
m3.metric("Partition Events", n_partitions)
ct = result.get("convergence_time")
m4.metric("Convergence Time", f"{ct:.1f}" if ct is not None else "N/A")
st.plotly_chart(fig_crdt_divergence(result), use_container_width=True)
# Per-peer state table
st.subheader("Per-Peer State Signatures (final)")
peer_sigs = result["peer_signatures"]
table_data = []
for pid, sigs in peer_sigs.items():
final = sigs[-1] if sigs else ()
table_data.append({
"Peer": pid,
"Labor Logs": final[0] if len(final) > 0 else 0,
"Labor Entries": final[1] if len(final) > 1 else 0,
"Intents": final[2] if len(final) > 2 else 0,
"Flow Peers": final[3] if len(final) > 3 else 0,
"Trust Peers": final[4] if len(final) > 4 else 0,
"Credit Lines": final[5] if len(final) > 5 else 0,
"DCA Schedules": final[6] if len(final) > 6 else 0,
})
st.dataframe(table_data, use_container_width=True)
def _with_labor(state, contributor, entry):
state.labor = submit_attestation(state.labor, contributor, entry)
return state
def _with_intent(state, intent):
state.intents = add_intent(state.intents, intent)
return state
def _with_dca(state, schedule):
state.dca = DCAScheduleRegistry(
schedules={**state.dca.schedules, schedule.schedule_id: schedule}
)
return state

View File

@ -0,0 +1,113 @@
"""DCA Explorer tab — strategy comparison + subscription DCA."""
import streamlit as st
from src.composed.simulator import scenario_dca_comparison
from src.composed.subscription_dca import (
SubscriptionDCAConfig,
simulate_subscription_dca,
)
from src.composed.myco_surface import MycoSystemConfig
from src.commitments.subscription import SubscriptionTier
from dashboard.charts import fig_dca_comparison
import plotly.graph_objects as go
from plotly.subplots import make_subplots
def render():
st.header("DCA Explorer")
tab_compare, tab_subscription = st.tabs(["Strategy Comparison", "Subscription DCA"])
with tab_compare:
_render_comparison()
with tab_subscription:
_render_subscription()
def _render_comparison():
col1, col2, col3 = st.columns(3)
with col1:
total = st.number_input("Total amount ($)", 1000, 100_000, 10_000, step=1000, key="dca_total")
with col2:
chunks = st.slider("Chunks", 5, 50, 20, key="dca_chunks")
with col3:
interval = st.number_input("Interval", 0.5, 10.0, 1.0, step=0.5, key="dca_interval")
if st.button("Compare Strategies", key="dca_compare_run"):
with st.spinner("Running DCA comparison..."):
results = scenario_dca_comparison(
total_amount=float(total),
n_chunks=chunks,
interval=float(interval),
)
st.session_state["dca_results"] = results
if "dca_results" in st.session_state:
results = st.session_state["dca_results"]
cols = st.columns(len(results))
for i, (name, r) in enumerate(results.items()):
with cols[i]:
st.subheader(name)
st.metric("Tokens", f"{r.order.total_tokens_received:.2f}")
st.metric("Avg Price", f"{r.order.avg_price:.6f}")
st.metric("DCA Advantage", f"{r.dca_advantage:+.4f}")
st.plotly_chart(fig_dca_comparison(results), use_container_width=True)
def _render_subscription():
col1, col2, col3 = st.columns(3)
with col1:
tier_payment = st.number_input("Payment/period ($)", 50, 5000, 100, step=50, key="sub_payment")
with col2:
n_chunks = st.slider("DCA chunks", 2, 20, 5, key="sub_chunks")
with col3:
duration = st.slider("Duration (days)", 90, 730, 365, key="sub_duration")
if st.button("Simulate Subscription DCA", key="sub_run"):
tiers = {
"custom": SubscriptionTier(
name="custom",
payment_per_period=float(tier_payment),
period_length=30.0,
loyalty_multiplier_max=2.0,
loyalty_halflife=90.0,
),
}
config = SubscriptionDCAConfig(n_chunks=n_chunks, spread_fraction=0.8)
sys_config = MycoSystemConfig(n_reserve_assets=3)
subscribers = [("subscriber_1", "custom", 0.0)]
with st.spinner("Running subscription DCA..."):
sim = simulate_subscription_dca(
tiers=tiers, config=config, system_config=sys_config,
subscribers=subscribers, duration=float(duration), dt=1.0,
)
st.session_state["sub_dca_result"] = sim
if "sub_dca_result" in st.session_state:
sim = st.session_state["sub_dca_result"]
t = sim["times"].tolist()
fig = make_subplots(
rows=1, cols=2,
subplot_titles=("Token Accumulation", "Total Supply"),
)
fig.add_trace(go.Scatter(
x=t, y=sim["total_curve_tokens"].tolist(),
name="Curve Tokens", line=dict(color="#2196F3"),
), row=1, col=1)
fig.add_trace(go.Scatter(
x=t, y=sim["total_loyalty_bonus"].tolist(),
name="Loyalty Bonus", line=dict(color="#FF9800", dash="dash"),
), row=1, col=1)
fig.add_trace(go.Scatter(
x=t, y=sim["total_supply"].tolist(),
name="Supply", line=dict(color="#4CAF50"),
), row=1, col=2)
fig.update_layout(height=400, template="plotly_white")
st.plotly_chart(fig, use_container_width=True)

View File

@ -0,0 +1,65 @@
"""Signal Router visualization tab — live on slider change."""
import numpy as np
import streamlit as st
from src.primitives.signal_router import (
AdaptiveParams,
SignalRouterConfig,
simulate_signal_routing,
)
from dashboard.charts import fig_signal_routing
def render():
st.header("Signal Router")
st.caption("Parameters update live — no button needed (sub-ms computation).")
col1, col2 = st.columns(2)
with col1:
trajectory = st.selectbox(
"Price trajectory",
["stable", "bull", "crash", "volatile"],
index=3,
)
steps = st.slider("Steps", 50, 500, 200)
with col2:
k_vol_flow = st.slider("k_vol_flow", 0.0, 5.0, 1.0, 0.1)
k_dev_alpha = st.slider("k_dev_alpha", 0.0, 5.0, 2.0, 0.1)
k_vol_fee = st.slider("k_vol_fee", 0.0, 5.0, 1.5, 0.1)
k_oracle_vel = st.slider("k_oracle_vel", 0.0, 0.1, 0.01, 0.001, format="%.3f")
# Generate trajectory
t = np.linspace(0, 1, steps)
traj_map = {
"stable": lambda: [1.0] * steps,
"bull": lambda: (1.0 + t * 0.5).tolist(),
"crash": lambda: (1.0 - 0.4 * t + 0.1 * np.sin(t * 20)).tolist(),
"volatile": lambda: (1.0 + 0.3 * np.sin(t * 30) + 0.1 * np.cos(t * 7)).tolist(),
}
prices = traj_map[trajectory]()
base = AdaptiveParams(
flow_threshold=0.1,
pamm_alpha_bar=10.0,
surge_fee_rate=0.05,
oracle_multiplier_velocity=0.0,
)
config = SignalRouterConfig(
k_vol_flow=k_vol_flow,
k_dev_alpha=k_dev_alpha,
k_vol_fee=k_vol_fee,
k_oracle_vel=k_oracle_vel,
)
result = simulate_signal_routing(base, config, prices)
result["prices"] = prices # Add for chart
st.plotly_chart(fig_signal_routing(result), use_container_width=True)
# Summary metrics
m1, m2, m3, m4 = st.columns(4)
m1.metric("Final Flow Threshold", f"{result['flow_threshold'][-1]:.6f}")
m2.metric("Final Alpha Bar", f"{result['pamm_alpha_bar'][-1]:.2f}")
m3.metric("Final Surge Fee", f"{result['surge_fee_rate'][-1]:.6f}")
m4.metric("Final Volatility", f"{result['volatility'][-1]:.6f}")

View File

@ -0,0 +1,53 @@
"""Stress Tests tab — bank run sweep."""
import streamlit as st
from src.composed.simulator import scenario_bank_run
from dashboard.charts import fig_bank_run_sweep
def render():
st.header("Stress Tests")
col1, col2 = st.columns(2)
with col1:
fractions_str = st.text_input(
"Redemption fractions (comma-separated)",
"0.02, 0.05, 0.10, 0.20",
)
with col2:
initial_reserve = st.number_input(
"Initial reserve ($)", 10_000, 1_000_000, 100_000, step=10_000,
)
if st.button("Run Stress Test", key="stress_run"):
fractions = [float(f.strip()) for f in fractions_str.split(",")]
results = {}
progress = st.progress(0)
for i, frac in enumerate(fractions):
with st.spinner(f"Running {frac:.0%} redemption..."):
results[frac] = scenario_bank_run(
initial_reserve=float(initial_reserve),
redemption_fraction=frac,
)
progress.progress((i + 1) / len(fractions))
st.session_state["stress_results"] = results
if "stress_results" in st.session_state:
results = st.session_state["stress_results"]
# Summary table
cols = st.columns(len(results))
for i, (frac, result) in enumerate(results.items()):
final_ratio = result.backing_ratio[-1]
with cols[i]:
st.metric(
f"{frac:.0%} Redemption",
f"Backing: {final_ratio:.4f}",
delta="Survived" if final_ratio > 0.5 else "Failed",
delta_color="normal" if final_ratio > 0.5 else "inverse",
)
st.plotly_chart(fig_bank_run_sweep(results), use_container_width=True)

View File

@ -0,0 +1,42 @@
"""Token Launch scenario tab."""
import streamlit as st
from src.composed.simulator import scenario_token_launch
from dashboard.charts import fig_simulation_overview
def render():
st.header("Token Launch Simulation")
col1, col2, col3, col4 = st.columns(4)
with col1:
n_assets = st.slider("Reserve assets", 2, 6, 3)
with col2:
n_depositors = st.slider("Depositors", 10, 200, 50)
with col3:
total_raise = st.number_input("Total raise ($)", 10_000, 1_000_000, 100_000, step=10_000)
with col4:
duration = st.slider("Duration (days)", 30, 365, 90)
if st.button("Run Simulation", key="token_launch_run"):
with st.spinner("Running token launch simulation..."):
result = scenario_token_launch(
n_assets=n_assets,
total_raise=float(total_raise),
n_depositors=n_depositors,
duration=float(duration),
)
st.session_state["token_launch_result"] = result
if "token_launch_result" in st.session_state:
result = st.session_state["token_launch_result"]
# Metric cards
m1, m2, m3, m4 = st.columns(4)
m1.metric("Final Supply", f"{result.supply[-1]:,.0f}")
m2.metric("Final Reserve", f"${result.reserve_value[-1]:,.0f}")
m3.metric("Backing Ratio", f"{result.backing_ratio[-1]:.4f}")
m4.metric("Total Minted", f"{result.financial_minted[-1] + result.commitment_minted[-1]:,.0f}")
st.plotly_chart(fig_simulation_overview(result), use_container_width=True)

View File

@ -0,0 +1,477 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# DCA Strategies & Signal Routing\n",
"\n",
"Interactive exploration of the DCA execution engine, signal router,\n",
"subscription DCA, and CRDT-native schedule replication.\n",
"\n",
"**Sections:**\n",
"1. Signal Router Response across price trajectories\n",
"2. DCA vs Lump Sum comparison\n",
"3. Subscription DCA + Loyalty bonuses\n",
"4. CRDT Schedule Lifecycle (create → materialize → reconcile → merge)\n",
"5. CRDT Network Simulation — divergence and convergence"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%matplotlib inline\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"from matplotlib.colors import to_rgba\n",
"\n",
"plt.rcParams['figure.figsize'] = (12, 6)\n",
"plt.rcParams['figure.dpi'] = 100"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1. Signal Router Response\n",
"\n",
"Four price trajectories (stable, bull, crash, volatile) fed through the\n",
"signal router. Each subplot shows how adaptive parameters respond."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from src.primitives.signal_router import (\n",
" AdaptiveParams, SignalRouterConfig, simulate_signal_routing,\n",
")\n",
"\n",
"base_params = AdaptiveParams(\n",
" flow_threshold=0.1,\n",
" pamm_alpha_bar=10.0,\n",
" surge_fee_rate=0.05,\n",
" oracle_multiplier_velocity=0.0,\n",
")\n",
"config = SignalRouterConfig()\n",
"\n",
"# Generate trajectories\n",
"steps = 200\n",
"t = np.linspace(0, 1, steps)\n",
"trajectories = {\n",
" 'stable': [1.0] * steps,\n",
" 'bull': (1.0 + t * 0.5).tolist(),\n",
" 'crash': (1.0 - 0.4 * t + 0.1 * np.sin(t * 20)).tolist(),\n",
" 'volatile': (1.0 + 0.3 * np.sin(t * 30) + 0.1 * np.cos(t * 7)).tolist(),\n",
"}\n",
"\n",
"params_to_plot = ['flow_threshold', 'pamm_alpha_bar', 'surge_fee_rate', 'oracle_velocity']\n",
"\n",
"fig, axes = plt.subplots(4, 4, figsize=(16, 12), sharex='col')\n",
"\n",
"for j, (traj_name, prices) in enumerate(trajectories.items()):\n",
" result = simulate_signal_routing(base_params, config, prices)\n",
" for i, param in enumerate(params_to_plot):\n",
" ax = axes[i, j]\n",
" ax.plot(result['times'], result[param], linewidth=1.2)\n",
" if i == 0:\n",
" ax.set_title(traj_name, fontsize=11, fontweight='bold')\n",
" if j == 0:\n",
" ax.set_ylabel(param.replace('_', ' '), fontsize=9)\n",
" if i == 3:\n",
" ax.set_xlabel('Time')\n",
"\n",
"fig.suptitle('Signal Router: Parameter Response to Price Trajectories', fontsize=13)\n",
"fig.tight_layout()\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2. DCA vs Lump Sum\n",
"\n",
"Compare fixed-size DCA and TWAP-aware DCA against lump-sum deposit.\n",
"Shows chunk prices, cumulative tokens, and the curve path."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from src.composed.simulator import scenario_dca_comparison\n",
"\n",
"results = scenario_dca_comparison(\n",
" total_amount=10_000.0,\n",
" n_chunks=20,\n",
" interval=1.0,\n",
")\n",
"\n",
"fig, axes = plt.subplots(1, 3, figsize=(15, 5))\n",
"\n",
"colors = {'fixed': '#2196F3', 'twap_aware': '#FF9800'}\n",
"\n",
"for name, dca_result in results.items():\n",
" history = dca_result.order.history\n",
" chunks_x = list(range(len(history)))\n",
" prices = [h['price'] for h in history]\n",
" tokens = [h['tokens'] for h in history]\n",
" cumulative = np.cumsum(tokens)\n",
"\n",
" axes[0].plot(chunks_x, prices, '-o', color=colors[name],\n",
" label=name, markersize=4)\n",
" axes[1].bar(\n",
" [x + (0.2 if name == 'twap_aware' else -0.2) for x in chunks_x],\n",
" tokens, width=0.35, color=colors[name], label=name, alpha=0.7,\n",
" )\n",
" axes[2].plot(chunks_x, cumulative, '-s', color=colors[name],\n",
" label=name, markersize=4)\n",
"\n",
"# Add lump sum reference line\n",
"for name, dca_result in results.items():\n",
" axes[2].axhline(dca_result.lump_sum_tokens, color='gray',\n",
" linestyle='--', alpha=0.5,\n",
" label='lump sum' if name == 'fixed' else None)\n",
"\n",
"axes[0].set_title('Price per Chunk')\n",
"axes[0].set_xlabel('Chunk #')\n",
"axes[0].set_ylabel('Effective Price')\n",
"axes[0].legend()\n",
"\n",
"axes[1].set_title('Tokens per Chunk')\n",
"axes[1].set_xlabel('Chunk #')\n",
"axes[1].set_ylabel('Tokens')\n",
"axes[1].legend()\n",
"\n",
"axes[2].set_title('Cumulative Tokens')\n",
"axes[2].set_xlabel('Chunk #')\n",
"axes[2].set_ylabel('Total Tokens')\n",
"axes[2].legend()\n",
"\n",
"fig.suptitle('DCA vs Lump Sum Comparison', fontsize=13)\n",
"fig.tight_layout()\n",
"plt.show()\n",
"\n",
"# Summary table\n",
"for name, r in results.items():\n",
" print(f\"{name:12s} tokens={r.order.total_tokens_received:.2f} \"\n",
" f\"avg_price={r.order.avg_price:.6f} \"\n",
" f\"dca_adv={r.dca_advantage:+.4f}\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. Subscription DCA + Loyalty\n",
"\n",
"Simulates subscription DCA over 1 year for two tiers.\n",
"Shows curve-minted tokens versus loyalty bonus accumulation."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from src.composed.subscription_dca import (\n",
" SubscriptionDCAConfig, simulate_subscription_dca,\n",
")\n",
"from src.composed.myco_surface import MycoSystemConfig\n",
"from src.commitments.subscription import SubscriptionTier\n",
"\n",
"tiers = {\n",
" 'basic': SubscriptionTier(\n",
" name='basic', payment_per_period=100.0, period_length=30.0,\n",
" loyalty_multiplier_max=1.5, loyalty_halflife=90.0,\n",
" ),\n",
" 'premium': SubscriptionTier(\n",
" name='premium', payment_per_period=500.0, period_length=30.0,\n",
" loyalty_multiplier_max=2.0, loyalty_halflife=60.0,\n",
" ),\n",
"}\n",
"\n",
"config = SubscriptionDCAConfig(n_chunks=5, spread_fraction=0.8)\n",
"sys_config = MycoSystemConfig(n_reserve_assets=3)\n",
"\n",
"subscribers = [\n",
" ('alice', 'basic', 0.0),\n",
" ('bob', 'premium', 0.0),\n",
" ('carol', 'basic', 30.0),\n",
"]\n",
"\n",
"sim = simulate_subscription_dca(\n",
" tiers=tiers, config=config, system_config=sys_config,\n",
" subscribers=subscribers, duration=365.0, dt=1.0,\n",
")\n",
"\n",
"fig, axes = plt.subplots(1, 3, figsize=(15, 5))\n",
"\n",
"axes[0].plot(sim['times'], sim['total_curve_tokens'], label='Curve Tokens')\n",
"axes[0].plot(sim['times'], sim['total_loyalty_bonus'], label='Loyalty Bonus', linestyle='--')\n",
"axes[0].set_title('Token Accumulation')\n",
"axes[0].set_xlabel('Days')\n",
"axes[0].set_ylabel('Tokens')\n",
"axes[0].legend()\n",
"\n",
"total_tokens = sim['total_curve_tokens'] + sim['total_loyalty_bonus']\n",
"axes[1].fill_between(sim['times'], 0, sim['total_curve_tokens'],\n",
" alpha=0.5, label='Curve')\n",
"axes[1].fill_between(sim['times'], sim['total_curve_tokens'], total_tokens,\n",
" alpha=0.5, label='Loyalty Bonus')\n",
"axes[1].set_title('Stacked Token Sources')\n",
"axes[1].set_xlabel('Days')\n",
"axes[1].set_ylabel('Tokens')\n",
"axes[1].legend()\n",
"\n",
"axes[2].plot(sim['times'], sim['total_supply'])\n",
"axes[2].set_title('Total $MYCO Supply')\n",
"axes[2].set_xlabel('Days')\n",
"axes[2].set_ylabel('Supply')\n",
"\n",
"fig.suptitle('Subscription DCA + Loyalty Over 1 Year', fontsize=13)\n",
"fig.tight_layout()\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4. CRDT Schedule Lifecycle\n",
"\n",
"Demonstrates the DCA schedule lifecycle:\n",
"`create_dca_schedule` → `materialize_chunks` → `reconcile_settled_chunks` → `merge_dca_schedules`\n",
"\n",
"Shows status bar charts across two replicas."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from src.crdt.dca_schedule import (\n",
" create_dca_schedule, DCAScheduleRegistry,\n",
" materialize_chunks, merge_dca_schedules,\n",
")\n",
"from src.crdt.intent_matching import IntentSet\n",
"\n",
"# Create a schedule on replica A\n",
"schedule = create_dca_schedule(\n",
" schedule_id='s1', maker='alice',\n",
" total_amount=1000.0, n_chunks=10,\n",
" start_time=0.0, interval=1.0,\n",
")\n",
"reg_a = DCAScheduleRegistry(schedules={'s1': schedule})\n",
"reg_b = DCAScheduleRegistry() # Empty replica B\n",
"\n",
"# Lifecycle stages\n",
"stages = []\n",
"\n",
"# Stage 1: Initial (A has schedule, B empty)\n",
"def count_statuses(reg, sid='s1'):\n",
" if sid not in reg.schedules:\n",
" return {'pending': 0, 'submitted': 0, 'executed': 0}\n",
" chunks = reg.schedules[sid].chunks.values()\n",
" return {\n",
" 'pending': sum(1 for c in chunks if c.status == 'pending'),\n",
" 'submitted': sum(1 for c in chunks if c.status == 'submitted'),\n",
" 'executed': sum(1 for c in chunks if c.status == 'executed'),\n",
" }\n",
"\n",
"stages.append(('A: Created', count_statuses(reg_a), 'B: Empty', count_statuses(reg_b)))\n",
"\n",
"# Stage 2: Materialize first 5 chunks on A\n",
"iset = IntentSet()\n",
"reg_a, iset, materialized = materialize_chunks(reg_a, iset, current_time=5.0)\n",
"stages.append(('A: Materialized', count_statuses(reg_a), 'B: Empty', count_statuses(reg_b)))\n",
"\n",
"# Stage 3: Merge A → B\n",
"reg_b = merge_dca_schedules(reg_b, reg_a)\n",
"stages.append(('A: Post-merge', count_statuses(reg_a), 'B: Post-merge', count_statuses(reg_b)))\n",
"\n",
"# Stage 4: Materialize remaining on B\n",
"iset_b = IntentSet()\n",
"reg_b, iset_b, _ = materialize_chunks(reg_b, iset_b, current_time=10.0)\n",
"stages.append(('A: Stale', count_statuses(reg_a), 'B: All materialized', count_statuses(reg_b)))\n",
"\n",
"# Stage 5: Final merge\n",
"merged = merge_dca_schedules(reg_a, reg_b)\n",
"stages.append(('A: Final', count_statuses(merged), 'B: Final', count_statuses(merged)))\n",
"\n",
"# Plot\n",
"fig, axes = plt.subplots(2, len(stages), figsize=(16, 6), sharey='row')\n",
"status_colors = {'pending': '#90CAF9', 'submitted': '#FFA726', 'executed': '#66BB6A'}\n",
"\n",
"for col, (a_label, a_counts, b_label, b_counts) in enumerate(stages):\n",
" for row, (label, counts) in enumerate([(a_label, a_counts), (b_label, b_counts)]):\n",
" ax = axes[row, col]\n",
" statuses = list(counts.keys())\n",
" values = list(counts.values())\n",
" bars = ax.bar(statuses, values,\n",
" color=[status_colors[s] for s in statuses])\n",
" ax.set_title(label, fontsize=9)\n",
" ax.set_ylim(0, 11)\n",
" for bar, v in zip(bars, values):\n",
" if v > 0:\n",
" ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.2,\n",
" str(v), ha='center', fontsize=8)\n",
"\n",
"axes[0, 0].set_ylabel('Replica A')\n",
"axes[1, 0].set_ylabel('Replica B')\n",
"fig.suptitle('CRDT DCA Schedule Lifecycle: Create → Materialize → Merge', fontsize=13)\n",
"fig.tight_layout()\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5. CRDT Network Simulation\n",
"\n",
"Multi-peer gossip network with partitions. Tracks divergence over time\n",
"and merge events as they propagate state across peers."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from src.crdt.bridge import Network, NetworkConfig, EventType\n",
"from src.crdt.labor_crdt import AttestationEntry, submit_attestation\n",
"from src.crdt.intent_matching import Intent, add_intent\n",
"\n",
"# Create a 5-peer network with partitions\n",
"config = NetworkConfig(\n",
" partition_probability=0.15,\n",
" reconnect_delay=5.0,\n",
" seed=42,\n",
")\n",
"net = Network.create(['p1', 'p2', 'p3', 'p4', 'p5'], config)\n",
"\n",
"# Inject mutations at different peers\n",
"def add_labor(contributor, entry_id):\n",
" entry = AttestationEntry(\n",
" entry_id=entry_id, contribution_type='code',\n",
" units=5.0, timestamp=1.0, attester='admin',\n",
" )\n",
" def mutate(state):\n",
" state.labor = submit_attestation(state.labor, contributor, entry)\n",
" return state\n",
" return mutate\n",
"\n",
"def add_an_intent(intent_id):\n",
" intent = Intent(\n",
" intent_id=intent_id, maker='alice',\n",
" sell_token='USDC', sell_amount=100.0,\n",
" buy_token='MYCO', min_buy_amount=80.0,\n",
" valid_until=999.0,\n",
" )\n",
" def mutate(state):\n",
" state.intents = add_intent(state.intents, intent)\n",
" return state\n",
" return mutate\n",
"\n",
"# Spread mutations across peers\n",
"net.mutate_peer('p1', add_labor('alice', 'e1'))\n",
"net.mutate_peer('p2', add_labor('bob', 'e2'))\n",
"net.mutate_peer('p3', add_an_intent('i1'))\n",
"net.mutate_peer('p4', add_an_intent('i2'))\n",
"net.mutate_peer('p5', add_labor('carol', 'e3'))\n",
"\n",
"# Simulate\n",
"sim = net.simulate(steps=40)\n",
"\n",
"# Plot\n",
"fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)\n",
"\n",
"# Divergence timeline\n",
"ax1 = axes[0]\n",
"ax1.plot(sim['times'], sim['divergence'], 'b-', linewidth=2, label='Distinct signatures')\n",
"ax1.axhline(1, color='green', linestyle='--', alpha=0.5, label='Converged')\n",
"\n",
"# Shade partition periods\n",
"for i, active in enumerate(sim['partition_active']):\n",
" if active:\n",
" ax1.axvspan(sim['times'][i] - 0.5, sim['times'][i] + 0.5,\n",
" alpha=0.2, color='red')\n",
"\n",
"ax1.set_ylabel('Divergence (# unique states)')\n",
"ax1.set_title('CRDT Network: Peer Divergence Over Time')\n",
"ax1.legend()\n",
"ax1.set_ylim(0, max(sim['divergence']) + 1)\n",
"\n",
"# Merge event scatter\n",
"ax2 = axes[1]\n",
"merge_events = [e for e in sim['events'] if e.event_type == EventType.MERGE]\n",
"partition_events = [e for e in sim['events'] if e.event_type == EventType.PARTITION]\n",
"reconnect_events = [e for e in sim['events'] if e.event_type == EventType.RECONNECT]\n",
"\n",
"peer_ids = list(net.peers.keys())\n",
"peer_y = {pid: i for i, pid in enumerate(peer_ids)}\n",
"\n",
"if merge_events:\n",
" ax2.scatter(\n",
" [e.time for e in merge_events],\n",
" [peer_y[e.source_peer] for e in merge_events],\n",
" c='blue', marker='o', s=20, alpha=0.6, label='Merge',\n",
" )\n",
"if partition_events:\n",
" ax2.scatter(\n",
" [e.time for e in partition_events],\n",
" [peer_y[e.source_peer] for e in partition_events],\n",
" c='red', marker='x', s=60, label='Partition',\n",
" )\n",
"if reconnect_events:\n",
" ax2.scatter(\n",
" [e.time for e in reconnect_events],\n",
" [peer_y[e.source_peer] for e in reconnect_events],\n",
" c='green', marker='^', s=60, label='Reconnect',\n",
" )\n",
"\n",
"ax2.set_yticks(range(len(peer_ids)))\n",
"ax2.set_yticklabels(peer_ids)\n",
"ax2.set_xlabel('Time')\n",
"ax2.set_ylabel('Peer')\n",
"ax2.set_title('Merge & Partition Events')\n",
"ax2.legend()\n",
"\n",
"fig.tight_layout()\n",
"plt.show()\n",
"\n",
"ct = net.convergence_time()\n",
"print(f\"Convergence time: {ct}\")\n",
"print(f\"Final divergence: {sim['divergence'][-1]}\")\n",
"print(f\"Total merges: {len(merge_events)}\")\n",
"print(f\"Total partitions: {len(partition_events)}\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.11.0"
}
},
"nbformat": 4,
"nbformat_minor": 4
}

View File

@ -10,8 +10,12 @@ dependencies = [
"sympy>=1.13",
]
[project.scripts]
myco = "src.cli:main"
[project.optional-dependencies]
dev = ["pytest>=8.0", "jupyter>=1.0", "ipywidgets>=8.0"]
dashboard = ["streamlit>=1.35", "plotly>=5.20"]
[build-system]
requires = ["setuptools>=69.0"]

319
src/cli.py Normal file
View File

@ -0,0 +1,319 @@
"""MYCO bonding surface CLI — run simulations from the terminal.
Thin argparse wrapper calling existing simulation functions. Stdlib only.
Usage:
myco simulate token-launch --n-assets 3 --duration 90
myco compare-dca --total 10000 --chunks 20
myco signal-routing --trajectory volatile --steps 100
myco stress-test --fractions 0.02 0.05 0.10 0.20
"""
import argparse
import json
import sys
import math
import numpy as np
def _price_trajectory(name: str, steps: int) -> list[float]:
"""Generate a named price trajectory."""
t = np.linspace(0, 1, steps)
if name == "stable":
return [1.0] * steps
elif name == "bull":
return (1.0 + t * 0.5).tolist()
elif name == "crash":
return (1.0 - 0.4 * t + 0.1 * np.sin(t * 20)).tolist()
elif name == "volatile":
return (1.0 + 0.3 * np.sin(t * 30) + 0.1 * np.cos(t * 7)).tolist()
else:
raise ValueError(f"Unknown trajectory: {name}")
def _save_chart(fig, path: str) -> None:
"""Save a matplotlib figure to disk."""
fig.savefig(path, dpi=150, bbox_inches="tight")
print(f"Chart saved to {path}")
def cmd_simulate(args: argparse.Namespace) -> int:
"""Run a simulation scenario."""
from src.composed.simulator import (
scenario_token_launch,
scenario_bank_run,
scenario_mixed_issuance,
)
scenario_map = {
"token-launch": lambda: scenario_token_launch(
n_assets=args.n_assets, duration=args.duration,
),
"bank-run": lambda: scenario_bank_run(
n_assets=args.n_assets, duration=args.duration,
),
"mixed-issuance": lambda: scenario_mixed_issuance(
n_assets=args.n_assets, duration=args.duration,
),
}
if args.scenario not in scenario_map:
print(f"Unknown scenario: {args.scenario}", file=sys.stderr)
print(f"Available: {', '.join(scenario_map.keys())}", file=sys.stderr)
return 1
result = scenario_map[args.scenario]()
if args.output == "json":
data = {
k: v.tolist() for k, v in vars(result).items()
}
print(json.dumps(data, indent=2))
else:
metrics = result
print(f"Simulation: {args.scenario}")
print(f" Duration: {args.duration}")
print(f" Final supply: {metrics.supply[-1]:.2f}")
print(f" Final reserve: {metrics.reserve_value[-1]:.2f}")
print(f" Backing ratio: {metrics.backing_ratio[-1]:.4f}")
print(f" Total minted: {metrics.financial_minted[-1] + metrics.commitment_minted[-1]:.2f}")
print(f" Total redeemed: {metrics.total_redeemed[-1]:.2f}")
if args.save_chart:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from src.utils.plotting import plot_time_series
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
plot_time_series(result.times, {"Supply": result.supply}, ax=axes[0, 0])
plot_time_series(result.times, {"Reserve": result.reserve_value}, ax=axes[0, 1])
plot_time_series(result.times, {"Backing Ratio": result.backing_ratio}, ax=axes[1, 0])
plot_time_series(
result.times,
{"Financial": result.financial_minted, "Commitment": result.commitment_minted},
ax=axes[1, 1],
)
fig.suptitle(f"MYCO Simulation: {args.scenario}")
fig.tight_layout()
_save_chart(fig, args.save_chart)
plt.close(fig)
return 0
def cmd_compare_dca(args: argparse.Namespace) -> int:
"""Compare DCA strategies."""
from src.composed.simulator import scenario_dca_comparison
results = scenario_dca_comparison(
total_amount=args.total,
n_chunks=args.chunks,
interval=args.interval,
)
if args.output == "json":
data = {}
for name, dca_result in results.items():
data[name] = {
"total_tokens": dca_result.order.total_tokens_received,
"total_spent": dca_result.order.total_spent,
"avg_price": dca_result.order.avg_price,
"twap_price": dca_result.twap_price,
"lump_sum_tokens": dca_result.lump_sum_tokens,
"dca_advantage": dca_result.dca_advantage,
}
print(json.dumps(data, indent=2))
else:
for name, dca_result in results.items():
print(f"\nStrategy: {name}")
print(f" Tokens received: {dca_result.order.total_tokens_received:.4f}")
print(f" Avg price: {dca_result.order.avg_price:.6f}")
print(f" TWAP price: {dca_result.twap_price:.6f}")
print(f" Lump sum tokens: {dca_result.lump_sum_tokens:.4f}")
print(f" DCA advantage: {dca_result.dca_advantage:+.4f}")
if args.save_chart:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
for name, dca_result in results.items():
history = dca_result.order.history
chunks_x = list(range(len(history)))
tokens = [h["tokens"] for h in history]
prices = [h["price"] for h in history]
axes[0].bar(
[x + (0.2 if name == "twap_aware" else -0.2) for x in chunks_x],
tokens, width=0.35, label=name, alpha=0.7,
)
axes[1].plot(chunks_x, prices, "-o", label=name, markersize=4)
axes[0].set_title("Tokens per Chunk")
axes[0].set_xlabel("Chunk")
axes[0].legend()
axes[1].set_title("Price per Chunk")
axes[1].set_xlabel("Chunk")
axes[1].legend()
fig.suptitle("DCA Strategy Comparison")
fig.tight_layout()
_save_chart(fig, args.save_chart)
plt.close(fig)
return 0
def cmd_signal_routing(args: argparse.Namespace) -> int:
"""Run signal routing simulation."""
from src.primitives.signal_router import (
AdaptiveParams,
SignalRouterConfig,
simulate_signal_routing,
)
base = AdaptiveParams(
flow_threshold=0.1,
pamm_alpha_bar=10.0,
surge_fee_rate=0.05,
oracle_multiplier_velocity=0.0,
)
config = SignalRouterConfig()
prices = _price_trajectory(args.trajectory, args.steps)
result = simulate_signal_routing(base, config, prices)
if args.output == "json":
print(json.dumps(result, indent=2))
else:
print(f"Signal Routing: {args.trajectory} ({args.steps} steps)")
print(f" Final flow_threshold: {result['flow_threshold'][-1]:.6f}")
print(f" Final pamm_alpha_bar: {result['pamm_alpha_bar'][-1]:.4f}")
print(f" Final surge_fee_rate: {result['surge_fee_rate'][-1]:.6f}")
print(f" Final oracle_velocity: {result['oracle_velocity'][-1]:.6f}")
print(f" Final volatility: {result['volatility'][-1]:.6f}")
if args.save_chart:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
fig, axes = plt.subplots(3, 1, figsize=(12, 10), sharex=True)
t = result["times"]
axes[0].plot(t, prices, label="Spot Price")
axes[0].set_ylabel("Price")
axes[0].set_title(f"Signal Router: {args.trajectory}")
axes[0].legend()
axes[1].plot(t, result["twap_deviation"], label="TWAP Deviation")
axes[1].plot(t, result["volatility"], label="Volatility")
axes[1].set_ylabel("Signal Value")
axes[1].legend()
axes[2].plot(t, result["flow_threshold"], label="Flow Threshold")
axes[2].plot(t, result["surge_fee_rate"], label="Surge Fee")
axes[2].set_ylabel("Parameter Value")
axes[2].set_xlabel("Time")
axes[2].legend()
fig.tight_layout()
_save_chart(fig, args.save_chart)
plt.close(fig)
return 0
def cmd_stress_test(args: argparse.Namespace) -> int:
"""Run bank run stress tests at multiple redemption fractions."""
from src.composed.simulator import scenario_bank_run
results = {}
for frac in args.fractions:
result = scenario_bank_run(redemption_fraction=frac)
results[frac] = result
final_ratio = result.backing_ratio[-1]
final_reserve = result.reserve_value[-1]
survived = "YES" if final_ratio > 0.5 else "NO"
print(f" Fraction {frac:.2f}: backing={final_ratio:.4f} reserve={final_reserve:.0f} survived={survived}")
if args.save_chart:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(10, 6))
for frac, result in results.items():
ax.plot(result.times, result.reserve_value, label=f"{frac:.0%} redemption")
ax.set_xlabel("Time")
ax.set_ylabel("Reserve Value")
ax.set_title("Bank Run Stress Test — Reserve Curves")
ax.legend()
fig.tight_layout()
_save_chart(fig, args.save_chart)
plt.close(fig)
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="myco",
description="MYCO bonding surface simulation toolkit",
)
sub = parser.add_subparsers(dest="command")
# simulate
p_sim = sub.add_parser("simulate", help="Run a simulation scenario")
p_sim.add_argument("scenario", choices=["token-launch", "bank-run", "mixed-issuance"])
p_sim.add_argument("--n-assets", type=int, default=3)
p_sim.add_argument("--duration", type=float, default=90.0)
p_sim.add_argument("--output", choices=["text", "json"], default="text")
p_sim.add_argument("--save-chart", type=str, default=None)
# compare-dca
p_dca = sub.add_parser("compare-dca", help="Compare DCA strategies")
p_dca.add_argument("--total", type=float, default=10_000.0)
p_dca.add_argument("--chunks", type=int, default=20)
p_dca.add_argument("--interval", type=float, default=1.0)
p_dca.add_argument("--output", choices=["text", "json"], default="text")
p_dca.add_argument("--save-chart", type=str, default=None)
# signal-routing
p_sig = sub.add_parser("signal-routing", help="Signal routing simulation")
p_sig.add_argument("--trajectory", choices=["stable", "bull", "crash", "volatile"], default="stable")
p_sig.add_argument("--steps", type=int, default=100)
p_sig.add_argument("--output", choices=["text", "json"], default="text")
p_sig.add_argument("--save-chart", type=str, default=None)
# stress-test
p_stress = sub.add_parser("stress-test", help="Bank run stress tests")
p_stress.add_argument(
"--fractions", type=float, nargs="+",
default=[0.02, 0.05, 0.10, 0.20],
)
p_stress.add_argument("--save-chart", type=str, default=None)
return parser
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
if args.command is None:
parser.print_help()
return 0
handlers = {
"simulate": cmd_simulate,
"compare-dca": cmd_compare_dca,
"signal-routing": cmd_signal_routing,
"stress-test": cmd_stress_test,
}
return handlers[args.command](args)
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,15 @@
"""CRDT Bridge — multi-peer simulation for all 6 CRDT modules."""
from src.crdt.bridge.peer import PeerState, merge_peers
from src.crdt.bridge.network import Network, NetworkConfig
from src.crdt.bridge.events import EventType, MergeEvent, EventLog
__all__ = [
"PeerState",
"merge_peers",
"Network",
"NetworkConfig",
"EventType",
"MergeEvent",
"EventLog",
]

41
src/crdt/bridge/events.py Normal file
View File

@ -0,0 +1,41 @@
"""Event tracking for CRDT bridge merge operations.
Provides an append-only event log for observing merge activity,
partition detection, and convergence timing across the simulated network.
"""
from dataclasses import dataclass, field
from enum import Enum
class EventType(Enum):
"""Types of events in the CRDT bridge network."""
MERGE = "merge"
PARTITION = "partition"
RECONNECT = "reconnect"
MUTATE = "mutate"
@dataclass(frozen=True)
class MergeEvent:
"""A single event in the network simulation."""
time: float
event_type: EventType
source_peer: str
target_peer: str = ""
detail: str = ""
@dataclass
class EventLog:
"""Append-only log of network events."""
events: list[MergeEvent] = field(default_factory=list)
def append(self, event: MergeEvent) -> None:
self.events.append(event)
def filter_by_type(self, event_type: EventType) -> list[MergeEvent]:
return [e for e in self.events if e.event_type == event_type]
def events_in_range(self, start: float, end: float) -> list[MergeEvent]:
return [e for e in self.events if start <= e.time <= end]

193
src/crdt/bridge/network.py Normal file
View File

@ -0,0 +1,193 @@
"""Network simulation — gossip rounds, partitions, and convergence.
In-process multi-peer simulation wrapping merge_peers(). No real networking
architecturally clean for future transport plug-in.
"""
import random
from dataclasses import dataclass, field
from typing import Callable
from src.crdt.bridge.peer import PeerState, merge_peers, _state_signature
from src.crdt.bridge.events import EventType, MergeEvent, EventLog
@dataclass
class NetworkConfig:
"""Configuration for network simulation."""
gossip_interval: float = 1.0 # Time between gossip rounds
partition_probability: float = 0.0 # Chance of partition per step
reconnect_delay: float = 5.0 # Steps before reconnect after partition
seed: int | None = None # RNG seed for reproducibility
@dataclass
class Network:
"""Simulated multi-peer CRDT network with gossip-based sync.
Each step, connected peers exchange state via merge_peers().
Partitions temporarily prevent merges between subgroups.
"""
peers: dict[str, PeerState] = field(default_factory=dict)
config: NetworkConfig = field(default_factory=NetworkConfig)
event_log: EventLog = field(default_factory=EventLog)
time: float = 0.0
# Partition state: set of frozensets representing disconnected groups
_partitioned: set[str] = field(default_factory=set)
_partition_start: float = -1.0
_rng: random.Random = field(default_factory=lambda: random.Random())
def __post_init__(self):
if self.config.seed is not None:
self._rng = random.Random(self.config.seed)
@classmethod
def create(cls, peer_ids: list[str], config: NetworkConfig | None = None) -> "Network":
"""Create a network with the given peer IDs."""
cfg = config or NetworkConfig()
net = cls(
peers={pid: PeerState() for pid in peer_ids},
config=cfg,
)
if cfg.seed is not None:
net._rng = random.Random(cfg.seed)
return net
def mutate_peer(self, peer_id: str, fn: Callable[[PeerState], PeerState]) -> None:
"""Apply a local mutation to a single peer's state."""
if peer_id not in self.peers:
return
self.peers[peer_id] = fn(self.peers[peer_id])
self.event_log.append(MergeEvent(
time=self.time,
event_type=EventType.MUTATE,
source_peer=peer_id,
detail="local_mutation",
))
def step(self, dt: float = 1.0) -> None:
"""Execute one gossip round.
Each connected peer pair exchanges state. Partitioned peers
are excluded from gossip.
"""
self.time += dt
# Handle partition lifecycle
self._maybe_partition()
self._maybe_reconnect()
# Gossip: each peer syncs with one random connected peer
peer_ids = list(self.peers.keys())
connected = [p for p in peer_ids if p not in self._partitioned]
if len(connected) < 2:
return
# Random pairs for this round
self._rng.shuffle(connected)
for i in range(0, len(connected) - 1, 2):
a_id, b_id = connected[i], connected[i + 1]
merged = merge_peers(self.peers[a_id], self.peers[b_id])
self.peers[a_id] = merged
self.peers[b_id] = merged
self.event_log.append(MergeEvent(
time=self.time,
event_type=EventType.MERGE,
source_peer=a_id,
target_peer=b_id,
))
def _maybe_partition(self) -> None:
"""Randomly partition a subset of peers."""
if self._partitioned:
return # Already partitioned
if self._rng.random() < self.config.partition_probability:
peer_ids = list(self.peers.keys())
# Partition off ~half the peers
n_partitioned = max(1, len(peer_ids) // 2)
self._partitioned = set(self._rng.sample(peer_ids, n_partitioned))
self._partition_start = self.time
for pid in self._partitioned:
self.event_log.append(MergeEvent(
time=self.time,
event_type=EventType.PARTITION,
source_peer=pid,
detail=f"partitioned ({len(self._partitioned)} peers)",
))
def _maybe_reconnect(self) -> None:
"""Reconnect partitioned peers after delay."""
if not self._partitioned:
return
if self.time - self._partition_start >= self.config.reconnect_delay:
for pid in self._partitioned:
self.event_log.append(MergeEvent(
time=self.time,
event_type=EventType.RECONNECT,
source_peer=pid,
))
self._partitioned.clear()
self._partition_start = -1.0
def divergence(self) -> int:
"""Count the number of distinct state signatures across peers.
Returns 1 when fully converged, >1 when peers have diverged.
"""
sigs = {pid: _state_signature(state) for pid, state in self.peers.items()}
return len(set(sigs.values()))
def convergence_time(self) -> float | None:
"""Return the time at which convergence was first reached, or None.
Scans the event log for the earliest time after all partitions
resolved where divergence == 1.
"""
if self._partitioned:
return None # Still partitioned
if self.divergence() > 1:
return None # Not yet converged
# Find last reconnect event
reconnects = self.event_log.filter_by_type(EventType.RECONNECT)
if reconnects:
return reconnects[-1].time
# No partitions ever occurred — converged from the start
merges = self.event_log.filter_by_type(EventType.MERGE)
if merges:
return merges[0].time
return 0.0
def simulate(self, steps: int, dt: float = 1.0) -> dict:
"""Run multiple steps and collect metrics.
Returns:
Dict with keys: times, divergence, merge_count, partition_active,
peer_signatures.
"""
times = []
divergences = []
merge_counts = []
partition_active = []
peer_sigs: dict[str, list[tuple]] = {pid: [] for pid in self.peers}
for _ in range(steps):
self.step(dt)
times.append(self.time)
divergences.append(self.divergence())
merge_counts.append(
len(self.event_log.filter_by_type(EventType.MERGE))
)
partition_active.append(len(self._partitioned) > 0)
for pid in self.peers:
peer_sigs[pid].append(_state_signature(self.peers[pid]))
return {
"times": times,
"divergence": divergences,
"merge_count": merge_counts,
"partition_active": partition_active,
"peer_signatures": peer_sigs,
"events": self.event_log.events,
}

63
src/crdt/bridge/peer.py Normal file
View File

@ -0,0 +1,63 @@
"""PeerState and merge_peers — composite CRDT state per peer.
Wraps all 6 CRDT modules into a single PeerState dataclass and provides
a unified merge_peers() that calls each module's merge function.
"""
from dataclasses import dataclass, field
from src.crdt.labor_crdt import CRDTLaborSystem, merge_labor
from src.crdt.intent_matching import IntentSet, merge_intents
from src.crdt.flow_dampening import CRDTFlowSystem, merge_flow_states
from src.crdt.trust_pamm import PeerTrustState, merge_trust
from src.crdt.credit_invariant import CreditPortfolio, merge_portfolios
from src.crdt.dca_schedule import DCAScheduleRegistry, merge_dca_schedules
@dataclass
class PeerState:
"""Composite CRDT state for a single peer in the network.
Holds one instance of each of the 6 CRDT modules. All fields
default to empty/initial state so peers start clean.
"""
labor: CRDTLaborSystem = field(default_factory=CRDTLaborSystem)
intents: IntentSet = field(default_factory=IntentSet)
flows: CRDTFlowSystem = field(default_factory=CRDTFlowSystem)
trust: PeerTrustState = field(default_factory=PeerTrustState)
credit: CreditPortfolio = field(default_factory=CreditPortfolio)
dca: DCAScheduleRegistry = field(default_factory=DCAScheduleRegistry)
def merge_peers(a: PeerState, b: PeerState) -> PeerState:
"""Merge two peer states by calling all 6 CRDT merge functions.
Commutative, associative, and idempotent inherits these properties
from each underlying merge function.
"""
return PeerState(
labor=merge_labor(a.labor, b.labor),
intents=merge_intents(a.intents, b.intents),
flows=merge_flow_states(a.flows, b.flows),
trust=merge_trust(a.trust, b.trust),
credit=merge_portfolios(a.credit, b.credit),
dca=merge_dca_schedules(a.dca, b.dca),
)
def _state_signature(peer: PeerState) -> tuple:
"""Hash-like signature of a peer's CRDT collection sizes.
Used for divergence detection peers with the same signature
have converged (same number of entries in each collection).
"""
return (
len(peer.labor.logs),
sum(len(log.entries) for log in peer.labor.logs.values()),
len(peer.intents.intents),
len(peer.flows.peers),
len(peer.trust.peers),
len(peer.credit.lines),
len(peer.dca.schedules),
sum(len(s.chunks) for s in peer.dca.schedules.values()),
)

75
tests/test_cli.py Normal file
View File

@ -0,0 +1,75 @@
"""Tests for the MYCO CLI tool."""
import json
import sys
from io import StringIO
import pytest
from src.cli import main
class TestCLISimulate:
def test_simulate_token_launch_text(self, capsys):
code = main(["simulate", "token-launch", "--duration", "30"])
assert code == 0
out = capsys.readouterr().out
assert "Final supply" in out
def test_simulate_token_launch_json(self, capsys):
code = main(["simulate", "token-launch", "--duration", "30", "--output", "json"])
assert code == 0
data = json.loads(capsys.readouterr().out)
assert "supply" in data
assert "reserve_value" in data
def test_simulate_unknown_scenario(self):
with pytest.raises(SystemExit) as exc_info:
main(["simulate", "nonexistent"])
assert exc_info.value.code == 2
class TestCLIDCA:
def test_compare_dca_text(self, capsys):
code = main(["compare-dca", "--total", "5000", "--chunks", "5"])
assert code == 0
out = capsys.readouterr().out
assert "fixed" in out
assert "twap_aware" in out
def test_compare_dca_json(self, capsys):
code = main(["compare-dca", "--total", "5000", "--chunks", "5", "--output", "json"])
assert code == 0
data = json.loads(capsys.readouterr().out)
assert "fixed" in data
assert "twap_aware" in data
class TestCLISignalRouting:
def test_signal_routing_text(self, capsys):
code = main(["signal-routing", "--trajectory", "volatile", "--steps", "20"])
assert code == 0
out = capsys.readouterr().out
assert "flow_threshold" in out
def test_signal_routing_json(self, capsys):
code = main(["signal-routing", "--trajectory", "stable", "--steps", "10", "--output", "json"])
assert code == 0
data = json.loads(capsys.readouterr().out)
assert "flow_threshold" in data
assert len(data["times"]) == 10
class TestCLIStressTest:
def test_stress_test(self, capsys):
code = main(["stress-test", "--fractions", "0.02", "0.05"])
assert code == 0
out = capsys.readouterr().out
assert "0.02" in out
assert "0.05" in out
class TestCLIHelp:
def test_no_command_shows_help(self, capsys):
code = main([])
assert code == 0

193
tests/test_crdt_bridge.py Normal file
View File

@ -0,0 +1,193 @@
"""Tests for CRDT bridge — multi-peer simulation and merge semantics."""
import pytest
from src.crdt.bridge import PeerState, merge_peers, Network, NetworkConfig, EventType
from src.crdt.labor_crdt import (
CRDTLaborSystem,
AttestationEntry,
submit_attestation,
)
from src.crdt.intent_matching import Intent, IntentSet, add_intent
from src.crdt.dca_schedule import create_dca_schedule, DCAScheduleRegistry
# --- Helpers ---
def _peer_with_labor(contributor: str, entry_id: str) -> PeerState:
"""Create a peer with one labor attestation."""
state = PeerState()
entry = AttestationEntry(
entry_id=entry_id,
contribution_type="code",
units=5.0,
timestamp=1.0,
attester="admin",
)
state.labor = submit_attestation(state.labor, contributor, entry)
return state
def _peer_with_intent(intent_id: str) -> PeerState:
"""Create a peer with one intent."""
state = PeerState()
intent = Intent(
intent_id=intent_id,
maker="alice",
sell_token="USDC",
sell_amount=100.0,
buy_token="MYCO",
min_buy_amount=80.0,
valid_until=999.0,
)
state.intents = add_intent(state.intents, intent)
return state
def _peer_with_dca(schedule_id: str) -> PeerState:
"""Create a peer with one DCA schedule."""
state = PeerState()
schedule = create_dca_schedule(
schedule_id=schedule_id,
maker="alice",
total_amount=1000.0,
n_chunks=5,
start_time=0.0,
interval=1.0,
)
state.dca = DCAScheduleRegistry(schedules={schedule_id: schedule})
return state
# --- TestMergePeers ---
class TestMergePeers:
def test_commutativity(self):
a = _peer_with_labor("alice", "e1")
b = _peer_with_labor("bob", "e2")
ab = merge_peers(a, b)
ba = merge_peers(b, a)
# Both should have both contributors
assert set(ab.labor.logs.keys()) == {"alice", "bob"}
assert set(ba.labor.logs.keys()) == {"alice", "bob"}
def test_idempotency(self):
a = _peer_with_labor("alice", "e1")
aa = merge_peers(a, a)
assert set(aa.labor.logs.keys()) == {"alice"}
assert len(aa.labor.logs["alice"].entries) == 1
def test_merge_intents(self):
a = _peer_with_intent("i1")
b = _peer_with_intent("i2")
merged = merge_peers(a, b)
assert "i1" in merged.intents.intents
assert "i2" in merged.intents.intents
def test_merge_dca_schedules(self):
a = _peer_with_dca("s1")
b = _peer_with_dca("s2")
merged = merge_peers(a, b)
assert "s1" in merged.dca.schedules
assert "s2" in merged.dca.schedules
# --- TestNetworkConvergence ---
class TestNetworkConvergence:
def test_converges_after_mutations(self):
"""All peers see all mutations after enough gossip rounds."""
net = Network.create(["p1", "p2", "p3"], NetworkConfig(seed=42))
# Mutate different peers
net.mutate_peer("p1", lambda s: _peer_with_labor("alice", "e1"))
net.mutate_peer("p2", lambda s: _peer_with_intent("i1"))
net.mutate_peer("p3", lambda s: _peer_with_dca("s1"))
# Run enough rounds to propagate
for _ in range(20):
net.step()
# All peers should have converged
assert net.divergence() == 1
# All peers should have all data
for pid in ["p1", "p2", "p3"]:
state = net.peers[pid]
assert "alice" in state.labor.logs
assert "i1" in state.intents.intents
assert "s1" in state.dca.schedules
def test_empty_network_divergence_one(self):
net = Network.create(["p1", "p2"], NetworkConfig(seed=0))
assert net.divergence() == 1 # All empty = converged
class TestNetworkPartitions:
def test_partition_and_reconnect(self):
"""Peers reach consistency even after partitions."""
config = NetworkConfig(
partition_probability=0.5,
reconnect_delay=3.0,
seed=123,
)
net = Network.create(["p1", "p2", "p3", "p4"], config)
# Mutate before running
net.mutate_peer("p1", lambda s: _peer_with_labor("alice", "e1"))
net.mutate_peer("p4", lambda s: _peer_with_intent("i1"))
# Run many steps to ensure partition + reconnect + convergence
for _ in range(50):
net.step()
# After enough rounds, should converge
assert net.divergence() == 1
def test_convergence_time_none_during_partition(self):
"""convergence_time() returns None while partitioned."""
config = NetworkConfig(
partition_probability=1.0, # Force partition on first step
reconnect_delay=100.0, # Never reconnect during test
seed=42,
)
net = Network.create(["p1", "p2", "p3"], config)
net.mutate_peer("p1", lambda s: _peer_with_labor("alice", "e1"))
net.step() # Triggers partition
assert net.convergence_time() is None
def test_convergence_time_returns_float_after_reconnect(self):
config = NetworkConfig(
partition_probability=1.0,
reconnect_delay=3.0,
seed=42,
)
net = Network.create(["p1", "p2"], config)
net.mutate_peer("p1", lambda s: _peer_with_labor("alice", "e1"))
# Force partition then wait for reconnect + convergence
for _ in range(20):
net.step()
ct = net.convergence_time()
assert ct is not None
assert isinstance(ct, float)
class TestNetworkSimulate:
def test_simulate_returns_metrics(self):
config = NetworkConfig(seed=42)
net = Network.create(["p1", "p2", "p3"], config)
net.mutate_peer("p1", lambda s: _peer_with_labor("alice", "e1"))
result = net.simulate(steps=10)
assert "times" in result
assert "divergence" in result
assert "merge_count" in result
assert "partition_active" in result
assert "peer_signatures" in result
assert "events" in result
assert len(result["times"]) == 10
# After 10 rounds, should converge
assert result["divergence"][-1] == 1