From f40b8bd97d2b169a25ee2401ba51606c95320788 Mon Sep 17 00:00:00 2001 From: Jeff Emmett Date: Mon, 23 Mar 2026 10:14:41 -0700 Subject: [PATCH] fix(mi-api): handle RunPod vLLM output format and jsonb serialization Fixes three issues preventing summary generation via RunPod GPU: - Extract text from vLLM's nested choices/tokens output format - Strip LLM preamble text before JSON parsing - Serialize lists to JSON strings for asyncpg jsonb columns Co-Authored-By: Claude Opus 4.6 --- .../api/app/routes/summaries.py | 122 ++++++++++++------ 1 file changed, 82 insertions(+), 40 deletions(-) diff --git a/deploy/meeting-intelligence/api/app/routes/summaries.py b/deploy/meeting-intelligence/api/app/routes/summaries.py index 4d3d9ad..717d3db 100644 --- a/deploy/meeting-intelligence/api/app/routes/summaries.py +++ b/deploy/meeting-intelligence/api/app/routes/summaries.py @@ -80,6 +80,40 @@ Remember: """ +def _load_jsonb(value, default=None): + """Load a jsonb field that asyncpg may return as a string.""" + if default is None: + default = [] + if value is None: + return default + if isinstance(value, str): + try: + return json.loads(value) + except json.JSONDecodeError: + return default + return value + + +def _summary_to_response(meeting_id: str, summary: dict) -> SummaryResponse: + """Convert a DB summary row to a SummaryResponse.""" + key_points = _load_jsonb(summary["key_points"]) + action_items = _load_jsonb(summary["action_items"]) + decisions = _load_jsonb(summary["decisions"]) + topics = _load_jsonb(summary["topics"]) + + return SummaryResponse( + meeting_id=meeting_id, + summary_text=summary["summary_text"], + key_points=key_points, + action_items=[ActionItem(**item) for item in action_items], + decisions=decisions, + topics=[Topic(**topic) for topic in topics], + sentiment=summary.get("sentiment"), + model_used=summary["model_used"], + generated_at=summary["generated_at"].isoformat() + ) + + @router.get("/{meeting_id}/summary", response_model=SummaryResponse) async def get_summary(request: Request, meeting_id: str): """Get AI-generated summary for a meeting.""" @@ -98,21 +132,7 @@ async def get_summary(request: Request, meeting_id: str): detail="No summary available. Use POST to generate one." ) - return SummaryResponse( - meeting_id=meeting_id, - summary_text=summary["summary_text"], - key_points=summary["key_points"] or [], - action_items=[ - ActionItem(**item) for item in (summary["action_items"] or []) - ], - decisions=summary["decisions"] or [], - topics=[ - Topic(**topic) for topic in (summary["topics"] or []) - ], - sentiment=summary.get("sentiment"), - model_used=summary["model_used"], - generated_at=summary["generated_at"].isoformat() - ) + return _summary_to_response(meeting_id, summary) @router.post("/{meeting_id}/summary", response_model=SummaryResponse) @@ -153,14 +173,14 @@ async def generate_summary( # Generate summary — try orchestrator first if configured, fall back to direct Ollama summary_data, model_used = await _generate_summary(transcript_text) - # Save summary + # Save summary (serialize lists to JSON for asyncpg jsonb columns) await db.save_summary( meeting_id=meeting_id, summary_text=summary_data["summary"], - key_points=summary_data["key_points"], - action_items=summary_data["action_items"], - decisions=summary_data["decisions"], - topics=summary_data["topics"], + key_points=json.dumps(summary_data["key_points"]), + action_items=json.dumps(summary_data["action_items"]), + decisions=json.dumps(summary_data["decisions"]), + topics=json.dumps(summary_data["topics"]), sentiment=summary_data["sentiment"], model_used=model_used ) @@ -171,21 +191,7 @@ async def generate_summary( # Get the saved summary summary = await db.get_summary(meeting_id) - return SummaryResponse( - meeting_id=meeting_id, - summary_text=summary["summary_text"], - key_points=summary["key_points"] or [], - action_items=[ - ActionItem(**item) for item in (summary["action_items"] or []) - ], - decisions=summary["decisions"] or [], - topics=[ - Topic(**topic) for topic in (summary["topics"] or []) - ], - sentiment=summary.get("sentiment"), - model_used=summary["model_used"], - generated_at=summary["generated_at"].isoformat() - ) + return _summary_to_response(meeting_id, summary) def _format_transcript(segments: list) -> str: @@ -279,10 +285,7 @@ async def _poll_runpod_job(client: httpx.AsyncClient, job_id: str, max_wait: int if status == "COMPLETED": output = status_data.get("output", {}) - # vLLM output may be nested; extract text - if isinstance(output, dict): - return output.get("text", output.get("response", json.dumps(output))) - return str(output) + return _extract_vllm_text(output) elif status == "FAILED": error = status_data.get("error", "Unknown RunPod error") raise RuntimeError(f"RunPod job failed: {error}") @@ -290,8 +293,47 @@ async def _poll_runpod_job(client: httpx.AsyncClient, job_id: str, max_wait: int raise TimeoutError(f"RunPod job {job_id} did not complete within {max_wait}s") +def _extract_vllm_text(output) -> str: + """Extract text from RunPod vLLM output which can be in various formats. + + Known formats: + - list: [{"choices": [{"tokens": ["text..."]}], "usage": {...}}] + - dict with "text": {"text": "..."} + - dict with "response": {"response": "..."} + - plain string + """ + if isinstance(output, str): + return output + + # vLLM list format: [{"choices": [{"tokens": ["chunk1", "chunk2"]}]}] + if isinstance(output, list) and len(output) > 0: + first = output[0] + if isinstance(first, dict): + choices = first.get("choices", []) + if choices: + tokens = choices[0].get("tokens", []) + if tokens: + return "".join(str(t) for t in tokens) + # Fallback: join all list items as strings + return "".join(str(item) for item in output) + + if isinstance(output, dict): + return output.get("text", output.get("response", json.dumps(output))) + + return str(output) + + def _parse_summary_json(response_text: str) -> dict: - """Parse and validate summary JSON from LLM response.""" + """Parse and validate summary JSON from LLM response. + + Handles common LLM quirks: preamble text before JSON, trailing text after JSON. + """ + # Strip any text before the first { and after the last } + start = response_text.find("{") + end = response_text.rfind("}") + if start != -1 and end != -1 and end > start: + response_text = response_text[start:end + 1] + try: summary_data = json.loads(response_text) except json.JSONDecodeError as e: