diff --git a/deploy/meeting-intelligence/api/app/config.py b/deploy/meeting-intelligence/api/app/config.py index 2ca6afa..89641ec 100644 --- a/deploy/meeting-intelligence/api/app/config.py +++ b/deploy/meeting-intelligence/api/app/config.py @@ -18,6 +18,11 @@ class Settings(BaseSettings): # Ollama (for AI summaries) ollama_url: str = "http://localhost:11434" ollama_model: str = "llama3.2" + ollama_timeout: int = 600 # seconds (up from hardcoded 120) + + # AI Orchestrator (optional GPU fallback via RunPod) + ai_orchestrator_url: str = "" # empty = disabled, e.g. "http://ai-orchestrator:8080" + ai_orchestrator_priority: str = "normal" # low|normal|high # File paths recordings_path: str = "/recordings" diff --git a/deploy/meeting-intelligence/api/app/routes/summaries.py b/deploy/meeting-intelligence/api/app/routes/summaries.py index a464a1f..4d3d9ad 100644 --- a/deploy/meeting-intelligence/api/app/routes/summaries.py +++ b/deploy/meeting-intelligence/api/app/routes/summaries.py @@ -2,6 +2,7 @@ AI Summary routes. """ +import asyncio import json from typing import Optional, List @@ -149,8 +150,8 @@ async def generate_summary( # Format transcript for LLM transcript_text = _format_transcript(segments) - # Generate summary using Ollama - summary_data = await _generate_summary_with_ollama(transcript_text) + # Generate summary — try orchestrator first if configured, fall back to direct Ollama + summary_data, model_used = await _generate_summary(transcript_text) # Save summary await db.save_summary( @@ -161,7 +162,7 @@ async def generate_summary( decisions=summary_data["decisions"], topics=summary_data["topics"], sentiment=summary_data["sentiment"], - model_used=settings.ollama_model + model_used=model_used ) # Update meeting status @@ -204,11 +205,112 @@ def _format_transcript(segments: list) -> str: return "\n".join(lines) -async def _generate_summary_with_ollama(transcript: str) -> dict: - """Generate summary using Ollama.""" +async def _generate_summary(transcript: str) -> tuple[dict, str]: + """Generate summary via orchestrator (if configured) with Ollama fallback. + + Returns (summary_data, model_used) tuple. + """ prompt = SUMMARY_PROMPT.format(transcript=transcript[:15000]) # Limit context - async with httpx.AsyncClient(timeout=120.0) as client: + # Try AI Orchestrator first if configured + if settings.ai_orchestrator_url: + try: + result = await _generate_summary_with_orchestrator(prompt) + return result, f"orchestrator/{result.get('_provider', 'unknown')}" + except Exception as e: + log.warning("Orchestrator failed, falling back to direct Ollama", error=str(e)) + + # Fallback: direct Ollama with configurable timeout + return await _generate_summary_with_ollama(prompt), settings.ollama_model + + +async def _generate_summary_with_orchestrator(prompt: str) -> dict: + """Generate summary via AI Orchestrator (supports RunPod GPU fallback).""" + timeout = settings.ollama_timeout + + async with httpx.AsyncClient(timeout=float(timeout)) as client: + # Submit generation request + response = await client.post( + f"{settings.ai_orchestrator_url}/api/generate/text", + json={ + "prompt": prompt, + "system": "You are a meeting analysis assistant. Respond only with valid JSON.", + "model": settings.ollama_model, + "max_tokens": 2048, + "temperature": 0.3, + "priority": settings.ai_orchestrator_priority, + } + ) + response.raise_for_status() + result = response.json() + provider = result.get("provider", "unknown") + + log.info("Orchestrator responded", provider=provider, cost=result.get("cost")) + + if provider == "runpod": + # Async RunPod job — poll for completion + job_id = result["job_id"] + response_text = await _poll_runpod_job(client, job_id, timeout) + else: + # Ollama sync response — text is already in the response + response_text = result.get("response", "") + + summary_data = _parse_summary_json(response_text) + summary_data["_provider"] = provider + return summary_data + + +async def _poll_runpod_job(client: httpx.AsyncClient, job_id: str, max_wait: int) -> str: + """Poll AI Orchestrator for RunPod job completion.""" + poll_url = f"{settings.ai_orchestrator_url}/api/status/llm/{job_id}" + elapsed = 0 + interval = 5 + + while elapsed < max_wait: + await asyncio.sleep(interval) + elapsed += interval + + resp = await client.get(poll_url) + resp.raise_for_status() + status_data = resp.json() + status = status_data.get("status", "") + + log.debug("RunPod job poll", job_id=job_id, status=status, elapsed=elapsed) + + if status == "COMPLETED": + output = status_data.get("output", {}) + # vLLM output may be nested; extract text + if isinstance(output, dict): + return output.get("text", output.get("response", json.dumps(output))) + return str(output) + elif status == "FAILED": + error = status_data.get("error", "Unknown RunPod error") + raise RuntimeError(f"RunPod job failed: {error}") + + raise TimeoutError(f"RunPod job {job_id} did not complete within {max_wait}s") + + +def _parse_summary_json(response_text: str) -> dict: + """Parse and validate summary JSON from LLM response.""" + try: + summary_data = json.loads(response_text) + except json.JSONDecodeError as e: + log.error("Failed to parse AI response as JSON", error=str(e), text=response_text[:500]) + raise HTTPException(status_code=500, detail="Failed to parse AI response") + + return { + "summary": summary_data.get("summary", "No summary generated"), + "key_points": summary_data.get("key_points", []), + "action_items": summary_data.get("action_items", []), + "decisions": summary_data.get("decisions", []), + "topics": summary_data.get("topics", []), + "sentiment": summary_data.get("sentiment", "neutral"), + } + + +async def _generate_summary_with_ollama(prompt: str) -> dict: + """Generate summary using direct Ollama with configurable timeout.""" + async with httpx.AsyncClient(timeout=float(settings.ollama_timeout)) as client: try: response = await client.post( f"{settings.ollama_url}/api/generate", @@ -224,18 +326,7 @@ async def _generate_summary_with_ollama(transcript: str) -> dict: result = response.json() response_text = result.get("response", "") - # Parse JSON from response - summary_data = json.loads(response_text) - - # Validate required fields - return { - "summary": summary_data.get("summary", "No summary generated"), - "key_points": summary_data.get("key_points", []), - "action_items": summary_data.get("action_items", []), - "decisions": summary_data.get("decisions", []), - "topics": summary_data.get("topics", []), - "sentiment": summary_data.get("sentiment", "neutral") - } + return _parse_summary_json(response_text) except httpx.HTTPError as e: log.error("Ollama request failed", error=str(e)) @@ -243,9 +334,3 @@ async def _generate_summary_with_ollama(transcript: str) -> dict: status_code=503, detail=f"AI service unavailable: {str(e)}" ) - except json.JSONDecodeError as e: - log.error("Failed to parse Ollama response", error=str(e)) - raise HTTPException( - status_code=500, - detail="Failed to parse AI response" - )