diff --git a/dashboard/app.py b/dashboard/app.py
index b5d6171..261337d 100644
--- a/dashboard/app.py
+++ b/dashboard/app.py
@@ -11,24 +11,34 @@ st.set_page_config(
layout="wide",
)
-st.title("MYCO Bonding Surface Dashboard")
-st.caption("Interactive simulations for the multi-asset bonding curve with CRDT-native primitives.")
+st.title("MycoFi Protocol Dashboard")
+st.caption("Multi-chain bonding surfaces, risk tranches, conviction governance, and cadCAD simulations.")
# Initialize default config in session_state if not present
if "myco_config" not in st.session_state:
from src.composed.myco_surface import MycoSystemConfig
st.session_state["myco_config"] = MycoSystemConfig()
-tab_config, tab_launch, tab_dca, tab_signal, tab_stress, tab_crdt = st.tabs([
+(
+ tab_config, tab_launch, tab_dca, tab_signal, tab_stress, tab_crdt,
+ tab_crosschain, tab_tranches, tab_governance, tab_cadcad,
+) = st.tabs([
"System Config",
"Token Launch",
"DCA Explorer",
"Signal Router",
"Stress Tests",
"CRDT Flow",
+ "Cross-Chain",
+ "Risk Tranches",
+ "Governance",
+ "cadCAD Sim",
])
-from dashboard.tabs import system_config, token_launch, dca_explorer, signal_router, stress_tests, crdt_flow
+from dashboard.tabs import (
+ system_config, token_launch, dca_explorer, signal_router,
+ stress_tests, crdt_flow, crosschain, tranches, governance, cadcad_sim,
+)
with tab_config:
system_config.render()
@@ -47,3 +57,15 @@ with tab_stress:
with tab_crdt:
crdt_flow.render()
+
+with tab_crosschain:
+ crosschain.render()
+
+with tab_tranches:
+ tranches.render()
+
+with tab_governance:
+ governance.render()
+
+with tab_cadcad:
+ cadcad_sim.render()
diff --git a/dashboard/tabs/cadcad_sim.py b/dashboard/tabs/cadcad_sim.py
new file mode 100644
index 0000000..a0287f2
--- /dev/null
+++ b/dashboard/tabs/cadcad_sim.py
@@ -0,0 +1,171 @@
+"""cadCAD Monte Carlo simulation dashboard tab."""
+
+import streamlit as st
+import plotly.graph_objects as go
+from plotly.subplots import make_subplots
+import numpy as np
+
+
+def render():
+ st.header("cadCAD Monte Carlo Simulation")
+ st.caption("Full system simulation with stochastic processes, parameter sweeps, and stress testing.")
+
+ scenario = st.selectbox("Scenario", [
+ "Normal Growth",
+ "ETH Crash Stress Test",
+ "Parameter Sweep",
+ ])
+
+ col1, col2, col3 = st.columns(3)
+ with col1:
+ timesteps = st.slider("Days", 30, 730, 180, key="cd_days")
+ with col2:
+ runs = st.slider("Monte Carlo Runs", 1, 10, 3, key="cd_runs")
+ with col3:
+ if scenario == "ETH Crash Stress Test":
+ crash_mag = st.slider("Crash Magnitude %", 20, 80, 50, 5, key="cd_crash")
+ elif scenario == "Parameter Sweep":
+ st.info("Sweeps: volatility × growth × redemption")
+
+ if st.button("Run cadCAD Simulation", key="cd_run"):
+ with st.spinner(f"Running {scenario} ({runs} runs, {timesteps} days)..."):
+ try:
+ from src.cadcad.config import (
+ scenario_normal_growth,
+ scenario_stress_test,
+ scenario_parameter_sweep,
+ )
+
+ if scenario == "Normal Growth":
+ df = scenario_normal_growth(timesteps=timesteps, runs=runs)
+ elif scenario == "ETH Crash Stress Test":
+ df = scenario_stress_test(timesteps=timesteps, runs=runs)
+ else:
+ df = scenario_parameter_sweep(timesteps=timesteps, runs=1)
+
+ st.session_state["cadcad_results"] = df
+ st.success(f"Simulation complete: {len(df)} data points across {runs} run(s)")
+
+ except Exception as e:
+ st.error(f"Simulation error: {e}")
+ st.info("Make sure cadCAD is installed: `pip install cadCAD`")
+ return
+
+ # Display results
+ if "cadcad_results" in st.session_state:
+ df = st.session_state["cadcad_results"]
+ _plot_overview(df)
+ _plot_tranches(df)
+ _plot_crosschain(df)
+
+
+def _plot_overview(df):
+ """System overview: supply, collateral, price, CR."""
+ st.subheader("System Overview")
+
+ fig = make_subplots(
+ rows=2, cols=2,
+ subplot_titles=("Total Supply", "Total Collateral", "MYCO Price", "System CR"),
+ )
+
+ runs = df["run"].unique()
+ colors = ["#2dd4bf", "#f59e0b", "#ef4444", "#a78bfa", "#60a5fa"]
+
+ for i, run in enumerate(runs):
+ rd = df[df["run"] == run]
+ color = colors[i % len(colors)]
+ opacity = 0.8 if len(runs) == 1 else 0.4
+
+ fig.add_trace(go.Scatter(
+ x=rd["timestep"], y=rd["total_supply"],
+ name=f"Run {run}" if i == 0 else None, showlegend=(i == 0),
+ line=dict(color=color, width=1), opacity=opacity,
+ ), row=1, col=1)
+
+ fig.add_trace(go.Scatter(
+ x=rd["timestep"], y=rd["total_collateral_usd"],
+ showlegend=False, line=dict(color=color, width=1), opacity=opacity,
+ ), row=1, col=2)
+
+ fig.add_trace(go.Scatter(
+ x=rd["timestep"], y=rd["myco_price"],
+ showlegend=False, line=dict(color=color, width=1), opacity=opacity,
+ ), row=2, col=1)
+
+ fig.add_trace(go.Scatter(
+ x=rd["timestep"], y=rd["system_cr"].clip(upper=5),
+ showlegend=False, line=dict(color=color, width=1), opacity=opacity,
+ ), row=2, col=2)
+
+ fig.update_layout(template="plotly_dark", height=600, showlegend=True)
+ st.plotly_chart(fig, use_container_width=True)
+
+
+def _plot_tranches(df):
+ """Tranche metrics across runs."""
+ st.subheader("Tranche Dynamics")
+
+ fig = make_subplots(
+ rows=1, cols=3,
+ subplot_titles=("Senior CR", "Mezzanine CR", "Junior CR"),
+ )
+
+ runs = df["run"].unique()
+ for i, run in enumerate(runs):
+ rd = df[df["run"] == run]
+ opacity = 0.6
+
+ for j, (col, color) in enumerate([
+ ("senior_cr", "#10B981"),
+ ("mezzanine_cr", "#F59E0B"),
+ ("junior_cr", "#EF4444"),
+ ]):
+ fig.add_trace(go.Scatter(
+ x=rd["timestep"], y=rd[col].clip(upper=5),
+ showlegend=False, line=dict(color=color, width=1), opacity=opacity,
+ ), row=1, col=j + 1)
+
+ # Add threshold lines
+ fig.add_hline(y=1.5, line_dash="dash", line_color="#10B981", row=1, col=1)
+ fig.add_hline(y=1.2, line_dash="dash", line_color="#F59E0B", row=1, col=2)
+ fig.add_hline(y=1.0, line_dash="dash", line_color="#EF4444", row=1, col=3)
+
+ fig.update_layout(template="plotly_dark", height=350)
+ st.plotly_chart(fig, use_container_width=True)
+
+
+def _plot_crosschain(df):
+ """Cross-chain metrics."""
+ st.subheader("Cross-Chain Activity")
+
+ # Check for per-chain columns
+ chain_cols = [c for c in df.columns if c.startswith("collateral_")]
+ if not chain_cols:
+ st.info("No per-chain data in this simulation.")
+ return
+
+ fig = go.Figure()
+ rd = df[df["run"] == df["run"].iloc[0]] # First run
+
+ for col in chain_cols:
+ chain_name = col.replace("collateral_", "").capitalize()
+ fig.add_trace(go.Scatter(
+ x=rd["timestep"], y=rd[col],
+ name=chain_name, stackgroup="one",
+ ))
+
+ fig.update_layout(
+ title="Collateral by Chain (Run 1)",
+ xaxis_title="Timestep", yaxis_title="USD Value",
+ template="plotly_dark", height=400,
+ )
+ st.plotly_chart(fig, use_container_width=True)
+
+ # Summary stats
+ st.subheader("Summary Statistics")
+ last_step = df[df["timestep"] == df["timestep"].max()]
+ cols = st.columns(4)
+ cols[0].metric("Avg Final Supply", f"{last_step['total_supply'].mean():,.0f}")
+ cols[1].metric("Avg Final Collateral", f"${last_step['total_collateral_usd'].mean():,.0f}")
+ cols[2].metric("Avg System CR", f"{last_step['system_cr'].mean():.2f}")
+ cols[3].metric("Total CCIP Messages", f"{last_step['ccip_messages'].mean():,.0f}")
diff --git a/dashboard/tabs/crosschain.py b/dashboard/tabs/crosschain.py
new file mode 100644
index 0000000..f653061
--- /dev/null
+++ b/dashboard/tabs/crosschain.py
@@ -0,0 +1,187 @@
+"""Cross-chain collateral visualization dashboard tab."""
+
+import streamlit as st
+import plotly.graph_objects as go
+from plotly.subplots import make_subplots
+import numpy as np
+
+
+def render():
+ st.header("Cross-Chain Collateral Map")
+ st.caption("Visualize collateral distribution across chains and staking assets.")
+
+ # Configuration
+ col1, col2 = st.columns(2)
+ with col1:
+ n_days = st.slider("Simulation days", 30, 365, 90, key="cc_days")
+ with col2:
+ eth_vol = st.select_slider(
+ "ETH Volatility",
+ options=[0.2, 0.4, 0.6, 0.8, 1.0],
+ value=0.6,
+ key="cc_vol",
+ )
+
+ if st.button("Run Cross-Chain Simulation", key="cc_run"):
+ with st.spinner("Simulating cross-chain system..."):
+ from src.crosschain.hub_spoke import (
+ create_default_system, simulate_deposit, tick,
+ apply_price_shock, get_crosschain_metrics,
+ )
+
+ system = create_default_system()
+
+ # Bootstrap with deposits
+ deposits = {
+ "ethereum": [("stETH", 100), ("rETH", 50), ("cbETH", 30)],
+ "arbitrum": [("wstETH", 80), ("rETH", 40)],
+ "optimism": [("wstETH", 60), ("sfrxETH", 40)],
+ "base": [("cbETH", 40), ("USDC", 100_000)],
+ "polygon": [("stMATIC", 200_000), ("USDC", 50_000)],
+ }
+
+ for chain, assets in deposits.items():
+ for symbol, qty in assets:
+ simulate_deposit(system, chain, symbol, qty, 0.0)
+ system.hub.process_messages(0.0)
+
+ # Simulate daily
+ dt = 1 / 365
+ history = []
+ for day in range(n_days):
+ # Random deposits
+ if np.random.random() < 0.3:
+ chains = list(system.hub.spokes.keys())
+ chain = np.random.choice(chains)
+ spoke = system.hub.spokes[chain]
+ if spoke.accepted_assets:
+ asset = np.random.choice(spoke.accepted_assets)
+ qty = np.random.lognormal(2, 1) / asset.price
+ simulate_deposit(system, chain, asset.symbol, qty, system.time)
+
+ # Price movements
+ eth_return = np.random.normal(0, eth_vol * np.sqrt(dt))
+ for chain, spoke in system.hub.spokes.items():
+ for asset in spoke.accepted_assets:
+ if asset.symbol != "USDC":
+ asset.price *= (1 + eth_return + np.random.normal(0, 0.01))
+ spoke._recalculate_value()
+ system.hub._recalculate_total()
+
+ metrics = tick(system, dt)
+ metrics["day"] = day
+ history.append(metrics)
+
+ # Visualization
+ _plot_chain_distribution(history, system)
+ _plot_collateral_timeline(history)
+ _plot_asset_breakdown(system)
+
+
+def _plot_chain_distribution(history: list, system):
+ """Stacked area chart of collateral per chain over time."""
+ st.subheader("Collateral by Chain")
+
+ fig = go.Figure()
+ chains = list(system.hub.spokes.keys())
+ colors = {
+ "ethereum": "#627EEA",
+ "arbitrum": "#28A0F0",
+ "optimism": "#FF0420",
+ "base": "#0052FF",
+ "polygon": "#8247E5",
+ }
+
+ days = [h["day"] for h in history]
+ for chain in chains:
+ values = [h["per_chain"].get(chain, 0) for h in history]
+ fig.add_trace(go.Scatter(
+ x=days, y=values,
+ name=chain.capitalize(),
+ stackgroup="one",
+ fillcolor=colors.get(chain, "#888888"),
+ line=dict(width=0.5, color=colors.get(chain, "#888888")),
+ ))
+
+ fig.update_layout(
+ title="Cross-Chain Collateral Distribution",
+ xaxis_title="Day",
+ yaxis_title="USD Value",
+ template="plotly_dark",
+ height=500,
+ )
+ st.plotly_chart(fig, use_container_width=True)
+
+
+def _plot_collateral_timeline(history: list):
+ """Total collateral and yield over time."""
+ st.subheader("Total Collateral & Yield")
+
+ fig = make_subplots(specs=[[{"secondary_y": True}]])
+
+ days = [h["day"] for h in history]
+ total_col = [h["total_collateral_usd"] for h in history]
+ cum_yield = [h["yield_this_tick"] for h in history]
+ cum_yield_sum = np.cumsum(cum_yield)
+
+ fig.add_trace(
+ go.Scatter(x=days, y=total_col, name="Total Collateral", line=dict(color="#2dd4bf", width=2)),
+ secondary_y=False,
+ )
+ fig.add_trace(
+ go.Scatter(x=days, y=cum_yield_sum, name="Cumulative Yield", line=dict(color="#fbbf24", width=2)),
+ secondary_y=True,
+ )
+
+ fig.update_layout(
+ title="System Collateral & Staking Yield",
+ template="plotly_dark",
+ height=400,
+ )
+ fig.update_yaxes(title_text="Collateral (USD)", secondary_y=False)
+ fig.update_yaxes(title_text="Cumulative Yield (USD)", secondary_y=True)
+ st.plotly_chart(fig, use_container_width=True)
+
+
+def _plot_asset_breakdown(system):
+ """Treemap of current asset breakdown."""
+ st.subheader("Current Asset Breakdown")
+
+ labels, parents, values, colors = [], [], [], []
+ chain_colors = {
+ "ethereum": "#627EEA",
+ "arbitrum": "#28A0F0",
+ "optimism": "#FF0420",
+ "base": "#0052FF",
+ "polygon": "#8247E5",
+ }
+
+ for chain, spoke in system.hub.spokes.items():
+ labels.append(chain.capitalize())
+ parents.append("")
+ values.append(spoke.total_value_usd)
+ colors.append(chain_colors.get(chain, "#888"))
+
+ for asset in spoke.accepted_assets:
+ qty = spoke.balances.get(asset.symbol, 0)
+ val = qty * asset.price
+ if val > 0:
+ labels.append(f"{asset.symbol}")
+ parents.append(chain.capitalize())
+ values.append(val)
+ colors.append(chain_colors.get(chain, "#888"))
+
+ fig = go.Figure(go.Treemap(
+ labels=labels,
+ parents=parents,
+ values=values,
+ marker_colors=colors,
+ textinfo="label+value+percent root",
+ texttemplate="%{label}
$%{value:,.0f}
%{percentRoot:.1%}",
+ ))
+ fig.update_layout(
+ title="Collateral Treemap",
+ template="plotly_dark",
+ height=500,
+ )
+ st.plotly_chart(fig, use_container_width=True)
diff --git a/dashboard/tabs/governance.py b/dashboard/tabs/governance.py
new file mode 100644
index 0000000..1eb3ed8
--- /dev/null
+++ b/dashboard/tabs/governance.py
@@ -0,0 +1,190 @@
+"""Conviction voting governance visualization dashboard tab."""
+
+import streamlit as st
+import plotly.graph_objects as go
+from plotly.subplots import make_subplots
+import numpy as np
+
+
+def render():
+ st.header("Conviction Voting Governance")
+ st.caption("Explore conviction voting dynamics for parameter governance.")
+
+ # Parameters
+ col1, col2, col3, col4 = st.columns(4)
+ with col1:
+ half_life = st.slider("Half-life (epochs)", 2, 30, 7, key="cv_hl")
+ with col2:
+ beta = st.slider("Max fund share (β)", 0.05, 0.5, 0.2, 0.05, key="cv_beta")
+ with col3:
+ rho = st.slider("Scale factor (ρ) ×1000", 0.5, 10.0, 2.5, 0.5, key="cv_rho")
+ with col4:
+ n_voters = st.slider("Voters", 5, 50, 20, key="cv_voters")
+
+ rho_actual = rho / 1000
+
+ tab_curves, tab_sim, tab_trigger = st.tabs(["Conviction Curves", "Governance Sim", "Trigger Map"])
+
+ with tab_curves:
+ _render_conviction_curves(half_life)
+
+ with tab_sim:
+ _render_governance_sim(half_life, beta, rho_actual, n_voters)
+
+ with tab_trigger:
+ _render_trigger_map(beta, rho_actual)
+
+
+def _render_conviction_curves(half_life: float):
+ """Visualize conviction charging and discharging."""
+ from src.primitives.conviction import ConvictionParams, generate_conviction_curves
+
+ params = ConvictionParams.from_half_life(half_life)
+
+ st.markdown(f"**α = {params.alpha:.4f}** (half-life = {half_life} epochs)")
+
+ curves = generate_conviction_curves(1000, params.alpha, epochs=100)
+
+ fig = make_subplots(rows=1, cols=2, subplot_titles=("Charging", "Discharging"))
+
+ fig.add_trace(go.Scatter(
+ x=curves["time"], y=curves["charge"],
+ name="Conviction", line=dict(color="#2dd4bf", width=2),
+ ), row=1, col=1)
+ fig.add_trace(go.Scatter(
+ x=curves["time"], y=curves["max"],
+ name="Max", line=dict(color="#fbbf24", dash="dash"),
+ ), row=1, col=1)
+
+ fig.add_trace(go.Scatter(
+ x=curves["time"], y=curves["discharge"],
+ name="Decay", line=dict(color="#ef4444", width=2),
+ ), row=1, col=2)
+
+ fig.update_layout(template="plotly_dark", height=400, showlegend=True)
+ fig.update_xaxes(title_text="Epochs")
+ fig.update_yaxes(title_text="Conviction")
+ st.plotly_chart(fig, use_container_width=True)
+
+ # Key metrics
+ from src.primitives.conviction import epochs_to_fraction, max_conviction
+ col1, col2, col3 = st.columns(3)
+ col1.metric("50% of max", f"{epochs_to_fraction(0.5, params.alpha):.1f} epochs")
+ col2.metric("90% of max", f"{epochs_to_fraction(0.9, params.alpha):.1f} epochs")
+ col3.metric("Max conviction (1000 tokens)", f"{max_conviction(1000, params.alpha):,.0f}")
+
+
+def _render_governance_sim(half_life: float, beta: float, rho: float, n_voters: int):
+ """Run a governance simulation."""
+ if st.button("Run Governance Simulation", key="cv_run"):
+ with st.spinner("Simulating conviction voting..."):
+ from src.primitives.conviction import (
+ ConvictionParams, ConvictionSystem, Proposal, Voter,
+ stake, tick, get_governance_metrics,
+ )
+
+ params = ConvictionParams.from_half_life(half_life, beta=beta, rho=rho)
+ system = ConvictionSystem(params=params)
+
+ # Create voters
+ for i in range(n_voters):
+ holdings = np.random.lognormal(mean=np.log(5000), sigma=1.0)
+ system.voters[f"v{i}"] = Voter(id=f"v{i}", holdings=holdings)
+ system.total_supply = sum(v.holdings for v in system.voters.values())
+
+ # Create proposals
+ proposals = [
+ ("Lower senior CR to 1.3", 0.05),
+ ("Add Scroll chain", 0.08),
+ ("Increase mez yield to 10%", 0.03),
+ ("Add sfrxETH to Arbitrum", 0.02),
+ ]
+ for i, (title, funds) in enumerate(proposals):
+ system.proposals[f"p{i}"] = Proposal(
+ id=f"p{i}", title=title, funds_requested=funds,
+ )
+
+ # Simulate staking & ticking
+ history = []
+ for epoch in range(100):
+ # Some voters stake/unstake
+ for vid, voter in system.voters.items():
+ if np.random.random() < 0.3:
+ candidates = [p for p in system.proposals.values() if p.status == "candidate"]
+ if candidates:
+ prop = np.random.choice(candidates)
+ amt = voter.holdings * np.random.uniform(0.05, 0.3)
+ stake(system, vid, prop.id, amt)
+
+ tick(system)
+ metrics = get_governance_metrics(system)
+ metrics["epoch"] = epoch
+ history.append(metrics)
+
+ _plot_conviction_progress(history, system)
+
+
+def _plot_conviction_progress(history: list, system):
+ """Plot conviction progress toward triggers for each proposal."""
+ fig = go.Figure()
+
+ for prop_id, prop in system.proposals.items():
+ progress = [
+ h["proposals"].get(prop_id, {}).get("progress", 0)
+ for h in history
+ ]
+ epochs = [h["epoch"] for h in history]
+ fig.add_trace(go.Scatter(
+ x=epochs, y=progress,
+ name=f"{prop.title} ({'PASSED' if prop.status == 'passed' else prop.status})",
+ line=dict(width=2),
+ ))
+
+ fig.add_hline(y=1.0, line_dash="dash", line_color="white",
+ annotation_text="Trigger threshold")
+
+ fig.update_layout(
+ title="Proposal Conviction Progress",
+ xaxis_title="Epoch",
+ yaxis_title="Progress (conviction / trigger)",
+ template="plotly_dark",
+ height=500,
+ )
+ st.plotly_chart(fig, use_container_width=True)
+
+ # Summary
+ for prop in system.proposals.values():
+ status_emoji = {"passed": "✅", "candidate": "⏳", "failed": "❌"}.get(prop.status, "❓")
+ st.write(f"{status_emoji} **{prop.title}** — {prop.status} (age: {prop.age})")
+
+
+def _render_trigger_map(beta: float, rho: float):
+ """2D contour map of trigger function."""
+ from src.primitives.conviction import trigger_threshold, ConvictionParams
+
+ params = ConvictionParams(beta=beta, rho=rho)
+ supply_range = np.linspace(10_000, 1_000_000, 100)
+ share_range = np.linspace(0.001, beta - 0.001, 100)
+
+ Z = np.zeros((len(share_range), len(supply_range)))
+ for i, share in enumerate(share_range):
+ for j, supply in enumerate(supply_range):
+ Z[i, j] = np.log10(trigger_threshold(share, supply, params))
+
+ fig = go.Figure(go.Contour(
+ z=Z,
+ x=supply_range,
+ y=share_range,
+ colorscale="Viridis",
+ colorbar=dict(title="log₁₀(trigger)"),
+ ))
+
+ fig.update_layout(
+ title="Trigger Function Map",
+ xaxis_title="Total Supply",
+ yaxis_title="Share of Funds Requested",
+ template="plotly_dark",
+ height=500,
+ )
+ st.plotly_chart(fig, use_container_width=True)
+ st.caption("Higher trigger = more conviction needed. Requesting closer to β makes it exponentially harder.")
diff --git a/dashboard/tabs/tranches.py b/dashboard/tabs/tranches.py
new file mode 100644
index 0000000..d405d3a
--- /dev/null
+++ b/dashboard/tabs/tranches.py
@@ -0,0 +1,216 @@
+"""Risk tranche visualization dashboard tab."""
+
+import streamlit as st
+import plotly.graph_objects as go
+from plotly.subplots import make_subplots
+import numpy as np
+
+
+def render():
+ st.header("Risk Tranches")
+ st.caption("Senior / Mezzanine / Junior tranche dynamics with yield waterfall and stress testing.")
+
+ # Tranche parameters
+ st.subheader("Tranche Configuration")
+ col1, col2, col3 = st.columns(3)
+ with col1:
+ st.markdown("**Senior (myUSD-S)**")
+ senior_cr = st.slider("Min Collateral Ratio", 1.1, 2.0, 1.5, 0.05, key="sr_cr")
+ senior_yield = st.slider("Target Yield %", 1.0, 6.0, 3.0, 0.5, key="sr_y")
+ with col2:
+ st.markdown("**Mezzanine (myUSD-M)**")
+ mez_cr = st.slider("Min Collateral Ratio", 1.0, 1.5, 1.2, 0.05, key="mz_cr")
+ mez_yield = st.slider("Target Yield %", 4.0, 15.0, 8.0, 0.5, key="mz_y")
+ with col3:
+ st.markdown("**Junior ($MYCO)**")
+ st.info("First-loss position. Gets residual yield after Senior & Mez are paid.")
+
+ # Simulation controls
+ st.subheader("Simulation")
+ col1, col2, col3 = st.columns(3)
+ with col1:
+ initial_collateral = st.number_input("Initial Collateral ($)", 100_000, 10_000_000, 1_000_000, 100_000)
+ with col2:
+ staking_apy = st.slider("Avg Staking APY %", 2.0, 8.0, 4.0, 0.5)
+ with col3:
+ n_days = st.slider("Days", 30, 730, 365, key="tr_days")
+
+ # Stress scenario
+ stress = st.selectbox("Stress Scenario", [
+ "None",
+ "30% ETH crash at day 60",
+ "50% ETH crash at day 60",
+ "Slow bleed (1% daily for 30 days)",
+ ])
+
+ if st.button("Run Tranche Simulation", key="tr_run"):
+ with st.spinner("Simulating tranches..."):
+ from src.primitives.risk_tranching import (
+ RiskTrancheSystem, TrancheParams,
+ deposit_collateral, mint_tranche,
+ distribute_yield, apply_loss,
+ get_tranche_metrics,
+ )
+
+ params = TrancheParams(
+ senior_collateral_ratio=senior_cr,
+ mezzanine_collateral_ratio=mez_cr,
+ senior_yield_target=senior_yield / 100,
+ mezzanine_yield_target=mez_yield / 100,
+ )
+ system = RiskTrancheSystem(params=params)
+
+ # Bootstrap
+ deposit_collateral(system, initial_collateral)
+ mint_tranche(system, "senior", initial_collateral * 0.4 / senior_cr)
+ mint_tranche(system, "mezzanine", initial_collateral * 0.25 / mez_cr)
+ mint_tranche(system, "junior", initial_collateral * 0.15)
+
+ # Simulate
+ dt = 1 / 365
+ history = []
+ collateral_value = initial_collateral
+
+ for day in range(n_days):
+ # Staking yield
+ daily_yield = collateral_value * (staking_apy / 100) * dt
+ distribute_yield(system, daily_yield, dt)
+ collateral_value += daily_yield
+
+ # Price movement
+ price_change = np.random.normal(0, 0.02) # 2% daily vol
+
+ # Apply stress
+ if stress == "30% ETH crash at day 60" and day == 60:
+ price_change = -0.30
+ elif stress == "50% ETH crash at day 60" and day == 60:
+ price_change = -0.50
+ elif stress == "Slow bleed (1% daily for 30 days)" and 60 <= day < 90:
+ price_change = -0.01
+
+ if price_change < 0:
+ loss = abs(price_change) * collateral_value
+ apply_loss(system, loss)
+ collateral_value -= loss
+ else:
+ gain = price_change * collateral_value
+ collateral_value += gain
+ system.total_collateral = collateral_value
+
+ metrics = get_tranche_metrics(system)
+ metrics["day"] = day
+ metrics["collateral_value"] = collateral_value
+ history.append(metrics)
+
+ _plot_tranche_supplies(history)
+ _plot_collateral_ratios(history)
+ _plot_yield_waterfall(history)
+ _plot_loss_absorption(history)
+
+
+def _plot_tranche_supplies(history: list):
+ """Stacked area of tranche supplies."""
+ st.subheader("Tranche Supplies")
+
+ fig = go.Figure()
+ days = [h["day"] for h in history]
+
+ for name, color in [("senior", "#10B981"), ("mezzanine", "#F59E0B"), ("junior", "#EF4444")]:
+ values = [h[name]["supply"] for h in history]
+ fig.add_trace(go.Scatter(
+ x=days, y=values, name=name.capitalize(),
+ stackgroup="one", line=dict(width=0.5, color=color),
+ ))
+
+ fig.update_layout(
+ title="Tranche Token Supply",
+ xaxis_title="Day", yaxis_title="Supply",
+ template="plotly_dark", height=400,
+ )
+ st.plotly_chart(fig, use_container_width=True)
+
+
+def _plot_collateral_ratios(history: list):
+ """Collateral ratios per tranche over time."""
+ st.subheader("Collateral Ratios")
+
+ fig = go.Figure()
+ days = [h["day"] for h in history]
+
+ for name, color, threshold in [
+ ("senior", "#10B981", 1.5),
+ ("mezzanine", "#F59E0B", 1.2),
+ ("junior", "#EF4444", 1.0),
+ ]:
+ values = [min(h[name]["cr"], 5.0) for h in history] # Cap for display
+ fig.add_trace(go.Scatter(
+ x=days, y=values, name=name.capitalize(),
+ line=dict(color=color, width=2),
+ ))
+ # Threshold line
+ fig.add_hline(
+ y=threshold, line_dash="dash", line_color=color,
+ annotation_text=f"{name} min", opacity=0.5,
+ )
+
+ fig.add_hline(y=1.0, line_dash="dot", line_color="red", annotation_text="Underwater")
+
+ fig.update_layout(
+ title="Collateral Ratios (higher = safer)",
+ xaxis_title="Day", yaxis_title="Ratio",
+ template="plotly_dark", height=400,
+ )
+ st.plotly_chart(fig, use_container_width=True)
+
+
+def _plot_yield_waterfall(history: list):
+ """Cumulative yield per tranche (waterfall)."""
+ st.subheader("Yield Waterfall")
+
+ fig = go.Figure()
+ days = [h["day"] for h in history]
+
+ for name, color in [("senior", "#10B981"), ("mezzanine", "#F59E0B"), ("junior", "#EF4444")]:
+ values = [h[name]["yield"] for h in history]
+ fig.add_trace(go.Scatter(
+ x=days, y=values, name=f"{name.capitalize()} yield",
+ line=dict(color=color, width=2),
+ fill="tozeroy" if name == "senior" else "tonexty",
+ ))
+
+ fig.update_layout(
+ title="Cumulative Yield Distribution (Senior → Mez → Junior)",
+ xaxis_title="Day", yaxis_title="Cumulative Yield ($)",
+ template="plotly_dark", height=400,
+ )
+ st.plotly_chart(fig, use_container_width=True)
+
+
+def _plot_loss_absorption(history: list):
+ """Cumulative losses absorbed per tranche."""
+ st.subheader("Loss Absorption")
+
+ total_losses = sum(h["junior"]["losses"] + h["mezzanine"]["losses"] + h["senior"]["losses"]
+ for h in history[-1:])
+
+ if total_losses > 0:
+ fig = go.Figure()
+ days = [h["day"] for h in history]
+
+ for name, color in [("junior", "#EF4444"), ("mezzanine", "#F59E0B"), ("senior", "#10B981")]:
+ values = [h[name]["losses"] for h in history]
+ fig.add_trace(go.Bar(
+ x=[name.capitalize()],
+ y=[values[-1]],
+ name=name.capitalize(),
+ marker_color=color,
+ ))
+
+ fig.update_layout(
+ title="Total Losses Absorbed by Tranche",
+ yaxis_title="Losses ($)",
+ template="plotly_dark", height=300,
+ )
+ st.plotly_chart(fig, use_container_width=True)
+ else:
+ st.success("No losses absorbed — all tranches healthy.")
diff --git a/pyproject.toml b/pyproject.toml
index 669b7aa..aca1502 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -8,6 +8,9 @@ dependencies = [
"scipy>=1.12",
"matplotlib>=3.8",
"sympy>=1.13",
+ "cadCAD>=0.3.1",
+ "pandas>=2.1",
+ "networkx>=3.2",
]
[project.scripts]
@@ -15,7 +18,7 @@ myco = "src.cli:main"
[project.optional-dependencies]
dev = ["pytest>=8.0", "jupyter>=1.0", "ipywidgets>=8.0"]
-dashboard = ["streamlit>=1.35", "plotly>=5.20"]
+dashboard = ["streamlit>=1.35", "plotly>=5.20", "seaborn>=0.13"]
[build-system]
requires = ["setuptools>=69.0"]
diff --git a/src/cadcad/__init__.py b/src/cadcad/__init__.py
new file mode 100644
index 0000000..330096c
--- /dev/null
+++ b/src/cadcad/__init__.py
@@ -0,0 +1 @@
+"""cadCAD model definitions for the MycoFi cross-chain protocol."""
diff --git a/src/cadcad/config.py b/src/cadcad/config.py
new file mode 100644
index 0000000..1f03840
--- /dev/null
+++ b/src/cadcad/config.py
@@ -0,0 +1,322 @@
+"""cadCAD simulation configuration for the MycoFi cross-chain system.
+
+Assembles policies and state updates into cadCAD-compatible
+partial state update blocks (PSUBs) and simulation configs.
+"""
+
+import numpy as np
+import pandas as pd
+from typing import Any
+
+from cadCAD.configuration.utils import config_sim
+
+from src.cadcad.state import create_initial_state, extract_metrics, MycoFiState
+from src.cadcad.policies import (
+ p_new_deposits,
+ p_redemptions,
+ p_price_process,
+ p_staking_yield,
+ p_governance_actions,
+ p_eth_crash,
+ p_bank_run,
+)
+from src.cadcad.state_updates import (
+ s_process_deposits,
+ s_process_redemptions,
+ s_apply_prices,
+ s_apply_yield,
+ s_governance_tick,
+ s_apply_price_shock,
+ s_apply_bank_run,
+ s_check_liquidations,
+)
+from src.primitives.risk_tranching import TrancheParams
+from src.primitives.conviction import ConvictionParams, Voter
+from src.composed.myco_surface import MycoSystemConfig
+
+
+# ---------- Partial State Update Blocks ----------
+
+# Normal operation: deposits, yields, prices, governance
+PSUB_NORMAL = [
+ {
+ "label": "Deposits & Cross-chain",
+ "policies": {"deposits": p_new_deposits},
+ "variables": {"mycofi": s_process_deposits},
+ },
+ {
+ "label": "Price Movements",
+ "policies": {"price_changes": p_price_process},
+ "variables": {"mycofi": s_apply_prices},
+ },
+ {
+ "label": "Staking Yield",
+ "policies": {"yield_dt": p_staking_yield},
+ "variables": {"mycofi": s_apply_yield},
+ },
+ {
+ "label": "Redemptions",
+ "policies": {"redemptions": p_redemptions},
+ "variables": {"mycofi": s_process_redemptions},
+ },
+ {
+ "label": "Governance",
+ "policies": {"governance_actions": p_governance_actions},
+ "variables": {"mycofi": s_governance_tick},
+ },
+ {
+ "label": "Liquidation Check",
+ "policies": {},
+ "variables": {"mycofi": s_check_liquidations},
+ },
+]
+
+# Stress test: normal + price shock + bank run
+PSUB_STRESS = PSUB_NORMAL + [
+ {
+ "label": "Price Shock",
+ "policies": {"price_shock": p_eth_crash},
+ "variables": {"mycofi": s_apply_price_shock},
+ },
+ {
+ "label": "Bank Run",
+ "policies": {"bank_run": p_bank_run},
+ "variables": {"mycofi": s_apply_bank_run},
+ },
+]
+
+
+# ---------- Scenario Configurations ----------
+
+def default_params() -> dict:
+ """Default simulation parameters."""
+ return {
+ "dt": [1 / 365], # Daily timesteps
+ "deposit_rate": [5], # Avg deposits per day
+ "max_deposit_usd": [50_000],
+ "redemption_rate": [0.02], # 2% daily base redemption
+ "eth_volatility": [0.6], # 60% annualized
+ "eth_drift": [0.0], # No drift
+ "proposal_rate": [0.05], # 5% chance of new proposal per epoch
+ }
+
+
+def stress_params() -> dict:
+ """Parameters for stress testing scenarios."""
+ return {
+ **default_params(),
+ "crash_epoch": [30],
+ "crash_magnitude": [0.5], # 50% ETH crash
+ "run_start_epoch": [31], # Bank run starts after crash
+ "run_intensity": [0.15], # 15% per epoch
+ }
+
+
+def param_sweep_params() -> dict:
+ """Parameters for parameter sweep analysis."""
+ return {
+ **default_params(),
+ "eth_volatility": [0.3, 0.6, 0.9], # Low, medium, high vol
+ "deposit_rate": [2, 5, 10], # Low, medium, high growth
+ "redemption_rate": [0.01, 0.02, 0.05],
+ }
+
+
+# ---------- Bootstrap Helpers ----------
+
+def bootstrap_system(
+ state: dict,
+ initial_deposits: dict[str, dict[str, float]] | None = None,
+ initial_tranche_mints: dict[str, float] | None = None,
+ n_voters: int = 20,
+) -> dict:
+ """Bootstrap the system with initial deposits, mints, and voters.
+
+ Args:
+ state: The cadCAD initial state dict
+ initial_deposits: {chain: {asset: quantity}} to seed vaults
+ initial_tranche_mints: {tranche: amount} to mint initial tranches
+ n_voters: Number of governance voters to create
+ """
+ from src.crosschain.hub_spoke import simulate_deposit
+ from src.primitives.risk_tranching import deposit_collateral, mint_tranche
+
+ s: MycoFiState = state["mycofi"]
+
+ # Seed cross-chain deposits
+ if initial_deposits and s.crosschain:
+ total_usd = 0.0
+ for chain, assets in initial_deposits.items():
+ for asset_sym, qty in assets.items():
+ msg = simulate_deposit(s.crosschain, chain, asset_sym, qty, 0.0)
+ # Find price
+ spoke = s.crosschain.hub.spokes[chain]
+ for a in spoke.accepted_assets:
+ if a.symbol == asset_sym:
+ total_usd += qty * a.price
+ break
+
+ s.crosschain.hub.process_messages(0.0)
+
+ # Seed tranche system with same total
+ if s.tranche_system:
+ deposit_collateral(s.tranche_system, total_usd)
+
+ # Mint initial tranches
+ if initial_tranche_mints and s.tranche_system:
+ for tranche, amount in initial_tranche_mints.items():
+ mint_tranche(s.tranche_system, tranche, amount)
+
+ # Seed bonding surface with initial reserve deposit
+ if s.myco_system and s.crosschain:
+ total_value = s.crosschain.hub.total_collateral_usd
+ if total_value > 0:
+ n = s.myco_system.config.n_reserve_assets
+ amounts = np.full(n, total_value / n)
+ s.myco_system.deposit(amounts, 0.0)
+
+ # Create governance voters
+ if s.governance:
+ for i in range(n_voters):
+ voter_id = f"voter_{i}"
+ holdings = np.random.lognormal(mean=np.log(5000), sigma=1.0)
+ s.governance.voters[voter_id] = Voter(
+ id=voter_id,
+ holdings=holdings,
+ sentiment=np.random.uniform(0.3, 0.9),
+ )
+ s.governance.total_supply = sum(
+ v.holdings for v in s.governance.voters.values()
+ )
+
+ from src.cadcad.state import sync_metrics
+ sync_metrics(s)
+
+ return state
+
+
+# ---------- Run Simulation ----------
+
+def run_simulation(
+ timesteps: int = 365,
+ runs: int = 1,
+ params: dict | None = None,
+ psubs: list | None = None,
+ initial_state: dict | None = None,
+ initial_deposits: dict | None = None,
+ initial_tranche_mints: dict | None = None,
+ n_voters: int = 20,
+) -> pd.DataFrame:
+ """Run a cadCAD simulation and return results as a DataFrame.
+
+ Args:
+ timesteps: Number of simulation steps
+ runs: Number of Monte Carlo runs
+ params: Simulation parameters (defaults to default_params())
+ psubs: Partial state update blocks (defaults to PSUB_NORMAL)
+ initial_state: Override initial state
+ initial_deposits: Bootstrap deposits {chain: {asset: qty}}
+ initial_tranche_mints: Bootstrap tranche mints {tranche: amount}
+ n_voters: Number of governance voters
+
+ Returns:
+ DataFrame with one row per (run, substep, timestep)
+ """
+ from cadCAD.configuration import Experiment
+ from cadCAD.engine import ExecutionMode, ExecutionContext, Executor
+
+ # Create initial state
+ if initial_state is None:
+ initial_state = create_initial_state()
+
+ # Bootstrap
+ if initial_deposits is None:
+ initial_deposits = {
+ "ethereum": {"stETH": 100, "rETH": 50},
+ "arbitrum": {"wstETH": 80},
+ "optimism": {"wstETH": 60},
+ "base": {"cbETH": 40, "USDC": 100_000},
+ "polygon": {"stMATIC": 200_000, "USDC": 50_000},
+ }
+ if initial_tranche_mints is None:
+ initial_tranche_mints = {
+ "senior": 200_000,
+ "mezzanine": 100_000,
+ "junior": 50_000,
+ }
+
+ initial_state = bootstrap_system(
+ initial_state, initial_deposits, initial_tranche_mints, n_voters,
+ )
+
+ if params is None:
+ params = default_params()
+ if psubs is None:
+ psubs = PSUB_NORMAL
+
+ # Configure simulation
+ sim_config = config_sim({
+ "N": runs,
+ "T": range(timesteps),
+ "M": params,
+ })
+
+ exp = Experiment()
+ exp.append_model(
+ initial_state=initial_state,
+ partial_state_update_blocks=psubs,
+ sim_configs=sim_config,
+ )
+
+ # Execute using the experiment's configs
+ exec_mode = ExecutionMode()
+ local_ctx = ExecutionContext(context=exec_mode.local_mode)
+ simulation = Executor(exec_context=local_ctx, configs=exp.configs)
+ raw_results, tensor_field, sessions = simulation.execute()
+
+ # Convert to DataFrame
+ df = pd.DataFrame(raw_results)
+
+ # Extract metrics from state objects
+ metrics_rows = []
+ for _, row in df.iterrows():
+ s: MycoFiState = row["mycofi"]
+ metrics = extract_metrics(s)
+ metrics["run"] = row.get("run", 0)
+ metrics["substep"] = row.get("substep", 0)
+ metrics["timestep"] = row.get("timestep", 0)
+ metrics_rows.append(metrics)
+
+ return pd.DataFrame(metrics_rows)
+
+
+# ---------- Pre-built Scenarios ----------
+
+def scenario_normal_growth(timesteps: int = 365, runs: int = 3) -> pd.DataFrame:
+ """Normal growth scenario: steady deposits, mild volatility."""
+ return run_simulation(
+ timesteps=timesteps,
+ runs=runs,
+ params=default_params(),
+ psubs=PSUB_NORMAL,
+ )
+
+
+def scenario_stress_test(timesteps: int = 100, runs: int = 5) -> pd.DataFrame:
+ """Stress test: ETH crash at day 30, bank run follows."""
+ return run_simulation(
+ timesteps=timesteps,
+ runs=runs,
+ params=stress_params(),
+ psubs=PSUB_STRESS,
+ )
+
+
+def scenario_parameter_sweep(timesteps: int = 180, runs: int = 1) -> pd.DataFrame:
+ """Parameter sweep: explore sensitivity to volatility, growth, redemptions."""
+ return run_simulation(
+ timesteps=timesteps,
+ runs=runs,
+ params=param_sweep_params(),
+ psubs=PSUB_NORMAL,
+ )
diff --git a/src/cadcad/policies.py b/src/cadcad/policies.py
new file mode 100644
index 0000000..772026d
--- /dev/null
+++ b/src/cadcad/policies.py
@@ -0,0 +1,204 @@
+"""cadCAD policy functions for the MycoFi cross-chain system.
+
+Policies compute signals (actions) based on the current state.
+They represent exogenous events and agent behaviors.
+"""
+
+import numpy as np
+from typing import Any
+
+
+# ---------- Exogenous Processes ----------
+
+def p_new_deposits(params: dict, substep: int, state_history: list, state: dict) -> dict:
+ """Generate new deposits across chains.
+
+ Simulates users depositing staking assets on various spoke chains.
+ """
+ s: Any = state["mycofi"]
+ if s.crosschain is None:
+ return {"deposits": []}
+
+ deposit_rate = params.get("deposit_rate", 5) # avg deposits per epoch
+ max_deposit_usd = params.get("max_deposit_usd", 50_000)
+
+ deposits = []
+ n_deposits = np.random.poisson(deposit_rate)
+
+ for _ in range(n_deposits):
+ # Pick a random chain and asset
+ chains = list(s.crosschain.hub.spokes.keys())
+ chain = np.random.choice(chains)
+ spoke = s.crosschain.hub.spokes[chain]
+ if not spoke.accepted_assets:
+ continue
+ asset = np.random.choice(spoke.accepted_assets)
+
+ # Random deposit amount (log-normal distribution)
+ usd_amount = np.random.lognormal(mean=np.log(5000), sigma=1.0)
+ usd_amount = min(usd_amount, max_deposit_usd)
+ qty = usd_amount / asset.price if asset.price > 0 else 0
+
+ deposits.append({
+ "chain": chain,
+ "asset": asset.symbol,
+ "quantity": qty,
+ "usd_value": usd_amount,
+ })
+
+ return {"deposits": deposits}
+
+
+def p_redemptions(params: dict, substep: int, state_history: list, state: dict) -> dict:
+ """Generate redemption requests.
+
+ Some users redeem their tranche tokens for collateral.
+ """
+ s: Any = state["mycofi"]
+ if s.tranche_system is None:
+ return {"redemptions": []}
+
+ # Base redemption rate influenced by sentiment
+ base_rate = params.get("redemption_rate", 0.02) # 2% of supply per epoch
+
+ # Higher redemption when collateral ratio is low
+ cr = s.system_collateral_ratio
+ if cr < 1.2:
+ panic_multiplier = 2.0 + (1.2 - cr) * 10 # Panic selling
+ elif cr > 2.0:
+ panic_multiplier = 0.5 # Very comfortable, low redemptions
+ else:
+ panic_multiplier = 1.0
+
+ effective_rate = base_rate * panic_multiplier
+
+ redemptions = []
+ for tranche in ["senior", "mezzanine", "junior"]:
+ ts = getattr(s.tranche_system, tranche)
+ if ts.supply > 0:
+ amount = ts.supply * effective_rate * np.random.uniform(0, 1)
+ if amount > 0:
+ redemptions.append({
+ "tranche": tranche,
+ "amount": amount,
+ })
+
+ return {"redemptions": redemptions}
+
+
+def p_price_process(params: dict, substep: int, state_history: list, state: dict) -> dict:
+ """Simulate asset price movements (geometric Brownian motion).
+
+ Models correlated price movements across staking assets.
+ """
+ s: Any = state["mycofi"]
+ if s.crosschain is None:
+ return {"price_changes": {}}
+
+ dt = params.get("dt", 1 / 365) # Daily by default
+ eth_volatility = params.get("eth_volatility", 0.6) # 60% annualized
+ eth_drift = params.get("eth_drift", 0.0) # No drift by default
+
+ # ETH is the primary driver — LSTs are correlated
+ eth_return = np.random.normal(eth_drift * dt, eth_volatility * np.sqrt(dt))
+
+ price_changes = {}
+ for chain, spoke in s.crosschain.hub.spokes.items():
+ for asset in spoke.accepted_assets:
+ if asset.symbol == "USDC":
+ # Stablecoin — minimal movement
+ change = np.random.normal(0, 0.0001)
+ elif "ETH" in asset.symbol or "stETH" in asset.symbol or "wstETH" in asset.symbol:
+ # ETH-correlated with small idiosyncratic component
+ idio = np.random.normal(0, 0.02 * np.sqrt(dt))
+ change = eth_return + idio
+ elif "MATIC" in asset.symbol:
+ # Higher vol, partially correlated with ETH
+ idio = np.random.normal(0, 0.8 * np.sqrt(dt))
+ change = 0.6 * eth_return + idio
+ else:
+ change = np.random.normal(0, 0.3 * np.sqrt(dt))
+
+ price_changes[f"{chain}:{asset.symbol}"] = 1.0 + change
+
+ return {"price_changes": price_changes}
+
+
+def p_staking_yield(params: dict, substep: int, state_history: list, state: dict) -> dict:
+ """Generate staking yield from collateral assets."""
+ dt = params.get("dt", 1 / 365)
+ return {"yield_dt": dt}
+
+
+def p_governance_actions(params: dict, substep: int, state_history: list, state: dict) -> dict:
+ """Simulate governance participation (staking on proposals, new proposals)."""
+ s: Any = state["mycofi"]
+ if s.governance is None:
+ return {"governance_actions": []}
+
+ actions = []
+
+ # Occasionally create new proposals
+ if np.random.random() < params.get("proposal_rate", 0.05):
+ proposal_types = [
+ ("weight_update", "Update stETH weight", 0.05, "stETH_weight", np.random.uniform(0.8, 1.2)),
+ ("parameter_change", "Adjust senior yield target", 0.02, "senior_yield", np.random.uniform(0.02, 0.05)),
+ ("parameter_change", "Adjust mez CR", 0.03, "mez_cr", np.random.uniform(1.1, 1.3)),
+ ("chain_onboard", "Add Scroll chain", 0.08, "new_chain", 1.0),
+ ]
+ ptype, title, funds_req, target, value = proposal_types[np.random.randint(len(proposal_types))]
+ actions.append({
+ "type": "new_proposal",
+ "proposal_type": ptype,
+ "title": title,
+ "funds_requested": funds_req,
+ "parameter_target": target,
+ "parameter_value": value,
+ })
+
+ # Voters adjust stakes based on sentiment
+ for voter_id, voter in s.governance.voters.items():
+ if np.random.random() < 0.3: # 30% chance of action per epoch
+ candidates = [
+ p for p in s.governance.proposals.values()
+ if p.status == "candidate"
+ ]
+ if candidates:
+ prop = np.random.choice(candidates)
+ stake_amount = voter.holdings * np.random.uniform(0.01, 0.2)
+ actions.append({
+ "type": "stake",
+ "voter_id": voter_id,
+ "proposal_id": prop.id,
+ "amount": stake_amount,
+ })
+
+ return {"governance_actions": actions}
+
+
+# ---------- Stress Scenarios ----------
+
+def p_eth_crash(params: dict, substep: int, state_history: list, state: dict) -> dict:
+ """Simulate a sudden ETH price crash (for stress testing)."""
+ s: Any = state["mycofi"]
+ crash_epoch = params.get("crash_epoch", 30)
+ crash_magnitude = params.get("crash_magnitude", 0.5) # 50% drop
+
+ current_epoch = len(state_history)
+ if current_epoch == crash_epoch:
+ return {"price_shock": {"pattern": "eth_crash", "multiplier": crash_magnitude}}
+
+ return {"price_shock": None}
+
+
+def p_bank_run(params: dict, substep: int, state_history: list, state: dict) -> dict:
+ """Simulate a coordinated bank run on tranches."""
+ s: Any = state["mycofi"]
+ run_start = params.get("run_start_epoch", 50)
+ run_intensity = params.get("run_intensity", 0.15) # 15% per epoch
+
+ current_epoch = len(state_history)
+ if current_epoch >= run_start:
+ return {"bank_run": {"intensity": run_intensity}}
+
+ return {"bank_run": None}
diff --git a/src/cadcad/state.py b/src/cadcad/state.py
new file mode 100644
index 0000000..2e2eae6
--- /dev/null
+++ b/src/cadcad/state.py
@@ -0,0 +1,156 @@
+"""cadCAD state variables for the MycoFi cross-chain system.
+
+Defines the complete system state that cadCAD tracks across timesteps.
+Integrates: bonding surface, risk tranches, cross-chain vaults, conviction governance.
+"""
+
+import numpy as np
+from dataclasses import dataclass, field, asdict
+from typing import Any
+
+from src.composed.myco_surface import MycoSystem, MycoSystemConfig
+from src.primitives.risk_tranching import (
+ RiskTrancheSystem, TrancheParams, TrancheState,
+)
+from src.primitives.conviction import (
+ ConvictionSystem, ConvictionParams,
+)
+from src.crosschain.hub_spoke import (
+ CrossChainSystem, create_default_system as create_default_crosschain,
+)
+
+
+@dataclass
+class MycoFiState:
+ """Complete state of the MycoFi cross-chain protocol.
+
+ This is the top-level state variable for the cadCAD simulation.
+ """
+ # Core bonding surface (from existing MycoSystem)
+ myco_system: MycoSystem | None = None
+
+ # Risk tranches
+ tranche_system: RiskTrancheSystem | None = None
+
+ # Cross-chain
+ crosschain: CrossChainSystem | None = None
+
+ # Governance
+ governance: ConvictionSystem | None = None
+
+ # Aggregate metrics (for cadCAD DataFrame output)
+ time: float = 0.0
+ total_supply: float = 0.0
+ total_collateral_usd: float = 0.0
+ system_collateral_ratio: float = 0.0
+ myco_price: float = 0.0
+
+ # Tranche metrics
+ senior_supply: float = 0.0
+ senior_cr: float = 0.0
+ mezzanine_supply: float = 0.0
+ mezzanine_cr: float = 0.0
+ junior_supply: float = 0.0
+ junior_cr: float = 0.0
+
+ # Cross-chain metrics
+ total_chains: int = 0
+ total_yield: float = 0.0
+ ccip_messages: int = 0
+
+ # Governance metrics
+ governance_epoch: int = 0
+ total_staked: float = 0.0
+ proposals_passed: int = 0
+
+
+def create_initial_state(
+ myco_config: MycoSystemConfig | None = None,
+ tranche_params: TrancheParams | None = None,
+ conviction_params: ConvictionParams | None = None,
+ chains: list[str] | None = None,
+) -> dict[str, Any]:
+ """Create the initial state dictionary for cadCAD.
+
+ Returns a flat dict suitable for cadCAD's initial_conditions.
+ """
+ # Initialize subsystems
+ myco = MycoSystem(myco_config or MycoSystemConfig())
+ tranches = RiskTrancheSystem(params=tranche_params or TrancheParams())
+ crosschain = create_default_crosschain()
+ governance = ConvictionSystem(
+ params=conviction_params or ConvictionParams(),
+ )
+
+ state = MycoFiState(
+ myco_system=myco,
+ tranche_system=tranches,
+ crosschain=crosschain,
+ governance=governance,
+ total_chains=len(crosschain.hub.spokes),
+ )
+
+ return {"mycofi": state}
+
+
+def extract_metrics(state: MycoFiState) -> dict:
+ """Extract flat metrics dict from state for DataFrame output."""
+ metrics = {
+ "time": state.time,
+ "total_supply": state.total_supply,
+ "total_collateral_usd": state.total_collateral_usd,
+ "system_cr": state.system_collateral_ratio,
+ "myco_price": state.myco_price,
+ "senior_supply": state.senior_supply,
+ "senior_cr": state.senior_cr,
+ "mezzanine_supply": state.mezzanine_supply,
+ "mezzanine_cr": state.mezzanine_cr,
+ "junior_supply": state.junior_supply,
+ "junior_cr": state.junior_cr,
+ "total_chains": state.total_chains,
+ "total_yield": state.total_yield,
+ "ccip_messages": state.ccip_messages,
+ "governance_epoch": state.governance_epoch,
+ "total_staked": state.total_staked,
+ "proposals_passed": state.proposals_passed,
+ }
+
+ # Add per-chain collateral
+ if state.crosschain:
+ for chain, spoke in state.crosschain.hub.spokes.items():
+ metrics[f"collateral_{chain}"] = spoke.total_value_usd
+
+ return metrics
+
+
+def sync_metrics(state: MycoFiState) -> MycoFiState:
+ """Synchronize aggregate metrics from subsystems into top-level fields."""
+ if state.myco_system:
+ m = state.myco_system.get_metrics()
+ state.total_supply = m["supply"]
+ state.myco_price = state.myco_system.get_spot_price()
+
+ if state.tranche_system:
+ ts = state.tranche_system
+ state.senior_supply = ts.senior.supply
+ state.senior_cr = ts.senior.collateral_ratio
+ state.mezzanine_supply = ts.mezzanine.supply
+ state.mezzanine_cr = ts.mezzanine.collateral_ratio
+ state.junior_supply = ts.junior.supply
+ state.junior_cr = ts.junior.collateral_ratio
+ state.system_collateral_ratio = ts.system_collateral_ratio
+
+ if state.crosschain:
+ state.total_collateral_usd = state.crosschain.hub.total_collateral_usd
+ state.total_chains = len(state.crosschain.hub.spokes)
+ state.total_yield = state.crosschain.total_yield_generated
+ state.ccip_messages = state.crosschain.total_messages_delivered
+
+ if state.governance:
+ state.governance_epoch = state.governance.epoch
+ state.total_staked = sum(
+ sum(v.stakes.values()) for v in state.governance.voters.values()
+ )
+ state.proposals_passed = len(state.governance.passed_proposals)
+
+ return state
diff --git a/src/cadcad/state_updates.py b/src/cadcad/state_updates.py
new file mode 100644
index 0000000..03c5bec
--- /dev/null
+++ b/src/cadcad/state_updates.py
@@ -0,0 +1,228 @@
+"""cadCAD state update functions for the MycoFi cross-chain system.
+
+State update functions apply policy signals to evolve the system state.
+Each function takes (params, substep, state_history, state, signal) and
+returns (variable_name, new_value).
+"""
+
+import numpy as np
+from copy import deepcopy
+from typing import Any
+
+from src.primitives.risk_tranching import (
+ deposit_collateral, mint_tranche, redeem_tranche,
+ distribute_yield, apply_loss, check_liquidation,
+ get_tranche_metrics,
+)
+from src.primitives.conviction import (
+ Proposal, Voter, stake as cv_stake, tick as cv_tick,
+)
+from src.crosschain.hub_spoke import (
+ simulate_deposit, tick as cc_tick, apply_price_shock,
+ get_crosschain_metrics,
+)
+from src.cadcad.state import sync_metrics
+
+
+def s_process_deposits(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
+ """Process new cross-chain deposits."""
+ s = deepcopy(state["mycofi"])
+ deposits = signal.get("deposits", [])
+
+ for dep in deposits:
+ if s.crosschain is None:
+ break
+
+ chain = dep["chain"]
+ asset = dep["asset"]
+ qty = dep["quantity"]
+
+ # Deposit into spoke vault
+ simulate_deposit(s.crosschain, chain, asset, qty, s.time)
+
+ # Also add to tranche system collateral
+ if s.tranche_system is not None:
+ deposit_collateral(s.tranche_system, dep["usd_value"])
+
+ # Process CCIP messages
+ if s.crosschain:
+ s.crosschain.hub.process_messages(s.time)
+
+ s = sync_metrics(s)
+ return ("mycofi", s)
+
+
+def s_process_redemptions(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
+ """Process tranche redemptions."""
+ s = deepcopy(state["mycofi"])
+ redemptions = signal.get("redemptions", [])
+
+ for red in redemptions:
+ if s.tranche_system is None:
+ break
+ redeem_tranche(s.tranche_system, red["tranche"], red["amount"])
+
+ s = sync_metrics(s)
+ return ("mycofi", s)
+
+
+def s_apply_prices(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
+ """Apply price movements to all assets."""
+ s = deepcopy(state["mycofi"])
+ price_changes = signal.get("price_changes", {})
+
+ if s.crosschain is None:
+ return ("mycofi", s)
+
+ old_total = s.crosschain.hub.total_collateral_usd
+
+ for key, multiplier in price_changes.items():
+ chain, asset_symbol = key.split(":")
+ spoke = s.crosschain.hub.spokes.get(chain)
+ if spoke is None:
+ continue
+ for asset in spoke.accepted_assets:
+ if asset.symbol == asset_symbol:
+ asset.price *= multiplier
+ spoke._recalculate_value()
+
+ s.crosschain.hub._recalculate_total()
+ new_total = s.crosschain.hub.total_collateral_usd
+
+ # If collateral value dropped, apply loss to tranches
+ if new_total < old_total and s.tranche_system is not None:
+ loss = old_total - new_total
+ apply_loss(s.tranche_system, loss)
+ s.tranche_system.total_collateral = new_total
+ elif new_total > old_total and s.tranche_system is not None:
+ # Collateral appreciation
+ gain = new_total - old_total
+ s.tranche_system.total_collateral = new_total
+
+ s = sync_metrics(s)
+ return ("mycofi", s)
+
+
+def s_apply_yield(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
+ """Apply staking yield and distribute through tranche waterfall."""
+ s = deepcopy(state["mycofi"])
+ dt = signal.get("yield_dt", 1 / 365)
+
+ total_yield = 0.0
+ if s.crosschain:
+ for spoke in s.crosschain.hub.spokes.values():
+ total_yield += spoke.apply_staking_yield(dt)
+ s.crosschain.hub._recalculate_total()
+
+ # Distribute yield through tranche waterfall
+ if s.tranche_system is not None and total_yield > 0:
+ distribute_yield(s.tranche_system, total_yield, dt)
+ s.tranche_system.total_collateral += total_yield
+
+ s.time += dt
+ s = sync_metrics(s)
+ return ("mycofi", s)
+
+
+def s_governance_tick(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
+ """Process governance actions and advance conviction voting."""
+ s = deepcopy(state["mycofi"])
+ actions = signal.get("governance_actions", [])
+
+ if s.governance is None:
+ return ("mycofi", s)
+
+ for action in actions:
+ if action["type"] == "new_proposal":
+ prop_id = f"prop_{s.governance.epoch}_{np.random.randint(1000)}"
+ s.governance.proposals[prop_id] = Proposal(
+ id=prop_id,
+ title=action["title"],
+ proposal_type=action["proposal_type"],
+ funds_requested=action["funds_requested"],
+ parameter_target=action.get("parameter_target", ""),
+ parameter_value=action.get("parameter_value", 0.0),
+ )
+ elif action["type"] == "stake":
+ cv_stake(
+ s.governance,
+ action["voter_id"],
+ action["proposal_id"],
+ action["amount"],
+ )
+
+ # Advance conviction voting
+ cv_tick(s.governance)
+
+ s = sync_metrics(s)
+ return ("mycofi", s)
+
+
+def s_apply_price_shock(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
+ """Apply sudden price shocks (stress scenarios)."""
+ s = deepcopy(state["mycofi"])
+ shock = signal.get("price_shock")
+
+ if shock is None or s.crosschain is None:
+ return ("mycofi", s)
+
+ if shock["pattern"] == "eth_crash":
+ multiplier = shock["multiplier"]
+ old_total = s.crosschain.hub.total_collateral_usd
+
+ # Apply to all ETH-derivative assets
+ for chain, spoke in s.crosschain.hub.spokes.items():
+ for asset in spoke.accepted_assets:
+ if asset.symbol != "USDC":
+ asset.price *= multiplier
+ spoke._recalculate_value()
+ s.crosschain.hub._recalculate_total()
+
+ new_total = s.crosschain.hub.total_collateral_usd
+ if s.tranche_system and new_total < old_total:
+ apply_loss(s.tranche_system, old_total - new_total)
+ s.tranche_system.total_collateral = new_total
+
+ s = sync_metrics(s)
+ return ("mycofi", s)
+
+
+def s_apply_bank_run(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
+ """Apply bank run redemptions."""
+ s = deepcopy(state["mycofi"])
+ run = signal.get("bank_run")
+
+ if run is None or s.tranche_system is None:
+ return ("mycofi", s)
+
+ intensity = run["intensity"]
+
+ # Everyone tries to redeem — senior first (they're most risk-averse)
+ for tranche in ["senior", "mezzanine", "junior"]:
+ ts = getattr(s.tranche_system, tranche)
+ amount = ts.supply * intensity
+ if amount > 0:
+ redeem_tranche(s.tranche_system, tranche, amount)
+
+ s = sync_metrics(s)
+ return ("mycofi", s)
+
+
+def s_check_liquidations(params: dict, substep: int, state_history: list, state: dict, signal: dict) -> tuple:
+ """Check and execute liquidations if needed."""
+ s = deepcopy(state["mycofi"])
+
+ if s.tranche_system is None:
+ return ("mycofi", s)
+
+ liquidations = check_liquidation(s.tranche_system)
+
+ # Auto-liquidate mezzanine to protect senior
+ if liquidations["mezzanine"] and s.tranche_system.mezzanine.supply > 0:
+ redeem_tranche(
+ s.tranche_system, "mezzanine",
+ s.tranche_system.mezzanine.supply * 0.5, # Liquidate 50%
+ )
+
+ s = sync_metrics(s)
+ return ("mycofi", s)
diff --git a/src/crosschain/__init__.py b/src/crosschain/__init__.py
new file mode 100644
index 0000000..36c92fd
--- /dev/null
+++ b/src/crosschain/__init__.py
@@ -0,0 +1 @@
+"""Cross-chain simulation layer for hub-and-spoke architecture."""
diff --git a/src/crosschain/hub_spoke.py b/src/crosschain/hub_spoke.py
new file mode 100644
index 0000000..afbe904
--- /dev/null
+++ b/src/crosschain/hub_spoke.py
@@ -0,0 +1,368 @@
+"""Hub-and-spoke cross-chain architecture simulation.
+
+Models the multi-chain deployment:
+ Hub (Base): Registry, bonding curve, tranche manager, treasury
+ Spokes (Arbitrum, Optimism, Ethereum, ...): Collateral vaults
+
+CCIP messages carry:
+ - Deposit reports (spoke → hub)
+ - State sync (hub → spokes)
+ - Rebalancing triggers (hub → spokes)
+ - Governance updates (hub → spokes)
+"""
+
+import numpy as np
+from dataclasses import dataclass, field
+from typing import Optional
+
+
+# ---------- Collateral Assets ----------
+
+@dataclass
+class StakingAsset:
+ """A staking asset available on a specific chain."""
+ symbol: str # e.g., "stETH", "rETH", "cbETH"
+ chain: str # e.g., "ethereum", "arbitrum"
+ price: float = 1.0 # USD price from oracle
+ staking_apy: float = 0.04 # Annualized staking yield
+ weight: float = 1.0 # Governance-set weight in bonding curve
+ risk_score: float = 0.1 # 0-1 risk score (0 = safest)
+
+
+# ---------- CCIP Messages ----------
+
+@dataclass
+class CCIPMessage:
+ """Simulated CCIP cross-chain message."""
+ msg_type: str # deposit_report, state_sync, rebalance, governance
+ source_chain: str
+ dest_chain: str
+ payload: dict
+ timestamp: float
+ latency: float = 0.0 # Simulated delivery delay
+ delivered: bool = False
+ delivery_time: float = 0.0
+
+
+# ---------- Spoke (per-chain vault) ----------
+
+@dataclass
+class SpokeVault:
+ """A collateral vault on a spoke chain."""
+ chain: str
+ accepted_assets: list[StakingAsset] = field(default_factory=list)
+ balances: dict[str, float] = field(default_factory=dict) # asset_symbol -> quantity
+ total_value_usd: float = 0.0
+ pending_reports: list[CCIPMessage] = field(default_factory=list)
+ last_sync_time: float = 0.0
+
+ def deposit(self, asset_symbol: str, amount: float, timestamp: float) -> CCIPMessage:
+ """Deposit an asset and generate a CCIP report to hub."""
+ self.balances[asset_symbol] = self.balances.get(asset_symbol, 0.0) + amount
+
+ # Recalculate total value
+ self._recalculate_value()
+
+ # Generate CCIP message to hub
+ msg = CCIPMessage(
+ msg_type="deposit_report",
+ source_chain=self.chain,
+ dest_chain="base",
+ payload={
+ "asset": asset_symbol,
+ "amount": amount,
+ "chain_total_value": self.total_value_usd,
+ "balances": dict(self.balances),
+ },
+ timestamp=timestamp,
+ )
+ self.pending_reports.append(msg)
+ return msg
+
+ def withdraw(self, asset_symbol: str, amount: float, timestamp: float) -> Optional[CCIPMessage]:
+ """Withdraw an asset (after hub confirmation)."""
+ available = self.balances.get(asset_symbol, 0.0)
+ actual = min(amount, available)
+ if actual <= 0:
+ return None
+
+ self.balances[asset_symbol] -= actual
+ self._recalculate_value()
+
+ msg = CCIPMessage(
+ msg_type="deposit_report", # Negative deposit
+ source_chain=self.chain,
+ dest_chain="base",
+ payload={
+ "asset": asset_symbol,
+ "amount": -actual,
+ "chain_total_value": self.total_value_usd,
+ "balances": dict(self.balances),
+ },
+ timestamp=timestamp,
+ )
+ self.pending_reports.append(msg)
+ return msg
+
+ def apply_staking_yield(self, dt: float) -> float:
+ """Apply staking yield to all assets in the vault."""
+ total_yield = 0.0
+ for asset in self.accepted_assets:
+ qty = self.balances.get(asset.symbol, 0.0)
+ if qty > 0:
+ yield_amount = qty * asset.staking_apy * dt
+ self.balances[asset.symbol] += yield_amount
+ total_yield += yield_amount * asset.price
+ self._recalculate_value()
+ return total_yield
+
+ def _recalculate_value(self) -> None:
+ """Recalculate total USD value from balances and prices."""
+ total = 0.0
+ for asset in self.accepted_assets:
+ qty = self.balances.get(asset.symbol, 0.0)
+ total += qty * asset.price * asset.weight
+ self.total_value_usd = total
+
+
+# ---------- Hub (Base chain) ----------
+
+@dataclass
+class HubRegistry:
+ """The hub on Base chain that maintains global state."""
+ spokes: dict[str, SpokeVault] = field(default_factory=dict)
+ global_collateral: dict[str, float] = field(default_factory=dict) # asset -> total qty across chains
+ total_collateral_usd: float = 0.0
+ all_assets: dict[str, StakingAsset] = field(default_factory=dict) # symbol -> asset
+ message_queue: list[CCIPMessage] = field(default_factory=list)
+ processed_messages: list[CCIPMessage] = field(default_factory=list)
+ time: float = 0.0
+
+ def register_spoke(self, spoke: SpokeVault) -> None:
+ """Register a new spoke chain."""
+ self.spokes[spoke.chain] = spoke
+ for asset in spoke.accepted_assets:
+ self.all_assets[asset.symbol] = asset
+
+ def process_messages(self, current_time: float) -> list[CCIPMessage]:
+ """Process all pending CCIP messages from spokes."""
+ delivered = []
+
+ for spoke in self.spokes.values():
+ for msg in spoke.pending_reports:
+ # Simulate latency
+ if not msg.delivered:
+ msg.latency = _simulate_ccip_latency(msg.source_chain)
+ msg.delivery_time = msg.timestamp + msg.latency
+
+ if msg.delivery_time <= current_time and not msg.delivered:
+ msg.delivered = True
+ self._handle_message(msg)
+ delivered.append(msg)
+
+ # Clear delivered messages
+ spoke.pending_reports = [m for m in spoke.pending_reports if not m.delivered]
+
+ self.time = current_time
+ self.processed_messages.extend(delivered)
+ return delivered
+
+ def _handle_message(self, msg: CCIPMessage) -> None:
+ """Handle a delivered CCIP message."""
+ if msg.msg_type == "deposit_report":
+ # Update global collateral tracking
+ payload = msg.payload
+ asset = payload["asset"]
+ self.global_collateral[asset] = sum(
+ spoke.balances.get(asset, 0.0)
+ for spoke in self.spokes.values()
+ )
+ self._recalculate_total()
+
+ def _recalculate_total(self) -> None:
+ """Recalculate total collateral value across all spokes."""
+ self.total_collateral_usd = sum(
+ spoke.total_value_usd for spoke in self.spokes.values()
+ )
+
+ def broadcast_state_sync(self, current_time: float) -> list[CCIPMessage]:
+ """Send state sync to all spokes."""
+ messages = []
+ for chain, spoke in self.spokes.items():
+ msg = CCIPMessage(
+ msg_type="state_sync",
+ source_chain="base",
+ dest_chain=chain,
+ payload={
+ "total_collateral_usd": self.total_collateral_usd,
+ "global_collateral": dict(self.global_collateral),
+ },
+ timestamp=current_time,
+ )
+ messages.append(msg)
+ spoke.last_sync_time = current_time
+ return messages
+
+
+# ---------- Full Cross-Chain System ----------
+
+@dataclass
+class CrossChainSystem:
+ """The complete cross-chain bonding curve system."""
+ hub: HubRegistry = field(default_factory=HubRegistry)
+ time: float = 0.0
+ message_log: list[CCIPMessage] = field(default_factory=list)
+ total_messages_sent: int = 0
+ total_messages_delivered: int = 0
+ total_yield_generated: float = 0.0
+
+
+def create_default_system() -> CrossChainSystem:
+ """Create a default cross-chain system with common chains."""
+ system = CrossChainSystem()
+
+ # Define staking assets per chain
+ chains = {
+ "ethereum": [
+ StakingAsset("stETH", "ethereum", price=2400, staking_apy=0.035, weight=1.0, risk_score=0.05),
+ StakingAsset("rETH", "ethereum", price=2420, staking_apy=0.032, weight=0.95, risk_score=0.08),
+ StakingAsset("cbETH", "ethereum", price=2390, staking_apy=0.030, weight=0.90, risk_score=0.10),
+ ],
+ "arbitrum": [
+ StakingAsset("wstETH", "arbitrum", price=2410, staking_apy=0.035, weight=1.0, risk_score=0.07),
+ StakingAsset("rETH", "arbitrum", price=2420, staking_apy=0.032, weight=0.95, risk_score=0.10),
+ ],
+ "optimism": [
+ StakingAsset("wstETH", "optimism", price=2410, staking_apy=0.035, weight=1.0, risk_score=0.07),
+ StakingAsset("sfrxETH", "optimism", price=2380, staking_apy=0.040, weight=0.85, risk_score=0.15),
+ ],
+ "base": [
+ StakingAsset("cbETH", "base", price=2390, staking_apy=0.030, weight=0.90, risk_score=0.06),
+ StakingAsset("USDC", "base", price=1.0, staking_apy=0.0, weight=1.0, risk_score=0.02),
+ ],
+ "polygon": [
+ StakingAsset("stMATIC", "polygon", price=0.45, staking_apy=0.045, weight=0.70, risk_score=0.20),
+ StakingAsset("USDC", "polygon", price=1.0, staking_apy=0.0, weight=1.0, risk_score=0.05),
+ ],
+ }
+
+ for chain_name, assets in chains.items():
+ spoke = SpokeVault(chain=chain_name, accepted_assets=assets)
+ system.hub.register_spoke(spoke)
+
+ return system
+
+
+def simulate_deposit(
+ system: CrossChainSystem,
+ chain: str,
+ asset_symbol: str,
+ amount: float,
+ timestamp: float,
+) -> CCIPMessage:
+ """Simulate a user depositing an asset on a spoke chain."""
+ spoke = system.hub.spokes.get(chain)
+ if spoke is None:
+ raise ValueError(f"Unknown chain: {chain}")
+
+ msg = spoke.deposit(asset_symbol, amount, timestamp)
+ system.total_messages_sent += 1
+ system.message_log.append(msg)
+ return msg
+
+
+def tick(
+ system: CrossChainSystem,
+ dt: float,
+) -> dict:
+ """Advance the cross-chain system by dt (in years for APY calculations).
+
+ Returns metrics about what happened this tick.
+ """
+ system.time += dt
+
+ # Apply staking yield on all spokes
+ total_yield = 0.0
+ for spoke in system.hub.spokes.values():
+ total_yield += spoke.apply_staking_yield(dt)
+ system.total_yield_generated += total_yield
+
+ # Process pending CCIP messages
+ delivered = system.hub.process_messages(system.time)
+ system.total_messages_delivered += len(delivered)
+
+ # Periodic state sync (every ~1 day = 1/365 year)
+ sync_messages = system.hub.broadcast_state_sync(system.time)
+ system.total_messages_sent += len(sync_messages)
+ system.message_log.extend(sync_messages)
+
+ return {
+ "time": system.time,
+ "yield_this_tick": total_yield,
+ "messages_delivered": len(delivered),
+ "total_collateral_usd": system.hub.total_collateral_usd,
+ "per_chain": {
+ chain: spoke.total_value_usd
+ for chain, spoke in system.hub.spokes.items()
+ },
+ }
+
+
+def apply_price_shock(
+ system: CrossChainSystem,
+ asset_symbol: str,
+ price_multiplier: float,
+) -> None:
+ """Apply a price shock to an asset across all chains (for stress testing)."""
+ for chain, spoke in system.hub.spokes.items():
+ for asset in spoke.accepted_assets:
+ if asset.symbol == asset_symbol:
+ asset.price *= price_multiplier
+ spoke._recalculate_value()
+ system.hub._recalculate_total()
+
+
+def get_crosschain_metrics(system: CrossChainSystem) -> dict:
+ """Get comprehensive cross-chain metrics."""
+ return {
+ "time": system.time,
+ "total_collateral_usd": system.hub.total_collateral_usd,
+ "total_messages_sent": system.total_messages_sent,
+ "total_messages_delivered": system.total_messages_delivered,
+ "total_yield_generated": system.total_yield_generated,
+ "chains": {
+ chain: {
+ "total_value_usd": spoke.total_value_usd,
+ "assets": {
+ asset.symbol: {
+ "balance": spoke.balances.get(asset.symbol, 0.0),
+ "price": asset.price,
+ "value": spoke.balances.get(asset.symbol, 0.0) * asset.price,
+ "apy": asset.staking_apy,
+ }
+ for asset in spoke.accepted_assets
+ },
+ }
+ for chain, spoke in system.hub.spokes.items()
+ },
+ "global_collateral": dict(system.hub.global_collateral),
+ }
+
+
+def _simulate_ccip_latency(source_chain: str) -> float:
+ """Simulate CCIP message latency based on source chain.
+
+ Returns latency in years (for consistency with dt).
+ Real latencies: ~5 min for L2s, ~20 min for L1.
+ """
+ # In years: 5 min ≈ 9.5e-6 years, 20 min ≈ 3.8e-5 years
+ base_latencies = {
+ "ethereum": 3.8e-5, # ~20 min
+ "arbitrum": 9.5e-6, # ~5 min
+ "optimism": 9.5e-6,
+ "base": 5.7e-6, # ~3 min (same operator)
+ "polygon": 1.9e-5, # ~10 min
+ }
+ base = base_latencies.get(source_chain, 1.9e-5)
+ # Add some randomness
+ return base * (0.5 + np.random.random())
diff --git a/src/primitives/conviction.py b/src/primitives/conviction.py
new file mode 100644
index 0000000..e324f84
--- /dev/null
+++ b/src/primitives/conviction.py
@@ -0,0 +1,281 @@
+"""Conviction voting governance for parameter management.
+
+Ported from the conviction/ cadCAD research repo. Conviction voting
+provides continuous signal-weighted governance where participants stake
+tokens on proposals, and conviction accumulates over time.
+
+Core formula: y_{t+1} = α × y_t + x_t
+Trigger: y*(r) = ρS / ((1-α)(β - r/R)²)
+
+Used here to govern:
+- Collateral weights (which assets accepted, at what weights)
+- Tranche parameters (ratios, yield targets)
+- Risk parameters (liquidation thresholds)
+- New chain onboarding decisions
+"""
+
+import numpy as np
+from dataclasses import dataclass, field
+
+
+@dataclass
+class ConvictionParams:
+ """Parameters for the conviction voting system."""
+ alpha: float = 0.9 # Decay factor (0.5 ≤ α < 1)
+ beta: float = 0.2 # Maximum share of funds per proposal
+ rho: float = 0.0025 # Trigger function scale factor
+ min_age: int = 3 # Minimum epochs before a proposal can pass
+
+ @property
+ def half_life(self) -> float:
+ """Half-life in epochs: T_half = -ln(2)/ln(α)"""
+ if self.alpha <= 0 or self.alpha >= 1:
+ return float('inf')
+ return -np.log(2) / np.log(self.alpha)
+
+ @classmethod
+ def from_half_life(cls, half_life: float, **kwargs) -> 'ConvictionParams':
+ """Create params from half-life instead of alpha."""
+ alpha = np.exp(-np.log(2) / half_life)
+ return cls(alpha=alpha, **kwargs)
+
+
+@dataclass
+class Proposal:
+ """A governance proposal."""
+ id: str
+ title: str
+ description: str = ""
+ funds_requested: float = 0.0 # As fraction of total funds (for trigger)
+ proposal_type: str = "parameter_change" # parameter_change, chain_onboard, weight_update
+ parameter_target: str = "" # Which parameter to change
+ parameter_value: float = 0.0 # Proposed new value
+ status: str = "candidate" # candidate, active, passed, failed, killed
+ age: int = 0
+ total_conviction: float = 0.0
+ trigger: float = 0.0
+
+
+@dataclass
+class Voter:
+ """A participant in conviction voting."""
+ id: str
+ holdings: float = 0.0
+ sentiment: float = 0.5 # 0-1 scale
+ stakes: dict[str, float] = field(default_factory=dict) # proposal_id -> tokens staked
+ convictions: dict[str, float] = field(default_factory=dict) # proposal_id -> conviction
+
+
+@dataclass
+class ConvictionSystem:
+ """The conviction voting governance system."""
+ params: ConvictionParams
+ proposals: dict[str, Proposal] = field(default_factory=dict)
+ voters: dict[str, Voter] = field(default_factory=dict)
+ total_supply: float = 100_000.0
+ total_funds: float = 50_000.0
+ epoch: int = 0
+ passed_proposals: list[Proposal] = field(default_factory=list)
+
+
+def trigger_threshold(
+ requested_share: float,
+ supply: float,
+ params: ConvictionParams,
+) -> float:
+ """Calculate the conviction threshold for a proposal to pass.
+
+ τ = ρS / ((1-α)(β - share)²)
+ """
+ if requested_share >= params.beta:
+ return float('inf') # Can't request more than beta
+ return params.rho * supply / ((1 - params.alpha) * (params.beta - requested_share) ** 2)
+
+
+def update_conviction(
+ current_conviction: float,
+ staked_tokens: float,
+ alpha: float,
+) -> float:
+ """Update conviction for one voter-proposal pair.
+
+ y_{t+1} = α × y_t + x_t
+ """
+ return alpha * current_conviction + staked_tokens
+
+
+def max_conviction(staked_tokens: float, alpha: float) -> float:
+ """Maximum possible conviction (asymptotic limit).
+
+ y_max = x / (1 - α)
+ """
+ if alpha >= 1:
+ return float('inf')
+ return staked_tokens / (1 - alpha)
+
+
+def conviction_at_time(
+ staked_tokens: float,
+ alpha: float,
+ epochs: int,
+ initial_conviction: float = 0.0,
+) -> float:
+ """Explicit formula for conviction at time t.
+
+ y_t = α^t × y_0 + x × (1 - α^t) / (1 - α)
+ """
+ alpha_t = alpha ** epochs
+ return alpha_t * initial_conviction + staked_tokens * (1 - alpha_t) / (1 - alpha)
+
+
+def epochs_to_fraction(
+ target_fraction: float,
+ alpha: float,
+) -> float:
+ """Number of epochs to reach a fraction of max conviction.
+
+ T = ln(1 - fraction) / ln(α)
+ """
+ if target_fraction >= 1.0 or target_fraction <= 0.0:
+ return float('inf')
+ return np.log(1 - target_fraction) / np.log(alpha)
+
+
+def stake(
+ system: ConvictionSystem,
+ voter_id: str,
+ proposal_id: str,
+ amount: float,
+) -> ConvictionSystem:
+ """Stake tokens on a proposal."""
+ voter = system.voters.get(voter_id)
+ proposal = system.proposals.get(proposal_id)
+ if voter is None or proposal is None:
+ return system
+ if proposal.status != "candidate":
+ return system
+
+ # Check available tokens
+ total_staked = sum(voter.stakes.values())
+ available = voter.holdings - total_staked
+ actual = min(amount, available)
+
+ if actual <= 0:
+ return system
+
+ voter.stakes[proposal_id] = voter.stakes.get(proposal_id, 0.0) + actual
+ return system
+
+
+def unstake(
+ system: ConvictionSystem,
+ voter_id: str,
+ proposal_id: str,
+ amount: float,
+) -> ConvictionSystem:
+ """Unstake tokens from a proposal."""
+ voter = system.voters.get(voter_id)
+ if voter is None or proposal_id not in voter.stakes:
+ return system
+
+ actual = min(amount, voter.stakes[proposal_id])
+ voter.stakes[proposal_id] -= actual
+ if voter.stakes[proposal_id] <= 0:
+ del voter.stakes[proposal_id]
+
+ return system
+
+
+def tick(system: ConvictionSystem) -> ConvictionSystem:
+ """Advance one epoch: update convictions, check triggers.
+
+ Returns updated system with any newly passed proposals.
+ """
+ system.epoch += 1
+ p = system.params
+
+ # Update conviction for each voter-proposal pair
+ for voter in system.voters.values():
+ for proposal_id, staked in voter.stakes.items():
+ old_conviction = voter.convictions.get(proposal_id, 0.0)
+ voter.convictions[proposal_id] = update_conviction(
+ old_conviction, staked, p.alpha
+ )
+
+ # Aggregate total conviction per proposal and check triggers
+ for prop in system.proposals.values():
+ if prop.status != "candidate":
+ continue
+
+ prop.age += 1
+ prop.total_conviction = sum(
+ v.convictions.get(prop.id, 0.0)
+ for v in system.voters.values()
+ )
+
+ # Compute trigger threshold
+ prop.trigger = trigger_threshold(
+ prop.funds_requested, system.total_supply, p
+ )
+
+ # Check if proposal passes
+ if prop.age >= p.min_age and prop.total_conviction >= prop.trigger:
+ prop.status = "passed"
+ system.passed_proposals.append(prop)
+
+ return system
+
+
+def generate_conviction_curves(
+ staked_tokens: float,
+ alpha: float,
+ epochs: int = 100,
+) -> dict[str, np.ndarray]:
+ """Generate charging and discharging curves for visualization."""
+ charge = np.array([
+ conviction_at_time(staked_tokens, alpha, t)
+ for t in range(epochs)
+ ])
+
+ # Discharge: start from max conviction, remove stake
+ y_max = max_conviction(staked_tokens, alpha)
+ discharge = np.array([
+ y_max * (alpha ** t)
+ for t in range(epochs)
+ ])
+
+ t = np.arange(epochs)
+ return {
+ "time": t,
+ "charge": charge,
+ "discharge": discharge,
+ "max": np.full(epochs, y_max),
+ }
+
+
+def get_governance_metrics(system: ConvictionSystem) -> dict:
+ """Get comprehensive governance metrics."""
+ active_proposals = [p for p in system.proposals.values() if p.status == "candidate"]
+ total_staked = sum(
+ sum(v.stakes.values()) for v in system.voters.values()
+ )
+ return {
+ "epoch": system.epoch,
+ "total_supply": system.total_supply,
+ "total_staked": total_staked,
+ "staking_ratio": total_staked / system.total_supply if system.total_supply > 0 else 0,
+ "active_proposals": len(active_proposals),
+ "passed_proposals": len(system.passed_proposals),
+ "total_voters": len(system.voters),
+ "proposals": {
+ p.id: {
+ "title": p.title,
+ "conviction": p.total_conviction,
+ "trigger": p.trigger,
+ "progress": p.total_conviction / p.trigger if p.trigger > 0 and p.trigger != float('inf') else 0,
+ "age": p.age,
+ "status": p.status,
+ }
+ for p in system.proposals.values()
+ },
+ }
diff --git a/src/primitives/risk_tranching.py b/src/primitives/risk_tranching.py
new file mode 100644
index 0000000..b3b8c03
--- /dev/null
+++ b/src/primitives/risk_tranching.py
@@ -0,0 +1,350 @@
+"""Risk-tranched stablecoin issuance from staked collateral.
+
+Three tranches from the same multi-asset collateral pool:
+ - Senior (myUSD-S): Lowest risk, first claim on collateral, lowest yield
+ - Mezzanine (myUSD-M): Medium risk, second claim, medium yield
+ - Junior/Equity ($MYCO): Highest risk, residual claim, absorbs volatility
+
+Yield waterfall: staking rewards flow Senior → Mezzanine → Junior.
+Loss waterfall: losses absorbed Junior → Mezzanine → Senior.
+"""
+
+import numpy as np
+from dataclasses import dataclass, field
+
+
+@dataclass
+class TrancheParams:
+ """Parameters for the risk tranche system."""
+ # Collateral ratios (minimum ratio of collateral value to tranche value)
+ senior_collateral_ratio: float = 1.5 # 150% overcollateralized
+ mezzanine_collateral_ratio: float = 1.2 # 120% collateralized
+ # Junior has no minimum — it's the equity/residual tranche
+
+ # Yield targets (annualized, from staking rewards)
+ senior_yield_target: float = 0.03 # 3% annual
+ mezzanine_yield_target: float = 0.08 # 8% annual
+
+ # Tranche allocation limits (fraction of total collateral)
+ max_senior_fraction: float = 0.50 # Max 50% of collateral backs senior
+ max_mezzanine_fraction: float = 0.30 # Max 30% backs mezzanine
+ # Junior gets the remainder (min 20%)
+
+ # Liquidation thresholds
+ senior_liquidation_ratio: float = 1.2 # Liquidate if drops below 120%
+ mezzanine_liquidation_ratio: float = 1.05 # Liquidate if drops below 105%
+
+ # Rebalance trigger (deviation from target allocation)
+ rebalance_threshold: float = 0.05 # 5% deviation triggers rebalance
+
+
+@dataclass
+class TrancheState:
+ """State of a single tranche."""
+ name: str
+ supply: float = 0.0 # Tokens outstanding
+ collateral_backing: float = 0.0 # USD value of collateral assigned to this tranche
+ accrued_yield: float = 0.0 # Yield accumulated but not distributed
+ cumulative_yield: float = 0.0 # Total yield ever distributed
+ cumulative_losses: float = 0.0 # Total losses ever absorbed
+
+ @property
+ def collateral_ratio(self) -> float:
+ """Current collateral ratio."""
+ if self.supply == 0:
+ return float('inf')
+ return self.collateral_backing / self.supply
+
+ @property
+ def is_healthy(self) -> bool:
+ """Whether the tranche is above minimum collateral."""
+ return self.collateral_ratio > 1.0
+
+
+@dataclass
+class RiskTrancheSystem:
+ """The complete risk tranche system."""
+ params: TrancheParams
+ senior: TrancheState = field(default_factory=lambda: TrancheState(name="myUSD-S"))
+ mezzanine: TrancheState = field(default_factory=lambda: TrancheState(name="myUSD-M"))
+ junior: TrancheState = field(default_factory=lambda: TrancheState(name="$MYCO"))
+ total_collateral: float = 0.0
+ total_staking_yield: float = 0.0
+ time: float = 0.0
+
+ @property
+ def total_supply(self) -> float:
+ return self.senior.supply + self.mezzanine.supply + self.junior.supply
+
+ @property
+ def system_collateral_ratio(self) -> float:
+ if self.total_supply == 0:
+ return float('inf')
+ return self.total_collateral / self.total_supply
+
+
+def mint_capacity(system: RiskTrancheSystem) -> dict[str, float]:
+ """Calculate how many tokens each tranche can mint given current collateral.
+
+ Returns max additional mintable amount for each tranche.
+ """
+ p = system.params
+ total_c = system.total_collateral
+
+ # Senior: max mint = (total_c * max_senior_frac - current_backing) / cr
+ senior_max_backing = total_c * p.max_senior_fraction
+ senior_available = max(0, senior_max_backing - system.senior.collateral_backing)
+ senior_mintable = senior_available / p.senior_collateral_ratio
+
+ # Mezzanine: similar
+ mez_max_backing = total_c * p.max_mezzanine_fraction
+ mez_available = max(0, mez_max_backing - system.mezzanine.collateral_backing)
+ mez_mintable = mez_available / p.mezzanine_collateral_ratio
+
+ # Junior: residual collateral after senior + mez allocations
+ junior_backing = total_c - system.senior.collateral_backing - system.mezzanine.collateral_backing
+ junior_mintable = max(0, junior_backing - system.junior.supply) # 1:1 for junior
+
+ return {
+ "senior": senior_mintable,
+ "mezzanine": mez_mintable,
+ "junior": junior_mintable,
+ }
+
+
+def deposit_collateral(
+ system: RiskTrancheSystem,
+ amount: float,
+ asset_prices: dict[str, float] | None = None,
+) -> RiskTrancheSystem:
+ """Add collateral to the system.
+
+ Collateral is added to the pool and automatically allocated
+ to tranches based on target ratios and current state.
+ """
+ system.total_collateral += amount
+
+ # Allocate to tranches based on deficit from targets
+ _rebalance_collateral(system)
+
+ return system
+
+
+def mint_tranche(
+ system: RiskTrancheSystem,
+ tranche: str,
+ amount: float,
+) -> tuple[RiskTrancheSystem, float]:
+ """Mint tokens from a specific tranche.
+
+ Returns (updated_system, actual_minted).
+ """
+ caps = mint_capacity(system)
+ actual = min(amount, caps.get(tranche, 0.0))
+
+ if actual <= 0:
+ return system, 0.0
+
+ p = system.params
+
+ if tranche == "senior":
+ required_collateral = actual * p.senior_collateral_ratio
+ system.senior.supply += actual
+ system.senior.collateral_backing += required_collateral
+ elif tranche == "mezzanine":
+ required_collateral = actual * p.mezzanine_collateral_ratio
+ system.mezzanine.supply += actual
+ system.mezzanine.collateral_backing += required_collateral
+ elif tranche == "junior":
+ system.junior.supply += actual
+ system.junior.collateral_backing += actual # 1:1
+
+ return system, actual
+
+
+def redeem_tranche(
+ system: RiskTrancheSystem,
+ tranche: str,
+ amount: float,
+) -> tuple[RiskTrancheSystem, float]:
+ """Redeem tranche tokens for collateral.
+
+ Returns (updated_system, collateral_returned).
+ """
+ if tranche == "senior":
+ t = system.senior
+ elif tranche == "mezzanine":
+ t = system.mezzanine
+ else:
+ t = system.junior
+
+ actual = min(amount, t.supply)
+ if actual <= 0:
+ return system, 0.0
+
+ # Proportional collateral release
+ collateral_share = actual / t.supply
+ collateral_returned = t.collateral_backing * collateral_share
+
+ t.supply -= actual
+ t.collateral_backing -= collateral_returned
+ system.total_collateral -= collateral_returned
+
+ return system, collateral_returned
+
+
+def distribute_yield(
+ system: RiskTrancheSystem,
+ yield_amount: float,
+ dt: float,
+) -> RiskTrancheSystem:
+ """Distribute staking yield through the waterfall.
+
+ Order: Senior gets target yield first, then Mezzanine, then Junior gets residual.
+ """
+ remaining = yield_amount
+ system.total_staking_yield += yield_amount
+
+ # Senior gets their target first
+ if system.senior.supply > 0:
+ senior_target = system.senior.supply * system.params.senior_yield_target * dt
+ senior_gets = min(remaining, senior_target)
+ system.senior.accrued_yield += senior_gets
+ system.senior.cumulative_yield += senior_gets
+ system.senior.collateral_backing += senior_gets
+ remaining -= senior_gets
+
+ # Mezzanine gets their target next
+ if remaining > 0 and system.mezzanine.supply > 0:
+ mez_target = system.mezzanine.supply * system.params.mezzanine_yield_target * dt
+ mez_gets = min(remaining, mez_target)
+ system.mezzanine.accrued_yield += mez_gets
+ system.mezzanine.cumulative_yield += mez_gets
+ system.mezzanine.collateral_backing += mez_gets
+ remaining -= mez_gets
+
+ # Junior gets the residual
+ if remaining > 0:
+ system.junior.accrued_yield += remaining
+ system.junior.cumulative_yield += remaining
+ system.junior.collateral_backing += remaining
+
+ return system
+
+
+def apply_loss(
+ system: RiskTrancheSystem,
+ loss_amount: float,
+) -> RiskTrancheSystem:
+ """Apply collateral loss through the reverse waterfall.
+
+ Order: Junior absorbs first, then Mezzanine, then Senior.
+ """
+ remaining = loss_amount
+ system.total_collateral -= loss_amount
+
+ # Junior absorbs first
+ junior_can_absorb = system.junior.collateral_backing
+ junior_absorbs = min(remaining, junior_can_absorb)
+ system.junior.collateral_backing -= junior_absorbs
+ system.junior.cumulative_losses += junior_absorbs
+ remaining -= junior_absorbs
+
+ # Mezzanine absorbs next
+ if remaining > 0:
+ mez_can_absorb = system.mezzanine.collateral_backing
+ mez_absorbs = min(remaining, mez_can_absorb)
+ system.mezzanine.collateral_backing -= mez_absorbs
+ system.mezzanine.cumulative_losses += mez_absorbs
+ remaining -= mez_absorbs
+
+ # Senior absorbs last (this is the doomsday scenario)
+ if remaining > 0:
+ system.senior.collateral_backing -= remaining
+ system.senior.cumulative_losses += remaining
+
+ return system
+
+
+def check_liquidation(system: RiskTrancheSystem) -> dict[str, bool]:
+ """Check if any tranche needs liquidation."""
+ p = system.params
+ return {
+ "senior": (
+ system.senior.supply > 0
+ and system.senior.collateral_ratio < p.senior_liquidation_ratio
+ ),
+ "mezzanine": (
+ system.mezzanine.supply > 0
+ and system.mezzanine.collateral_ratio < p.mezzanine_liquidation_ratio
+ ),
+ "junior": (
+ system.junior.supply > 0
+ and system.junior.collateral_ratio < 0.5 # Junior liquidates at 50%
+ ),
+ }
+
+
+def _rebalance_collateral(system: RiskTrancheSystem) -> None:
+ """Internal: rebalance collateral allocation across tranches."""
+ total = system.total_collateral
+ if total == 0:
+ return
+
+ p = system.params
+ # Target: senior gets enough for its CR, mez gets enough, junior gets rest
+ senior_needs = system.senior.supply * p.senior_collateral_ratio
+ mez_needs = system.mezzanine.supply * p.mezzanine_collateral_ratio
+ junior_needs = system.junior.supply # 1:1
+
+ total_needs = senior_needs + mez_needs + junior_needs
+ if total_needs == 0:
+ return
+
+ # If enough collateral, give everyone what they need
+ if total >= total_needs:
+ system.senior.collateral_backing = senior_needs
+ system.mezzanine.collateral_backing = mez_needs
+ # Junior gets all the excess
+ system.junior.collateral_backing = total - senior_needs - mez_needs
+ else:
+ # Not enough: allocate proportionally with priority
+ # Senior gets priority
+ system.senior.collateral_backing = min(senior_needs, total)
+ remaining = total - system.senior.collateral_backing
+ system.mezzanine.collateral_backing = min(mez_needs, remaining)
+ remaining -= system.mezzanine.collateral_backing
+ system.junior.collateral_backing = remaining
+
+
+def get_tranche_metrics(system: RiskTrancheSystem) -> dict:
+ """Get comprehensive metrics for all tranches."""
+ return {
+ "total_collateral": system.total_collateral,
+ "system_cr": system.system_collateral_ratio,
+ "total_supply": system.total_supply,
+ "senior": {
+ "supply": system.senior.supply,
+ "collateral": system.senior.collateral_backing,
+ "cr": system.senior.collateral_ratio,
+ "yield": system.senior.cumulative_yield,
+ "losses": system.senior.cumulative_losses,
+ "healthy": system.senior.is_healthy,
+ },
+ "mezzanine": {
+ "supply": system.mezzanine.supply,
+ "collateral": system.mezzanine.collateral_backing,
+ "cr": system.mezzanine.collateral_ratio,
+ "yield": system.mezzanine.cumulative_yield,
+ "losses": system.mezzanine.cumulative_losses,
+ "healthy": system.mezzanine.is_healthy,
+ },
+ "junior": {
+ "supply": system.junior.supply,
+ "collateral": system.junior.collateral_backing,
+ "cr": system.junior.collateral_ratio,
+ "yield": system.junior.cumulative_yield,
+ "losses": system.junior.cumulative_losses,
+ "healthy": system.junior.is_healthy,
+ },
+ }