402 lines
12 KiB
Python
402 lines
12 KiB
Python
"""Spore Agent Commons MCP Server.
|
|
|
|
Provides tools for Claude Code agents to interact with the Spore commons.
|
|
Transport: stdio (invoked via docker exec or direct python).
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import httpx
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
API_BASE = os.environ.get("SPORE_API_URL", "http://localhost:8351")
|
|
|
|
server = FastMCP(
|
|
name="spore-commons",
|
|
instructions=(
|
|
"Spore Agent Commons — governance memory, knowledge graph, and "
|
|
"federation layer for multi-agent coordination. Use these tools to "
|
|
"manage holons (agents), claims, evidence, attestations, intents, "
|
|
"commitments, and governance documents."
|
|
),
|
|
)
|
|
|
|
client = httpx.Client(base_url=API_BASE, timeout=30)
|
|
|
|
|
|
def _api(method: str, path: str, data: dict | None = None) -> dict | list:
|
|
"""Make an API call and return parsed JSON."""
|
|
if method == "GET":
|
|
r = client.get(path)
|
|
elif method == "POST":
|
|
r = client.post(path, json=data)
|
|
elif method == "PATCH":
|
|
r = client.patch(path)
|
|
else:
|
|
raise ValueError(f"Unsupported method: {method}")
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
# --- Resource ---
|
|
|
|
@server.resource("spore://node/status")
|
|
def node_status() -> str:
|
|
"""Current node health and status."""
|
|
result = _api("GET", "/health")
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
# --- Holon tools ---
|
|
|
|
@server.tool()
|
|
def list_holons(holon_type: str = "") -> str:
|
|
"""List all holons (agents/teams/orgs) in the commons.
|
|
|
|
Args:
|
|
holon_type: Filter by type (agent, team, org). Empty = all.
|
|
"""
|
|
params = f"?holon_type={holon_type}" if holon_type else ""
|
|
result = _api("GET", f"/holons{params}")
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@server.tool()
|
|
def register_holon(slug: str, name: str, holon_type: str = "agent",
|
|
description: str = "") -> str:
|
|
"""Register a new holon (agent, team, or organization) in the commons.
|
|
|
|
Args:
|
|
slug: URL-safe identifier (e.g. 'jeff-emmett')
|
|
name: Display name
|
|
holon_type: agent, team, or org
|
|
description: Brief description of the holon
|
|
"""
|
|
result = _api("POST", "/holons", {
|
|
"slug": slug, "name": name,
|
|
"holon_type": holon_type, "description": description,
|
|
})
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
# --- Claims tools ---
|
|
|
|
@server.tool()
|
|
def propose_claim(content: str, proposer_rid: str,
|
|
confidence: float = 0.5) -> str:
|
|
"""Propose a new knowledge claim in the commons.
|
|
|
|
Args:
|
|
content: The claim text
|
|
proposer_rid: RID of the proposing holon (e.g. 'orn:spore.holon:jeff-emmett')
|
|
confidence: Initial confidence level (0.0-1.0)
|
|
"""
|
|
result = _api("POST", "/claims", {
|
|
"content": content, "proposer_rid": proposer_rid,
|
|
"confidence": confidence,
|
|
})
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@server.tool()
|
|
def list_claims(status: str = "", proposer_rid: str = "",
|
|
limit: int = 20) -> str:
|
|
"""List knowledge claims with optional filters.
|
|
|
|
Args:
|
|
status: Filter by status (proposed, supported, challenged, superseded)
|
|
proposer_rid: Filter by proposer RID
|
|
limit: Max results (default 20)
|
|
"""
|
|
params = []
|
|
if status:
|
|
params.append(f"status={status}")
|
|
if proposer_rid:
|
|
params.append(f"proposer_rid={proposer_rid}")
|
|
params.append(f"limit={limit}")
|
|
qs = "?" + "&".join(params)
|
|
result = _api("GET", f"/claims{qs}")
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@server.tool()
|
|
def add_evidence(claim_id: str, relation: str, body: str,
|
|
source: str = "") -> str:
|
|
"""Add evidence supporting or challenging a claim.
|
|
|
|
Args:
|
|
claim_id: UUID of the claim
|
|
relation: 'supports' or 'challenges'
|
|
body: The evidence text
|
|
source: Source attribution (optional)
|
|
"""
|
|
provenance = {"source": source} if source else {}
|
|
result = _api("POST", f"/claims/{claim_id}/evidence", {
|
|
"relation": relation, "body": body, "provenance": provenance,
|
|
})
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@server.tool()
|
|
def attest_claim(claim_id: str, attester_rid: str, verdict: str,
|
|
strength: float = 1.0, reasoning: str = "") -> str:
|
|
"""Attest to a claim (endorse, dispute, or abstain).
|
|
|
|
Args:
|
|
claim_id: UUID of the claim
|
|
attester_rid: RID of the attesting holon
|
|
verdict: 'endorse', 'dispute', or 'abstain'
|
|
strength: Attestation strength (0.0-1.0)
|
|
reasoning: Explanation for the attestation
|
|
"""
|
|
result = _api("POST", f"/claims/{claim_id}/attestations", {
|
|
"attester_rid": attester_rid, "verdict": verdict,
|
|
"strength": strength, "reasoning": reasoning,
|
|
})
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@server.tool()
|
|
def get_claim_strength(claim_id: str) -> str:
|
|
"""Get aggregated strength metrics for a claim (endorsements, disputes, evidence counts).
|
|
|
|
Args:
|
|
claim_id: UUID of the claim
|
|
"""
|
|
result = _api("GET", f"/claims/{claim_id}/strength")
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
# --- Intent tools ---
|
|
|
|
@server.tool()
|
|
def publish_intent(publisher_rid: str, title: str, intent_type: str,
|
|
description: str = "",
|
|
governance_fit: str = "") -> str:
|
|
"""Publish an intent (need, offer, or possibility).
|
|
|
|
Args:
|
|
publisher_rid: RID of the publishing holon
|
|
title: Short title for the intent
|
|
intent_type: 'need', 'offer', or 'possibility'
|
|
description: Detailed description
|
|
governance_fit: Comma-separated governance tags for matching (e.g. 'consent,membrane')
|
|
"""
|
|
gov_list = [g.strip() for g in governance_fit.split(",") if g.strip()] if governance_fit else []
|
|
result = _api("POST", "/intents", {
|
|
"publisher_rid": publisher_rid, "title": title,
|
|
"intent_type": intent_type, "description": description,
|
|
"governance_fit": gov_list,
|
|
})
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@server.tool()
|
|
def get_matching_intents(intent_id: str,
|
|
min_similarity: float = 0.1) -> str:
|
|
"""Get matching intents for a given intent (complementary need↔offer pairs).
|
|
|
|
Args:
|
|
intent_id: UUID of the intent to find matches for
|
|
min_similarity: Minimum similarity threshold (0.0-1.0)
|
|
"""
|
|
result = _api("GET", f"/intents/{intent_id}/matches?min_similarity={min_similarity}")
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@server.tool()
|
|
def list_intents(intent_type: str = "", state: str = "",
|
|
limit: int = 20) -> str:
|
|
"""List published intents with optional filters.
|
|
|
|
Args:
|
|
intent_type: Filter by type (need, offer, possibility)
|
|
state: Filter by state (open, matched, committed, expired, withdrawn)
|
|
limit: Max results
|
|
"""
|
|
params = []
|
|
if intent_type:
|
|
params.append(f"intent_type={intent_type}")
|
|
if state:
|
|
params.append(f"state={state}")
|
|
params.append(f"limit={limit}")
|
|
qs = "?" + "&".join(params)
|
|
result = _api("GET", f"/intents{qs}")
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
# --- Commitment tools ---
|
|
|
|
@server.tool()
|
|
def create_commitment(title: str, proposer_rid: str,
|
|
acceptor_rid: str = "",
|
|
description: str = "",
|
|
settlement_type: str = "attestation") -> str:
|
|
"""Create a new commitment between holons.
|
|
|
|
Args:
|
|
title: Commitment title
|
|
proposer_rid: RID of the proposing holon
|
|
acceptor_rid: RID of the accepting holon (optional)
|
|
description: Detailed description
|
|
settlement_type: How the commitment is settled (default: attestation)
|
|
"""
|
|
data = {
|
|
"title": title, "proposer_rid": proposer_rid,
|
|
"description": description, "settlement_type": settlement_type,
|
|
}
|
|
if acceptor_rid:
|
|
data["acceptor_rid"] = acceptor_rid
|
|
result = _api("POST", "/commitments", data)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@server.tool()
|
|
def update_commitment_state(commitment_id: str, new_state: str) -> str:
|
|
"""Transition a commitment to a new state.
|
|
|
|
Valid transitions:
|
|
- proposed → verified, cancelled
|
|
- verified → active, cancelled
|
|
- active → evidence_linked, disputed, cancelled
|
|
- evidence_linked → redeemed, disputed
|
|
- disputed → resolved, cancelled
|
|
- resolved → active, redeemed
|
|
|
|
Args:
|
|
commitment_id: UUID of the commitment
|
|
new_state: Target state
|
|
"""
|
|
result = _api("PATCH", f"/commitments/{commitment_id}/state?new_state={new_state}")
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@server.tool()
|
|
def list_commitments(state: str = "", proposer_rid: str = "",
|
|
limit: int = 20) -> str:
|
|
"""List commitments with optional filters.
|
|
|
|
Args:
|
|
state: Filter by state
|
|
proposer_rid: Filter by proposer
|
|
limit: Max results
|
|
"""
|
|
params = []
|
|
if state:
|
|
params.append(f"state={state}")
|
|
if proposer_rid:
|
|
params.append(f"proposer_rid={proposer_rid}")
|
|
params.append(f"limit={limit}")
|
|
qs = "?" + "&".join(params)
|
|
result = _api("GET", f"/commitments{qs}")
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
# --- Governance tools ---
|
|
|
|
@server.tool()
|
|
def get_governance_dag() -> str:
|
|
"""Get the full governance specification DAG (documents, dependencies, topological order)."""
|
|
result = _api("GET", "/governance/dag")
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@server.tool()
|
|
def ingest_governance_doc(content: str) -> str:
|
|
"""Ingest a governance document (markdown with YAML frontmatter).
|
|
|
|
The document must have YAML frontmatter with at least doc_id and doc_kind.
|
|
Example:
|
|
---
|
|
doc_id: spore.governance.consent
|
|
doc_kind: protocol
|
|
status: active
|
|
depends_on:
|
|
- spore.governance.membrane
|
|
---
|
|
# Document Title
|
|
Body content...
|
|
|
|
Args:
|
|
content: Raw markdown with YAML frontmatter
|
|
"""
|
|
result = _api("POST", "/governance/docs", {"content": content})
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@server.tool()
|
|
def search_knowledge(query: str, entity_type: str = "",
|
|
limit: int = 10) -> str:
|
|
"""Search across claims, intents, and governance docs.
|
|
|
|
Args:
|
|
query: Search query text
|
|
entity_type: Filter to 'claims', 'intents', or 'governance' (empty = all)
|
|
limit: Max results per type
|
|
"""
|
|
results = {}
|
|
|
|
if not entity_type or entity_type == "claims":
|
|
claims = _api("GET", f"/claims?limit={limit}")
|
|
# Filter by query text (basic until embeddings are available)
|
|
q_lower = query.lower()
|
|
results["claims"] = [
|
|
c for c in claims
|
|
if q_lower in c.get("content", "").lower()
|
|
]
|
|
|
|
if not entity_type or entity_type == "intents":
|
|
intents = _api("GET", f"/intents?limit={limit}")
|
|
q_lower = query.lower()
|
|
results["intents"] = [
|
|
i for i in intents
|
|
if q_lower in i.get("title", "").lower()
|
|
or q_lower in i.get("description", "").lower()
|
|
]
|
|
|
|
if not entity_type or entity_type == "governance":
|
|
docs = _api("GET", "/governance/docs")
|
|
q_lower = query.lower()
|
|
results["governance"] = [
|
|
d for d in docs
|
|
if q_lower in d.get("title", "").lower()
|
|
or q_lower in d.get("body", "").lower()
|
|
]
|
|
|
|
return json.dumps(results, indent=2)
|
|
|
|
|
|
# --- Federation tools ---
|
|
|
|
@server.tool()
|
|
def list_federation_peers(trust_tier: str = "") -> str:
|
|
"""List all federation peers with their trust tiers and handshake status.
|
|
|
|
Args:
|
|
trust_tier: Filter by tier (trusted, peer, monitored). Empty = all.
|
|
"""
|
|
params = f"?trust_tier={trust_tier}" if trust_tier else ""
|
|
result = _api("GET", f"/federation/peers{params}")
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@server.tool()
|
|
def initiate_federation_handshake(peer_url: str, peer_rid: str = "",
|
|
trust_tier: str = "monitored") -> str:
|
|
"""Initiate a federation handshake with a peer node.
|
|
|
|
Args:
|
|
peer_url: URL of the peer node (e.g. 'https://bkc.example.com')
|
|
peer_rid: RID of the peer (optional, auto-generated if empty)
|
|
trust_tier: Initial trust tier (trusted, peer, monitored)
|
|
"""
|
|
result = _api("POST", "/federation/handshake", {
|
|
"peer_url": peer_url, "peer_rid": peer_rid,
|
|
"trust_tier": trust_tier,
|
|
})
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
server.run(transport="stdio")
|