From 76d44975cbf2ece0b7f5a622f76f97597e24ebfe Mon Sep 17 00:00:00 2001 From: Jeff Emmett Date: Sun, 22 Mar 2026 20:08:28 -0700 Subject: [PATCH] feat(mi-api): add configurable timeout + AI orchestrator GPU fallback Increase Ollama timeout from hardcoded 120s to configurable 600s default. Add optional AI Orchestrator integration for RunPod GPU acceleration with automatic fallback to direct Ollama when orchestrator is unavailable. Co-Authored-By: Claude Opus 4.6 --- deploy/meeting-intelligence/api/app/config.py | 5 + .../api/app/routes/summaries.py | 133 ++++++++++++++---- 2 files changed, 114 insertions(+), 24 deletions(-) diff --git a/deploy/meeting-intelligence/api/app/config.py b/deploy/meeting-intelligence/api/app/config.py index 2ca6afa..89641ec 100644 --- a/deploy/meeting-intelligence/api/app/config.py +++ b/deploy/meeting-intelligence/api/app/config.py @@ -18,6 +18,11 @@ class Settings(BaseSettings): # Ollama (for AI summaries) ollama_url: str = "http://localhost:11434" ollama_model: str = "llama3.2" + ollama_timeout: int = 600 # seconds (up from hardcoded 120) + + # AI Orchestrator (optional GPU fallback via RunPod) + ai_orchestrator_url: str = "" # empty = disabled, e.g. "http://ai-orchestrator:8080" + ai_orchestrator_priority: str = "normal" # low|normal|high # File paths recordings_path: str = "/recordings" diff --git a/deploy/meeting-intelligence/api/app/routes/summaries.py b/deploy/meeting-intelligence/api/app/routes/summaries.py index a464a1f..4d3d9ad 100644 --- a/deploy/meeting-intelligence/api/app/routes/summaries.py +++ b/deploy/meeting-intelligence/api/app/routes/summaries.py @@ -2,6 +2,7 @@ AI Summary routes. """ +import asyncio import json from typing import Optional, List @@ -149,8 +150,8 @@ async def generate_summary( # Format transcript for LLM transcript_text = _format_transcript(segments) - # Generate summary using Ollama - summary_data = await _generate_summary_with_ollama(transcript_text) + # Generate summary — try orchestrator first if configured, fall back to direct Ollama + summary_data, model_used = await _generate_summary(transcript_text) # Save summary await db.save_summary( @@ -161,7 +162,7 @@ async def generate_summary( decisions=summary_data["decisions"], topics=summary_data["topics"], sentiment=summary_data["sentiment"], - model_used=settings.ollama_model + model_used=model_used ) # Update meeting status @@ -204,11 +205,112 @@ def _format_transcript(segments: list) -> str: return "\n".join(lines) -async def _generate_summary_with_ollama(transcript: str) -> dict: - """Generate summary using Ollama.""" +async def _generate_summary(transcript: str) -> tuple[dict, str]: + """Generate summary via orchestrator (if configured) with Ollama fallback. + + Returns (summary_data, model_used) tuple. + """ prompt = SUMMARY_PROMPT.format(transcript=transcript[:15000]) # Limit context - async with httpx.AsyncClient(timeout=120.0) as client: + # Try AI Orchestrator first if configured + if settings.ai_orchestrator_url: + try: + result = await _generate_summary_with_orchestrator(prompt) + return result, f"orchestrator/{result.get('_provider', 'unknown')}" + except Exception as e: + log.warning("Orchestrator failed, falling back to direct Ollama", error=str(e)) + + # Fallback: direct Ollama with configurable timeout + return await _generate_summary_with_ollama(prompt), settings.ollama_model + + +async def _generate_summary_with_orchestrator(prompt: str) -> dict: + """Generate summary via AI Orchestrator (supports RunPod GPU fallback).""" + timeout = settings.ollama_timeout + + async with httpx.AsyncClient(timeout=float(timeout)) as client: + # Submit generation request + response = await client.post( + f"{settings.ai_orchestrator_url}/api/generate/text", + json={ + "prompt": prompt, + "system": "You are a meeting analysis assistant. Respond only with valid JSON.", + "model": settings.ollama_model, + "max_tokens": 2048, + "temperature": 0.3, + "priority": settings.ai_orchestrator_priority, + } + ) + response.raise_for_status() + result = response.json() + provider = result.get("provider", "unknown") + + log.info("Orchestrator responded", provider=provider, cost=result.get("cost")) + + if provider == "runpod": + # Async RunPod job — poll for completion + job_id = result["job_id"] + response_text = await _poll_runpod_job(client, job_id, timeout) + else: + # Ollama sync response — text is already in the response + response_text = result.get("response", "") + + summary_data = _parse_summary_json(response_text) + summary_data["_provider"] = provider + return summary_data + + +async def _poll_runpod_job(client: httpx.AsyncClient, job_id: str, max_wait: int) -> str: + """Poll AI Orchestrator for RunPod job completion.""" + poll_url = f"{settings.ai_orchestrator_url}/api/status/llm/{job_id}" + elapsed = 0 + interval = 5 + + while elapsed < max_wait: + await asyncio.sleep(interval) + elapsed += interval + + resp = await client.get(poll_url) + resp.raise_for_status() + status_data = resp.json() + status = status_data.get("status", "") + + log.debug("RunPod job poll", job_id=job_id, status=status, elapsed=elapsed) + + if status == "COMPLETED": + output = status_data.get("output", {}) + # vLLM output may be nested; extract text + if isinstance(output, dict): + return output.get("text", output.get("response", json.dumps(output))) + return str(output) + elif status == "FAILED": + error = status_data.get("error", "Unknown RunPod error") + raise RuntimeError(f"RunPod job failed: {error}") + + raise TimeoutError(f"RunPod job {job_id} did not complete within {max_wait}s") + + +def _parse_summary_json(response_text: str) -> dict: + """Parse and validate summary JSON from LLM response.""" + try: + summary_data = json.loads(response_text) + except json.JSONDecodeError as e: + log.error("Failed to parse AI response as JSON", error=str(e), text=response_text[:500]) + raise HTTPException(status_code=500, detail="Failed to parse AI response") + + return { + "summary": summary_data.get("summary", "No summary generated"), + "key_points": summary_data.get("key_points", []), + "action_items": summary_data.get("action_items", []), + "decisions": summary_data.get("decisions", []), + "topics": summary_data.get("topics", []), + "sentiment": summary_data.get("sentiment", "neutral"), + } + + +async def _generate_summary_with_ollama(prompt: str) -> dict: + """Generate summary using direct Ollama with configurable timeout.""" + async with httpx.AsyncClient(timeout=float(settings.ollama_timeout)) as client: try: response = await client.post( f"{settings.ollama_url}/api/generate", @@ -224,18 +326,7 @@ async def _generate_summary_with_ollama(transcript: str) -> dict: result = response.json() response_text = result.get("response", "") - # Parse JSON from response - summary_data = json.loads(response_text) - - # Validate required fields - return { - "summary": summary_data.get("summary", "No summary generated"), - "key_points": summary_data.get("key_points", []), - "action_items": summary_data.get("action_items", []), - "decisions": summary_data.get("decisions", []), - "topics": summary_data.get("topics", []), - "sentiment": summary_data.get("sentiment", "neutral") - } + return _parse_summary_json(response_text) except httpx.HTTPError as e: log.error("Ollama request failed", error=str(e)) @@ -243,9 +334,3 @@ async def _generate_summary_with_ollama(transcript: str) -> dict: status_code=503, detail=f"AI service unavailable: {str(e)}" ) - except json.JSONDecodeError as e: - log.error("Failed to parse Ollama response", error=str(e)) - raise HTTPException( - status_code=500, - detail="Failed to parse AI response" - )