"""Intent publishing, matching, and lifecycle endpoints.""" import json import uuid from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel from spore_node.db.connection import get_pool from spore_node.rid_types import SporeIntent router = APIRouter(prefix="/intents", tags=["intents"]) VALID_STATES = ("open", "matched", "committed", "expired", "withdrawn") VALID_TRANSITIONS = { "open": ("matched", "expired", "withdrawn"), "matched": ("committed", "open", "withdrawn"), "committed": ("expired",), "expired": (), "withdrawn": (), } COMPLEMENTARY_TYPES = { "need": "offer", "offer": "need", "possibility": "possibility", } class IntentCreate(BaseModel): publisher_rid: str title: str description: str = "" intent_type: str # need | offer | possibility capacity: dict = {} timing: dict = {} governance_fit: list[str] = [] metadata: dict = {} class IntentResponse(BaseModel): id: str rid: str publisher_rid: str title: str description: str intent_type: str capacity: dict timing: dict governance_fit: list[str] state: str metadata: dict class MatchResponse(BaseModel): intent_a_id: str intent_b_id: str similarity: float match_details: dict @router.post("", response_model=IntentResponse, status_code=201) async def publish_intent(data: IntentCreate): if data.intent_type not in ("need", "offer", "possibility"): raise HTTPException(422, "intent_type must be 'need', 'offer', or 'possibility'") pool = get_pool() iid = str(uuid.uuid4()) rid = str(SporeIntent(iid)) row = await pool.fetchrow( """ INSERT INTO intents (id, rid, publisher_rid, title, description, intent_type, capacity, timing, governance_fit, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb, $8::jsonb, $9, $10::jsonb) RETURNING * """, uuid.UUID(iid), rid, data.publisher_rid, data.title, data.description, data.intent_type, json.dumps(data.capacity), json.dumps(data.timing), data.governance_fit, json.dumps(data.metadata), ) await _log_event(pool, rid, "intent.published", {"type": data.intent_type, "title": data.title}) # Trigger matching (inline for now, ARQ worker in production) await _compute_matches(pool, uuid.UUID(iid), data.intent_type) return _row_dict(row) @router.get("", response_model=list[IntentResponse]) async def list_intents( intent_type: str | None = None, state: str | None = None, publisher_rid: str | None = None, limit: int = Query(default=50, le=200), offset: int = 0, ): pool = get_pool() conditions, params = [], [] idx = 1 if intent_type: conditions.append(f"intent_type = ${idx}") params.append(intent_type) idx += 1 if state: conditions.append(f"state = ${idx}") params.append(state) idx += 1 if publisher_rid: conditions.append(f"publisher_rid = ${idx}") params.append(publisher_rid) idx += 1 where = "WHERE " + " AND ".join(conditions) if conditions else "" rows = await pool.fetch( f"SELECT * FROM intents {where} ORDER BY created_at DESC LIMIT ${idx} OFFSET ${idx+1}", *params, limit, offset, ) return [_row_dict(r) for r in rows] @router.get("/{intent_id}", response_model=IntentResponse) async def get_intent(intent_id: str): pool = get_pool() row = await pool.fetchrow("SELECT * FROM intents WHERE id = $1", uuid.UUID(intent_id)) if not row: raise HTTPException(404, "Intent not found") return _row_dict(row) @router.patch("/{intent_id}/state") async def transition_intent_state(intent_id: str, new_state: str): pool = get_pool() row = await pool.fetchrow("SELECT state, rid FROM intents WHERE id = $1", uuid.UUID(intent_id)) if not row: raise HTTPException(404, "Intent not found") current = row["state"] if new_state not in VALID_TRANSITIONS.get(current, ()): raise HTTPException( 422, f"Cannot transition from '{current}' to '{new_state}'. " f"Valid: {VALID_TRANSITIONS.get(current, ())}", ) await pool.execute( "UPDATE intents SET state = $1, updated_at = now() WHERE id = $2", new_state, uuid.UUID(intent_id), ) await _log_event(pool, row["rid"], "intent.state_changed", {"from": current, "to": new_state}) return {"state": new_state, "previous": current} @router.get("/{intent_id}/matches", response_model=list[MatchResponse]) async def get_matches(intent_id: str, min_similarity: float = 0.0): pool = get_pool() uid = uuid.UUID(intent_id) rows = await pool.fetch( """ SELECT * FROM intent_matches WHERE (intent_a_id = $1 OR intent_b_id = $1) AND similarity >= $2 ORDER BY similarity DESC """, uid, min_similarity, ) return [_match_dict(r) for r in rows] # --- Matching logic --- async def _compute_matches(pool, intent_id: uuid.UUID, intent_type: str): """Compute matches between this intent and complementary open intents. Scoring: governance_fit overlap (primary), with text similarity as tiebreaker. Embedding-based similarity added when embeddings are available. """ complement = COMPLEMENTARY_TYPES.get(intent_type, "") if not complement: return intent = await pool.fetchrow("SELECT * FROM intents WHERE id = $1", intent_id) if not intent: return # Find complementary open intents candidates = await pool.fetch( """ SELECT * FROM intents WHERE intent_type = $1 AND state = 'open' AND id != $2 """, complement, intent_id, ) my_gov = set(intent["governance_fit"] or []) for cand in candidates: cand_gov = set(cand["governance_fit"] or []) # Governance fit overlap if my_gov and cand_gov: gov_score = len(my_gov & cand_gov) / len(my_gov | cand_gov) elif not my_gov and not cand_gov: gov_score = 0.5 # neutral else: gov_score = 0.1 # Simple text overlap as proxy until embeddings available my_words = set(intent["title"].lower().split() + intent["description"].lower().split()) cand_words = set(cand["title"].lower().split() + cand["description"].lower().split()) text_score = len(my_words & cand_words) / max(len(my_words | cand_words), 1) similarity = 0.6 * gov_score + 0.4 * text_score if similarity < 0.05: continue # Ensure consistent ordering for unique constraint a_id, b_id = sorted([intent_id, cand["id"]], key=str) await pool.execute( """ INSERT INTO intent_matches (intent_a_id, intent_b_id, similarity, match_details) VALUES ($1, $2, $3, $4::jsonb) ON CONFLICT (intent_a_id, intent_b_id) DO UPDATE SET similarity = $3, match_details = $4::jsonb """, a_id, b_id, round(similarity, 4), json.dumps({"gov_score": round(gov_score, 4), "text_score": round(text_score, 4)}), ) # --- Helpers --- def _row_dict(row) -> dict: d = dict(row) d["id"] = str(d["id"]) for k in ("capacity", "timing", "metadata"): if isinstance(d.get(k), str): d[k] = json.loads(d[k]) if d.get("governance_fit") is None: d["governance_fit"] = [] return d def _match_dict(row) -> dict: d = dict(row) d.pop("id", None) d["intent_a_id"] = str(d["intent_a_id"]) d["intent_b_id"] = str(d["intent_b_id"]) if isinstance(d.get("match_details"), str): d["match_details"] = json.loads(d["match_details"]) return d async def _log_event(pool, entity_rid: str, event_kind: str, payload: dict): await pool.execute( "INSERT INTO events (entity_rid, event_kind, payload) VALUES ($1, $2, $3::jsonb)", entity_rid, event_kind, json.dumps(payload), )