fix(mi-api): handle RunPod vLLM output format and jsonb serialization

Fixes three issues preventing summary generation via RunPod GPU:
- Extract text from vLLM's nested choices/tokens output format
- Strip LLM preamble text before JSON parsing
- Serialize lists to JSON strings for asyncpg jsonb columns

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Jeff Emmett 2026-03-23 10:14:41 -07:00
parent 76d44975cb
commit f40b8bd97d
1 changed files with 82 additions and 40 deletions

View File

@ -80,6 +80,40 @@ Remember:
"""
def _load_jsonb(value, default=None):
"""Load a jsonb field that asyncpg may return as a string."""
if default is None:
default = []
if value is None:
return default
if isinstance(value, str):
try:
return json.loads(value)
except json.JSONDecodeError:
return default
return value
def _summary_to_response(meeting_id: str, summary: dict) -> SummaryResponse:
"""Convert a DB summary row to a SummaryResponse."""
key_points = _load_jsonb(summary["key_points"])
action_items = _load_jsonb(summary["action_items"])
decisions = _load_jsonb(summary["decisions"])
topics = _load_jsonb(summary["topics"])
return SummaryResponse(
meeting_id=meeting_id,
summary_text=summary["summary_text"],
key_points=key_points,
action_items=[ActionItem(**item) for item in action_items],
decisions=decisions,
topics=[Topic(**topic) for topic in topics],
sentiment=summary.get("sentiment"),
model_used=summary["model_used"],
generated_at=summary["generated_at"].isoformat()
)
@router.get("/{meeting_id}/summary", response_model=SummaryResponse)
async def get_summary(request: Request, meeting_id: str):
"""Get AI-generated summary for a meeting."""
@ -98,21 +132,7 @@ async def get_summary(request: Request, meeting_id: str):
detail="No summary available. Use POST to generate one."
)
return SummaryResponse(
meeting_id=meeting_id,
summary_text=summary["summary_text"],
key_points=summary["key_points"] or [],
action_items=[
ActionItem(**item) for item in (summary["action_items"] or [])
],
decisions=summary["decisions"] or [],
topics=[
Topic(**topic) for topic in (summary["topics"] or [])
],
sentiment=summary.get("sentiment"),
model_used=summary["model_used"],
generated_at=summary["generated_at"].isoformat()
)
return _summary_to_response(meeting_id, summary)
@router.post("/{meeting_id}/summary", response_model=SummaryResponse)
@ -153,14 +173,14 @@ async def generate_summary(
# Generate summary — try orchestrator first if configured, fall back to direct Ollama
summary_data, model_used = await _generate_summary(transcript_text)
# Save summary
# Save summary (serialize lists to JSON for asyncpg jsonb columns)
await db.save_summary(
meeting_id=meeting_id,
summary_text=summary_data["summary"],
key_points=summary_data["key_points"],
action_items=summary_data["action_items"],
decisions=summary_data["decisions"],
topics=summary_data["topics"],
key_points=json.dumps(summary_data["key_points"]),
action_items=json.dumps(summary_data["action_items"]),
decisions=json.dumps(summary_data["decisions"]),
topics=json.dumps(summary_data["topics"]),
sentiment=summary_data["sentiment"],
model_used=model_used
)
@ -171,21 +191,7 @@ async def generate_summary(
# Get the saved summary
summary = await db.get_summary(meeting_id)
return SummaryResponse(
meeting_id=meeting_id,
summary_text=summary["summary_text"],
key_points=summary["key_points"] or [],
action_items=[
ActionItem(**item) for item in (summary["action_items"] or [])
],
decisions=summary["decisions"] or [],
topics=[
Topic(**topic) for topic in (summary["topics"] or [])
],
sentiment=summary.get("sentiment"),
model_used=summary["model_used"],
generated_at=summary["generated_at"].isoformat()
)
return _summary_to_response(meeting_id, summary)
def _format_transcript(segments: list) -> str:
@ -279,10 +285,7 @@ async def _poll_runpod_job(client: httpx.AsyncClient, job_id: str, max_wait: int
if status == "COMPLETED":
output = status_data.get("output", {})
# vLLM output may be nested; extract text
if isinstance(output, dict):
return output.get("text", output.get("response", json.dumps(output)))
return str(output)
return _extract_vllm_text(output)
elif status == "FAILED":
error = status_data.get("error", "Unknown RunPod error")
raise RuntimeError(f"RunPod job failed: {error}")
@ -290,8 +293,47 @@ async def _poll_runpod_job(client: httpx.AsyncClient, job_id: str, max_wait: int
raise TimeoutError(f"RunPod job {job_id} did not complete within {max_wait}s")
def _extract_vllm_text(output) -> str:
"""Extract text from RunPod vLLM output which can be in various formats.
Known formats:
- list: [{"choices": [{"tokens": ["text..."]}], "usage": {...}}]
- dict with "text": {"text": "..."}
- dict with "response": {"response": "..."}
- plain string
"""
if isinstance(output, str):
return output
# vLLM list format: [{"choices": [{"tokens": ["chunk1", "chunk2"]}]}]
if isinstance(output, list) and len(output) > 0:
first = output[0]
if isinstance(first, dict):
choices = first.get("choices", [])
if choices:
tokens = choices[0].get("tokens", [])
if tokens:
return "".join(str(t) for t in tokens)
# Fallback: join all list items as strings
return "".join(str(item) for item in output)
if isinstance(output, dict):
return output.get("text", output.get("response", json.dumps(output)))
return str(output)
def _parse_summary_json(response_text: str) -> dict:
"""Parse and validate summary JSON from LLM response."""
"""Parse and validate summary JSON from LLM response.
Handles common LLM quirks: preamble text before JSON, trailing text after JSON.
"""
# Strip any text before the first { and after the last }
start = response_text.find("{")
end = response_text.rfind("}")
if start != -1 and end != -1 and end > start:
response_text = response_text[start:end + 1]
try:
summary_data = json.loads(response_text)
except json.JSONDecodeError as e: