diff --git a/node/requirements.txt b/node/requirements.txt index 5c120b0..42f4336 100644 --- a/node/requirements.txt +++ b/node/requirements.txt @@ -7,3 +7,4 @@ pydantic-settings>=2.0.0 httpx>=0.27.0 pyyaml>=6.0 mcp>=1.0.0 +redis>=5.0.0 diff --git a/node/spore_node/api/main.py b/node/spore_node/api/main.py index 78a62b1..97f4686 100644 --- a/node/spore_node/api/main.py +++ b/node/spore_node/api/main.py @@ -5,12 +5,15 @@ If running standalone (without koi-net), creates its own app. """ import logging +from pathlib import Path from contextlib import asynccontextmanager from fastapi import FastAPI +from fastapi.staticfiles import StaticFiles +from fastapi.responses import FileResponse from spore_node.config import SporeConfig from spore_node.db.connection import init_pool, close_pool -from spore_node.api.routers import health, holons, governance, claims, intents, commitments +from spore_node.api.routers import health, holons, governance, claims, intents, commitments, federation log = logging.getLogger(__name__) @@ -23,6 +26,16 @@ def mount_routers(app: FastAPI) -> None: app.include_router(claims.router) app.include_router(intents.router) app.include_router(commitments.router) + app.include_router(federation.router) + + # Graph visualization + static_dir = Path(__file__).parent.parent / "static" + if static_dir.exists(): + @app.get("/graph") + async def graph_page(): + return FileResponse(static_dir / "graph.html") + + app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") def create_standalone_app(cfg: SporeConfig | None = None) -> FastAPI: diff --git a/node/spore_node/api/routers/federation.py b/node/spore_node/api/routers/federation.py new file mode 100644 index 0000000..bc8de50 --- /dev/null +++ b/node/spore_node/api/routers/federation.py @@ -0,0 +1,225 @@ +"""Federation endpoints — peer handshake, trust tiers, event relay.""" + +import json +import uuid +from fastapi import APIRouter, HTTPException +from pydantic import BaseModel + +from spore_node.db.connection import get_pool +from spore_node.federation.crypto import ( + generate_keypair, public_key_to_b64, b64_to_public_key, derive_shared_key, +) + +router = APIRouter(prefix="/federation", tags=["federation"]) + +VALID_TRUST_TIERS = ("trusted", "peer", "monitored") +VALID_HANDSHAKE_STATES = ("pending", "proposed", "approved", "rejected") + +# Node's own keypair (generated on first handshake) +_node_keypair: tuple[bytes, bytes] | None = None + + +def _get_keypair() -> tuple[bytes, bytes]: + global _node_keypair + if _node_keypair is None: + _node_keypair = generate_keypair() + return _node_keypair + + +class HandshakeRequest(BaseModel): + peer_url: str + peer_rid: str = "" + peer_public_key: str = "" # base64-encoded X25519 public key + trust_tier: str = "monitored" + + +class PeerResponse(BaseModel): + id: str + node_rid: str + node_url: str + trust_tier: str + handshake_status: str + pending_events: int = 0 + + +class HandshakeResponse(BaseModel): + status: str + peer_rid: str + our_public_key: str + message: str + + +@router.post("/handshake", response_model=HandshakeResponse) +async def initiate_handshake(data: HandshakeRequest): + """Initiate or accept a federation handshake with a peer node.""" + if data.trust_tier not in VALID_TRUST_TIERS: + raise HTTPException(422, f"trust_tier must be one of {VALID_TRUST_TIERS}") + + pool = get_pool() + _, our_public = _get_keypair() + + peer_rid = data.peer_rid or f"orn:koi-net.node:peer-{uuid.uuid4().hex[:8]}" + + # Store peer encryption config + config = {} + if data.peer_public_key: + config["peer_public_key"] = data.peer_public_key + config["our_public_key"] = public_key_to_b64(our_public) + # Derive shared key for this peer + our_private, _ = _get_keypair() + shared = derive_shared_key(our_private, b64_to_public_key(data.peer_public_key)) + config["encryption_ready"] = True + + try: + await pool.execute( + """ + INSERT INTO koi_peers (node_rid, node_url, trust_tier, handshake_status, config) + VALUES ($1, $2, $3, 'proposed', $4::jsonb) + ON CONFLICT (node_rid) DO UPDATE SET + node_url = EXCLUDED.node_url, + trust_tier = EXCLUDED.trust_tier, + handshake_status = 'proposed', + config = EXCLUDED.config, + updated_at = now() + """, + peer_rid, data.peer_url, data.trust_tier, json.dumps(config), + ) + except Exception as e: + raise HTTPException(500, f"Failed to store peer: {e}") + + await _log_event(pool, peer_rid, "federation.handshake.proposed", + {"peer_url": data.peer_url, "trust_tier": data.trust_tier}) + + return HandshakeResponse( + status="proposed", + peer_rid=peer_rid, + our_public_key=public_key_to_b64(our_public), + message=f"Handshake proposed with {data.peer_url}. Awaiting bilateral approval.", + ) + + +@router.patch("/peers/{peer_rid}/approve") +async def approve_peer(peer_rid: str): + """Approve a pending handshake — transitions to approved.""" + pool = get_pool() + row = await pool.fetchrow( + "SELECT handshake_status FROM koi_peers WHERE node_rid = $1", peer_rid + ) + if not row: + raise HTTPException(404, "Peer not found") + if row["handshake_status"] == "approved": + return {"status": "already_approved"} + if row["handshake_status"] == "rejected": + raise HTTPException(422, "Peer was rejected. Re-initiate handshake.") + + await pool.execute( + "UPDATE koi_peers SET handshake_status = 'approved', updated_at = now() WHERE node_rid = $1", + peer_rid, + ) + await _log_event(pool, peer_rid, "federation.handshake.approved", {}) + return {"status": "approved", "peer_rid": peer_rid} + + +@router.patch("/peers/{peer_rid}/reject") +async def reject_peer(peer_rid: str): + """Reject a pending handshake.""" + pool = get_pool() + row = await pool.fetchrow( + "SELECT handshake_status FROM koi_peers WHERE node_rid = $1", peer_rid + ) + if not row: + raise HTTPException(404, "Peer not found") + + await pool.execute( + "UPDATE koi_peers SET handshake_status = 'rejected', updated_at = now() WHERE node_rid = $1", + peer_rid, + ) + await _log_event(pool, peer_rid, "federation.handshake.rejected", {}) + return {"status": "rejected", "peer_rid": peer_rid} + + +@router.patch("/peers/{peer_rid}/trust") +async def update_trust_tier(peer_rid: str, trust_tier: str): + """Update a peer's trust tier.""" + if trust_tier not in VALID_TRUST_TIERS: + raise HTTPException(422, f"trust_tier must be one of {VALID_TRUST_TIERS}") + + pool = get_pool() + row = await pool.fetchrow( + "SELECT node_rid FROM koi_peers WHERE node_rid = $1", peer_rid + ) + if not row: + raise HTTPException(404, "Peer not found") + + await pool.execute( + "UPDATE koi_peers SET trust_tier = $1, updated_at = now() WHERE node_rid = $2", + trust_tier, peer_rid, + ) + await _log_event(pool, peer_rid, "federation.trust_tier.changed", + {"trust_tier": trust_tier}) + return {"peer_rid": peer_rid, "trust_tier": trust_tier} + + +@router.get("/peers", response_model=list[PeerResponse]) +async def list_peers(trust_tier: str | None = None): + """List all federation peers.""" + pool = get_pool() + if trust_tier: + rows = await pool.fetch( + "SELECT * FROM koi_peers WHERE trust_tier = $1 ORDER BY created_at", + trust_tier, + ) + else: + rows = await pool.fetch("SELECT * FROM koi_peers ORDER BY created_at") + return [_peer_dict(r) for r in rows] + + +@router.get("/peers/{peer_rid}", response_model=PeerResponse) +async def get_peer(peer_rid: str): + """Get a single peer by RID.""" + pool = get_pool() + row = await pool.fetchrow( + "SELECT * FROM koi_peers WHERE node_rid = $1", peer_rid + ) + if not row: + raise HTTPException(404, "Peer not found") + return _peer_dict(row) + + +@router.get("/events/pending") +async def pending_events(): + """Get count of pending relay events per peer.""" + pool = get_pool() + peers = await pool.fetch( + "SELECT node_rid, node_url FROM koi_peers WHERE handshake_status = 'approved'" + ) + # Import relay lazily to avoid circular deps + from spore_node.federation.relay import FederationRelay + from spore_node.config import SporeConfig + cfg = SporeConfig() + relay = FederationRelay(cfg.redis_url) + + result = {} + for peer in peers: + count = await relay.pending_count(peer["node_rid"]) + if count > 0: + result[peer["node_rid"]] = count + return result + + +def _peer_dict(row) -> dict: + d = dict(row) + d["id"] = str(d["id"]) + d["pending_events"] = 0 + d.pop("config", None) + d.pop("last_seen_at", None) + d.pop("created_at", None) + d.pop("updated_at", None) + return d + + +async def _log_event(pool, entity_rid: str, event_kind: str, payload: dict): + await pool.execute( + "INSERT INTO events (entity_rid, event_kind, payload) VALUES ($1, $2, $3::jsonb)", + entity_rid, event_kind, json.dumps(payload), + ) diff --git a/node/spore_node/api/routers/health.py b/node/spore_node/api/routers/health.py index 497a176..64414cc 100644 --- a/node/spore_node/api/routers/health.py +++ b/node/spore_node/api/routers/health.py @@ -6,6 +6,17 @@ from spore_node.db.connection import get_pool router = APIRouter(tags=["health"]) +@router.get("/") +async def root(): + """Root endpoint — redirect to docs.""" + return { + "name": "Spore Agent Commons", + "version": "0.1.0", + "docs": "/docs", + "health": "/health", + } + + @router.get("/health") async def health(): """Node health check with DB connectivity.""" diff --git a/node/spore_node/federation/__init__.py b/node/spore_node/federation/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/node/spore_node/federation/crypto.py b/node/spore_node/federation/crypto.py new file mode 100644 index 0000000..41f52ef --- /dev/null +++ b/node/spore_node/federation/crypto.py @@ -0,0 +1,55 @@ +"""E2E encryption for federation payloads. + +X25519 key exchange + ChaCha20-Poly1305 for symmetric encryption. +Uses the `cryptography` library (already a koi-net dependency). +""" + +import base64 +import os + +from cryptography.hazmat.primitives.asymmetric.x25519 import ( + X25519PrivateKey, + X25519PublicKey, +) +from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 + + +def generate_keypair() -> tuple[bytes, bytes]: + """Generate X25519 keypair. Returns (private_key_bytes, public_key_bytes).""" + private_key = X25519PrivateKey.generate() + public_key = private_key.public_key() + return ( + private_key.private_bytes_raw(), + public_key.public_bytes_raw(), + ) + + +def derive_shared_key(our_private: bytes, their_public: bytes) -> bytes: + """Derive shared secret from X25519 key exchange.""" + private_key = X25519PrivateKey.from_private_bytes(our_private) + public_key = X25519PublicKey.from_public_bytes(their_public) + return private_key.exchange(public_key) + + +def encrypt(shared_key: bytes, plaintext: bytes) -> bytes: + """Encrypt with ChaCha20-Poly1305. Returns nonce + ciphertext.""" + aead = ChaCha20Poly1305(shared_key) + nonce = os.urandom(12) + ciphertext = aead.encrypt(nonce, plaintext, None) + return nonce + ciphertext + + +def decrypt(shared_key: bytes, data: bytes) -> bytes: + """Decrypt ChaCha20-Poly1305. Expects nonce (12 bytes) + ciphertext.""" + aead = ChaCha20Poly1305(shared_key) + nonce = data[:12] + ciphertext = data[12:] + return aead.decrypt(nonce, ciphertext, None) + + +def public_key_to_b64(key_bytes: bytes) -> str: + return base64.b64encode(key_bytes).decode() + + +def b64_to_public_key(b64: str) -> bytes: + return base64.b64decode(b64) diff --git a/node/spore_node/federation/relay.py b/node/spore_node/federation/relay.py new file mode 100644 index 0000000..ed0729d --- /dev/null +++ b/node/spore_node/federation/relay.py @@ -0,0 +1,94 @@ +"""Store-and-forward relay for offline federation peers. + +Uses Redis sorted sets with TTL. Events queued per peer, flushed on schedule. +""" + +import json +import time +import logging +from typing import Any + +import httpx + +log = logging.getLogger(__name__) + +RELAY_PREFIX = "spore:relay:" +RELAY_TTL_SECONDS = 7 * 24 * 3600 # 7 days + + +class FederationRelay: + def __init__(self, redis_url: str): + self._redis_url = redis_url + self._redis = None + + async def _get_redis(self): + if self._redis is None: + import redis.asyncio as aioredis + self._redis = aioredis.from_url(self._redis_url) + return self._redis + + async def enqueue(self, peer_rid: str, event: dict) -> None: + """Queue an event for a peer.""" + r = await self._get_redis() + key = f"{RELAY_PREFIX}{peer_rid}" + score = time.time() + await r.zadd(key, {json.dumps(event): score}) + # Set TTL on the key + await r.expire(key, RELAY_TTL_SECONDS) + + async def flush_peer(self, peer_rid: str, peer_url: str) -> int: + """Attempt to flush queued events to a peer. Returns count flushed.""" + r = await self._get_redis() + key = f"{RELAY_PREFIX}{peer_rid}" + + # Get all queued events + events = await r.zrangebyscore(key, "-inf", "+inf") + if not events: + return 0 + + flushed = 0 + async with httpx.AsyncClient(timeout=30) as client: + for raw in events: + event = json.loads(raw) + try: + resp = await client.post( + f"{peer_url}/koi-net/events/broadcast", + json=event, + ) + resp.raise_for_status() + await r.zrem(key, raw) + flushed += 1 + except Exception as e: + log.warning(f"Failed to flush to {peer_rid}: {e}") + break # Stop on first failure + + return flushed + + async def flush_all(self, peers: list[dict]) -> dict[str, int]: + """Flush events to all peers. Returns {peer_rid: count}.""" + results = {} + for peer in peers: + if peer.get("handshake_status") != "approved": + continue + count = await self.flush_peer(peer["node_rid"], peer["node_url"]) + if count > 0: + results[peer["node_rid"]] = count + return results + + async def pending_count(self, peer_rid: str) -> int: + """Get count of pending events for a peer.""" + r = await self._get_redis() + key = f"{RELAY_PREFIX}{peer_rid}" + return await r.zcard(key) + + async def prune_expired(self) -> int: + """Remove events older than TTL. Returns count removed.""" + r = await self._get_redis() + cutoff = time.time() - RELAY_TTL_SECONDS + total = 0 + + async for key in r.scan_iter(f"{RELAY_PREFIX}*"): + removed = await r.zremrangebyscore(key, "-inf", cutoff) + total += removed + + return total diff --git a/node/spore_node/mcp_server.py b/node/spore_node/mcp_server.py index 87522c3..a77bdbe 100644 --- a/node/spore_node/mcp_server.py +++ b/node/spore_node/mcp_server.py @@ -366,5 +366,36 @@ def search_knowledge(query: str, entity_type: str = "", return json.dumps(results, indent=2) +# --- Federation tools --- + +@server.tool() +def list_federation_peers(trust_tier: str = "") -> str: + """List all federation peers with their trust tiers and handshake status. + + Args: + trust_tier: Filter by tier (trusted, peer, monitored). Empty = all. + """ + params = f"?trust_tier={trust_tier}" if trust_tier else "" + result = _api("GET", f"/federation/peers{params}") + return json.dumps(result, indent=2) + + +@server.tool() +def initiate_federation_handshake(peer_url: str, peer_rid: str = "", + trust_tier: str = "monitored") -> str: + """Initiate a federation handshake with a peer node. + + Args: + peer_url: URL of the peer node (e.g. 'https://bkc.example.com') + peer_rid: RID of the peer (optional, auto-generated if empty) + trust_tier: Initial trust tier (trusted, peer, monitored) + """ + result = _api("POST", "/federation/handshake", { + "peer_url": peer_url, "peer_rid": peer_rid, + "trust_tier": trust_tier, + }) + return json.dumps(result, indent=2) + + if __name__ == "__main__": server.run(transport="stdio") diff --git a/node/spore_node/static/graph.html b/node/spore_node/static/graph.html new file mode 100644 index 0000000..be654c1 --- /dev/null +++ b/node/spore_node/static/graph.html @@ -0,0 +1,179 @@ + + +
+ + +Click a node to inspect