feat: add cadCAD simulation, risk tranches, cross-chain, and conviction governance

- Add risk_tranching.py: Senior/Mezzanine/Junior tranche system with yield
  waterfall, loss absorption, and liquidation mechanics
- Add conviction.py: Full conviction voting engine for parameter governance
  (ported from conviction/ research repo)
- Add crosschain/hub_spoke.py: 5-chain hub-spoke simulation with CCIP messaging,
  staking yield, and price shock modeling
- Add cadcad/ module: State definitions, 7 policy functions, 7 state update
  functions, pre-built scenarios (normal growth, stress test, parameter sweep)
- Add 4 dashboard tabs: Cross-Chain map, Risk Tranches, Governance, cadCAD Sim
- Wire cadCAD 0.5.3 with pandas/networkx/seaborn as new dependencies
- All 350 existing tests still pass

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jeff Emmett 2026-04-03 12:11:15 -07:00
parent e179587901
commit c48b7ae82a
15 changed files with 2705 additions and 5 deletions

View File

@ -11,24 +11,34 @@ st.set_page_config(
layout="wide",
)
st.title("MYCO Bonding Surface Dashboard")
st.caption("Interactive simulations for the multi-asset bonding curve with CRDT-native primitives.")
st.title("MycoFi Protocol Dashboard")
st.caption("Multi-chain bonding surfaces, risk tranches, conviction governance, and cadCAD simulations.")
# Initialize default config in session_state if not present
if "myco_config" not in st.session_state:
from src.composed.myco_surface import MycoSystemConfig
st.session_state["myco_config"] = MycoSystemConfig()
tab_config, tab_launch, tab_dca, tab_signal, tab_stress, tab_crdt = st.tabs([
(
tab_config, tab_launch, tab_dca, tab_signal, tab_stress, tab_crdt,
tab_crosschain, tab_tranches, tab_governance, tab_cadcad,
) = st.tabs([
"System Config",
"Token Launch",
"DCA Explorer",
"Signal Router",
"Stress Tests",
"CRDT Flow",
"Cross-Chain",
"Risk Tranches",
"Governance",
"cadCAD Sim",
])
from dashboard.tabs import system_config, token_launch, dca_explorer, signal_router, stress_tests, crdt_flow
from dashboard.tabs import (
system_config, token_launch, dca_explorer, signal_router,
stress_tests, crdt_flow, crosschain, tranches, governance, cadcad_sim,
)
with tab_config:
system_config.render()
@ -47,3 +57,15 @@ with tab_stress:
with tab_crdt:
crdt_flow.render()
with tab_crosschain:
crosschain.render()
with tab_tranches:
tranches.render()
with tab_governance:
governance.render()
with tab_cadcad:
cadcad_sim.render()

View File

@ -0,0 +1,171 @@
"""cadCAD Monte Carlo simulation dashboard tab."""
import streamlit as st
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
def render():
st.header("cadCAD Monte Carlo Simulation")
st.caption("Full system simulation with stochastic processes, parameter sweeps, and stress testing.")
scenario = st.selectbox("Scenario", [
"Normal Growth",
"ETH Crash Stress Test",
"Parameter Sweep",
])
col1, col2, col3 = st.columns(3)
with col1:
timesteps = st.slider("Days", 30, 730, 180, key="cd_days")
with col2:
runs = st.slider("Monte Carlo Runs", 1, 10, 3, key="cd_runs")
with col3:
if scenario == "ETH Crash Stress Test":
crash_mag = st.slider("Crash Magnitude %", 20, 80, 50, 5, key="cd_crash")
elif scenario == "Parameter Sweep":
st.info("Sweeps: volatility × growth × redemption")
if st.button("Run cadCAD Simulation", key="cd_run"):
with st.spinner(f"Running {scenario} ({runs} runs, {timesteps} days)..."):
try:
from src.cadcad.config import (
scenario_normal_growth,
scenario_stress_test,
scenario_parameter_sweep,
)
if scenario == "Normal Growth":
df = scenario_normal_growth(timesteps=timesteps, runs=runs)
elif scenario == "ETH Crash Stress Test":
df = scenario_stress_test(timesteps=timesteps, runs=runs)
else:
df = scenario_parameter_sweep(timesteps=timesteps, runs=1)
st.session_state["cadcad_results"] = df
st.success(f"Simulation complete: {len(df)} data points across {runs} run(s)")
except Exception as e:
st.error(f"Simulation error: {e}")
st.info("Make sure cadCAD is installed: `pip install cadCAD`")
return
# Display results
if "cadcad_results" in st.session_state:
df = st.session_state["cadcad_results"]
_plot_overview(df)
_plot_tranches(df)
_plot_crosschain(df)
def _plot_overview(df):
"""System overview: supply, collateral, price, CR."""
st.subheader("System Overview")
fig = make_subplots(
rows=2, cols=2,
subplot_titles=("Total Supply", "Total Collateral", "MYCO Price", "System CR"),
)
runs = df["run"].unique()
colors = ["#2dd4bf", "#f59e0b", "#ef4444", "#a78bfa", "#60a5fa"]
for i, run in enumerate(runs):
rd = df[df["run"] == run]
color = colors[i % len(colors)]
opacity = 0.8 if len(runs) == 1 else 0.4
fig.add_trace(go.Scatter(
x=rd["timestep"], y=rd["total_supply"],
name=f"Run {run}" if i == 0 else None, showlegend=(i == 0),
line=dict(color=color, width=1), opacity=opacity,
), row=1, col=1)
fig.add_trace(go.Scatter(
x=rd["timestep"], y=rd["total_collateral_usd"],
showlegend=False, line=dict(color=color, width=1), opacity=opacity,
), row=1, col=2)
fig.add_trace(go.Scatter(
x=rd["timestep"], y=rd["myco_price"],
showlegend=False, line=dict(color=color, width=1), opacity=opacity,
), row=2, col=1)
fig.add_trace(go.Scatter(
x=rd["timestep"], y=rd["system_cr"].clip(upper=5),
showlegend=False, line=dict(color=color, width=1), opacity=opacity,
), row=2, col=2)
fig.update_layout(template="plotly_dark", height=600, showlegend=True)
st.plotly_chart(fig, use_container_width=True)
def _plot_tranches(df):
"""Tranche metrics across runs."""
st.subheader("Tranche Dynamics")
fig = make_subplots(
rows=1, cols=3,
subplot_titles=("Senior CR", "Mezzanine CR", "Junior CR"),
)
runs = df["run"].unique()
for i, run in enumerate(runs):
rd = df[df["run"] == run]
opacity = 0.6
for j, (col, color) in enumerate([
("senior_cr", "#10B981"),
("mezzanine_cr", "#F59E0B"),
("junior_cr", "#EF4444"),
]):
fig.add_trace(go.Scatter(
x=rd["timestep"], y=rd[col].clip(upper=5),
showlegend=False, line=dict(color=color, width=1), opacity=opacity,
), row=1, col=j + 1)
# Add threshold lines
fig.add_hline(y=1.5, line_dash="dash", line_color="#10B981", row=1, col=1)
fig.add_hline(y=1.2, line_dash="dash", line_color="#F59E0B", row=1, col=2)
fig.add_hline(y=1.0, line_dash="dash", line_color="#EF4444", row=1, col=3)
fig.update_layout(template="plotly_dark", height=350)
st.plotly_chart(fig, use_container_width=True)
def _plot_crosschain(df):
"""Cross-chain metrics."""
st.subheader("Cross-Chain Activity")
# Check for per-chain columns
chain_cols = [c for c in df.columns if c.startswith("collateral_")]
if not chain_cols:
st.info("No per-chain data in this simulation.")
return
fig = go.Figure()
rd = df[df["run"] == df["run"].iloc[0]] # First run
for col in chain_cols:
chain_name = col.replace("collateral_", "").capitalize()
fig.add_trace(go.Scatter(
x=rd["timestep"], y=rd[col],
name=chain_name, stackgroup="one",
))
fig.update_layout(
title="Collateral by Chain (Run 1)",
xaxis_title="Timestep", yaxis_title="USD Value",
template="plotly_dark", height=400,
)
st.plotly_chart(fig, use_container_width=True)
# Summary stats
st.subheader("Summary Statistics")
last_step = df[df["timestep"] == df["timestep"].max()]
cols = st.columns(4)
cols[0].metric("Avg Final Supply", f"{last_step['total_supply'].mean():,.0f}")
cols[1].metric("Avg Final Collateral", f"${last_step['total_collateral_usd'].mean():,.0f}")
cols[2].metric("Avg System CR", f"{last_step['system_cr'].mean():.2f}")
cols[3].metric("Total CCIP Messages", f"{last_step['ccip_messages'].mean():,.0f}")

View File

@ -0,0 +1,187 @@
"""Cross-chain collateral visualization dashboard tab."""
import streamlit as st
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
def render():
st.header("Cross-Chain Collateral Map")
st.caption("Visualize collateral distribution across chains and staking assets.")
# Configuration
col1, col2 = st.columns(2)
with col1:
n_days = st.slider("Simulation days", 30, 365, 90, key="cc_days")
with col2:
eth_vol = st.select_slider(
"ETH Volatility",
options=[0.2, 0.4, 0.6, 0.8, 1.0],
value=0.6,
key="cc_vol",
)
if st.button("Run Cross-Chain Simulation", key="cc_run"):
with st.spinner("Simulating cross-chain system..."):
from src.crosschain.hub_spoke import (
create_default_system, simulate_deposit, tick,
apply_price_shock, get_crosschain_metrics,
)
system = create_default_system()
# Bootstrap with deposits
deposits = {
"ethereum": [("stETH", 100), ("rETH", 50), ("cbETH", 30)],
"arbitrum": [("wstETH", 80), ("rETH", 40)],
"optimism": [("wstETH", 60), ("sfrxETH", 40)],
"base": [("cbETH", 40), ("USDC", 100_000)],
"polygon": [("stMATIC", 200_000), ("USDC", 50_000)],
}
for chain, assets in deposits.items():
for symbol, qty in assets:
simulate_deposit(system, chain, symbol, qty, 0.0)
system.hub.process_messages(0.0)
# Simulate daily
dt = 1 / 365
history = []
for day in range(n_days):
# Random deposits
if np.random.random() < 0.3:
chains = list(system.hub.spokes.keys())
chain = np.random.choice(chains)
spoke = system.hub.spokes[chain]
if spoke.accepted_assets:
asset = np.random.choice(spoke.accepted_assets)
qty = np.random.lognormal(2, 1) / asset.price
simulate_deposit(system, chain, asset.symbol, qty, system.time)
# Price movements
eth_return = np.random.normal(0, eth_vol * np.sqrt(dt))
for chain, spoke in system.hub.spokes.items():
for asset in spoke.accepted_assets:
if asset.symbol != "USDC":
asset.price *= (1 + eth_return + np.random.normal(0, 0.01))
spoke._recalculate_value()
system.hub._recalculate_total()
metrics = tick(system, dt)
metrics["day"] = day
history.append(metrics)
# Visualization
_plot_chain_distribution(history, system)
_plot_collateral_timeline(history)
_plot_asset_breakdown(system)
def _plot_chain_distribution(history: list, system):
"""Stacked area chart of collateral per chain over time."""
st.subheader("Collateral by Chain")
fig = go.Figure()
chains = list(system.hub.spokes.keys())
colors = {
"ethereum": "#627EEA",
"arbitrum": "#28A0F0",
"optimism": "#FF0420",
"base": "#0052FF",
"polygon": "#8247E5",
}
days = [h["day"] for h in history]
for chain in chains:
values = [h["per_chain"].get(chain, 0) for h in history]
fig.add_trace(go.Scatter(
x=days, y=values,
name=chain.capitalize(),
stackgroup="one",
fillcolor=colors.get(chain, "#888888"),
line=dict(width=0.5, color=colors.get(chain, "#888888")),
))
fig.update_layout(
title="Cross-Chain Collateral Distribution",
xaxis_title="Day",
yaxis_title="USD Value",
template="plotly_dark",
height=500,
)
st.plotly_chart(fig, use_container_width=True)
def _plot_collateral_timeline(history: list):
"""Total collateral and yield over time."""
st.subheader("Total Collateral & Yield")
fig = make_subplots(specs=[[{"secondary_y": True}]])
days = [h["day"] for h in history]
total_col = [h["total_collateral_usd"] for h in history]
cum_yield = [h["yield_this_tick"] for h in history]
cum_yield_sum = np.cumsum(cum_yield)
fig.add_trace(
go.Scatter(x=days, y=total_col, name="Total Collateral", line=dict(color="#2dd4bf", width=2)),
secondary_y=False,
)
fig.add_trace(
go.Scatter(x=days, y=cum_yield_sum, name="Cumulative Yield", line=dict(color="#fbbf24", width=2)),
secondary_y=True,
)
fig.update_layout(
title="System Collateral & Staking Yield",
template="plotly_dark",
height=400,
)
fig.update_yaxes(title_text="Collateral (USD)", secondary_y=False)
fig.update_yaxes(title_text="Cumulative Yield (USD)", secondary_y=True)
st.plotly_chart(fig, use_container_width=True)
def _plot_asset_breakdown(system):
"""Treemap of current asset breakdown."""
st.subheader("Current Asset Breakdown")
labels, parents, values, colors = [], [], [], []
chain_colors = {
"ethereum": "#627EEA",
"arbitrum": "#28A0F0",
"optimism": "#FF0420",
"base": "#0052FF",
"polygon": "#8247E5",
}
for chain, spoke in system.hub.spokes.items():
labels.append(chain.capitalize())
parents.append("")
values.append(spoke.total_value_usd)
colors.append(chain_colors.get(chain, "#888"))
for asset in spoke.accepted_assets:
qty = spoke.balances.get(asset.symbol, 0)
val = qty * asset.price
if val > 0:
labels.append(f"{asset.symbol}")
parents.append(chain.capitalize())
values.append(val)
colors.append(chain_colors.get(chain, "#888"))
fig = go.Figure(go.Treemap(
labels=labels,
parents=parents,
values=values,
marker_colors=colors,
textinfo="label+value+percent root",
texttemplate="%{label}<br>$%{value:,.0f}<br>%{percentRoot:.1%}",
))
fig.update_layout(
title="Collateral Treemap",
template="plotly_dark",
height=500,
)
st.plotly_chart(fig, use_container_width=True)

View File

@ -0,0 +1,190 @@
"""Conviction voting governance visualization dashboard tab."""
import streamlit as st
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
def render():
st.header("Conviction Voting Governance")
st.caption("Explore conviction voting dynamics for parameter governance.")
# Parameters
col1, col2, col3, col4 = st.columns(4)
with col1:
half_life = st.slider("Half-life (epochs)", 2, 30, 7, key="cv_hl")
with col2:
beta = st.slider("Max fund share (β)", 0.05, 0.5, 0.2, 0.05, key="cv_beta")
with col3:
rho = st.slider("Scale factor (ρ) ×1000", 0.5, 10.0, 2.5, 0.5, key="cv_rho")
with col4:
n_voters = st.slider("Voters", 5, 50, 20, key="cv_voters")
rho_actual = rho / 1000
tab_curves, tab_sim, tab_trigger = st.tabs(["Conviction Curves", "Governance Sim", "Trigger Map"])
with tab_curves:
_render_conviction_curves(half_life)
with tab_sim:
_render_governance_sim(half_life, beta, rho_actual, n_voters)
with tab_trigger:
_render_trigger_map(beta, rho_actual)
def _render_conviction_curves(half_life: float):
"""Visualize conviction charging and discharging."""
from src.primitives.conviction import ConvictionParams, generate_conviction_curves
params = ConvictionParams.from_half_life(half_life)
st.markdown(f"**α = {params.alpha:.4f}** (half-life = {half_life} epochs)")
curves = generate_conviction_curves(1000, params.alpha, epochs=100)
fig = make_subplots(rows=1, cols=2, subplot_titles=("Charging", "Discharging"))
fig.add_trace(go.Scatter(
x=curves["time"], y=curves["charge"],
name="Conviction", line=dict(color="#2dd4bf", width=2),
), row=1, col=1)
fig.add_trace(go.Scatter(
x=curves["time"], y=curves["max"],
name="Max", line=dict(color="#fbbf24", dash="dash"),
), row=1, col=1)
fig.add_trace(go.Scatter(
x=curves["time"], y=curves["discharge"],
name="Decay", line=dict(color="#ef4444", width=2),
), row=1, col=2)
fig.update_layout(template="plotly_dark", height=400, showlegend=True)
fig.update_xaxes(title_text="Epochs")
fig.update_yaxes(title_text="Conviction")
st.plotly_chart(fig, use_container_width=True)
# Key metrics
from src.primitives.conviction import epochs_to_fraction, max_conviction
col1, col2, col3 = st.columns(3)
col1.metric("50% of max", f"{epochs_to_fraction(0.5, params.alpha):.1f} epochs")
col2.metric("90% of max", f"{epochs_to_fraction(0.9, params.alpha):.1f} epochs")
col3.metric("Max conviction (1000 tokens)", f"{max_conviction(1000, params.alpha):,.0f}")
def _render_governance_sim(half_life: float, beta: float, rho: float, n_voters: int):
"""Run a governance simulation."""
if st.button("Run Governance Simulation", key="cv_run"):
with st.spinner("Simulating conviction voting..."):
from src.primitives.conviction import (
ConvictionParams, ConvictionSystem, Proposal, Voter,
stake, tick, get_governance_metrics,
)
params = ConvictionParams.from_half_life(half_life, beta=beta, rho=rho)
system = ConvictionSystem(params=params)
# Create voters
for i in range(n_voters):
holdings = np.random.lognormal(mean=np.log(5000), sigma=1.0)
system.voters[f"v{i}"] = Voter(id=f"v{i}", holdings=holdings)
system.total_supply = sum(v.holdings for v in system.voters.values())
# Create proposals
proposals = [
("Lower senior CR to 1.3", 0.05),
("Add Scroll chain", 0.08),
("Increase mez yield to 10%", 0.03),
("Add sfrxETH to Arbitrum", 0.02),
]
for i, (title, funds) in enumerate(proposals):
system.proposals[f"p{i}"] = Proposal(
id=f"p{i}", title=title, funds_requested=funds,
)
# Simulate staking & ticking
history = []
for epoch in range(100):
# Some voters stake/unstake
for vid, voter in system.voters.items():
if np.random.random() < 0.3:
candidates = [p for p in system.proposals.values() if p.status == "candidate"]
if candidates:
prop = np.random.choice(candidates)
amt = voter.holdings * np.random.uniform(0.05, 0.3)
stake(system, vid, prop.id, amt)
tick(system)
metrics = get_governance_metrics(system)
metrics["epoch"] = epoch
history.append(metrics)
_plot_conviction_progress(history, system)
def _plot_conviction_progress(history: list, system):
"""Plot conviction progress toward triggers for each proposal."""
fig = go.Figure()
for prop_id, prop in system.proposals.items():
progress = [
h["proposals"].get(prop_id, {}).get("progress", 0)
for h in history
]
epochs = [h["epoch"] for h in history]
fig.add_trace(go.Scatter(
x=epochs, y=progress,
name=f"{prop.title} ({'PASSED' if prop.status == 'passed' else prop.status})",
line=dict(width=2),
))
fig.add_hline(y=1.0, line_dash="dash", line_color="white",
annotation_text="Trigger threshold")
fig.update_layout(
title="Proposal Conviction Progress",
xaxis_title="Epoch",
yaxis_title="Progress (conviction / trigger)",
template="plotly_dark",
height=500,
)
st.plotly_chart(fig, use_container_width=True)
# Summary
for prop in system.proposals.values():
status_emoji = {"passed": "", "candidate": "", "failed": ""}.get(prop.status, "")
st.write(f"{status_emoji} **{prop.title}** — {prop.status} (age: {prop.age})")
def _render_trigger_map(beta: float, rho: float):
"""2D contour map of trigger function."""
from src.primitives.conviction import trigger_threshold, ConvictionParams
params = ConvictionParams(beta=beta, rho=rho)
supply_range = np.linspace(10_000, 1_000_000, 100)
share_range = np.linspace(0.001, beta - 0.001, 100)
Z = np.zeros((len(share_range), len(supply_range)))
for i, share in enumerate(share_range):
for j, supply in enumerate(supply_range):
Z[i, j] = np.log10(trigger_threshold(share, supply, params))
fig = go.Figure(go.Contour(
z=Z,
x=supply_range,
y=share_range,
colorscale="Viridis",
colorbar=dict(title="log₁₀(trigger)"),
))
fig.update_layout(
title="Trigger Function Map",
xaxis_title="Total Supply",
yaxis_title="Share of Funds Requested",
template="plotly_dark",
height=500,
)
st.plotly_chart(fig, use_container_width=True)
st.caption("Higher trigger = more conviction needed. Requesting closer to β makes it exponentially harder.")

216
dashboard/tabs/tranches.py Normal file
View File

@ -0,0 +1,216 @@
"""Risk tranche visualization dashboard tab."""
import streamlit as st
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
def render():
st.header("Risk Tranches")
st.caption("Senior / Mezzanine / Junior tranche dynamics with yield waterfall and stress testing.")
# Tranche parameters
st.subheader("Tranche Configuration")
col1, col2, col3 = st.columns(3)
with col1:
st.markdown("**Senior (myUSD-S)**")
senior_cr = st.slider("Min Collateral Ratio", 1.1, 2.0, 1.5, 0.05, key="sr_cr")
senior_yield = st.slider("Target Yield %", 1.0, 6.0, 3.0, 0.5, key="sr_y")
with col2:
st.markdown("**Mezzanine (myUSD-M)**")
mez_cr = st.slider("Min Collateral Ratio", 1.0, 1.5, 1.2, 0.05, key="mz_cr")
mez_yield = st.slider("Target Yield %", 4.0, 15.0, 8.0, 0.5, key="mz_y")
with col3:
st.markdown("**Junior ($MYCO)**")
st.info("First-loss position. Gets residual yield after Senior & Mez are paid.")
# Simulation controls
st.subheader("Simulation")
col1, col2, col3 = st.columns(3)
with col1:
initial_collateral = st.number_input("Initial Collateral ($)", 100_000, 10_000_000, 1_000_000, 100_000)
with col2:
staking_apy = st.slider("Avg Staking APY %", 2.0, 8.0, 4.0, 0.5)
with col3:
n_days = st.slider("Days", 30, 730, 365, key="tr_days")
# Stress scenario
stress = st.selectbox("Stress Scenario", [
"None",
"30% ETH crash at day 60",
"50% ETH crash at day 60",
"Slow bleed (1% daily for 30 days)",
])
if st.button("Run Tranche Simulation", key="tr_run"):
with st.spinner("Simulating tranches..."):
from src.primitives.risk_tranching import (
RiskTrancheSystem, TrancheParams,
deposit_collateral, mint_tranche,
distribute_yield, apply_loss,
get_tranche_metrics,
)
params = TrancheParams(
senior_collateral_ratio=senior_cr,
mezzanine_collateral_ratio=mez_cr,
senior_yield_target=senior_yield / 100,
mezzanine_yield_target=mez_yield / 100,
)
system = RiskTrancheSystem(params=params)
# Bootstrap
deposit_collateral(system, initial_collateral)
mint_tranche(system, "senior", initial_collateral * 0.4 / senior_cr)
mint_tranche(system, "mezzanine", initial_collateral * 0.25 / mez_cr)
mint_tranche(system, "junior", initial_collateral * 0.15)
# Simulate
dt = 1 / 365
history = []
collateral_value = initial_collateral
for day in range(n_days):
# Staking yield
daily_yield = collateral_value * (staking_apy / 100) * dt
distribute_yield(system, daily_yield, dt)
collateral_value += daily_yield
# Price movement
price_change = np.random.normal(0, 0.02) # 2% daily vol
# Apply stress
if stress == "30% ETH crash at day 60" and day == 60:
price_change = -0.30
elif stress == "50% ETH crash at day 60" and day == 60:
price_change = -0.50
elif stress == "Slow bleed (1% daily for 30 days)" and 60 <= day < 90:
price_change = -0.01
if price_change < 0:
loss = abs(price_change) * collateral_value
apply_loss(system, loss)
collateral_value -= loss
else:
gain = price_change * collateral_value
collateral_value += gain
system.total_collateral = collateral_value
metrics = get_tranche_metrics(system)
metrics["day"] = day
metrics["collateral_value"] = collateral_value
history.append(metrics)
_plot_tranche_supplies(history)
_plot_collateral_ratios(history)
_plot_yield_waterfall(history)
_plot_loss_absorption(history)
def _plot_tranche_supplies(history: list):
"""Stacked area of tranche supplies."""
st.subheader("Tranche Supplies")
fig = go.Figure()
days = [h["day"] for h in history]
for name, color in [("senior", "#10B981"), ("mezzanine", "#F59E0B"), ("junior", "#EF4444")]:
values = [h[name]["supply"] for h in history]
fig.add_trace(go.Scatter(
x=days, y=values, name=name.capitalize(),
stackgroup="one", line=dict(width=0.5, color=color),
))
fig.update_layout(
title="Tranche Token Supply",
xaxis_title="Day", yaxis_title="Supply",
template="plotly_dark", height=400,
)
st.plotly_chart(fig, use_container_width=True)
def _plot_collateral_ratios(history: list):
"""Collateral ratios per tranche over time."""
st.subheader("Collateral Ratios")
fig = go.Figure()
days = [h["day"] for h in history]
for name, color, threshold in [
("senior", "#10B981", 1.5),
("mezzanine", "#F59E0B", 1.2),
("junior", "#EF4444", 1.0),
]:
values = [min(h[name]["cr"], 5.0) for h in history] # Cap for display
fig.add_trace(go.Scatter(
x=days, y=values, name=name.capitalize(),
line=dict(color=color, width=2),
))
# Threshold line
fig.add_hline(
y=threshold, line_dash="dash", line_color=color,
annotation_text=f"{name} min", opacity=0.5,
)
fig.add_hline(y=1.0, line_dash="dot", line_color="red", annotation_text="Underwater")
fig.update_layout(
title="Collateral Ratios (higher = safer)",
xaxis_title="Day", yaxis_title="Ratio",
template="plotly_dark", height=400,
)
st.plotly_chart(fig, use_container_width=True)
def _plot_yield_waterfall(history: list):
"""Cumulative yield per tranche (waterfall)."""
st.subheader("Yield Waterfall")
fig = go.Figure()
days = [h["day"] for h in history]
for name, color in [("senior", "#10B981"), ("mezzanine", "#F59E0B"), ("junior", "#EF4444")]:
values = [h[name]["yield"] for h in history]
fig.add_trace(go.Scatter(
x=days, y=values, name=f"{name.capitalize()} yield",
line=dict(color=color, width=2),
fill="tozeroy" if name == "senior" else "tonexty",
))
fig.update_layout(
title="Cumulative Yield Distribution (Senior → Mez → Junior)",
xaxis_title="Day", yaxis_title="Cumulative Yield ($)",
template="plotly_dark", height=400,
)
st.plotly_chart(fig, use_container_width=True)
def _plot_loss_absorption(history: list):
"""Cumulative losses absorbed per tranche."""
st.subheader("Loss Absorption")
total_losses = sum(h["junior"]["losses"] + h["mezzanine"]["losses"] + h["senior"]["losses"]
for h in history[-1:])
if total_losses > 0:
fig = go.Figure()
days = [h["day"] for h in history]
for name, color in [("junior", "#EF4444"), ("mezzanine", "#F59E0B"), ("senior", "#10B981")]:
values = [h[name]["losses"] for h in history]
fig.add_trace(go.Bar(
x=[name.capitalize()],
y=[values[-1]],
name=name.capitalize(),
marker_color=color,
))
fig.update_layout(
title="Total Losses Absorbed by Tranche",
yaxis_title="Losses ($)",
template="plotly_dark", height=300,
)
st.plotly_chart(fig, use_container_width=True)
else:
st.success("No losses absorbed — all tranches healthy.")

View File

@ -8,6 +8,9 @@ dependencies = [
"scipy>=1.12",
"matplotlib>=3.8",
"sympy>=1.13",
"cadCAD>=0.3.1",
"pandas>=2.1",
"networkx>=3.2",
]
[project.scripts]
@ -15,7 +18,7 @@ myco = "src.cli:main"
[project.optional-dependencies]
dev = ["pytest>=8.0", "jupyter>=1.0", "ipywidgets>=8.0"]
dashboard = ["streamlit>=1.35", "plotly>=5.20"]
dashboard = ["streamlit>=1.35", "plotly>=5.20", "seaborn>=0.13"]
[build-system]
requires = ["setuptools>=69.0"]

1
src/cadcad/__init__.py Normal file
View File

@ -0,0 +1 @@
"""cadCAD model definitions for the MycoFi cross-chain protocol."""

322
src/cadcad/config.py Normal file
View File

@ -0,0 +1,322 @@
"""cadCAD simulation configuration for the MycoFi cross-chain system.
Assembles policies and state updates into cadCAD-compatible
partial state update blocks (PSUBs) and simulation configs.
"""
import numpy as np
import pandas as pd
from typing import Any
from cadCAD.configuration.utils import config_sim
from src.cadcad.state import create_initial_state, extract_metrics, MycoFiState
from src.cadcad.policies import (
p_new_deposits,
p_redemptions,
p_price_process,
p_staking_yield,
p_governance_actions,
p_eth_crash,
p_bank_run,
)
from src.cadcad.state_updates import (
s_process_deposits,
s_process_redemptions,
s_apply_prices,
s_apply_yield,
s_governance_tick,
s_apply_price_shock,
s_apply_bank_run,
s_check_liquidations,
)
from src.primitives.risk_tranching import TrancheParams
from src.primitives.conviction import ConvictionParams, Voter
from src.composed.myco_surface import MycoSystemConfig
# ---------- Partial State Update Blocks ----------
# Normal operation: deposits, yields, prices, governance
PSUB_NORMAL = [
{
"label": "Deposits & Cross-chain",
"policies": {"deposits": p_new_deposits},
"variables": {"mycofi": s_process_deposits},
},
{
"label": "Price Movements",
"policies": {"price_changes": p_price_process},
"variables": {"mycofi": s_apply_prices},
},
{
"label": "Staking Yield",
"policies": {"yield_dt": p_staking_yield},
"variables": {"mycofi": s_apply_yield},
},
{
"label": "Redemptions",
"policies": {"redemptions": p_redemptions},
"variables": {"mycofi": s_process_redemptions},
},
{
"label": "Governance",
"policies": {"governance_actions": p_governance_actions},
"variables": {"mycofi": s_governance_tick},
},
{
"label": "Liquidation Check",
"policies": {},
"variables": {"mycofi": s_check_liquidations},
},
]
# Stress test: normal + price shock + bank run
PSUB_STRESS = PSUB_NORMAL + [
{
"label": "Price Shock",
"policies": {"price_shock": p_eth_crash},
"variables": {"mycofi": s_apply_price_shock},
},
{
"label": "Bank Run",
"policies": {"bank_run": p_bank_run},
"variables": {"mycofi": s_apply_bank_run},
},
]
# ---------- Scenario Configurations ----------
def default_params() -> dict:
"""Default simulation parameters."""
return {
"dt": [1 / 365], # Daily timesteps
"deposit_rate": [5], # Avg deposits per day
"max_deposit_usd": [50_000],
"redemption_rate": [0.02], # 2% daily base redemption
"eth_volatility": [0.6], # 60% annualized
"eth_drift": [0.0], # No drift
"proposal_rate": [0.05], # 5% chance of new proposal per epoch
}
def stress_params() -> dict:
"""Parameters for stress testing scenarios."""
return {
**default_params(),
"crash_epoch": [30],
"crash_magnitude": [0.5], # 50% ETH crash
"run_start_epoch": [31], # Bank run starts after crash
"run_intensity": [0.15], # 15% per epoch
}
def param_sweep_params() -> dict:
"""Parameters for parameter sweep analysis."""
return {
**default_params(),
"eth_volatility": [0.3, 0.6, 0.9], # Low, medium, high vol
"deposit_rate": [2, 5, 10], # Low, medium, high growth
"redemption_rate": [0.01, 0.02, 0.05],
}
# ---------- Bootstrap Helpers ----------
def bootstrap_system(
state: dict,
initial_deposits: dict[str, dict[str, float]] | None = None,
initial_tranche_mints: dict[str, float] | None = None,
n_voters: int = 20,
) -> dict:
"""Bootstrap the system with initial deposits, mints, and voters.
Args:
state: The cadCAD initial state dict
initial_deposits: {chain: {asset: quantity}} to seed vaults
initial_tranche_mints: {tranche: amount} to mint initial tranches
n_voters: Number of governance voters to create
"""
from src.crosschain.hub_spoke import simulate_deposit
from src.primitives.risk_tranching import deposit_collateral, mint_tranche
s: MycoFiState = state["mycofi"]
# Seed cross-chain deposits
if initial_deposits and s.crosschain:
total_usd = 0.0
for chain, assets in initial_deposits.items():
for asset_sym, qty in assets.items():
msg = simulate_deposit(s.crosschain, chain, asset_sym, qty, 0.0)
# Find price
spoke = s.crosschain.hub.spokes[chain]
for a in spoke.accepted_assets:
if a.symbol == asset_sym:
total_usd += qty * a.price
break
s.crosschain.hub.process_messages(0.0)
# Seed tranche system with same total
if s.tranche_system:
deposit_collateral(s.tranche_system, total_usd)
# Mint initial tranches
if initial_tranche_mints and s.tranche_system:
for tranche, amount in initial_tranche_mints.items():
mint_tranche(s.tranche_system, tranche, amount)
# Seed bonding surface with initial reserve deposit
if s.myco_system and s.crosschain:
total_value = s.crosschain.hub.total_collateral_usd
if total_value > 0:
n = s.myco_system.config.n_reserve_assets
amounts = np.full(n, total_value / n)
s.myco_system.deposit(amounts, 0.0)
# Create governance voters
if s.governance:
for i in range(n_voters):
voter_id = f"voter_{i}"
holdings = np.random.lognormal(mean=np.log(5000), sigma=1.0)
s.governance.voters[voter_id] = Voter(
id=voter_id,
holdings=holdings,
sentiment=np.random.uniform(0.3, 0.9),
)
s.governance.total_supply = sum(
v.holdings for v in s.governance.voters.values()
)
from src.cadcad.state import sync_metrics
sync_metrics(s)
return state
# ---------- Run Simulation ----------
def run_simulation(
timesteps: int = 365,
runs: int = 1,
params: dict | None = None,
psubs: list | None = None,
initial_state: dict | None = None,
initial_deposits: dict | None = None,
initial_tranche_mints: dict | None = None,
n_voters: int = 20,
) -> pd.DataFrame:
"""Run a cadCAD simulation and return results as a DataFrame.
Args:
timesteps: Number of simulation steps
runs: Number of Monte Carlo runs
params: Simulation parameters (defaults to default_params())
psubs: Partial state update blocks (defaults to PSUB_NORMAL)
initial_state: Override initial state
initial_deposits: Bootstrap deposits {chain: {asset: qty}}
initial_tranche_mints: Bootstrap tranche mints {tranche: amount}
n_voters: Number of governance voters
Returns:
DataFrame with one row per (run, substep, timestep)
"""
from cadCAD.configuration import Experiment
from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
# Create initial state
if initial_state is None:
initial_state = create_initial_state()
# Bootstrap
if initial_deposits is None:
initial_deposits = {
"ethereum": {"stETH": 100, "rETH": 50},
"arbitrum": {"wstETH": 80},
"optimism": {"wstETH": 60},
"base": {"cbETH": 40, "USDC": 100_000},
"polygon": {"stMATIC": 200_000, "USDC": 50_000},
}
if initial_tranche_mints is None:
initial_tranche_mints = {
"senior": 200_000,
"mezzanine": 100_000,
"junior": 50_000,
}
initial_state = bootstrap_system(
initial_state, initial_deposits, initial_tranche_mints, n_voters,
)
if params is None:
params = default_params()
if psubs is None:
psubs = PSUB_NORMAL
# Configure simulation
sim_config = config_sim({
"N": runs,
"T": range(timesteps),
"M": params,
})
exp = Experiment()
exp.append_model(
initial_state=initial_state,
partial_state_update_blocks=psubs,
sim_configs=sim_config,
)
# Execute using the experiment's configs
exec_mode = ExecutionMode()
local_ctx = ExecutionContext(context=exec_mode.local_mode)
simulation = Executor(exec_context=local_ctx, configs=exp.configs)
raw_results, tensor_field, sessions = simulation.execute()
# Convert to DataFrame
df = pd.DataFrame(raw_results)
# Extract metrics from state objects
metrics_rows = []
for _, row in df.iterrows():
s: MycoFiState = row["mycofi"]
metrics = extract_metrics(s)
metrics["run"] = row.get("run", 0)
metrics["substep"] = row.get("substep", 0)
metrics["timestep"] = row.get("timestep", 0)
metrics_rows.append(metrics)
return pd.DataFrame(metrics_rows)
# ---------- Pre-built Scenarios ----------
def scenario_normal_growth(timesteps: int = 365, runs: int = 3) -> pd.DataFrame:
"""Normal growth scenario: steady deposits, mild volatility."""
return run_simulation(
timesteps=timesteps,
runs=runs,
params=default_params(),
psubs=PSUB_NORMAL,
)
def scenario_stress_test(timesteps: int = 100, runs: int = 5) -> pd.DataFrame:
"""Stress test: ETH crash at day 30, bank run follows."""
return run_simulation(
timesteps=timesteps,
runs=runs,
params=stress_params(),
psubs=PSUB_STRESS,
)
def scenario_parameter_sweep(timesteps: int = 180, runs: int = 1) -> pd.DataFrame:
"""Parameter sweep: explore sensitivity to volatility, growth, redemptions."""
return run_simulation(
timesteps=timesteps,
runs=runs,
params=param_sweep_params(),
psubs=PSUB_NORMAL,
)

204
src/cadcad/policies.py Normal file
View File

@ -0,0 +1,204 @@
"""cadCAD policy functions for the MycoFi cross-chain system.
Policies compute signals (actions) based on the current state.
They represent exogenous events and agent behaviors.
"""
import numpy as np
from typing import Any
# ---------- Exogenous Processes ----------
def p_new_deposits(params: dict, substep: int, state_history: list, state: dict) -> dict:
"""Generate new deposits across chains.
Simulates users depositing staking assets on various spoke chains.
"""
s: Any = state["mycofi"]
if s.crosschain is None:
return {"deposits": []}
deposit_rate = params.get("deposit_rate", 5) # avg deposits per epoch
max_deposit_usd = params.get("max_deposit_usd", 50_000)
deposits = []
n_deposits = np.random.poisson(deposit_rate)
for _ in range(n_deposits):
# Pick a random chain and asset
chains = list(s.crosschain.hub.spokes.keys())
chain = np.random.choice(chains)
spoke = s.crosschain.hub.spokes[chain]
if not spoke.accepted_assets:
continue
asset = np.random.choice(spoke.accepted_assets)
# Random deposit amount (log-normal distribution)
usd_amount = np.random.lognormal(mean=np.log(5000), sigma=1.0)
usd_amount = min(usd_amount, max_deposit_usd)
qty = usd_amount / asset.price if asset.price > 0 else 0
deposits.append({
"chain": chain,
"asset": asset.symbol,
"quantity": qty,
"usd_value": usd_amount,
})
return {"deposits": deposits}
def p_redemptions(params: dict, substep: int, state_history: list, state: dict) -> dict:
"""Generate redemption requests.
Some users redeem their tranche tokens for collateral.
"""
s: Any = state["mycofi"]
if s.tranche_system is None:
return {"redemptions": []}
# Base redemption rate influenced by sentiment
base_rate = params.get("redemption_rate", 0.02) # 2% of supply per epoch
# Higher redemption when collateral ratio is low
cr = s.system_collateral_ratio
if cr < 1.2:
panic_multiplier = 2.0 + (1.2 - cr) * 10 # Panic selling
elif cr > 2.0:
panic_multiplier = 0.5 # Very comfortable, low redemptions
else:
panic_multiplier = 1.0
effective_rate = base_rate * panic_multiplier
redemptions = []
for tranche in ["senior", "mezzanine", "junior"]:
ts = getattr(s.tranche_system, tranche)
if ts.supply > 0:
amount = ts.supply * effective_rate * np.random.uniform(0, 1)
if amount > 0:
redemptions.append({
"tranche": tranche,
"amount": amount,
})
return {"redemptions": redemptions}
def p_price_process(params: dict, substep: int, state_history: list, state: dict) -> dict:
"""Simulate asset price movements (geometric Brownian motion).
Models correlated price movements across staking assets.
"""
s: Any = state["mycofi"]
if s.crosschain is None:
return {"price_changes": {}}
dt = params.get("dt", 1 / 365) # Daily by default
eth_volatility = params.get("eth_volatility", 0.6) # 60% annualized
eth_drift = params.get("eth_drift", 0.0) # No drift by default
# ETH is the primary driver — LSTs are correlated
eth_return = np.random.normal(eth_drift * dt, eth_volatility * np.sqrt(dt))
price_changes = {}
for chain, spoke in s.crosschain.hub.spokes.items():
for asset in spoke.accepted_assets:
if asset.symbol == "USDC":
# Stablecoin — minimal movement
change = np.random.normal(0, 0.0001)
elif "ETH" in asset.symbol or "stETH" in asset.symbol or "wstETH" in asset.symbol:
# ETH-correlated with small idiosyncratic component
idio = np.random.normal(0, 0.02 * np.sqrt(dt))
change = eth_return + idio
elif "MATIC" in asset.symbol:
# Higher vol, partially correlated with ETH
idio = np.random.normal(0, 0.8 * np.sqrt(dt))
change = 0.6 * eth_return + idio
else:
change = np.random.normal(0, 0.3 * np.sqrt(dt))
price_changes[f"{chain}:{asset.symbol}"] = 1.0 + change
return {"price_changes": price_changes}
def p_staking_yield(params: dict, substep: int, state_history: list, state: dict) -> dict:
"""Generate staking yield from collateral assets."""
dt = params.get("dt", 1 / 365)
return {"yield_dt": dt}
def p_governance_actions(params: dict, substep: int, state_history: list, state: dict) -> dict:
"""Simulate governance participation (staking on proposals, new proposals)."""
s: Any = state["mycofi"]
if s.governance is None:
return {"governance_actions": []}
actions = []
# Occasionally create new proposals
if np.random.random() < params.get("proposal_rate", 0.05):
proposal_types = [
("weight_update", "Update stETH weight", 0.05, "stETH_weight", np.random.uniform(0.8, 1.2)),
("parameter_change", "Adjust senior yield target", 0.02, "senior_yield", np.random.uniform(0.02, 0.05)),
("parameter_change", "Adjust mez CR", 0.03, "mez_cr", np.random.uniform(1.1, 1.3)),
("chain_onboard", "Add Scroll chain", 0.08, "new_chain", 1.0),
]
ptype, title, funds_req, target, value = proposal_types[np.random.randint(len(proposal_types))]
actions.append({
"type": "new_proposal",
"proposal_type": ptype,
"title": title,
"funds_requested": funds_req,
"parameter_target": target,
"parameter_value": value,
})
# Voters adjust stakes based on sentiment
for voter_id, voter in s.governance.voters.items():
if np.random.random() < 0.3: # 30% chance of action per epoch
candidates = [
p for p in s.governance.proposals.values()
if p.status == "candidate"
]
if candidates:
prop = np.random.choice(candidates)
stake_amount = voter.holdings * np.random.uniform(0.01, 0.2)
actions.append({
"type": "stake",
"voter_id": voter_id,
"proposal_id": prop.id,
"amount": stake_amount,
})
return {"governance_actions": actions}
# ---------- Stress Scenarios ----------
def p_eth_crash(params: dict, substep: int, state_history: list, state: dict) -> dict:
"""Simulate a sudden ETH price crash (for stress testing)."""
s: Any = state["mycofi"]
crash_epoch = params.get("crash_epoch", 30)
crash_magnitude = params.get("crash_magnitude", 0.5) # 50% drop
current_epoch = len(state_history)
if current_epoch == crash_epoch:
return {"price_shock": {"pattern": "eth_crash", "multiplier": crash_magnitude}}
return {"price_shock": None}
def p_bank_run(params: dict, substep: int, state_history: list, state: dict) -> dict:
"""Simulate a coordinated bank run on tranches."""
s: Any = state["mycofi"]
run_start = params.get("run_start_epoch", 50)
run_intensity = params.get("run_intensity", 0.15) # 15% per epoch
current_epoch = len(state_history)
if current_epoch >= run_start:
return {"bank_run": {"intensity": run_intensity}}
return {"bank_run": None}

156
src/cadcad/state.py Normal file
View File

@ -0,0 +1,156 @@
"""cadCAD state variables for the MycoFi cross-chain system.
Defines the complete system state that cadCAD tracks across timesteps.
Integrates: bonding surface, risk tranches, cross-chain vaults, conviction governance.
"""
import numpy as np
from dataclasses import dataclass, field, asdict
from typing import Any
from src.composed.myco_surface import MycoSystem, MycoSystemConfig
from src.primitives.risk_tranching import (
RiskTrancheSystem, TrancheParams, TrancheState,
)
from src.primitives.conviction import (
ConvictionSystem, ConvictionParams,
)
from src.crosschain.hub_spoke import (
CrossChainSystem, create_default_system as create_default_crosschain,
)
@dataclass
class MycoFiState:
"""Complete state of the MycoFi cross-chain protocol.
This is the top-level state variable for the cadCAD simulation.
"""
# Core bonding surface (from existing MycoSystem)
myco_system: MycoSystem | None = None
# Risk tranches
tranche_system: RiskTrancheSystem | None = None
# Cross-chain
crosschain: CrossChainSystem | None = None
# Governance
governance: ConvictionSystem | None = None
# Aggregate metrics (for cadCAD DataFrame output)
time: float = 0.0
total_supply: float = 0.0
total_collateral_usd: float = 0.0
system_collateral_ratio: float = 0.0
myco_price: float = 0.0
# Tranche metrics
senior_supply: float = 0.0
senior_cr: float = 0.0
mezzanine_supply: float = 0.0
mezzanine_cr: float = 0.0
junior_supply: float = 0.0
junior_cr: float = 0.0
# Cross-chain metrics
total_chains: int = 0
total_yield: float = 0.0
ccip_messages: int = 0
# Governance metrics
governance_epoch: int = 0
total_staked: float = 0.0
proposals_passed: int = 0
def create_initial_state(
myco_config: MycoSystemConfig | None = None,
tranche_params: TrancheParams | None = None,
conviction_params: ConvictionParams | None = None,
chains: list[str] | None = None,
) -> dict[str, Any]:
"""Create the initial state dictionary for cadCAD.
Returns a flat dict suitable for cadCAD's initial_conditions.
"""
# Initialize subsystems
myco = MycoSystem(myco_config or MycoSystemConfig())
tranches = RiskTrancheSystem(params=tranche_params or TrancheParams())
crosschain = create_default_crosschain()
governance = ConvictionSystem(
params=conviction_params or ConvictionParams(),
)
state = MycoFiState(
myco_system=myco,
tranche_system=tranches,
crosschain=crosschain,
governance=governance,
total_chains=len(crosschain.hub.spokes),
)
return {"mycofi": state}
def extract_metrics(state: MycoFiState) -> dict:
"""Extract flat metrics dict from state for DataFrame output."""
metrics = {
"time": state.time,
"total_supply": state.total_supply,
"total_collateral_usd": state.total_collateral_usd,
"system_cr": state.system_collateral_ratio,
"myco_price": state.myco_price,
"senior_supply": state.senior_supply,
"senior_cr": state.senior_cr,
"mezzanine_supply": state.mezzanine_supply,
"mezzanine_cr": state.mezzanine_cr,
"junior_supply": state.junior_supply,
"junior_cr": state.junior_cr,
"total_chains": state.total_chains,
"total_yield": state.total_yield,
"ccip_messages": state.ccip_messages,
"governance_epoch": state.governance_epoch,
"total_staked": state.total_staked,
"proposals_passed": state.proposals_passed,
}
# Add per-chain collateral
if state.crosschain:
for chain, spoke in state.crosschain.hub.spokes.items():
metrics[f"collateral_{chain}"] = spoke.total_value_usd
return metrics
def sync_metrics(state: MycoFiState) -> MycoFiState:
"""Synchronize aggregate metrics from subsystems into top-level fields."""
if state.myco_system:
m = state.myco_system.get_metrics()
state.total_supply = m["supply"]
state.myco_price = state.myco_system.get_spot_price()
if state.tranche_system:
ts = state.tranche_system
state.senior_supply = ts.senior.supply
state.senior_cr = ts.senior.collateral_ratio
state.mezzanine_supply = ts.mezzanine.supply
state.mezzanine_cr = ts.mezzanine.collateral_ratio
state.junior_supply = ts.junior.supply
state.junior_cr = ts.junior.collateral_ratio
state.system_collateral_ratio = ts.system_collateral_ratio
if state.crosschain:
state.total_collateral_usd = state.crosschain.hub.total_collateral_usd
state.total_chains = len(state.crosschain.hub.spokes)
state.total_yield = state.crosschain.total_yield_generated
state.ccip_messages = state.crosschain.total_messages_delivered
if state.governance:
state.governance_epoch = state.governance.epoch
state.total_staked = sum(
sum(v.stakes.values()) for v in state.governance.voters.values()
)
state.proposals_passed = len(state.governance.passed_proposals)
return state

228
src/cadcad/state_updates.py Normal file
View File

@ -0,0 +1,228 @@
"""cadCAD state update functions for the MycoFi cross-chain system.
State update functions apply policy signals to evolve the system state.
Each function takes (params, substep, state_history, state, signal) and
returns (variable_name, new_value).
"""
import numpy as np
from copy import deepcopy
from typing import Any
from src.primitives.risk_tranching import (
deposit_collateral, mint_tranche, redeem_tranche,
distribute_yield, apply_loss, check_liquidation,
get_tranche_metrics,
)
from src.primitives.conviction import (
Proposal, Voter, stake as cv_stake, tick as cv_tick,
)
from src.crosschain.hub_spoke import (
simulate_deposit, tick as cc_tick, apply_price_shock,
get_crosschain_metrics,
)
from src.cadcad.state import sync_metrics
def s_process_deposits(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
"""Process new cross-chain deposits."""
s = deepcopy(state["mycofi"])
deposits = signal.get("deposits", [])
for dep in deposits:
if s.crosschain is None:
break
chain = dep["chain"]
asset = dep["asset"]
qty = dep["quantity"]
# Deposit into spoke vault
simulate_deposit(s.crosschain, chain, asset, qty, s.time)
# Also add to tranche system collateral
if s.tranche_system is not None:
deposit_collateral(s.tranche_system, dep["usd_value"])
# Process CCIP messages
if s.crosschain:
s.crosschain.hub.process_messages(s.time)
s = sync_metrics(s)
return ("mycofi", s)
def s_process_redemptions(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
"""Process tranche redemptions."""
s = deepcopy(state["mycofi"])
redemptions = signal.get("redemptions", [])
for red in redemptions:
if s.tranche_system is None:
break
redeem_tranche(s.tranche_system, red["tranche"], red["amount"])
s = sync_metrics(s)
return ("mycofi", s)
def s_apply_prices(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
"""Apply price movements to all assets."""
s = deepcopy(state["mycofi"])
price_changes = signal.get("price_changes", {})
if s.crosschain is None:
return ("mycofi", s)
old_total = s.crosschain.hub.total_collateral_usd
for key, multiplier in price_changes.items():
chain, asset_symbol = key.split(":")
spoke = s.crosschain.hub.spokes.get(chain)
if spoke is None:
continue
for asset in spoke.accepted_assets:
if asset.symbol == asset_symbol:
asset.price *= multiplier
spoke._recalculate_value()
s.crosschain.hub._recalculate_total()
new_total = s.crosschain.hub.total_collateral_usd
# If collateral value dropped, apply loss to tranches
if new_total < old_total and s.tranche_system is not None:
loss = old_total - new_total
apply_loss(s.tranche_system, loss)
s.tranche_system.total_collateral = new_total
elif new_total > old_total and s.tranche_system is not None:
# Collateral appreciation
gain = new_total - old_total
s.tranche_system.total_collateral = new_total
s = sync_metrics(s)
return ("mycofi", s)
def s_apply_yield(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
"""Apply staking yield and distribute through tranche waterfall."""
s = deepcopy(state["mycofi"])
dt = signal.get("yield_dt", 1 / 365)
total_yield = 0.0
if s.crosschain:
for spoke in s.crosschain.hub.spokes.values():
total_yield += spoke.apply_staking_yield(dt)
s.crosschain.hub._recalculate_total()
# Distribute yield through tranche waterfall
if s.tranche_system is not None and total_yield > 0:
distribute_yield(s.tranche_system, total_yield, dt)
s.tranche_system.total_collateral += total_yield
s.time += dt
s = sync_metrics(s)
return ("mycofi", s)
def s_governance_tick(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
"""Process governance actions and advance conviction voting."""
s = deepcopy(state["mycofi"])
actions = signal.get("governance_actions", [])
if s.governance is None:
return ("mycofi", s)
for action in actions:
if action["type"] == "new_proposal":
prop_id = f"prop_{s.governance.epoch}_{np.random.randint(1000)}"
s.governance.proposals[prop_id] = Proposal(
id=prop_id,
title=action["title"],
proposal_type=action["proposal_type"],
funds_requested=action["funds_requested"],
parameter_target=action.get("parameter_target", ""),
parameter_value=action.get("parameter_value", 0.0),
)
elif action["type"] == "stake":
cv_stake(
s.governance,
action["voter_id"],
action["proposal_id"],
action["amount"],
)
# Advance conviction voting
cv_tick(s.governance)
s = sync_metrics(s)
return ("mycofi", s)
def s_apply_price_shock(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
"""Apply sudden price shocks (stress scenarios)."""
s = deepcopy(state["mycofi"])
shock = signal.get("price_shock")
if shock is None or s.crosschain is None:
return ("mycofi", s)
if shock["pattern"] == "eth_crash":
multiplier = shock["multiplier"]
old_total = s.crosschain.hub.total_collateral_usd
# Apply to all ETH-derivative assets
for chain, spoke in s.crosschain.hub.spokes.items():
for asset in spoke.accepted_assets:
if asset.symbol != "USDC":
asset.price *= multiplier
spoke._recalculate_value()
s.crosschain.hub._recalculate_total()
new_total = s.crosschain.hub.total_collateral_usd
if s.tranche_system and new_total < old_total:
apply_loss(s.tranche_system, old_total - new_total)
s.tranche_system.total_collateral = new_total
s = sync_metrics(s)
return ("mycofi", s)
def s_apply_bank_run(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
"""Apply bank run redemptions."""
s = deepcopy(state["mycofi"])
run = signal.get("bank_run")
if run is None or s.tranche_system is None:
return ("mycofi", s)
intensity = run["intensity"]
# Everyone tries to redeem — senior first (they're most risk-averse)
for tranche in ["senior", "mezzanine", "junior"]:
ts = getattr(s.tranche_system, tranche)
amount = ts.supply * intensity
if amount > 0:
redeem_tranche(s.tranche_system, tranche, amount)
s = sync_metrics(s)
return ("mycofi", s)
def s_check_liquidations(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
"""Check and execute liquidations if needed."""
s = deepcopy(state["mycofi"])
if s.tranche_system is None:
return ("mycofi", s)
liquidations = check_liquidation(s.tranche_system)
# Auto-liquidate mezzanine to protect senior
if liquidations["mezzanine"] and s.tranche_system.mezzanine.supply > 0:
redeem_tranche(
s.tranche_system, "mezzanine",
s.tranche_system.mezzanine.supply * 0.5, # Liquidate 50%
)
s = sync_metrics(s)
return ("mycofi", s)

View File

@ -0,0 +1 @@
"""Cross-chain simulation layer for hub-and-spoke architecture."""

368
src/crosschain/hub_spoke.py Normal file
View File

@ -0,0 +1,368 @@
"""Hub-and-spoke cross-chain architecture simulation.
Models the multi-chain deployment:
Hub (Base): Registry, bonding curve, tranche manager, treasury
Spokes (Arbitrum, Optimism, Ethereum, ...): Collateral vaults
CCIP messages carry:
- Deposit reports (spoke hub)
- State sync (hub spokes)
- Rebalancing triggers (hub spokes)
- Governance updates (hub spokes)
"""
import numpy as np
from dataclasses import dataclass, field
from typing import Optional
# ---------- Collateral Assets ----------
@dataclass
class StakingAsset:
"""A staking asset available on a specific chain."""
symbol: str # e.g., "stETH", "rETH", "cbETH"
chain: str # e.g., "ethereum", "arbitrum"
price: float = 1.0 # USD price from oracle
staking_apy: float = 0.04 # Annualized staking yield
weight: float = 1.0 # Governance-set weight in bonding curve
risk_score: float = 0.1 # 0-1 risk score (0 = safest)
# ---------- CCIP Messages ----------
@dataclass
class CCIPMessage:
"""Simulated CCIP cross-chain message."""
msg_type: str # deposit_report, state_sync, rebalance, governance
source_chain: str
dest_chain: str
payload: dict
timestamp: float
latency: float = 0.0 # Simulated delivery delay
delivered: bool = False
delivery_time: float = 0.0
# ---------- Spoke (per-chain vault) ----------
@dataclass
class SpokeVault:
"""A collateral vault on a spoke chain."""
chain: str
accepted_assets: list[StakingAsset] = field(default_factory=list)
balances: dict[str, float] = field(default_factory=dict) # asset_symbol -> quantity
total_value_usd: float = 0.0
pending_reports: list[CCIPMessage] = field(default_factory=list)
last_sync_time: float = 0.0
def deposit(self, asset_symbol: str, amount: float, timestamp: float) -> CCIPMessage:
"""Deposit an asset and generate a CCIP report to hub."""
self.balances[asset_symbol] = self.balances.get(asset_symbol, 0.0) + amount
# Recalculate total value
self._recalculate_value()
# Generate CCIP message to hub
msg = CCIPMessage(
msg_type="deposit_report",
source_chain=self.chain,
dest_chain="base",
payload={
"asset": asset_symbol,
"amount": amount,
"chain_total_value": self.total_value_usd,
"balances": dict(self.balances),
},
timestamp=timestamp,
)
self.pending_reports.append(msg)
return msg
def withdraw(self, asset_symbol: str, amount: float, timestamp: float) -> Optional[CCIPMessage]:
"""Withdraw an asset (after hub confirmation)."""
available = self.balances.get(asset_symbol, 0.0)
actual = min(amount, available)
if actual <= 0:
return None
self.balances[asset_symbol] -= actual
self._recalculate_value()
msg = CCIPMessage(
msg_type="deposit_report", # Negative deposit
source_chain=self.chain,
dest_chain="base",
payload={
"asset": asset_symbol,
"amount": -actual,
"chain_total_value": self.total_value_usd,
"balances": dict(self.balances),
},
timestamp=timestamp,
)
self.pending_reports.append(msg)
return msg
def apply_staking_yield(self, dt: float) -> float:
"""Apply staking yield to all assets in the vault."""
total_yield = 0.0
for asset in self.accepted_assets:
qty = self.balances.get(asset.symbol, 0.0)
if qty > 0:
yield_amount = qty * asset.staking_apy * dt
self.balances[asset.symbol] += yield_amount
total_yield += yield_amount * asset.price
self._recalculate_value()
return total_yield
def _recalculate_value(self) -> None:
"""Recalculate total USD value from balances and prices."""
total = 0.0
for asset in self.accepted_assets:
qty = self.balances.get(asset.symbol, 0.0)
total += qty * asset.price * asset.weight
self.total_value_usd = total
# ---------- Hub (Base chain) ----------
@dataclass
class HubRegistry:
"""The hub on Base chain that maintains global state."""
spokes: dict[str, SpokeVault] = field(default_factory=dict)
global_collateral: dict[str, float] = field(default_factory=dict) # asset -> total qty across chains
total_collateral_usd: float = 0.0
all_assets: dict[str, StakingAsset] = field(default_factory=dict) # symbol -> asset
message_queue: list[CCIPMessage] = field(default_factory=list)
processed_messages: list[CCIPMessage] = field(default_factory=list)
time: float = 0.0
def register_spoke(self, spoke: SpokeVault) -> None:
"""Register a new spoke chain."""
self.spokes[spoke.chain] = spoke
for asset in spoke.accepted_assets:
self.all_assets[asset.symbol] = asset
def process_messages(self, current_time: float) -> list[CCIPMessage]:
"""Process all pending CCIP messages from spokes."""
delivered = []
for spoke in self.spokes.values():
for msg in spoke.pending_reports:
# Simulate latency
if not msg.delivered:
msg.latency = _simulate_ccip_latency(msg.source_chain)
msg.delivery_time = msg.timestamp + msg.latency
if msg.delivery_time <= current_time and not msg.delivered:
msg.delivered = True
self._handle_message(msg)
delivered.append(msg)
# Clear delivered messages
spoke.pending_reports = [m for m in spoke.pending_reports if not m.delivered]
self.time = current_time
self.processed_messages.extend(delivered)
return delivered
def _handle_message(self, msg: CCIPMessage) -> None:
"""Handle a delivered CCIP message."""
if msg.msg_type == "deposit_report":
# Update global collateral tracking
payload = msg.payload
asset = payload["asset"]
self.global_collateral[asset] = sum(
spoke.balances.get(asset, 0.0)
for spoke in self.spokes.values()
)
self._recalculate_total()
def _recalculate_total(self) -> None:
"""Recalculate total collateral value across all spokes."""
self.total_collateral_usd = sum(
spoke.total_value_usd for spoke in self.spokes.values()
)
def broadcast_state_sync(self, current_time: float) -> list[CCIPMessage]:
"""Send state sync to all spokes."""
messages = []
for chain, spoke in self.spokes.items():
msg = CCIPMessage(
msg_type="state_sync",
source_chain="base",
dest_chain=chain,
payload={
"total_collateral_usd": self.total_collateral_usd,
"global_collateral": dict(self.global_collateral),
},
timestamp=current_time,
)
messages.append(msg)
spoke.last_sync_time = current_time
return messages
# ---------- Full Cross-Chain System ----------
@dataclass
class CrossChainSystem:
"""The complete cross-chain bonding curve system."""
hub: HubRegistry = field(default_factory=HubRegistry)
time: float = 0.0
message_log: list[CCIPMessage] = field(default_factory=list)
total_messages_sent: int = 0
total_messages_delivered: int = 0
total_yield_generated: float = 0.0
def create_default_system() -> CrossChainSystem:
"""Create a default cross-chain system with common chains."""
system = CrossChainSystem()
# Define staking assets per chain
chains = {
"ethereum": [
StakingAsset("stETH", "ethereum", price=2400, staking_apy=0.035, weight=1.0, risk_score=0.05),
StakingAsset("rETH", "ethereum", price=2420, staking_apy=0.032, weight=0.95, risk_score=0.08),
StakingAsset("cbETH", "ethereum", price=2390, staking_apy=0.030, weight=0.90, risk_score=0.10),
],
"arbitrum": [
StakingAsset("wstETH", "arbitrum", price=2410, staking_apy=0.035, weight=1.0, risk_score=0.07),
StakingAsset("rETH", "arbitrum", price=2420, staking_apy=0.032, weight=0.95, risk_score=0.10),
],
"optimism": [
StakingAsset("wstETH", "optimism", price=2410, staking_apy=0.035, weight=1.0, risk_score=0.07),
StakingAsset("sfrxETH", "optimism", price=2380, staking_apy=0.040, weight=0.85, risk_score=0.15),
],
"base": [
StakingAsset("cbETH", "base", price=2390, staking_apy=0.030, weight=0.90, risk_score=0.06),
StakingAsset("USDC", "base", price=1.0, staking_apy=0.0, weight=1.0, risk_score=0.02),
],
"polygon": [
StakingAsset("stMATIC", "polygon", price=0.45, staking_apy=0.045, weight=0.70, risk_score=0.20),
StakingAsset("USDC", "polygon", price=1.0, staking_apy=0.0, weight=1.0, risk_score=0.05),
],
}
for chain_name, assets in chains.items():
spoke = SpokeVault(chain=chain_name, accepted_assets=assets)
system.hub.register_spoke(spoke)
return system
def simulate_deposit(
system: CrossChainSystem,
chain: str,
asset_symbol: str,
amount: float,
timestamp: float,
) -> CCIPMessage:
"""Simulate a user depositing an asset on a spoke chain."""
spoke = system.hub.spokes.get(chain)
if spoke is None:
raise ValueError(f"Unknown chain: {chain}")
msg = spoke.deposit(asset_symbol, amount, timestamp)
system.total_messages_sent += 1
system.message_log.append(msg)
return msg
def tick(
system: CrossChainSystem,
dt: float,
) -> dict:
"""Advance the cross-chain system by dt (in years for APY calculations).
Returns metrics about what happened this tick.
"""
system.time += dt
# Apply staking yield on all spokes
total_yield = 0.0
for spoke in system.hub.spokes.values():
total_yield += spoke.apply_staking_yield(dt)
system.total_yield_generated += total_yield
# Process pending CCIP messages
delivered = system.hub.process_messages(system.time)
system.total_messages_delivered += len(delivered)
# Periodic state sync (every ~1 day = 1/365 year)
sync_messages = system.hub.broadcast_state_sync(system.time)
system.total_messages_sent += len(sync_messages)
system.message_log.extend(sync_messages)
return {
"time": system.time,
"yield_this_tick": total_yield,
"messages_delivered": len(delivered),
"total_collateral_usd": system.hub.total_collateral_usd,
"per_chain": {
chain: spoke.total_value_usd
for chain, spoke in system.hub.spokes.items()
},
}
def apply_price_shock(
system: CrossChainSystem,
asset_symbol: str,
price_multiplier: float,
) -> None:
"""Apply a price shock to an asset across all chains (for stress testing)."""
for chain, spoke in system.hub.spokes.items():
for asset in spoke.accepted_assets:
if asset.symbol == asset_symbol:
asset.price *= price_multiplier
spoke._recalculate_value()
system.hub._recalculate_total()
def get_crosschain_metrics(system: CrossChainSystem) -> dict:
"""Get comprehensive cross-chain metrics."""
return {
"time": system.time,
"total_collateral_usd": system.hub.total_collateral_usd,
"total_messages_sent": system.total_messages_sent,
"total_messages_delivered": system.total_messages_delivered,
"total_yield_generated": system.total_yield_generated,
"chains": {
chain: {
"total_value_usd": spoke.total_value_usd,
"assets": {
asset.symbol: {
"balance": spoke.balances.get(asset.symbol, 0.0),
"price": asset.price,
"value": spoke.balances.get(asset.symbol, 0.0) * asset.price,
"apy": asset.staking_apy,
}
for asset in spoke.accepted_assets
},
}
for chain, spoke in system.hub.spokes.items()
},
"global_collateral": dict(system.hub.global_collateral),
}
def _simulate_ccip_latency(source_chain: str) -> float:
"""Simulate CCIP message latency based on source chain.
Returns latency in years (for consistency with dt).
Real latencies: ~5 min for L2s, ~20 min for L1.
"""
# In years: 5 min ≈ 9.5e-6 years, 20 min ≈ 3.8e-5 years
base_latencies = {
"ethereum": 3.8e-5, # ~20 min
"arbitrum": 9.5e-6, # ~5 min
"optimism": 9.5e-6,
"base": 5.7e-6, # ~3 min (same operator)
"polygon": 1.9e-5, # ~10 min
}
base = base_latencies.get(source_chain, 1.9e-5)
# Add some randomness
return base * (0.5 + np.random.random())

View File

@ -0,0 +1,281 @@
"""Conviction voting governance for parameter management.
Ported from the conviction/ cadCAD research repo. Conviction voting
provides continuous signal-weighted governance where participants stake
tokens on proposals, and conviction accumulates over time.
Core formula: y_{t+1} = α × y_t + x_t
Trigger: y*(r) = ρS / ((1-α)(β - r/R)²)
Used here to govern:
- Collateral weights (which assets accepted, at what weights)
- Tranche parameters (ratios, yield targets)
- Risk parameters (liquidation thresholds)
- New chain onboarding decisions
"""
import numpy as np
from dataclasses import dataclass, field
@dataclass
class ConvictionParams:
"""Parameters for the conviction voting system."""
alpha: float = 0.9 # Decay factor (0.5 ≤ α < 1)
beta: float = 0.2 # Maximum share of funds per proposal
rho: float = 0.0025 # Trigger function scale factor
min_age: int = 3 # Minimum epochs before a proposal can pass
@property
def half_life(self) -> float:
"""Half-life in epochs: T_half = -ln(2)/ln(α)"""
if self.alpha <= 0 or self.alpha >= 1:
return float('inf')
return -np.log(2) / np.log(self.alpha)
@classmethod
def from_half_life(cls, half_life: float, **kwargs) -> 'ConvictionParams':
"""Create params from half-life instead of alpha."""
alpha = np.exp(-np.log(2) / half_life)
return cls(alpha=alpha, **kwargs)
@dataclass
class Proposal:
"""A governance proposal."""
id: str
title: str
description: str = ""
funds_requested: float = 0.0 # As fraction of total funds (for trigger)
proposal_type: str = "parameter_change" # parameter_change, chain_onboard, weight_update
parameter_target: str = "" # Which parameter to change
parameter_value: float = 0.0 # Proposed new value
status: str = "candidate" # candidate, active, passed, failed, killed
age: int = 0
total_conviction: float = 0.0
trigger: float = 0.0
@dataclass
class Voter:
"""A participant in conviction voting."""
id: str
holdings: float = 0.0
sentiment: float = 0.5 # 0-1 scale
stakes: dict[str, float] = field(default_factory=dict) # proposal_id -> tokens staked
convictions: dict[str, float] = field(default_factory=dict) # proposal_id -> conviction
@dataclass
class ConvictionSystem:
"""The conviction voting governance system."""
params: ConvictionParams
proposals: dict[str, Proposal] = field(default_factory=dict)
voters: dict[str, Voter] = field(default_factory=dict)
total_supply: float = 100_000.0
total_funds: float = 50_000.0
epoch: int = 0
passed_proposals: list[Proposal] = field(default_factory=list)
def trigger_threshold(
requested_share: float,
supply: float,
params: ConvictionParams,
) -> float:
"""Calculate the conviction threshold for a proposal to pass.
τ = ρS / ((1-α)(β - share)²)
"""
if requested_share >= params.beta:
return float('inf') # Can't request more than beta
return params.rho * supply / ((1 - params.alpha) * (params.beta - requested_share) ** 2)
def update_conviction(
current_conviction: float,
staked_tokens: float,
alpha: float,
) -> float:
"""Update conviction for one voter-proposal pair.
y_{t+1} = α × y_t + x_t
"""
return alpha * current_conviction + staked_tokens
def max_conviction(staked_tokens: float, alpha: float) -> float:
"""Maximum possible conviction (asymptotic limit).
y_max = x / (1 - α)
"""
if alpha >= 1:
return float('inf')
return staked_tokens / (1 - alpha)
def conviction_at_time(
staked_tokens: float,
alpha: float,
epochs: int,
initial_conviction: float = 0.0,
) -> float:
"""Explicit formula for conviction at time t.
y_t = α^t × y_0 + x × (1 - α^t) / (1 - α)
"""
alpha_t = alpha ** epochs
return alpha_t * initial_conviction + staked_tokens * (1 - alpha_t) / (1 - alpha)
def epochs_to_fraction(
target_fraction: float,
alpha: float,
) -> float:
"""Number of epochs to reach a fraction of max conviction.
T = ln(1 - fraction) / ln(α)
"""
if target_fraction >= 1.0 or target_fraction <= 0.0:
return float('inf')
return np.log(1 - target_fraction) / np.log(alpha)
def stake(
system: ConvictionSystem,
voter_id: str,
proposal_id: str,
amount: float,
) -> ConvictionSystem:
"""Stake tokens on a proposal."""
voter = system.voters.get(voter_id)
proposal = system.proposals.get(proposal_id)
if voter is None or proposal is None:
return system
if proposal.status != "candidate":
return system
# Check available tokens
total_staked = sum(voter.stakes.values())
available = voter.holdings - total_staked
actual = min(amount, available)
if actual <= 0:
return system
voter.stakes[proposal_id] = voter.stakes.get(proposal_id, 0.0) + actual
return system
def unstake(
system: ConvictionSystem,
voter_id: str,
proposal_id: str,
amount: float,
) -> ConvictionSystem:
"""Unstake tokens from a proposal."""
voter = system.voters.get(voter_id)
if voter is None or proposal_id not in voter.stakes:
return system
actual = min(amount, voter.stakes[proposal_id])
voter.stakes[proposal_id] -= actual
if voter.stakes[proposal_id] <= 0:
del voter.stakes[proposal_id]
return system
def tick(system: ConvictionSystem) -> ConvictionSystem:
"""Advance one epoch: update convictions, check triggers.
Returns updated system with any newly passed proposals.
"""
system.epoch += 1
p = system.params
# Update conviction for each voter-proposal pair
for voter in system.voters.values():
for proposal_id, staked in voter.stakes.items():
old_conviction = voter.convictions.get(proposal_id, 0.0)
voter.convictions[proposal_id] = update_conviction(
old_conviction, staked, p.alpha
)
# Aggregate total conviction per proposal and check triggers
for prop in system.proposals.values():
if prop.status != "candidate":
continue
prop.age += 1
prop.total_conviction = sum(
v.convictions.get(prop.id, 0.0)
for v in system.voters.values()
)
# Compute trigger threshold
prop.trigger = trigger_threshold(
prop.funds_requested, system.total_supply, p
)
# Check if proposal passes
if prop.age >= p.min_age and prop.total_conviction >= prop.trigger:
prop.status = "passed"
system.passed_proposals.append(prop)
return system
def generate_conviction_curves(
staked_tokens: float,
alpha: float,
epochs: int = 100,
) -> dict[str, np.ndarray]:
"""Generate charging and discharging curves for visualization."""
charge = np.array([
conviction_at_time(staked_tokens, alpha, t)
for t in range(epochs)
])
# Discharge: start from max conviction, remove stake
y_max = max_conviction(staked_tokens, alpha)
discharge = np.array([
y_max * (alpha ** t)
for t in range(epochs)
])
t = np.arange(epochs)
return {
"time": t,
"charge": charge,
"discharge": discharge,
"max": np.full(epochs, y_max),
}
def get_governance_metrics(system: ConvictionSystem) -> dict:
"""Get comprehensive governance metrics."""
active_proposals = [p for p in system.proposals.values() if p.status == "candidate"]
total_staked = sum(
sum(v.stakes.values()) for v in system.voters.values()
)
return {
"epoch": system.epoch,
"total_supply": system.total_supply,
"total_staked": total_staked,
"staking_ratio": total_staked / system.total_supply if system.total_supply > 0 else 0,
"active_proposals": len(active_proposals),
"passed_proposals": len(system.passed_proposals),
"total_voters": len(system.voters),
"proposals": {
p.id: {
"title": p.title,
"conviction": p.total_conviction,
"trigger": p.trigger,
"progress": p.total_conviction / p.trigger if p.trigger > 0 and p.trigger != float('inf') else 0,
"age": p.age,
"status": p.status,
}
for p in system.proposals.values()
},
}

View File

@ -0,0 +1,350 @@
"""Risk-tranched stablecoin issuance from staked collateral.
Three tranches from the same multi-asset collateral pool:
- Senior (myUSD-S): Lowest risk, first claim on collateral, lowest yield
- Mezzanine (myUSD-M): Medium risk, second claim, medium yield
- Junior/Equity ($MYCO): Highest risk, residual claim, absorbs volatility
Yield waterfall: staking rewards flow Senior Mezzanine Junior.
Loss waterfall: losses absorbed Junior Mezzanine Senior.
"""
import numpy as np
from dataclasses import dataclass, field
@dataclass
class TrancheParams:
"""Parameters for the risk tranche system."""
# Collateral ratios (minimum ratio of collateral value to tranche value)
senior_collateral_ratio: float = 1.5 # 150% overcollateralized
mezzanine_collateral_ratio: float = 1.2 # 120% collateralized
# Junior has no minimum — it's the equity/residual tranche
# Yield targets (annualized, from staking rewards)
senior_yield_target: float = 0.03 # 3% annual
mezzanine_yield_target: float = 0.08 # 8% annual
# Tranche allocation limits (fraction of total collateral)
max_senior_fraction: float = 0.50 # Max 50% of collateral backs senior
max_mezzanine_fraction: float = 0.30 # Max 30% backs mezzanine
# Junior gets the remainder (min 20%)
# Liquidation thresholds
senior_liquidation_ratio: float = 1.2 # Liquidate if drops below 120%
mezzanine_liquidation_ratio: float = 1.05 # Liquidate if drops below 105%
# Rebalance trigger (deviation from target allocation)
rebalance_threshold: float = 0.05 # 5% deviation triggers rebalance
@dataclass
class TrancheState:
"""State of a single tranche."""
name: str
supply: float = 0.0 # Tokens outstanding
collateral_backing: float = 0.0 # USD value of collateral assigned to this tranche
accrued_yield: float = 0.0 # Yield accumulated but not distributed
cumulative_yield: float = 0.0 # Total yield ever distributed
cumulative_losses: float = 0.0 # Total losses ever absorbed
@property
def collateral_ratio(self) -> float:
"""Current collateral ratio."""
if self.supply == 0:
return float('inf')
return self.collateral_backing / self.supply
@property
def is_healthy(self) -> bool:
"""Whether the tranche is above minimum collateral."""
return self.collateral_ratio > 1.0
@dataclass
class RiskTrancheSystem:
"""The complete risk tranche system."""
params: TrancheParams
senior: TrancheState = field(default_factory=lambda: TrancheState(name="myUSD-S"))
mezzanine: TrancheState = field(default_factory=lambda: TrancheState(name="myUSD-M"))
junior: TrancheState = field(default_factory=lambda: TrancheState(name="$MYCO"))
total_collateral: float = 0.0
total_staking_yield: float = 0.0
time: float = 0.0
@property
def total_supply(self) -> float:
return self.senior.supply + self.mezzanine.supply + self.junior.supply
@property
def system_collateral_ratio(self) -> float:
if self.total_supply == 0:
return float('inf')
return self.total_collateral / self.total_supply
def mint_capacity(system: RiskTrancheSystem) -> dict[str, float]:
"""Calculate how many tokens each tranche can mint given current collateral.
Returns max additional mintable amount for each tranche.
"""
p = system.params
total_c = system.total_collateral
# Senior: max mint = (total_c * max_senior_frac - current_backing) / cr
senior_max_backing = total_c * p.max_senior_fraction
senior_available = max(0, senior_max_backing - system.senior.collateral_backing)
senior_mintable = senior_available / p.senior_collateral_ratio
# Mezzanine: similar
mez_max_backing = total_c * p.max_mezzanine_fraction
mez_available = max(0, mez_max_backing - system.mezzanine.collateral_backing)
mez_mintable = mez_available / p.mezzanine_collateral_ratio
# Junior: residual collateral after senior + mez allocations
junior_backing = total_c - system.senior.collateral_backing - system.mezzanine.collateral_backing
junior_mintable = max(0, junior_backing - system.junior.supply) # 1:1 for junior
return {
"senior": senior_mintable,
"mezzanine": mez_mintable,
"junior": junior_mintable,
}
def deposit_collateral(
system: RiskTrancheSystem,
amount: float,
asset_prices: dict[str, float] | None = None,
) -> RiskTrancheSystem:
"""Add collateral to the system.
Collateral is added to the pool and automatically allocated
to tranches based on target ratios and current state.
"""
system.total_collateral += amount
# Allocate to tranches based on deficit from targets
_rebalance_collateral(system)
return system
def mint_tranche(
system: RiskTrancheSystem,
tranche: str,
amount: float,
) -> tuple[RiskTrancheSystem, float]:
"""Mint tokens from a specific tranche.
Returns (updated_system, actual_minted).
"""
caps = mint_capacity(system)
actual = min(amount, caps.get(tranche, 0.0))
if actual <= 0:
return system, 0.0
p = system.params
if tranche == "senior":
required_collateral = actual * p.senior_collateral_ratio
system.senior.supply += actual
system.senior.collateral_backing += required_collateral
elif tranche == "mezzanine":
required_collateral = actual * p.mezzanine_collateral_ratio
system.mezzanine.supply += actual
system.mezzanine.collateral_backing += required_collateral
elif tranche == "junior":
system.junior.supply += actual
system.junior.collateral_backing += actual # 1:1
return system, actual
def redeem_tranche(
system: RiskTrancheSystem,
tranche: str,
amount: float,
) -> tuple[RiskTrancheSystem, float]:
"""Redeem tranche tokens for collateral.
Returns (updated_system, collateral_returned).
"""
if tranche == "senior":
t = system.senior
elif tranche == "mezzanine":
t = system.mezzanine
else:
t = system.junior
actual = min(amount, t.supply)
if actual <= 0:
return system, 0.0
# Proportional collateral release
collateral_share = actual / t.supply
collateral_returned = t.collateral_backing * collateral_share
t.supply -= actual
t.collateral_backing -= collateral_returned
system.total_collateral -= collateral_returned
return system, collateral_returned
def distribute_yield(
system: RiskTrancheSystem,
yield_amount: float,
dt: float,
) -> RiskTrancheSystem:
"""Distribute staking yield through the waterfall.
Order: Senior gets target yield first, then Mezzanine, then Junior gets residual.
"""
remaining = yield_amount
system.total_staking_yield += yield_amount
# Senior gets their target first
if system.senior.supply > 0:
senior_target = system.senior.supply * system.params.senior_yield_target * dt
senior_gets = min(remaining, senior_target)
system.senior.accrued_yield += senior_gets
system.senior.cumulative_yield += senior_gets
system.senior.collateral_backing += senior_gets
remaining -= senior_gets
# Mezzanine gets their target next
if remaining > 0 and system.mezzanine.supply > 0:
mez_target = system.mezzanine.supply * system.params.mezzanine_yield_target * dt
mez_gets = min(remaining, mez_target)
system.mezzanine.accrued_yield += mez_gets
system.mezzanine.cumulative_yield += mez_gets
system.mezzanine.collateral_backing += mez_gets
remaining -= mez_gets
# Junior gets the residual
if remaining > 0:
system.junior.accrued_yield += remaining
system.junior.cumulative_yield += remaining
system.junior.collateral_backing += remaining
return system
def apply_loss(
system: RiskTrancheSystem,
loss_amount: float,
) -> RiskTrancheSystem:
"""Apply collateral loss through the reverse waterfall.
Order: Junior absorbs first, then Mezzanine, then Senior.
"""
remaining = loss_amount
system.total_collateral -= loss_amount
# Junior absorbs first
junior_can_absorb = system.junior.collateral_backing
junior_absorbs = min(remaining, junior_can_absorb)
system.junior.collateral_backing -= junior_absorbs
system.junior.cumulative_losses += junior_absorbs
remaining -= junior_absorbs
# Mezzanine absorbs next
if remaining > 0:
mez_can_absorb = system.mezzanine.collateral_backing
mez_absorbs = min(remaining, mez_can_absorb)
system.mezzanine.collateral_backing -= mez_absorbs
system.mezzanine.cumulative_losses += mez_absorbs
remaining -= mez_absorbs
# Senior absorbs last (this is the doomsday scenario)
if remaining > 0:
system.senior.collateral_backing -= remaining
system.senior.cumulative_losses += remaining
return system
def check_liquidation(system: RiskTrancheSystem) -> dict[str, bool]:
"""Check if any tranche needs liquidation."""
p = system.params
return {
"senior": (
system.senior.supply > 0
and system.senior.collateral_ratio < p.senior_liquidation_ratio
),
"mezzanine": (
system.mezzanine.supply > 0
and system.mezzanine.collateral_ratio < p.mezzanine_liquidation_ratio
),
"junior": (
system.junior.supply > 0
and system.junior.collateral_ratio < 0.5 # Junior liquidates at 50%
),
}
def _rebalance_collateral(system: RiskTrancheSystem) -> None:
"""Internal: rebalance collateral allocation across tranches."""
total = system.total_collateral
if total == 0:
return
p = system.params
# Target: senior gets enough for its CR, mez gets enough, junior gets rest
senior_needs = system.senior.supply * p.senior_collateral_ratio
mez_needs = system.mezzanine.supply * p.mezzanine_collateral_ratio
junior_needs = system.junior.supply # 1:1
total_needs = senior_needs + mez_needs + junior_needs
if total_needs == 0:
return
# If enough collateral, give everyone what they need
if total >= total_needs:
system.senior.collateral_backing = senior_needs
system.mezzanine.collateral_backing = mez_needs
# Junior gets all the excess
system.junior.collateral_backing = total - senior_needs - mez_needs
else:
# Not enough: allocate proportionally with priority
# Senior gets priority
system.senior.collateral_backing = min(senior_needs, total)
remaining = total - system.senior.collateral_backing
system.mezzanine.collateral_backing = min(mez_needs, remaining)
remaining -= system.mezzanine.collateral_backing
system.junior.collateral_backing = remaining
def get_tranche_metrics(system: RiskTrancheSystem) -> dict:
"""Get comprehensive metrics for all tranches."""
return {
"total_collateral": system.total_collateral,
"system_cr": system.system_collateral_ratio,
"total_supply": system.total_supply,
"senior": {
"supply": system.senior.supply,
"collateral": system.senior.collateral_backing,
"cr": system.senior.collateral_ratio,
"yield": system.senior.cumulative_yield,
"losses": system.senior.cumulative_losses,
"healthy": system.senior.is_healthy,
},
"mezzanine": {
"supply": system.mezzanine.supply,
"collateral": system.mezzanine.collateral_backing,
"cr": system.mezzanine.collateral_ratio,
"yield": system.mezzanine.cumulative_yield,
"losses": system.mezzanine.cumulative_losses,
"healthy": system.mezzanine.is_healthy,
},
"junior": {
"supply": system.junior.supply,
"collateral": system.junior.collateral_backing,
"cr": system.junior.collateral_ratio,
"yield": system.junior.cumulative_yield,
"losses": system.junior.cumulative_losses,
"healthy": system.junior.is_healthy,
},
}