feat: Phase 3 — MCP bridge for Claude Code integration

MCP server with 15 tools: list_holons, register_holon, propose_claim,
list_claims, add_evidence, attest_claim, get_claim_strength,
publish_intent, get_matching_intents, list_intents, create_commitment,
update_commitment_state, list_commitments, get_governance_dag,
ingest_governance_doc, search_knowledge. Resource: spore://node/status.
Transport: stdio via docker exec.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jeff Emmett 2026-04-13 18:55:14 -04:00
parent 537f01770c
commit de6e1a85b7
2 changed files with 371 additions and 0 deletions

View File

@ -6,3 +6,4 @@ pydantic>=2.0.0
pydantic-settings>=2.0.0 pydantic-settings>=2.0.0
httpx>=0.27.0 httpx>=0.27.0
pyyaml>=6.0 pyyaml>=6.0
mcp>=1.0.0

View File

@ -0,0 +1,370 @@
"""Spore Agent Commons MCP Server.
Provides tools for Claude Code agents to interact with the Spore commons.
Transport: stdio (invoked via docker exec or direct python).
"""
import json
import os
import httpx
from mcp.server.fastmcp import FastMCP
API_BASE = os.environ.get("SPORE_API_URL", "http://localhost:8351")
server = FastMCP(
name="spore-commons",
instructions=(
"Spore Agent Commons — governance memory, knowledge graph, and "
"federation layer for multi-agent coordination. Use these tools to "
"manage holons (agents), claims, evidence, attestations, intents, "
"commitments, and governance documents."
),
)
client = httpx.Client(base_url=API_BASE, timeout=30)
def _api(method: str, path: str, data: dict | None = None) -> dict | list:
"""Make an API call and return parsed JSON."""
if method == "GET":
r = client.get(path)
elif method == "POST":
r = client.post(path, json=data)
elif method == "PATCH":
r = client.patch(path)
else:
raise ValueError(f"Unsupported method: {method}")
r.raise_for_status()
return r.json()
# --- Resource ---
@server.resource("spore://node/status")
def node_status() -> str:
"""Current node health and status."""
result = _api("GET", "/health")
return json.dumps(result, indent=2)
# --- Holon tools ---
@server.tool()
def list_holons(holon_type: str = "") -> str:
"""List all holons (agents/teams/orgs) in the commons.
Args:
holon_type: Filter by type (agent, team, org). Empty = all.
"""
params = f"?holon_type={holon_type}" if holon_type else ""
result = _api("GET", f"/holons{params}")
return json.dumps(result, indent=2)
@server.tool()
def register_holon(slug: str, name: str, holon_type: str = "agent",
description: str = "") -> str:
"""Register a new holon (agent, team, or organization) in the commons.
Args:
slug: URL-safe identifier (e.g. 'jeff-emmett')
name: Display name
holon_type: agent, team, or org
description: Brief description of the holon
"""
result = _api("POST", "/holons", {
"slug": slug, "name": name,
"holon_type": holon_type, "description": description,
})
return json.dumps(result, indent=2)
# --- Claims tools ---
@server.tool()
def propose_claim(content: str, proposer_rid: str,
confidence: float = 0.5) -> str:
"""Propose a new knowledge claim in the commons.
Args:
content: The claim text
proposer_rid: RID of the proposing holon (e.g. 'orn:spore.holon:jeff-emmett')
confidence: Initial confidence level (0.0-1.0)
"""
result = _api("POST", "/claims", {
"content": content, "proposer_rid": proposer_rid,
"confidence": confidence,
})
return json.dumps(result, indent=2)
@server.tool()
def list_claims(status: str = "", proposer_rid: str = "",
limit: int = 20) -> str:
"""List knowledge claims with optional filters.
Args:
status: Filter by status (proposed, supported, challenged, superseded)
proposer_rid: Filter by proposer RID
limit: Max results (default 20)
"""
params = []
if status:
params.append(f"status={status}")
if proposer_rid:
params.append(f"proposer_rid={proposer_rid}")
params.append(f"limit={limit}")
qs = "?" + "&".join(params)
result = _api("GET", f"/claims{qs}")
return json.dumps(result, indent=2)
@server.tool()
def add_evidence(claim_id: str, relation: str, body: str,
source: str = "") -> str:
"""Add evidence supporting or challenging a claim.
Args:
claim_id: UUID of the claim
relation: 'supports' or 'challenges'
body: The evidence text
source: Source attribution (optional)
"""
provenance = {"source": source} if source else {}
result = _api("POST", f"/claims/{claim_id}/evidence", {
"relation": relation, "body": body, "provenance": provenance,
})
return json.dumps(result, indent=2)
@server.tool()
def attest_claim(claim_id: str, attester_rid: str, verdict: str,
strength: float = 1.0, reasoning: str = "") -> str:
"""Attest to a claim (endorse, dispute, or abstain).
Args:
claim_id: UUID of the claim
attester_rid: RID of the attesting holon
verdict: 'endorse', 'dispute', or 'abstain'
strength: Attestation strength (0.0-1.0)
reasoning: Explanation for the attestation
"""
result = _api("POST", f"/claims/{claim_id}/attestations", {
"attester_rid": attester_rid, "verdict": verdict,
"strength": strength, "reasoning": reasoning,
})
return json.dumps(result, indent=2)
@server.tool()
def get_claim_strength(claim_id: str) -> str:
"""Get aggregated strength metrics for a claim (endorsements, disputes, evidence counts).
Args:
claim_id: UUID of the claim
"""
result = _api("GET", f"/claims/{claim_id}/strength")
return json.dumps(result, indent=2)
# --- Intent tools ---
@server.tool()
def publish_intent(publisher_rid: str, title: str, intent_type: str,
description: str = "",
governance_fit: str = "") -> str:
"""Publish an intent (need, offer, or possibility).
Args:
publisher_rid: RID of the publishing holon
title: Short title for the intent
intent_type: 'need', 'offer', or 'possibility'
description: Detailed description
governance_fit: Comma-separated governance tags for matching (e.g. 'consent,membrane')
"""
gov_list = [g.strip() for g in governance_fit.split(",") if g.strip()] if governance_fit else []
result = _api("POST", "/intents", {
"publisher_rid": publisher_rid, "title": title,
"intent_type": intent_type, "description": description,
"governance_fit": gov_list,
})
return json.dumps(result, indent=2)
@server.tool()
def get_matching_intents(intent_id: str,
min_similarity: float = 0.1) -> str:
"""Get matching intents for a given intent (complementary need↔offer pairs).
Args:
intent_id: UUID of the intent to find matches for
min_similarity: Minimum similarity threshold (0.0-1.0)
"""
result = _api("GET", f"/intents/{intent_id}/matches?min_similarity={min_similarity}")
return json.dumps(result, indent=2)
@server.tool()
def list_intents(intent_type: str = "", state: str = "",
limit: int = 20) -> str:
"""List published intents with optional filters.
Args:
intent_type: Filter by type (need, offer, possibility)
state: Filter by state (open, matched, committed, expired, withdrawn)
limit: Max results
"""
params = []
if intent_type:
params.append(f"intent_type={intent_type}")
if state:
params.append(f"state={state}")
params.append(f"limit={limit}")
qs = "?" + "&".join(params)
result = _api("GET", f"/intents{qs}")
return json.dumps(result, indent=2)
# --- Commitment tools ---
@server.tool()
def create_commitment(title: str, proposer_rid: str,
acceptor_rid: str = "",
description: str = "",
settlement_type: str = "attestation") -> str:
"""Create a new commitment between holons.
Args:
title: Commitment title
proposer_rid: RID of the proposing holon
acceptor_rid: RID of the accepting holon (optional)
description: Detailed description
settlement_type: How the commitment is settled (default: attestation)
"""
data = {
"title": title, "proposer_rid": proposer_rid,
"description": description, "settlement_type": settlement_type,
}
if acceptor_rid:
data["acceptor_rid"] = acceptor_rid
result = _api("POST", "/commitments", data)
return json.dumps(result, indent=2)
@server.tool()
def update_commitment_state(commitment_id: str, new_state: str) -> str:
"""Transition a commitment to a new state.
Valid transitions:
- proposed verified, cancelled
- verified active, cancelled
- active evidence_linked, disputed, cancelled
- evidence_linked redeemed, disputed
- disputed resolved, cancelled
- resolved active, redeemed
Args:
commitment_id: UUID of the commitment
new_state: Target state
"""
result = _api("PATCH", f"/commitments/{commitment_id}/state?new_state={new_state}")
return json.dumps(result, indent=2)
@server.tool()
def list_commitments(state: str = "", proposer_rid: str = "",
limit: int = 20) -> str:
"""List commitments with optional filters.
Args:
state: Filter by state
proposer_rid: Filter by proposer
limit: Max results
"""
params = []
if state:
params.append(f"state={state}")
if proposer_rid:
params.append(f"proposer_rid={proposer_rid}")
params.append(f"limit={limit}")
qs = "?" + "&".join(params)
result = _api("GET", f"/commitments{qs}")
return json.dumps(result, indent=2)
# --- Governance tools ---
@server.tool()
def get_governance_dag() -> str:
"""Get the full governance specification DAG (documents, dependencies, topological order)."""
result = _api("GET", "/governance/dag")
return json.dumps(result, indent=2)
@server.tool()
def ingest_governance_doc(content: str) -> str:
"""Ingest a governance document (markdown with YAML frontmatter).
The document must have YAML frontmatter with at least doc_id and doc_kind.
Example:
---
doc_id: spore.governance.consent
doc_kind: protocol
status: active
depends_on:
- spore.governance.membrane
---
# Document Title
Body content...
Args:
content: Raw markdown with YAML frontmatter
"""
result = _api("POST", "/governance/docs", {"content": content})
return json.dumps(result, indent=2)
@server.tool()
def search_knowledge(query: str, entity_type: str = "",
limit: int = 10) -> str:
"""Search across claims, intents, and governance docs.
Args:
query: Search query text
entity_type: Filter to 'claims', 'intents', or 'governance' (empty = all)
limit: Max results per type
"""
results = {}
if not entity_type or entity_type == "claims":
claims = _api("GET", f"/claims?limit={limit}")
# Filter by query text (basic until embeddings are available)
q_lower = query.lower()
results["claims"] = [
c for c in claims
if q_lower in c.get("content", "").lower()
]
if not entity_type or entity_type == "intents":
intents = _api("GET", f"/intents?limit={limit}")
q_lower = query.lower()
results["intents"] = [
i for i in intents
if q_lower in i.get("title", "").lower()
or q_lower in i.get("description", "").lower()
]
if not entity_type or entity_type == "governance":
docs = _api("GET", "/governance/docs")
q_lower = query.lower()
results["governance"] = [
d for d in docs
if q_lower in d.get("title", "").lower()
or q_lower in d.get("body", "").lower()
]
return json.dumps(results, indent=2)
if __name__ == "__main__":
server.run(transport="stdio")