From 4cb219db0f18e557f00f46fe3a7015dcd4a21fbf Mon Sep 17 00:00:00 2001 From: Jeff Emmett Date: Thu, 5 Feb 2026 19:04:19 +0000 Subject: [PATCH] feat(meeting-intelligence): add backend infrastructure for transcription and AI summaries Add complete Meeting Intelligence System infrastructure: Backend Services: - PostgreSQL schema with pgvector for semantic search - Transcription service using whisper.cpp and resemblyzer for diarization - Meeting Intelligence API with FastAPI - Jibri configuration for recording API Endpoints: - /meetings - List, get, delete meetings - /meetings/{id}/transcript - Get transcripts with speaker attribution - /meetings/{id}/summary - Generate AI summaries via Ollama - /search - Full-text and semantic search - /meetings/{id}/export - Export as PDF, Markdown, JSON - /webhooks/recording-complete - Jibri callback Features: - Zero-cost local transcription (whisper.cpp CPU) - Speaker diarization (who said what) - AI-powered summaries with key points, action items, decisions - Vector embeddings for semantic search - Multi-format export Co-Authored-By: Claude Opus 4.5 --- deploy/meeting-intelligence/.env.example | 17 + deploy/meeting-intelligence/README.md | 151 ++++++++ deploy/meeting-intelligence/api/Dockerfile | 32 ++ .../meeting-intelligence/api/app/__init__.py | 1 + deploy/meeting-intelligence/api/app/config.py | 50 +++ .../meeting-intelligence/api/app/database.py | 355 ++++++++++++++++++ deploy/meeting-intelligence/api/app/main.py | 113 ++++++ .../api/app/routes/__init__.py | 2 + .../api/app/routes/export.py | 319 ++++++++++++++++ .../api/app/routes/meetings.py | 112 ++++++ .../api/app/routes/search.py | 173 +++++++++ .../api/app/routes/summaries.py | 251 +++++++++++++ .../api/app/routes/transcripts.py | 161 ++++++++ .../api/app/routes/webhooks.py | 139 +++++++ .../meeting-intelligence/api/requirements.txt | 37 ++ .../meeting-intelligence/docker-compose.yml | 186 +++++++++ .../jibri/config/finalize.sh | 104 +++++ deploy/meeting-intelligence/postgres/init.sql | 310 +++++++++++++++ .../transcriber/Dockerfile | 67 ++++ .../transcriber/app/__init__.py | 1 + .../transcriber/app/config.py | 45 +++ .../transcriber/app/database.py | 245 ++++++++++++ .../transcriber/app/diarizer.py | 338 +++++++++++++++++ .../transcriber/app/main.py | 274 ++++++++++++++ .../transcriber/app/processor.py | 282 ++++++++++++++ .../transcriber/app/transcriber.py | 211 +++++++++++ .../transcriber/requirements.txt | 41 ++ 27 files changed, 4017 insertions(+) create mode 100644 deploy/meeting-intelligence/.env.example create mode 100644 deploy/meeting-intelligence/README.md create mode 100644 deploy/meeting-intelligence/api/Dockerfile create mode 100644 deploy/meeting-intelligence/api/app/__init__.py create mode 100644 deploy/meeting-intelligence/api/app/config.py create mode 100644 deploy/meeting-intelligence/api/app/database.py create mode 100644 deploy/meeting-intelligence/api/app/main.py create mode 100644 deploy/meeting-intelligence/api/app/routes/__init__.py create mode 100644 deploy/meeting-intelligence/api/app/routes/export.py create mode 100644 deploy/meeting-intelligence/api/app/routes/meetings.py create mode 100644 deploy/meeting-intelligence/api/app/routes/search.py create mode 100644 deploy/meeting-intelligence/api/app/routes/summaries.py create mode 100644 deploy/meeting-intelligence/api/app/routes/transcripts.py create mode 100644 deploy/meeting-intelligence/api/app/routes/webhooks.py create mode 100644 deploy/meeting-intelligence/api/requirements.txt create mode 100644 deploy/meeting-intelligence/docker-compose.yml create mode 100755 deploy/meeting-intelligence/jibri/config/finalize.sh create mode 100644 deploy/meeting-intelligence/postgres/init.sql create mode 100644 deploy/meeting-intelligence/transcriber/Dockerfile create mode 100644 deploy/meeting-intelligence/transcriber/app/__init__.py create mode 100644 deploy/meeting-intelligence/transcriber/app/config.py create mode 100644 deploy/meeting-intelligence/transcriber/app/database.py create mode 100644 deploy/meeting-intelligence/transcriber/app/diarizer.py create mode 100644 deploy/meeting-intelligence/transcriber/app/main.py create mode 100644 deploy/meeting-intelligence/transcriber/app/processor.py create mode 100644 deploy/meeting-intelligence/transcriber/app/transcriber.py create mode 100644 deploy/meeting-intelligence/transcriber/requirements.txt diff --git a/deploy/meeting-intelligence/.env.example b/deploy/meeting-intelligence/.env.example new file mode 100644 index 0000000..ec555da --- /dev/null +++ b/deploy/meeting-intelligence/.env.example @@ -0,0 +1,17 @@ +# Meeting Intelligence System - Environment Variables +# Copy this file to .env and update values + +# PostgreSQL +POSTGRES_PASSWORD=your-secure-password-here + +# API Security +API_SECRET_KEY=your-api-secret-key-here + +# Jibri XMPP Configuration +XMPP_SERVER=meet.jeffemmett.com +XMPP_DOMAIN=meet.jeffemmett.com +JIBRI_XMPP_PASSWORD=jibri-xmpp-password +JIBRI_RECORDER_PASSWORD=recorder-password + +# Ollama (uses host.docker.internal by default) +# OLLAMA_URL=http://host.docker.internal:11434 diff --git a/deploy/meeting-intelligence/README.md b/deploy/meeting-intelligence/README.md new file mode 100644 index 0000000..e94c027 --- /dev/null +++ b/deploy/meeting-intelligence/README.md @@ -0,0 +1,151 @@ +# Meeting Intelligence System + +A fully self-hosted, zero-cost meeting intelligence system for Jeffsi Meet that provides: +- Automatic meeting recording via Jibri +- Local transcription via whisper.cpp (CPU-only) +- Speaker diarization (who said what) +- AI-powered summaries via Ollama +- Searchable meeting archive with dashboard + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────────────┐ +│ Netcup RS 8000 (Backend) │ +│ │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │ +│ │ Jibri │───▶│ Whisper │───▶│ AI Processor │ │ +│ │ Recording │ │ Transcriber │ │ (Ollama + Summarizer) │ │ +│ │ Container │ │ Service │ │ │ │ +│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │ +│ │ │ │ │ +│ ▼ ▼ ▼ │ +│ ┌─────────────────────────────────────────────────────────────┐ │ +│ │ PostgreSQL + pgvector │ │ +│ └─────────────────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────────┘ +``` + +## Components + +| Service | Port | Description | +|---------|------|-------------| +| PostgreSQL | 5432 | Database with pgvector for semantic search | +| Redis | 6379 | Job queue for async processing | +| Transcriber | 8001 | whisper.cpp + speaker diarization | +| API | 8000 | REST API for meetings, transcripts, search | +| Jibri | - | Recording service (joins meetings as hidden participant) | + +## Deployment + +### Prerequisites + +1. Docker and Docker Compose installed +2. Ollama running on the host (for AI summaries) +3. Jeffsi Meet configured with recording enabled + +### Setup + +1. Copy environment file: + ```bash + cp .env.example .env + ``` + +2. Edit `.env` with your configuration: + ```bash + vim .env + ``` + +3. Create storage directories: + ```bash + sudo mkdir -p /opt/meetings/{recordings,audio} + sudo chown -R 1000:1000 /opt/meetings + ``` + +4. Start services: + ```bash + docker compose up -d + ``` + +5. Check logs: + ```bash + docker compose logs -f + ``` + +## API Endpoints + +Base URL: `https://meet.jeffemmett.com/api/intelligence` + +### Meetings +- `GET /meetings` - List all meetings +- `GET /meetings/{id}` - Get meeting details +- `DELETE /meetings/{id}` - Delete meeting + +### Transcripts +- `GET /meetings/{id}/transcript` - Get full transcript +- `GET /meetings/{id}/transcript/text` - Get as plain text +- `GET /meetings/{id}/speakers` - Get speaker statistics + +### Summaries +- `GET /meetings/{id}/summary` - Get AI summary +- `POST /meetings/{id}/summary` - Generate summary + +### Search +- `POST /search` - Search transcripts (text + semantic) +- `GET /search/suggest` - Get search suggestions + +### Export +- `GET /meetings/{id}/export?format=markdown` - Export as Markdown +- `GET /meetings/{id}/export?format=json` - Export as JSON +- `GET /meetings/{id}/export?format=pdf` - Export as PDF + +### Webhooks +- `POST /webhooks/recording-complete` - Jibri recording callback + +## Processing Pipeline + +1. **Recording** - Jibri joins meeting and records +2. **Webhook** - Jibri calls `/webhooks/recording-complete` +3. **Audio Extraction** - FFmpeg extracts audio from video +4. **Transcription** - whisper.cpp transcribes audio +5. **Diarization** - resemblyzer identifies speakers +6. **Embedding** - Generate vector embeddings for search +7. **Summary** - Ollama generates AI summary +8. **Ready** - Meeting available in dashboard + +## Resource Usage + +| Service | CPU | RAM | Storage | +|---------|-----|-----|---------| +| Transcriber | 8 cores | 12GB | 5GB (models) | +| API | 1 core | 2GB | - | +| PostgreSQL | 2 cores | 4GB | ~50GB | +| Jibri | 2 cores | 4GB | - | +| Redis | 0.5 cores | 512MB | - | + +## Troubleshooting + +### Transcription is slow +- Check CPU usage: `docker stats meeting-intelligence-transcriber` +- Increase `WHISPER_THREADS` in docker-compose.yml +- Consider using the `tiny` model for faster (less accurate) transcription + +### No summary generated +- Check Ollama is running: `curl http://localhost:11434/api/tags` +- Check logs: `docker compose logs api` +- Verify model is available: `ollama list` + +### Recording not starting +- Check Jibri logs: `docker compose logs jibri` +- Verify XMPP credentials in `.env` +- Check Prosody recorder virtual host configuration + +## Cost Analysis + +| Component | Monthly Cost | +|-----------|-------------| +| Jibri recording | $0 (local) | +| Whisper transcription | $0 (local CPU) | +| Ollama summarization | $0 (local) | +| PostgreSQL | $0 (local) | +| **Total** | **$0/month** | diff --git a/deploy/meeting-intelligence/api/Dockerfile b/deploy/meeting-intelligence/api/Dockerfile new file mode 100644 index 0000000..456c02c --- /dev/null +++ b/deploy/meeting-intelligence/api/Dockerfile @@ -0,0 +1,32 @@ +# Meeting Intelligence API +# Provides REST API for meeting transcripts, summaries, and search + +FROM python:3.11-slim + +# Install dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +# Install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY app/ ./app/ + +# Create directories +RUN mkdir -p /recordings /logs + +# Environment variables +ENV PYTHONUNBUFFERED=1 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 + +# Run the service +EXPOSE 8000 +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/deploy/meeting-intelligence/api/app/__init__.py b/deploy/meeting-intelligence/api/app/__init__.py new file mode 100644 index 0000000..e2fe786 --- /dev/null +++ b/deploy/meeting-intelligence/api/app/__init__.py @@ -0,0 +1 @@ +# Meeting Intelligence API diff --git a/deploy/meeting-intelligence/api/app/config.py b/deploy/meeting-intelligence/api/app/config.py new file mode 100644 index 0000000..2ca6afa --- /dev/null +++ b/deploy/meeting-intelligence/api/app/config.py @@ -0,0 +1,50 @@ +""" +Configuration settings for the Meeting Intelligence API. +""" + +from typing import List +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + """Application settings loaded from environment variables.""" + + # Database + postgres_url: str = "postgresql://meeting_intelligence:changeme@localhost:5432/meeting_intelligence" + + # Redis + redis_url: str = "redis://localhost:6379" + + # Ollama (for AI summaries) + ollama_url: str = "http://localhost:11434" + ollama_model: str = "llama3.2" + + # File paths + recordings_path: str = "/recordings" + + # Security + secret_key: str = "changeme" + api_key: str = "" # Optional API key authentication + + # CORS + cors_origins: List[str] = [ + "https://meet.jeffemmett.com", + "http://localhost:8080", + "http://localhost:3000" + ] + + # Embeddings model for semantic search + embedding_model: str = "all-MiniLM-L6-v2" + + # Export settings + export_temp_dir: str = "/tmp/exports" + + # Transcriber service URL + transcriber_url: str = "http://transcriber:8001" + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" + + +settings = Settings() diff --git a/deploy/meeting-intelligence/api/app/database.py b/deploy/meeting-intelligence/api/app/database.py new file mode 100644 index 0000000..1819df1 --- /dev/null +++ b/deploy/meeting-intelligence/api/app/database.py @@ -0,0 +1,355 @@ +""" +Database operations for the Meeting Intelligence API. +""" + +import uuid +from datetime import datetime +from typing import Optional, List, Dict, Any + +import asyncpg +import structlog + +log = structlog.get_logger() + + +class Database: + """Database operations for Meeting Intelligence API.""" + + def __init__(self, connection_string: str): + self.connection_string = connection_string + self.pool: Optional[asyncpg.Pool] = None + + async def connect(self): + """Establish database connection pool.""" + log.info("Connecting to database...") + self.pool = await asyncpg.create_pool( + self.connection_string, + min_size=2, + max_size=20 + ) + log.info("Database connected") + + async def disconnect(self): + """Close database connection pool.""" + if self.pool: + await self.pool.close() + log.info("Database disconnected") + + async def health_check(self): + """Check database connectivity.""" + async with self.pool.acquire() as conn: + await conn.fetchval("SELECT 1") + + # ==================== Meetings ==================== + + async def list_meetings( + self, + limit: int = 50, + offset: int = 0, + status: Optional[str] = None + ) -> List[Dict[str, Any]]: + """List meetings with pagination.""" + async with self.pool.acquire() as conn: + if status: + rows = await conn.fetch(""" + SELECT id, conference_id, conference_name, title, + started_at, ended_at, duration_seconds, + status, created_at + FROM meetings + WHERE status = $1 + ORDER BY created_at DESC + LIMIT $2 OFFSET $3 + """, status, limit, offset) + else: + rows = await conn.fetch(""" + SELECT id, conference_id, conference_name, title, + started_at, ended_at, duration_seconds, + status, created_at + FROM meetings + ORDER BY created_at DESC + LIMIT $1 OFFSET $2 + """, limit, offset) + + return [dict(row) for row in rows] + + async def get_meeting(self, meeting_id: str) -> Optional[Dict[str, Any]]: + """Get meeting details.""" + async with self.pool.acquire() as conn: + row = await conn.fetchrow(""" + SELECT m.id, m.conference_id, m.conference_name, m.title, + m.started_at, m.ended_at, m.duration_seconds, + m.recording_path, m.audio_path, m.status, + m.metadata, m.created_at, + (SELECT COUNT(*) FROM transcripts WHERE meeting_id = m.id) as segment_count, + (SELECT COUNT(*) FROM meeting_participants WHERE meeting_id = m.id) as participant_count, + (SELECT id FROM summaries WHERE meeting_id = m.id LIMIT 1) as summary_id + FROM meetings m + WHERE m.id = $1::uuid + """, meeting_id) + + if row: + return dict(row) + return None + + async def create_meeting( + self, + conference_id: str, + conference_name: Optional[str] = None, + title: Optional[str] = None, + recording_path: Optional[str] = None, + started_at: Optional[datetime] = None, + metadata: Optional[dict] = None + ) -> str: + """Create a new meeting record.""" + meeting_id = str(uuid.uuid4()) + + async with self.pool.acquire() as conn: + await conn.execute(""" + INSERT INTO meetings ( + id, conference_id, conference_name, title, + recording_path, started_at, status, metadata + ) + VALUES ($1, $2, $3, $4, $5, $6, 'recording', $7) + """, meeting_id, conference_id, conference_name, title, + recording_path, started_at or datetime.utcnow(), metadata or {}) + + return meeting_id + + async def update_meeting( + self, + meeting_id: str, + **kwargs + ): + """Update meeting fields.""" + if not kwargs: + return + + set_clauses = [] + values = [] + i = 1 + + for key, value in kwargs.items(): + if key in ['status', 'title', 'ended_at', 'duration_seconds', + 'recording_path', 'audio_path', 'error_message']: + set_clauses.append(f"{key} = ${i}") + values.append(value) + i += 1 + + if not set_clauses: + return + + values.append(meeting_id) + + async with self.pool.acquire() as conn: + await conn.execute(f""" + UPDATE meetings + SET {', '.join(set_clauses)}, updated_at = NOW() + WHERE id = ${i}::uuid + """, *values) + + # ==================== Transcripts ==================== + + async def get_transcript( + self, + meeting_id: str, + speaker_filter: Optional[str] = None + ) -> List[Dict[str, Any]]: + """Get transcript segments for a meeting.""" + async with self.pool.acquire() as conn: + if speaker_filter: + rows = await conn.fetch(""" + SELECT id, segment_index, start_time, end_time, + speaker_id, speaker_name, speaker_label, + text, confidence, language + FROM transcripts + WHERE meeting_id = $1::uuid AND speaker_id = $2 + ORDER BY segment_index ASC + """, meeting_id, speaker_filter) + else: + rows = await conn.fetch(""" + SELECT id, segment_index, start_time, end_time, + speaker_id, speaker_name, speaker_label, + text, confidence, language + FROM transcripts + WHERE meeting_id = $1::uuid + ORDER BY segment_index ASC + """, meeting_id) + + return [dict(row) for row in rows] + + async def get_speakers(self, meeting_id: str) -> List[Dict[str, Any]]: + """Get speaker statistics for a meeting.""" + async with self.pool.acquire() as conn: + rows = await conn.fetch(""" + SELECT speaker_id, speaker_label, + COUNT(*) as segment_count, + SUM(end_time - start_time) as speaking_time, + SUM(LENGTH(text)) as character_count + FROM transcripts + WHERE meeting_id = $1::uuid AND speaker_id IS NOT NULL + GROUP BY speaker_id, speaker_label + ORDER BY speaking_time DESC + """, meeting_id) + + return [dict(row) for row in rows] + + # ==================== Summaries ==================== + + async def get_summary(self, meeting_id: str) -> Optional[Dict[str, Any]]: + """Get AI summary for a meeting.""" + async with self.pool.acquire() as conn: + row = await conn.fetchrow(""" + SELECT id, meeting_id, summary_text, key_points, + action_items, decisions, topics, sentiment, + model_used, generated_at + FROM summaries + WHERE meeting_id = $1::uuid + ORDER BY generated_at DESC + LIMIT 1 + """, meeting_id) + + if row: + return dict(row) + return None + + async def save_summary( + self, + meeting_id: str, + summary_text: str, + key_points: List[str], + action_items: List[dict], + decisions: List[str], + topics: List[dict], + sentiment: str, + model_used: str, + prompt_tokens: int = 0, + completion_tokens: int = 0 + ) -> int: + """Save AI-generated summary.""" + async with self.pool.acquire() as conn: + row = await conn.fetchrow(""" + INSERT INTO summaries ( + meeting_id, summary_text, key_points, action_items, + decisions, topics, sentiment, model_used, + prompt_tokens, completion_tokens + ) + VALUES ($1::uuid, $2, $3, $4, $5, $6, $7, $8, $9, $10) + RETURNING id + """, meeting_id, summary_text, key_points, action_items, + decisions, topics, sentiment, model_used, + prompt_tokens, completion_tokens) + + return row["id"] + + # ==================== Search ==================== + + async def fulltext_search( + self, + query: str, + meeting_id: Optional[str] = None, + limit: int = 50 + ) -> List[Dict[str, Any]]: + """Full-text search across transcripts.""" + async with self.pool.acquire() as conn: + if meeting_id: + rows = await conn.fetch(""" + SELECT t.id, t.meeting_id, t.start_time, t.end_time, + t.speaker_label, t.text, m.title as meeting_title, + ts_rank(to_tsvector('english', t.text), + plainto_tsquery('english', $1)) as rank + FROM transcripts t + JOIN meetings m ON t.meeting_id = m.id + WHERE t.meeting_id = $2::uuid + AND to_tsvector('english', t.text) @@ plainto_tsquery('english', $1) + ORDER BY rank DESC + LIMIT $3 + """, query, meeting_id, limit) + else: + rows = await conn.fetch(""" + SELECT t.id, t.meeting_id, t.start_time, t.end_time, + t.speaker_label, t.text, m.title as meeting_title, + ts_rank(to_tsvector('english', t.text), + plainto_tsquery('english', $1)) as rank + FROM transcripts t + JOIN meetings m ON t.meeting_id = m.id + WHERE to_tsvector('english', t.text) @@ plainto_tsquery('english', $1) + ORDER BY rank DESC + LIMIT $2 + """, query, limit) + + return [dict(row) for row in rows] + + async def semantic_search( + self, + embedding: List[float], + meeting_id: Optional[str] = None, + threshold: float = 0.7, + limit: int = 20 + ) -> List[Dict[str, Any]]: + """Semantic search using vector embeddings.""" + async with self.pool.acquire() as conn: + embedding_str = f"[{','.join(map(str, embedding))}]" + + if meeting_id: + rows = await conn.fetch(""" + SELECT te.transcript_id, te.meeting_id, te.chunk_text, + t.start_time, t.speaker_label, m.title as meeting_title, + 1 - (te.embedding <=> $1::vector) as similarity + FROM transcript_embeddings te + JOIN transcripts t ON te.transcript_id = t.id + JOIN meetings m ON te.meeting_id = m.id + WHERE te.meeting_id = $2::uuid + AND 1 - (te.embedding <=> $1::vector) > $3 + ORDER BY te.embedding <=> $1::vector + LIMIT $4 + """, embedding_str, meeting_id, threshold, limit) + else: + rows = await conn.fetch(""" + SELECT te.transcript_id, te.meeting_id, te.chunk_text, + t.start_time, t.speaker_label, m.title as meeting_title, + 1 - (te.embedding <=> $1::vector) as similarity + FROM transcript_embeddings te + JOIN transcripts t ON te.transcript_id = t.id + JOIN meetings m ON te.meeting_id = m.id + WHERE 1 - (te.embedding <=> $1::vector) > $2 + ORDER BY te.embedding <=> $1::vector + LIMIT $3 + """, embedding_str, threshold, limit) + + return [dict(row) for row in rows] + + # ==================== Webhooks ==================== + + async def save_webhook_event( + self, + event_type: str, + payload: dict + ) -> int: + """Save a webhook event for processing.""" + async with self.pool.acquire() as conn: + row = await conn.fetchrow(""" + INSERT INTO webhook_events (event_type, payload) + VALUES ($1, $2) + RETURNING id + """, event_type, payload) + + return row["id"] + + # ==================== Jobs ==================== + + async def create_job( + self, + meeting_id: str, + job_type: str, + priority: int = 5, + result: Optional[dict] = None + ) -> int: + """Create a processing job.""" + async with self.pool.acquire() as conn: + row = await conn.fetchrow(""" + INSERT INTO processing_jobs (meeting_id, job_type, priority, result) + VALUES ($1::uuid, $2, $3, $4) + RETURNING id + """, meeting_id, job_type, priority, result or {}) + + return row["id"] diff --git a/deploy/meeting-intelligence/api/app/main.py b/deploy/meeting-intelligence/api/app/main.py new file mode 100644 index 0000000..8473f63 --- /dev/null +++ b/deploy/meeting-intelligence/api/app/main.py @@ -0,0 +1,113 @@ +""" +Meeting Intelligence API + +Provides REST API for: +- Meeting management +- Transcript retrieval +- AI-powered summaries +- Semantic search +- Export functionality +""" + +import os +from contextlib import asynccontextmanager +from typing import Optional + +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from .config import settings +from .database import Database +from .routes import meetings, transcripts, summaries, search, webhooks, export + +import structlog + +log = structlog.get_logger() + + +# Application state +class AppState: + db: Optional[Database] = None + + +state = AppState() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application startup and shutdown.""" + log.info("Starting Meeting Intelligence API...") + + # Initialize database + state.db = Database(settings.postgres_url) + await state.db.connect() + + # Make database available to routes + app.state.db = state.db + + log.info("Meeting Intelligence API started successfully") + + yield + + # Shutdown + log.info("Shutting down Meeting Intelligence API...") + if state.db: + await state.db.disconnect() + + log.info("Meeting Intelligence API stopped") + + +app = FastAPI( + title="Meeting Intelligence API", + description="API for meeting transcripts, summaries, and search", + version="1.0.0", + lifespan=lifespan, + docs_url="/docs", + redoc_url="/redoc" +) + +# CORS configuration +app.add_middleware( + CORSMiddleware, + allow_origins=settings.cors_origins, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Include routers +app.include_router(meetings.router, prefix="/meetings", tags=["Meetings"]) +app.include_router(transcripts.router, prefix="/meetings", tags=["Transcripts"]) +app.include_router(summaries.router, prefix="/meetings", tags=["Summaries"]) +app.include_router(search.router, prefix="/search", tags=["Search"]) +app.include_router(webhooks.router, prefix="/webhooks", tags=["Webhooks"]) +app.include_router(export.router, prefix="/meetings", tags=["Export"]) + + +@app.get("/health") +async def health_check(): + """Health check endpoint.""" + db_ok = False + + try: + if state.db: + await state.db.health_check() + db_ok = True + except Exception as e: + log.error("Database health check failed", error=str(e)) + + return { + "status": "healthy" if db_ok else "unhealthy", + "database": db_ok, + "version": "1.0.0" + } + + +@app.get("/") +async def root(): + """Root endpoint.""" + return { + "service": "Meeting Intelligence API", + "version": "1.0.0", + "docs": "/docs" + } diff --git a/deploy/meeting-intelligence/api/app/routes/__init__.py b/deploy/meeting-intelligence/api/app/routes/__init__.py new file mode 100644 index 0000000..83ed419 --- /dev/null +++ b/deploy/meeting-intelligence/api/app/routes/__init__.py @@ -0,0 +1,2 @@ +# API Routes +from . import meetings, transcripts, summaries, search, webhooks, export diff --git a/deploy/meeting-intelligence/api/app/routes/export.py b/deploy/meeting-intelligence/api/app/routes/export.py new file mode 100644 index 0000000..f88863e --- /dev/null +++ b/deploy/meeting-intelligence/api/app/routes/export.py @@ -0,0 +1,319 @@ +""" +Export routes for Meeting Intelligence. + +Supports exporting meetings as PDF, Markdown, and JSON. +""" + +import io +import json +import os +from datetime import datetime +from typing import Optional + +from fastapi import APIRouter, HTTPException, Request, Response +from fastapi.responses import StreamingResponse +from pydantic import BaseModel + +import structlog + +log = structlog.get_logger() + +router = APIRouter() + + +class ExportRequest(BaseModel): + format: str = "markdown" # "pdf", "markdown", "json" + include_transcript: bool = True + include_summary: bool = True + + +@router.get("/{meeting_id}/export") +async def export_meeting( + request: Request, + meeting_id: str, + format: str = "markdown", + include_transcript: bool = True, + include_summary: bool = True +): + """Export meeting data in various formats.""" + db = request.app.state.db + + # Get meeting data + meeting = await db.get_meeting(meeting_id) + if not meeting: + raise HTTPException(status_code=404, detail="Meeting not found") + + # Get transcript if requested + transcript = None + if include_transcript: + transcript = await db.get_transcript(meeting_id) + + # Get summary if requested + summary = None + if include_summary: + summary = await db.get_summary(meeting_id) + + # Export based on format + if format == "json": + return _export_json(meeting, transcript, summary) + elif format == "markdown": + return _export_markdown(meeting, transcript, summary) + elif format == "pdf": + return await _export_pdf(meeting, transcript, summary) + else: + raise HTTPException( + status_code=400, + detail=f"Unsupported format: {format}. Use: json, markdown, pdf" + ) + + +def _export_json(meeting: dict, transcript: list, summary: dict) -> Response: + """Export as JSON.""" + data = { + "meeting": { + "id": str(meeting["id"]), + "conference_id": meeting["conference_id"], + "title": meeting.get("title"), + "started_at": meeting["started_at"].isoformat() if meeting.get("started_at") else None, + "ended_at": meeting["ended_at"].isoformat() if meeting.get("ended_at") else None, + "duration_seconds": meeting.get("duration_seconds"), + "status": meeting["status"] + }, + "transcript": [ + { + "start_time": s["start_time"], + "end_time": s["end_time"], + "speaker": s.get("speaker_label"), + "text": s["text"] + } + for s in (transcript or []) + ] if transcript else None, + "summary": { + "text": summary["summary_text"], + "key_points": summary["key_points"], + "action_items": summary["action_items"], + "decisions": summary["decisions"], + "topics": summary["topics"], + "sentiment": summary.get("sentiment") + } if summary else None, + "exported_at": datetime.utcnow().isoformat() + } + + filename = f"meeting-{meeting['conference_id']}-{datetime.utcnow().strftime('%Y%m%d')}.json" + + return Response( + content=json.dumps(data, indent=2), + media_type="application/json", + headers={ + "Content-Disposition": f'attachment; filename="{filename}"' + } + ) + + +def _export_markdown(meeting: dict, transcript: list, summary: dict) -> Response: + """Export as Markdown.""" + lines = [] + + # Header + title = meeting.get("title") or f"Meeting: {meeting['conference_id']}" + lines.append(f"# {title}") + lines.append("") + + # Metadata + lines.append("## Meeting Details") + lines.append("") + lines.append(f"- **Conference ID:** {meeting['conference_id']}") + if meeting.get("started_at"): + lines.append(f"- **Date:** {meeting['started_at'].strftime('%Y-%m-%d %H:%M UTC')}") + if meeting.get("duration_seconds"): + minutes = meeting["duration_seconds"] // 60 + lines.append(f"- **Duration:** {minutes} minutes") + lines.append(f"- **Status:** {meeting['status']}") + lines.append("") + + # Summary + if summary: + lines.append("## Summary") + lines.append("") + lines.append(summary["summary_text"]) + lines.append("") + + # Key Points + if summary.get("key_points"): + lines.append("### Key Points") + lines.append("") + for point in summary["key_points"]: + lines.append(f"- {point}") + lines.append("") + + # Action Items + if summary.get("action_items"): + lines.append("### Action Items") + lines.append("") + for item in summary["action_items"]: + task = item.get("task", item) if isinstance(item, dict) else item + assignee = item.get("assignee", "") if isinstance(item, dict) else "" + checkbox = "[ ]" + if assignee: + lines.append(f"- {checkbox} {task} *(Assigned: {assignee})*") + else: + lines.append(f"- {checkbox} {task}") + lines.append("") + + # Decisions + if summary.get("decisions"): + lines.append("### Decisions") + lines.append("") + for decision in summary["decisions"]: + lines.append(f"- {decision}") + lines.append("") + + # Transcript + if transcript: + lines.append("## Transcript") + lines.append("") + + current_speaker = None + for segment in transcript: + speaker = segment.get("speaker_label") or "Speaker" + time_str = _format_time(segment["start_time"]) + + if speaker != current_speaker: + lines.append("") + lines.append(f"**{speaker}** *({time_str})*") + current_speaker = speaker + + lines.append(f"> {segment['text']}") + + lines.append("") + + # Footer + lines.append("---") + lines.append(f"*Exported on {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')} by Meeting Intelligence*") + + content = "\n".join(lines) + filename = f"meeting-{meeting['conference_id']}-{datetime.utcnow().strftime('%Y%m%d')}.md" + + return Response( + content=content, + media_type="text/markdown", + headers={ + "Content-Disposition": f'attachment; filename="{filename}"' + } + ) + + +async def _export_pdf(meeting: dict, transcript: list, summary: dict) -> StreamingResponse: + """Export as PDF using reportlab.""" + try: + from reportlab.lib.pagesizes import letter + from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle + from reportlab.lib.units import inch + from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, ListFlowable, ListItem + except ImportError: + raise HTTPException( + status_code=501, + detail="PDF export requires reportlab. Use markdown or json format." + ) + + buffer = io.BytesIO() + + # Create PDF document + doc = SimpleDocTemplate( + buffer, + pagesize=letter, + rightMargin=72, + leftMargin=72, + topMargin=72, + bottomMargin=72 + ) + + styles = getSampleStyleSheet() + story = [] + + # Title + title = meeting.get("title") or f"Meeting: {meeting['conference_id']}" + story.append(Paragraph(title, styles['Title'])) + story.append(Spacer(1, 12)) + + # Metadata + story.append(Paragraph("Meeting Details", styles['Heading2'])) + if meeting.get("started_at"): + story.append(Paragraph( + f"Date: {meeting['started_at'].strftime('%Y-%m-%d %H:%M UTC')}", + styles['Normal'] + )) + if meeting.get("duration_seconds"): + minutes = meeting["duration_seconds"] // 60 + story.append(Paragraph(f"Duration: {minutes} minutes", styles['Normal'])) + story.append(Spacer(1, 12)) + + # Summary + if summary: + story.append(Paragraph("Summary", styles['Heading2'])) + story.append(Paragraph(summary["summary_text"], styles['Normal'])) + story.append(Spacer(1, 12)) + + if summary.get("key_points"): + story.append(Paragraph("Key Points", styles['Heading3'])) + for point in summary["key_points"]: + story.append(Paragraph(f"• {point}", styles['Normal'])) + story.append(Spacer(1, 12)) + + if summary.get("action_items"): + story.append(Paragraph("Action Items", styles['Heading3'])) + for item in summary["action_items"]: + task = item.get("task", item) if isinstance(item, dict) else item + story.append(Paragraph(f"☐ {task}", styles['Normal'])) + story.append(Spacer(1, 12)) + + # Transcript (abbreviated for PDF) + if transcript: + story.append(Paragraph("Transcript", styles['Heading2'])) + current_speaker = None + + for segment in transcript[:100]: # Limit segments for PDF + speaker = segment.get("speaker_label") or "Speaker" + + if speaker != current_speaker: + story.append(Spacer(1, 6)) + story.append(Paragraph( + f"{speaker} ({_format_time(segment['start_time'])})", + styles['Normal'] + )) + current_speaker = speaker + + story.append(Paragraph(segment['text'], styles['Normal'])) + + if len(transcript) > 100: + story.append(Spacer(1, 12)) + story.append(Paragraph( + f"[... {len(transcript) - 100} more segments not shown in PDF]", + styles['Normal'] + )) + + # Build PDF + doc.build(story) + buffer.seek(0) + + filename = f"meeting-{meeting['conference_id']}-{datetime.utcnow().strftime('%Y%m%d')}.pdf" + + return StreamingResponse( + buffer, + media_type="application/pdf", + headers={ + "Content-Disposition": f'attachment; filename="{filename}"' + } + ) + + +def _format_time(seconds: float) -> str: + """Format seconds as HH:MM:SS or MM:SS.""" + total_seconds = int(seconds) + hours = total_seconds // 3600 + minutes = (total_seconds % 3600) // 60 + secs = total_seconds % 60 + + if hours > 0: + return f"{hours}:{minutes:02d}:{secs:02d}" + return f"{minutes}:{secs:02d}" diff --git a/deploy/meeting-intelligence/api/app/routes/meetings.py b/deploy/meeting-intelligence/api/app/routes/meetings.py new file mode 100644 index 0000000..ba0327b --- /dev/null +++ b/deploy/meeting-intelligence/api/app/routes/meetings.py @@ -0,0 +1,112 @@ +""" +Meeting management routes. +""" + +from typing import Optional, List + +from fastapi import APIRouter, HTTPException, Request, Query +from pydantic import BaseModel + +import structlog + +log = structlog.get_logger() + +router = APIRouter() + + +class MeetingResponse(BaseModel): + id: str + conference_id: str + conference_name: Optional[str] + title: Optional[str] + started_at: Optional[str] + ended_at: Optional[str] + duration_seconds: Optional[int] + status: str + created_at: str + segment_count: Optional[int] = None + participant_count: Optional[int] = None + has_summary: Optional[bool] = None + + +class MeetingListResponse(BaseModel): + meetings: List[MeetingResponse] + total: int + limit: int + offset: int + + +@router.get("", response_model=MeetingListResponse) +async def list_meetings( + request: Request, + limit: int = Query(default=50, le=100), + offset: int = Query(default=0, ge=0), + status: Optional[str] = Query(default=None) +): + """List all meetings with pagination.""" + db = request.app.state.db + + meetings = await db.list_meetings(limit=limit, offset=offset, status=status) + + return MeetingListResponse( + meetings=[ + MeetingResponse( + id=str(m["id"]), + conference_id=m["conference_id"], + conference_name=m.get("conference_name"), + title=m.get("title"), + started_at=m["started_at"].isoformat() if m.get("started_at") else None, + ended_at=m["ended_at"].isoformat() if m.get("ended_at") else None, + duration_seconds=m.get("duration_seconds"), + status=m["status"], + created_at=m["created_at"].isoformat() + ) + for m in meetings + ], + total=len(meetings), # TODO: Add total count query + limit=limit, + offset=offset + ) + + +@router.get("/{meeting_id}", response_model=MeetingResponse) +async def get_meeting(request: Request, meeting_id: str): + """Get meeting details.""" + db = request.app.state.db + + meeting = await db.get_meeting(meeting_id) + + if not meeting: + raise HTTPException(status_code=404, detail="Meeting not found") + + return MeetingResponse( + id=str(meeting["id"]), + conference_id=meeting["conference_id"], + conference_name=meeting.get("conference_name"), + title=meeting.get("title"), + started_at=meeting["started_at"].isoformat() if meeting.get("started_at") else None, + ended_at=meeting["ended_at"].isoformat() if meeting.get("ended_at") else None, + duration_seconds=meeting.get("duration_seconds"), + status=meeting["status"], + created_at=meeting["created_at"].isoformat(), + segment_count=meeting.get("segment_count"), + participant_count=meeting.get("participant_count"), + has_summary=meeting.get("summary_id") is not None + ) + + +@router.delete("/{meeting_id}") +async def delete_meeting(request: Request, meeting_id: str): + """Delete a meeting and all associated data.""" + db = request.app.state.db + + meeting = await db.get_meeting(meeting_id) + + if not meeting: + raise HTTPException(status_code=404, detail="Meeting not found") + + # TODO: Implement cascade delete + # For now, just mark as deleted + await db.update_meeting(meeting_id, status="deleted") + + return {"status": "deleted", "meeting_id": meeting_id} diff --git a/deploy/meeting-intelligence/api/app/routes/search.py b/deploy/meeting-intelligence/api/app/routes/search.py new file mode 100644 index 0000000..6f209de --- /dev/null +++ b/deploy/meeting-intelligence/api/app/routes/search.py @@ -0,0 +1,173 @@ +""" +Search routes for Meeting Intelligence. +""" + +from typing import Optional, List + +from fastapi import APIRouter, HTTPException, Request, Query +from pydantic import BaseModel +from sentence_transformers import SentenceTransformer + +from ..config import settings + +import structlog + +log = structlog.get_logger() + +router = APIRouter() + +# Lazy-load embedding model +_embedding_model = None + + +def get_embedding_model(): + """Get or initialize the embedding model.""" + global _embedding_model + if _embedding_model is None: + log.info("Loading embedding model...", model=settings.embedding_model) + _embedding_model = SentenceTransformer(settings.embedding_model) + log.info("Embedding model loaded") + return _embedding_model + + +class SearchResult(BaseModel): + meeting_id: str + meeting_title: Optional[str] + text: str + start_time: Optional[float] + speaker_label: Optional[str] + score: float + search_type: str + + +class SearchResponse(BaseModel): + query: str + results: List[SearchResult] + total: int + search_type: str + + +class SearchRequest(BaseModel): + query: str + meeting_id: Optional[str] = None + search_type: str = "combined" # "text", "semantic", "combined" + limit: int = 20 + + +@router.post("", response_model=SearchResponse) +async def search_transcripts(request: Request, body: SearchRequest): + """Search across meeting transcripts. + + Search types: + - text: Full-text search using PostgreSQL ts_vector + - semantic: Semantic search using vector embeddings + - combined: Both text and semantic search, merged results + """ + db = request.app.state.db + + if not body.query or len(body.query.strip()) < 2: + raise HTTPException( + status_code=400, + detail="Query must be at least 2 characters" + ) + + results = [] + + # Full-text search + if body.search_type in ["text", "combined"]: + text_results = await db.fulltext_search( + query=body.query, + meeting_id=body.meeting_id, + limit=body.limit + ) + + for r in text_results: + results.append(SearchResult( + meeting_id=str(r["meeting_id"]), + meeting_title=r.get("meeting_title"), + text=r["text"], + start_time=r.get("start_time"), + speaker_label=r.get("speaker_label"), + score=float(r["rank"]), + search_type="text" + )) + + # Semantic search + if body.search_type in ["semantic", "combined"]: + try: + model = get_embedding_model() + query_embedding = model.encode(body.query).tolist() + + semantic_results = await db.semantic_search( + embedding=query_embedding, + meeting_id=body.meeting_id, + threshold=0.6, + limit=body.limit + ) + + for r in semantic_results: + results.append(SearchResult( + meeting_id=str(r["meeting_id"]), + meeting_title=r.get("meeting_title"), + text=r["chunk_text"], + start_time=r.get("start_time"), + speaker_label=r.get("speaker_label"), + score=float(r["similarity"]), + search_type="semantic" + )) + + except Exception as e: + log.error("Semantic search failed", error=str(e)) + if body.search_type == "semantic": + raise HTTPException( + status_code=500, + detail=f"Semantic search failed: {str(e)}" + ) + + # Deduplicate and sort by score + seen = set() + unique_results = [] + for r in sorted(results, key=lambda x: x.score, reverse=True): + key = (r.meeting_id, r.text[:100]) + if key not in seen: + seen.add(key) + unique_results.append(r) + + return SearchResponse( + query=body.query, + results=unique_results[:body.limit], + total=len(unique_results), + search_type=body.search_type + ) + + +@router.get("/suggest") +async def search_suggestions( + request: Request, + q: str = Query(..., min_length=2) +): + """Get search suggestions based on partial query.""" + db = request.app.state.db + + # Simple prefix search on common terms + results = await db.fulltext_search(query=q, limit=5) + + # Extract unique phrases + suggestions = [] + for r in results: + # Get surrounding context + text = r["text"] + words = text.split() + + # Find matching words and get context + for i, word in enumerate(words): + if q.lower() in word.lower(): + start = max(0, i - 2) + end = min(len(words), i + 3) + phrase = " ".join(words[start:end]) + if phrase not in suggestions: + suggestions.append(phrase) + if len(suggestions) >= 5: + break + + return {"suggestions": suggestions} diff --git a/deploy/meeting-intelligence/api/app/routes/summaries.py b/deploy/meeting-intelligence/api/app/routes/summaries.py new file mode 100644 index 0000000..a464a1f --- /dev/null +++ b/deploy/meeting-intelligence/api/app/routes/summaries.py @@ -0,0 +1,251 @@ +""" +AI Summary routes. +""" + +import json +from typing import Optional, List + +import httpx +from fastapi import APIRouter, HTTPException, Request, BackgroundTasks +from pydantic import BaseModel + +from ..config import settings + +import structlog + +log = structlog.get_logger() + +router = APIRouter() + + +class ActionItem(BaseModel): + task: str + assignee: Optional[str] = None + due_date: Optional[str] = None + completed: bool = False + + +class Topic(BaseModel): + topic: str + duration_seconds: Optional[float] = None + relevance_score: Optional[float] = None + + +class SummaryResponse(BaseModel): + meeting_id: str + summary_text: str + key_points: List[str] + action_items: List[ActionItem] + decisions: List[str] + topics: List[Topic] + sentiment: Optional[str] + model_used: str + generated_at: str + + +class GenerateSummaryRequest(BaseModel): + force_regenerate: bool = False + + +# Summarization prompt template +SUMMARY_PROMPT = """You are analyzing a meeting transcript. Your task is to extract key information and provide a structured summary. + +## Meeting Transcript: +{transcript} + +## Instructions: +Analyze the transcript and extract the following information. Be concise and accurate. + +Respond ONLY with a valid JSON object in this exact format (no markdown, no extra text): +{{ + "summary": "A 2-3 sentence overview of what was discussed in the meeting", + "key_points": ["Point 1", "Point 2", "Point 3"], + "action_items": [ + {{"task": "Description of task", "assignee": "Person name or null", "due_date": "Date or null"}} + ], + "decisions": ["Decision 1", "Decision 2"], + "topics": [ + {{"topic": "Topic name", "relevance_score": 0.9}} + ], + "sentiment": "positive" or "neutral" or "negative" or "mixed" +}} + +Remember: +- key_points: 3-5 most important points discussed +- action_items: Tasks that need to be done, with assignees if mentioned +- decisions: Any decisions or conclusions reached +- topics: Main themes discussed with relevance scores (0-1) +- sentiment: Overall tone of the meeting +""" + + +@router.get("/{meeting_id}/summary", response_model=SummaryResponse) +async def get_summary(request: Request, meeting_id: str): + """Get AI-generated summary for a meeting.""" + db = request.app.state.db + + # Verify meeting exists + meeting = await db.get_meeting(meeting_id) + if not meeting: + raise HTTPException(status_code=404, detail="Meeting not found") + + summary = await db.get_summary(meeting_id) + + if not summary: + raise HTTPException( + status_code=404, + detail="No summary available. Use POST to generate one." + ) + + return SummaryResponse( + meeting_id=meeting_id, + summary_text=summary["summary_text"], + key_points=summary["key_points"] or [], + action_items=[ + ActionItem(**item) for item in (summary["action_items"] or []) + ], + decisions=summary["decisions"] or [], + topics=[ + Topic(**topic) for topic in (summary["topics"] or []) + ], + sentiment=summary.get("sentiment"), + model_used=summary["model_used"], + generated_at=summary["generated_at"].isoformat() + ) + + +@router.post("/{meeting_id}/summary", response_model=SummaryResponse) +async def generate_summary( + request: Request, + meeting_id: str, + body: GenerateSummaryRequest, + background_tasks: BackgroundTasks +): + """Generate AI summary for a meeting.""" + db = request.app.state.db + + # Verify meeting exists + meeting = await db.get_meeting(meeting_id) + if not meeting: + raise HTTPException(status_code=404, detail="Meeting not found") + + # Check if summary already exists + if not body.force_regenerate: + existing = await db.get_summary(meeting_id) + if existing: + raise HTTPException( + status_code=409, + detail="Summary already exists. Set force_regenerate=true to regenerate." + ) + + # Get transcript + segments = await db.get_transcript(meeting_id) + if not segments: + raise HTTPException( + status_code=400, + detail="No transcript available for summarization" + ) + + # Format transcript for LLM + transcript_text = _format_transcript(segments) + + # Generate summary using Ollama + summary_data = await _generate_summary_with_ollama(transcript_text) + + # Save summary + await db.save_summary( + meeting_id=meeting_id, + summary_text=summary_data["summary"], + key_points=summary_data["key_points"], + action_items=summary_data["action_items"], + decisions=summary_data["decisions"], + topics=summary_data["topics"], + sentiment=summary_data["sentiment"], + model_used=settings.ollama_model + ) + + # Update meeting status + await db.update_meeting(meeting_id, status="ready") + + # Get the saved summary + summary = await db.get_summary(meeting_id) + + return SummaryResponse( + meeting_id=meeting_id, + summary_text=summary["summary_text"], + key_points=summary["key_points"] or [], + action_items=[ + ActionItem(**item) for item in (summary["action_items"] or []) + ], + decisions=summary["decisions"] or [], + topics=[ + Topic(**topic) for topic in (summary["topics"] or []) + ], + sentiment=summary.get("sentiment"), + model_used=summary["model_used"], + generated_at=summary["generated_at"].isoformat() + ) + + +def _format_transcript(segments: list) -> str: + """Format transcript segments for LLM processing.""" + lines = [] + current_speaker = None + + for s in segments: + speaker = s.get("speaker_label") or "Speaker" + + if speaker != current_speaker: + lines.append(f"\n[{speaker}]") + current_speaker = speaker + + lines.append(s["text"]) + + return "\n".join(lines) + + +async def _generate_summary_with_ollama(transcript: str) -> dict: + """Generate summary using Ollama.""" + prompt = SUMMARY_PROMPT.format(transcript=transcript[:15000]) # Limit context + + async with httpx.AsyncClient(timeout=120.0) as client: + try: + response = await client.post( + f"{settings.ollama_url}/api/generate", + json={ + "model": settings.ollama_model, + "prompt": prompt, + "stream": False, + "format": "json" + } + ) + response.raise_for_status() + + result = response.json() + response_text = result.get("response", "") + + # Parse JSON from response + summary_data = json.loads(response_text) + + # Validate required fields + return { + "summary": summary_data.get("summary", "No summary generated"), + "key_points": summary_data.get("key_points", []), + "action_items": summary_data.get("action_items", []), + "decisions": summary_data.get("decisions", []), + "topics": summary_data.get("topics", []), + "sentiment": summary_data.get("sentiment", "neutral") + } + + except httpx.HTTPError as e: + log.error("Ollama request failed", error=str(e)) + raise HTTPException( + status_code=503, + detail=f"AI service unavailable: {str(e)}" + ) + except json.JSONDecodeError as e: + log.error("Failed to parse Ollama response", error=str(e)) + raise HTTPException( + status_code=500, + detail="Failed to parse AI response" + ) diff --git a/deploy/meeting-intelligence/api/app/routes/transcripts.py b/deploy/meeting-intelligence/api/app/routes/transcripts.py new file mode 100644 index 0000000..a47bf89 --- /dev/null +++ b/deploy/meeting-intelligence/api/app/routes/transcripts.py @@ -0,0 +1,161 @@ +""" +Transcript routes. +""" + +from typing import Optional, List + +from fastapi import APIRouter, HTTPException, Request, Query +from pydantic import BaseModel + +import structlog + +log = structlog.get_logger() + +router = APIRouter() + + +class TranscriptSegment(BaseModel): + id: int + segment_index: int + start_time: float + end_time: float + speaker_id: Optional[str] + speaker_name: Optional[str] + speaker_label: Optional[str] + text: str + confidence: Optional[float] + language: Optional[str] + + +class TranscriptResponse(BaseModel): + meeting_id: str + segments: List[TranscriptSegment] + total_segments: int + duration: Optional[float] + + +class SpeakerStats(BaseModel): + speaker_id: str + speaker_label: Optional[str] + segment_count: int + speaking_time: float + character_count: int + + +class SpeakersResponse(BaseModel): + meeting_id: str + speakers: List[SpeakerStats] + + +@router.get("/{meeting_id}/transcript", response_model=TranscriptResponse) +async def get_transcript( + request: Request, + meeting_id: str, + speaker: Optional[str] = Query(default=None, description="Filter by speaker ID") +): + """Get full transcript for a meeting.""" + db = request.app.state.db + + # Verify meeting exists + meeting = await db.get_meeting(meeting_id) + if not meeting: + raise HTTPException(status_code=404, detail="Meeting not found") + + segments = await db.get_transcript(meeting_id, speaker_filter=speaker) + + if not segments: + raise HTTPException( + status_code=404, + detail="No transcript available for this meeting" + ) + + # Calculate duration from last segment + duration = segments[-1]["end_time"] if segments else None + + return TranscriptResponse( + meeting_id=meeting_id, + segments=[ + TranscriptSegment( + id=s["id"], + segment_index=s["segment_index"], + start_time=s["start_time"], + end_time=s["end_time"], + speaker_id=s.get("speaker_id"), + speaker_name=s.get("speaker_name"), + speaker_label=s.get("speaker_label"), + text=s["text"], + confidence=s.get("confidence"), + language=s.get("language") + ) + for s in segments + ], + total_segments=len(segments), + duration=duration + ) + + +@router.get("/{meeting_id}/speakers", response_model=SpeakersResponse) +async def get_speakers(request: Request, meeting_id: str): + """Get speaker statistics for a meeting.""" + db = request.app.state.db + + # Verify meeting exists + meeting = await db.get_meeting(meeting_id) + if not meeting: + raise HTTPException(status_code=404, detail="Meeting not found") + + speakers = await db.get_speakers(meeting_id) + + return SpeakersResponse( + meeting_id=meeting_id, + speakers=[ + SpeakerStats( + speaker_id=s["speaker_id"], + speaker_label=s.get("speaker_label"), + segment_count=s["segment_count"], + speaking_time=float(s["speaking_time"] or 0), + character_count=s["character_count"] or 0 + ) + for s in speakers + ] + ) + + +@router.get("/{meeting_id}/transcript/text") +async def get_transcript_text(request: Request, meeting_id: str): + """Get transcript as plain text.""" + db = request.app.state.db + + # Verify meeting exists + meeting = await db.get_meeting(meeting_id) + if not meeting: + raise HTTPException(status_code=404, detail="Meeting not found") + + segments = await db.get_transcript(meeting_id) + + if not segments: + raise HTTPException( + status_code=404, + detail="No transcript available for this meeting" + ) + + # Format as plain text + lines = [] + current_speaker = None + + for s in segments: + speaker = s.get("speaker_label") or "Unknown" + + if speaker != current_speaker: + lines.append(f"\n{speaker}:") + current_speaker = speaker + + lines.append(f" {s['text']}") + + text = "\n".join(lines) + + return { + "meeting_id": meeting_id, + "text": text, + "format": "plain" + } diff --git a/deploy/meeting-intelligence/api/app/routes/webhooks.py b/deploy/meeting-intelligence/api/app/routes/webhooks.py new file mode 100644 index 0000000..4d8cedb --- /dev/null +++ b/deploy/meeting-intelligence/api/app/routes/webhooks.py @@ -0,0 +1,139 @@ +""" +Webhook routes for Jibri recording callbacks. +""" + +from datetime import datetime +from typing import Optional + +import httpx +from fastapi import APIRouter, HTTPException, Request, BackgroundTasks +from pydantic import BaseModel + +from ..config import settings + +import structlog + +log = structlog.get_logger() + +router = APIRouter() + + +class RecordingCompletePayload(BaseModel): + event_type: str + conference_id: str + recording_path: str + recording_dir: Optional[str] = None + file_size_bytes: Optional[int] = None + completed_at: Optional[str] = None + metadata: Optional[dict] = None + + +class WebhookResponse(BaseModel): + status: str + meeting_id: str + message: str + + +@router.post("/recording-complete", response_model=WebhookResponse) +async def recording_complete( + request: Request, + payload: RecordingCompletePayload, + background_tasks: BackgroundTasks +): + """ + Webhook called by Jibri when a recording completes. + + This triggers the processing pipeline: + 1. Create meeting record + 2. Queue transcription job + 3. (Later) Generate summary + """ + db = request.app.state.db + + log.info( + "Recording complete webhook received", + conference_id=payload.conference_id, + recording_path=payload.recording_path + ) + + # Save webhook event for audit + await db.save_webhook_event( + event_type=payload.event_type, + payload=payload.model_dump() + ) + + # Create meeting record + meeting_id = await db.create_meeting( + conference_id=payload.conference_id, + conference_name=payload.conference_id, # Use conference_id as name for now + title=f"Meeting - {payload.conference_id}", + recording_path=payload.recording_path, + started_at=datetime.utcnow(), # Will be updated from recording metadata + metadata=payload.metadata or {} + ) + + log.info("Meeting record created", meeting_id=meeting_id) + + # Update meeting status + await db.update_meeting(meeting_id, status="extracting_audio") + + # Queue transcription job + job_id = await db.create_job( + meeting_id=meeting_id, + job_type="transcribe", + priority=5, + result={ + "video_path": payload.recording_path, + "enable_diarization": True + } + ) + + log.info("Transcription job queued", job_id=job_id, meeting_id=meeting_id) + + # Trigger transcription service asynchronously + background_tasks.add_task( + _notify_transcriber, + meeting_id, + payload.recording_path + ) + + return WebhookResponse( + status="accepted", + meeting_id=meeting_id, + message="Recording queued for processing" + ) + + +async def _notify_transcriber(meeting_id: str, recording_path: str): + """Notify the transcription service to start processing.""" + try: + async with httpx.AsyncClient(timeout=30.0) as client: + response = await client.post( + f"{settings.transcriber_url}/transcribe", + json={ + "meeting_id": meeting_id, + "video_path": recording_path, + "enable_diarization": True + } + ) + response.raise_for_status() + log.info( + "Transcriber notified", + meeting_id=meeting_id, + response=response.json() + ) + except Exception as e: + log.error( + "Failed to notify transcriber", + meeting_id=meeting_id, + error=str(e) + ) + # Job is in database, transcriber will pick it up on next poll + + +@router.post("/test") +async def test_webhook(request: Request): + """Test endpoint for webhook connectivity.""" + body = await request.json() + log.info("Test webhook received", body=body) + return {"status": "ok", "received": body} diff --git a/deploy/meeting-intelligence/api/requirements.txt b/deploy/meeting-intelligence/api/requirements.txt new file mode 100644 index 0000000..d840225 --- /dev/null +++ b/deploy/meeting-intelligence/api/requirements.txt @@ -0,0 +1,37 @@ +# Meeting Intelligence API Dependencies + +# Web framework +fastapi==0.109.2 +uvicorn[standard]==0.27.1 +python-multipart==0.0.9 + +# Database +asyncpg==0.29.0 +sqlalchemy[asyncio]==2.0.25 +psycopg2-binary==2.9.9 + +# Redis +redis==5.0.1 + +# HTTP client (for Ollama) +httpx==0.26.0 +aiohttp==3.9.3 + +# Validation +pydantic==2.6.1 +pydantic-settings==2.1.0 + +# Sentence embeddings (for semantic search) +sentence-transformers==2.3.1 +numpy==1.26.4 + +# PDF export +reportlab==4.0.8 +markdown2==2.4.12 + +# Utilities +python-dotenv==1.0.1 +tenacity==8.2.3 + +# Logging +structlog==24.1.0 diff --git a/deploy/meeting-intelligence/docker-compose.yml b/deploy/meeting-intelligence/docker-compose.yml new file mode 100644 index 0000000..cf98bec --- /dev/null +++ b/deploy/meeting-intelligence/docker-compose.yml @@ -0,0 +1,186 @@ +# Meeting Intelligence System - Full Docker Compose +# Deploy on Netcup RS 8000 at /opt/meeting-intelligence/ +# +# Components: +# - Jibri (recording) +# - Transcriber (whisper.cpp + diarization) +# - Meeting Intelligence API +# - PostgreSQL (storage) +# - Redis (job queue) + +services: + # ============================================================ + # PostgreSQL Database + # ============================================================ + postgres: + image: pgvector/pgvector:pg16 + container_name: meeting-intelligence-db + restart: unless-stopped + environment: + POSTGRES_USER: meeting_intelligence + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-changeme} + POSTGRES_DB: meeting_intelligence + volumes: + - postgres_data:/var/lib/postgresql/data + - ./postgres/init.sql:/docker-entrypoint-initdb.d/init.sql:ro + healthcheck: + test: ["CMD-SHELL", "pg_isready -U meeting_intelligence"] + interval: 10s + timeout: 5s + retries: 5 + networks: + - meeting-intelligence + + # ============================================================ + # Redis Job Queue + # ============================================================ + redis: + image: redis:7-alpine + container_name: meeting-intelligence-redis + restart: unless-stopped + command: redis-server --appendonly yes + volumes: + - redis_data:/data + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + networks: + - meeting-intelligence + + # ============================================================ + # Transcription Service (whisper.cpp + diarization) + # ============================================================ + transcriber: + build: + context: ./transcriber + dockerfile: Dockerfile + container_name: meeting-intelligence-transcriber + restart: unless-stopped + environment: + REDIS_URL: redis://redis:6379 + POSTGRES_URL: postgresql://meeting_intelligence:${POSTGRES_PASSWORD:-changeme}@postgres:5432/meeting_intelligence + WHISPER_MODEL: small + WHISPER_THREADS: 8 + NUM_WORKERS: 4 + volumes: + - recordings:/recordings:ro + - audio_processed:/audio + - whisper_models:/models + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + deploy: + resources: + limits: + cpus: '12' + memory: 16G + networks: + - meeting-intelligence + + # ============================================================ + # Meeting Intelligence API + # ============================================================ + api: + build: + context: ./api + dockerfile: Dockerfile + container_name: meeting-intelligence-api + restart: unless-stopped + environment: + REDIS_URL: redis://redis:6379 + POSTGRES_URL: postgresql://meeting_intelligence:${POSTGRES_PASSWORD:-changeme}@postgres:5432/meeting_intelligence + OLLAMA_URL: http://host.docker.internal:11434 + RECORDINGS_PATH: /recordings + SECRET_KEY: ${API_SECRET_KEY:-changeme} + volumes: + - recordings:/recordings + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + labels: + - "traefik.enable=true" + - "traefik.http.routers.meeting-intelligence.rule=Host(`meet.jeffemmett.com`) && PathPrefix(`/api/intelligence`)" + - "traefik.http.services.meeting-intelligence.loadbalancer.server.port=8000" + - "traefik.http.routers.meeting-intelligence.middlewares=strip-intelligence-prefix" + - "traefik.http.middlewares.strip-intelligence-prefix.stripprefix.prefixes=/api/intelligence" + networks: + - meeting-intelligence + - traefik-public + + # ============================================================ + # Jibri Recording Service + # ============================================================ + jibri: + image: jitsi/jibri:stable-9584 + container_name: meeting-intelligence-jibri + restart: unless-stopped + privileged: true + environment: + # XMPP Connection + XMPP_SERVER: ${XMPP_SERVER:-meet.jeffemmett.com} + XMPP_DOMAIN: ${XMPP_DOMAIN:-meet.jeffemmett.com} + XMPP_AUTH_DOMAIN: auth.${XMPP_DOMAIN:-meet.jeffemmett.com} + XMPP_INTERNAL_MUC_DOMAIN: internal.auth.${XMPP_DOMAIN:-meet.jeffemmett.com} + XMPP_RECORDER_DOMAIN: recorder.${XMPP_DOMAIN:-meet.jeffemmett.com} + XMPP_MUC_DOMAIN: muc.${XMPP_DOMAIN:-meet.jeffemmett.com} + + # Jibri Settings + JIBRI_BREWERY_MUC: JibriBrewery + JIBRI_PENDING_TIMEOUT: 90 + JIBRI_RECORDING_DIR: /recordings + JIBRI_FINALIZE_RECORDING_SCRIPT_PATH: /config/finalize.sh + JIBRI_XMPP_USER: jibri + JIBRI_XMPP_PASSWORD: ${JIBRI_XMPP_PASSWORD:-changeme} + JIBRI_RECORDER_USER: recorder + JIBRI_RECORDER_PASSWORD: ${JIBRI_RECORDER_PASSWORD:-changeme} + + # Display Settings + DISPLAY: ":0" + CHROMIUM_FLAGS: --use-fake-ui-for-media-stream,--start-maximized,--kiosk,--enabled,--disable-infobars,--autoplay-policy=no-user-gesture-required + + # Public URL + PUBLIC_URL: https://${XMPP_DOMAIN:-meet.jeffemmett.com} + + # Timezone + TZ: UTC + volumes: + - recordings:/recordings + - ./jibri/config:/config + - /dev/shm:/dev/shm + cap_add: + - SYS_ADMIN + - NET_BIND_SERVICE + security_opt: + - seccomp:unconfined + shm_size: 2gb + networks: + - meeting-intelligence + +volumes: + postgres_data: + redis_data: + recordings: + driver: local + driver_opts: + type: none + o: bind + device: /opt/meetings/recordings + audio_processed: + driver: local + driver_opts: + type: none + o: bind + device: /opt/meetings/audio + whisper_models: + +networks: + meeting-intelligence: + driver: bridge + traefik-public: + external: true diff --git a/deploy/meeting-intelligence/jibri/config/finalize.sh b/deploy/meeting-intelligence/jibri/config/finalize.sh new file mode 100755 index 0000000..1a1f3b9 --- /dev/null +++ b/deploy/meeting-intelligence/jibri/config/finalize.sh @@ -0,0 +1,104 @@ +#!/bin/bash +# Jibri Recording Finalize Script +# Called when Jibri finishes recording a meeting +# +# Arguments: +# $1 - Recording directory path (e.g., /recordings//) +# +# This script: +# 1. Finds the recording file +# 2. Notifies the Meeting Intelligence API to start processing + +set -e + +RECORDING_DIR="$1" +API_URL="${MEETING_INTELLIGENCE_API:-http://api:8000}" +LOG_FILE="/var/log/jibri/finalize.log" + +log() { + echo "[$(date -Iseconds)] $1" >> "$LOG_FILE" + echo "[$(date -Iseconds)] $1" +} + +log "=== Finalize script started ===" +log "Recording directory: $RECORDING_DIR" + +# Validate recording directory +if [ -z "$RECORDING_DIR" ] || [ ! -d "$RECORDING_DIR" ]; then + log "ERROR: Invalid recording directory: $RECORDING_DIR" + exit 1 +fi + +# Find the recording file (MP4 or WebM) +RECORDING_FILE=$(find "$RECORDING_DIR" -type f \( -name "*.mp4" -o -name "*.webm" \) | head -1) + +if [ -z "$RECORDING_FILE" ]; then + log "ERROR: No recording file found in $RECORDING_DIR" + exit 1 +fi + +log "Found recording file: $RECORDING_FILE" + +# Get file info +FILE_SIZE=$(stat -c%s "$RECORDING_FILE" 2>/dev/null || echo "0") +log "Recording file size: $FILE_SIZE bytes" + +# Extract conference info from path +# Expected format: /recordings///recording.mp4 +CONFERENCE_ID=$(echo "$RECORDING_DIR" | awk -F'/' '{print $(NF-1)}') +if [ -z "$CONFERENCE_ID" ]; then + CONFERENCE_ID=$(basename "$(dirname "$RECORDING_DIR")") +fi + +# Look for metadata file (Jibri sometimes creates this) +METADATA_FILE="$RECORDING_DIR/metadata.json" +if [ -f "$METADATA_FILE" ]; then + log "Found metadata file: $METADATA_FILE" + METADATA=$(cat "$METADATA_FILE") +else + METADATA="{}" +fi + +# Prepare webhook payload +PAYLOAD=$(cat <&1) + +HTTP_CODE=$(echo "$RESPONSE" | tail -1) +BODY=$(echo "$RESPONSE" | head -n -1) + +if [ "$HTTP_CODE" = "200" ] || [ "$HTTP_CODE" = "201" ] || [ "$HTTP_CODE" = "202" ]; then + log "SUCCESS: Webhook accepted (HTTP $HTTP_CODE)" + log "Response: $BODY" +else + log "WARNING: Webhook returned HTTP $HTTP_CODE" + log "Response: $BODY" + + # Don't fail the script - the recording is still saved + # The API can be retried later +fi + +# Optional: Clean up old recordings (keep last 30 days) +# find /recordings -type f -mtime +30 -delete + +log "=== Finalize script completed ===" +exit 0 diff --git a/deploy/meeting-intelligence/postgres/init.sql b/deploy/meeting-intelligence/postgres/init.sql new file mode 100644 index 0000000..24556ca --- /dev/null +++ b/deploy/meeting-intelligence/postgres/init.sql @@ -0,0 +1,310 @@ +-- Meeting Intelligence System - PostgreSQL Schema +-- Uses pgvector extension for semantic search + +-- Enable required extensions +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; +CREATE EXTENSION IF NOT EXISTS "vector"; + +-- ============================================================ +-- Meetings Table +-- ============================================================ +CREATE TABLE meetings ( + id UUID PRIMARY KEY DEFAULT uuid_generate_v4(), + conference_id VARCHAR(255) NOT NULL, + conference_name VARCHAR(255), + title VARCHAR(500), + started_at TIMESTAMP WITH TIME ZONE, + ended_at TIMESTAMP WITH TIME ZONE, + duration_seconds INTEGER, + recording_path VARCHAR(1000), + audio_path VARCHAR(1000), + status VARCHAR(50) DEFAULT 'recording', + -- Status: 'recording', 'extracting_audio', 'transcribing', 'diarizing', 'summarizing', 'ready', 'failed' + error_message TEXT, + metadata JSONB DEFAULT '{}', + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX idx_meetings_conference_id ON meetings(conference_id); +CREATE INDEX idx_meetings_status ON meetings(status); +CREATE INDEX idx_meetings_started_at ON meetings(started_at DESC); +CREATE INDEX idx_meetings_created_at ON meetings(created_at DESC); + +-- ============================================================ +-- Meeting Participants +-- ============================================================ +CREATE TABLE meeting_participants ( + id SERIAL PRIMARY KEY, + meeting_id UUID NOT NULL REFERENCES meetings(id) ON DELETE CASCADE, + participant_id VARCHAR(255) NOT NULL, + display_name VARCHAR(255), + email VARCHAR(255), + joined_at TIMESTAMP WITH TIME ZONE, + left_at TIMESTAMP WITH TIME ZONE, + duration_seconds INTEGER, + is_moderator BOOLEAN DEFAULT FALSE, + metadata JSONB DEFAULT '{}', + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX idx_participants_meeting_id ON meeting_participants(meeting_id); +CREATE INDEX idx_participants_participant_id ON meeting_participants(participant_id); + +-- ============================================================ +-- Transcripts +-- ============================================================ +CREATE TABLE transcripts ( + id SERIAL PRIMARY KEY, + meeting_id UUID NOT NULL REFERENCES meetings(id) ON DELETE CASCADE, + segment_index INTEGER NOT NULL, + start_time FLOAT NOT NULL, + end_time FLOAT NOT NULL, + speaker_id VARCHAR(255), + speaker_name VARCHAR(255), + speaker_label VARCHAR(50), -- e.g., "Speaker 1", "Speaker 2" + text TEXT NOT NULL, + confidence FLOAT, + language VARCHAR(10) DEFAULT 'en', + word_timestamps JSONB, -- Array of {word, start, end, confidence} + metadata JSONB DEFAULT '{}', + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX idx_transcripts_meeting_id ON transcripts(meeting_id); +CREATE INDEX idx_transcripts_speaker_id ON transcripts(speaker_id); +CREATE INDEX idx_transcripts_start_time ON transcripts(meeting_id, start_time); +CREATE INDEX idx_transcripts_text_search ON transcripts USING gin(to_tsvector('english', text)); + +-- ============================================================ +-- Transcript Embeddings (for semantic search) +-- ============================================================ +CREATE TABLE transcript_embeddings ( + id SERIAL PRIMARY KEY, + transcript_id INTEGER NOT NULL REFERENCES transcripts(id) ON DELETE CASCADE, + meeting_id UUID NOT NULL REFERENCES meetings(id) ON DELETE CASCADE, + embedding vector(384), -- all-MiniLM-L6-v2 dimensions + chunk_text TEXT, -- The text chunk this embedding represents + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX idx_embeddings_transcript_id ON transcript_embeddings(transcript_id); +CREATE INDEX idx_embeddings_meeting_id ON transcript_embeddings(meeting_id); +CREATE INDEX idx_embeddings_vector ON transcript_embeddings USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100); + +-- ============================================================ +-- AI Summaries +-- ============================================================ +CREATE TABLE summaries ( + id SERIAL PRIMARY KEY, + meeting_id UUID NOT NULL REFERENCES meetings(id) ON DELETE CASCADE, + summary_text TEXT, + key_points JSONB, -- Array of key point strings + action_items JSONB, -- Array of {task, assignee, due_date, completed} + decisions JSONB, -- Array of decision strings + topics JSONB, -- Array of {topic, duration_seconds, relevance_score} + sentiment VARCHAR(50), -- 'positive', 'neutral', 'negative', 'mixed' + sentiment_scores JSONB, -- {positive: 0.7, neutral: 0.2, negative: 0.1} + participants_summary JSONB, -- {participant_id: {speaking_time, word_count, topics}} + model_used VARCHAR(100), + prompt_tokens INTEGER, + completion_tokens INTEGER, + generated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + metadata JSONB DEFAULT '{}' +); + +CREATE INDEX idx_summaries_meeting_id ON summaries(meeting_id); +CREATE INDEX idx_summaries_generated_at ON summaries(generated_at DESC); + +-- ============================================================ +-- Processing Jobs Queue +-- ============================================================ +CREATE TABLE processing_jobs ( + id SERIAL PRIMARY KEY, + meeting_id UUID NOT NULL REFERENCES meetings(id) ON DELETE CASCADE, + job_type VARCHAR(50) NOT NULL, -- 'extract_audio', 'transcribe', 'diarize', 'summarize', 'embed' + status VARCHAR(50) DEFAULT 'pending', -- 'pending', 'processing', 'completed', 'failed', 'cancelled' + priority INTEGER DEFAULT 5, -- 1 = highest, 10 = lowest + attempts INTEGER DEFAULT 0, + max_attempts INTEGER DEFAULT 3, + started_at TIMESTAMP WITH TIME ZONE, + completed_at TIMESTAMP WITH TIME ZONE, + error_message TEXT, + result JSONB, + worker_id VARCHAR(100), + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX idx_jobs_meeting_id ON processing_jobs(meeting_id); +CREATE INDEX idx_jobs_status ON processing_jobs(status, priority, created_at); +CREATE INDEX idx_jobs_type_status ON processing_jobs(job_type, status); + +-- ============================================================ +-- Search History (for analytics) +-- ============================================================ +CREATE TABLE search_history ( + id SERIAL PRIMARY KEY, + user_id VARCHAR(255), + query TEXT NOT NULL, + search_type VARCHAR(50), -- 'text', 'semantic', 'combined' + results_count INTEGER, + meeting_ids UUID[], + filters JSONB, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX idx_search_history_created_at ON search_history(created_at DESC); + +-- ============================================================ +-- Webhook Events (for Jibri callbacks) +-- ============================================================ +CREATE TABLE webhook_events ( + id SERIAL PRIMARY KEY, + event_type VARCHAR(100) NOT NULL, + payload JSONB NOT NULL, + processed BOOLEAN DEFAULT FALSE, + processed_at TIMESTAMP WITH TIME ZONE, + error_message TEXT, + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +CREATE INDEX idx_webhooks_processed ON webhook_events(processed, created_at); + +-- ============================================================ +-- Functions +-- ============================================================ + +-- Update timestamp trigger +CREATE OR REPLACE FUNCTION update_updated_at() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = NOW(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER meetings_updated_at + BEFORE UPDATE ON meetings + FOR EACH ROW + EXECUTE FUNCTION update_updated_at(); + +CREATE TRIGGER jobs_updated_at + BEFORE UPDATE ON processing_jobs + FOR EACH ROW + EXECUTE FUNCTION update_updated_at(); + +-- Semantic search function +CREATE OR REPLACE FUNCTION semantic_search( + query_embedding vector(384), + match_threshold FLOAT DEFAULT 0.7, + match_count INT DEFAULT 10, + meeting_filter UUID DEFAULT NULL +) +RETURNS TABLE ( + transcript_id INT, + meeting_id UUID, + chunk_text TEXT, + similarity FLOAT +) AS $$ +BEGIN + RETURN QUERY + SELECT + te.transcript_id, + te.meeting_id, + te.chunk_text, + 1 - (te.embedding <=> query_embedding) AS similarity + FROM transcript_embeddings te + WHERE + (meeting_filter IS NULL OR te.meeting_id = meeting_filter) + AND 1 - (te.embedding <=> query_embedding) > match_threshold + ORDER BY te.embedding <=> query_embedding + LIMIT match_count; +END; +$$ LANGUAGE plpgsql; + +-- Full-text search function +CREATE OR REPLACE FUNCTION fulltext_search( + search_query TEXT, + meeting_filter UUID DEFAULT NULL, + match_count INT DEFAULT 50 +) +RETURNS TABLE ( + transcript_id INT, + meeting_id UUID, + text TEXT, + speaker_name VARCHAR, + start_time FLOAT, + rank FLOAT +) AS $$ +BEGIN + RETURN QUERY + SELECT + t.id, + t.meeting_id, + t.text, + t.speaker_name, + t.start_time, + ts_rank(to_tsvector('english', t.text), plainto_tsquery('english', search_query)) AS rank + FROM transcripts t + WHERE + (meeting_filter IS NULL OR t.meeting_id = meeting_filter) + AND to_tsvector('english', t.text) @@ plainto_tsquery('english', search_query) + ORDER BY rank DESC + LIMIT match_count; +END; +$$ LANGUAGE plpgsql; + +-- ============================================================ +-- Views +-- ============================================================ + +-- Meeting overview with stats +CREATE VIEW meeting_overview AS +SELECT + m.id, + m.conference_id, + m.conference_name, + m.title, + m.started_at, + m.ended_at, + m.duration_seconds, + m.status, + m.recording_path, + COUNT(DISTINCT mp.id) AS participant_count, + COUNT(DISTINCT t.id) AS transcript_segment_count, + COALESCE(SUM(LENGTH(t.text)), 0) AS total_characters, + s.id IS NOT NULL AS has_summary, + m.created_at +FROM meetings m +LEFT JOIN meeting_participants mp ON m.id = mp.meeting_id +LEFT JOIN transcripts t ON m.id = t.meeting_id +LEFT JOIN summaries s ON m.id = s.meeting_id +GROUP BY m.id, s.id; + +-- Speaker stats per meeting +CREATE VIEW speaker_stats AS +SELECT + t.meeting_id, + t.speaker_id, + t.speaker_name, + t.speaker_label, + COUNT(*) AS segment_count, + SUM(t.end_time - t.start_time) AS speaking_time_seconds, + SUM(LENGTH(t.text)) AS character_count, + SUM(array_length(regexp_split_to_array(t.text, '\s+'), 1)) AS word_count +FROM transcripts t +GROUP BY t.meeting_id, t.speaker_id, t.speaker_name, t.speaker_label; + +-- ============================================================ +-- Sample Data (for testing - remove in production) +-- ============================================================ + +-- INSERT INTO meetings (conference_id, conference_name, title, started_at, status) +-- VALUES ('test-room-123', 'Test Room', 'Test Meeting', NOW() - INTERVAL '1 hour', 'ready'); + +COMMENT ON TABLE meetings IS 'Stores meeting metadata and processing status'; +COMMENT ON TABLE transcripts IS 'Stores time-stamped transcript segments with speaker attribution'; +COMMENT ON TABLE summaries IS 'Stores AI-generated meeting summaries and extracted information'; +COMMENT ON TABLE transcript_embeddings IS 'Stores vector embeddings for semantic search'; +COMMENT ON TABLE processing_jobs IS 'Job queue for async processing tasks'; diff --git a/deploy/meeting-intelligence/transcriber/Dockerfile b/deploy/meeting-intelligence/transcriber/Dockerfile new file mode 100644 index 0000000..eaabce3 --- /dev/null +++ b/deploy/meeting-intelligence/transcriber/Dockerfile @@ -0,0 +1,67 @@ +# Meeting Intelligence Transcription Service +# Uses whisper.cpp for fast CPU-based transcription +# Uses resemblyzer for speaker diarization + +FROM python:3.11-slim AS builder + +# Install build dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + cmake \ + git \ + ffmpeg \ + && rm -rf /var/lib/apt/lists/* + +# Build whisper.cpp +WORKDIR /build +RUN git clone https://github.com/ggerganov/whisper.cpp.git && \ + cd whisper.cpp && \ + cmake -B build -DWHISPER_BUILD_EXAMPLES=ON && \ + cmake --build build --config Release -j$(nproc) && \ + cp build/bin/whisper-cli /usr/local/bin/whisper && \ + cp build/bin/whisper-server /usr/local/bin/whisper-server 2>/dev/null || true + +# Download whisper models +WORKDIR /models +RUN cd /build/whisper.cpp && \ + bash models/download-ggml-model.sh small && \ + mv models/ggml-small.bin /models/ + +# Production image +FROM python:3.11-slim + +# Install runtime dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + ffmpeg \ + libsndfile1 \ + && rm -rf /var/lib/apt/lists/* + +# Copy whisper binary and models +COPY --from=builder /usr/local/bin/whisper /usr/local/bin/whisper +COPY --from=builder /models /models + +# Set up Python environment +WORKDIR /app + +# Install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY app/ ./app/ + +# Create directories +RUN mkdir -p /recordings /audio /logs + +# Environment variables +ENV PYTHONUNBUFFERED=1 +ENV WHISPER_MODEL=/models/ggml-small.bin +ENV WHISPER_THREADS=8 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ + CMD curl -f http://localhost:8001/health || exit 1 + +# Run the service +EXPOSE 8001 +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8001", "--workers", "1"] diff --git a/deploy/meeting-intelligence/transcriber/app/__init__.py b/deploy/meeting-intelligence/transcriber/app/__init__.py new file mode 100644 index 0000000..ab06c1d --- /dev/null +++ b/deploy/meeting-intelligence/transcriber/app/__init__.py @@ -0,0 +1 @@ +# Meeting Intelligence Transcription Service diff --git a/deploy/meeting-intelligence/transcriber/app/config.py b/deploy/meeting-intelligence/transcriber/app/config.py new file mode 100644 index 0000000..84299d5 --- /dev/null +++ b/deploy/meeting-intelligence/transcriber/app/config.py @@ -0,0 +1,45 @@ +""" +Configuration settings for the Transcription Service. +""" + +import os +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + """Application settings loaded from environment variables.""" + + # Redis configuration + redis_url: str = "redis://localhost:6379" + + # PostgreSQL configuration + postgres_url: str = "postgresql://meeting_intelligence:changeme@localhost:5432/meeting_intelligence" + + # Whisper configuration + whisper_model: str = "/models/ggml-small.bin" + whisper_threads: int = 8 + whisper_language: str = "en" + + # Worker configuration + num_workers: int = 4 + job_timeout: int = 7200 # 2 hours in seconds + + # Audio processing + audio_sample_rate: int = 16000 + audio_channels: int = 1 + + # Diarization settings + min_speaker_duration: float = 0.5 # Minimum speaker segment in seconds + max_speakers: int = 10 + + # Paths + recordings_path: str = "/recordings" + audio_output_path: str = "/audio" + temp_path: str = "/tmp/transcriber" + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" + + +settings = Settings() diff --git a/deploy/meeting-intelligence/transcriber/app/database.py b/deploy/meeting-intelligence/transcriber/app/database.py new file mode 100644 index 0000000..ad6d763 --- /dev/null +++ b/deploy/meeting-intelligence/transcriber/app/database.py @@ -0,0 +1,245 @@ +""" +Database operations for the Transcription Service. +""" + +import uuid +from typing import Optional, List, Dict, Any + +import asyncpg +import structlog + +log = structlog.get_logger() + + +class Database: + """Database operations for transcription service.""" + + def __init__(self, connection_string: str): + self.connection_string = connection_string + self.pool: Optional[asyncpg.Pool] = None + + async def connect(self): + """Establish database connection pool.""" + log.info("Connecting to database...") + self.pool = await asyncpg.create_pool( + self.connection_string, + min_size=2, + max_size=10 + ) + log.info("Database connected") + + async def disconnect(self): + """Close database connection pool.""" + if self.pool: + await self.pool.close() + log.info("Database disconnected") + + async def health_check(self): + """Check database connectivity.""" + async with self.pool.acquire() as conn: + await conn.fetchval("SELECT 1") + + async def create_transcription_job( + self, + meeting_id: str, + audio_path: Optional[str] = None, + video_path: Optional[str] = None, + enable_diarization: bool = True, + language: Optional[str] = None, + priority: int = 5 + ) -> str: + """Create a new transcription job.""" + job_id = str(uuid.uuid4()) + + async with self.pool.acquire() as conn: + await conn.execute(""" + INSERT INTO processing_jobs ( + id, meeting_id, job_type, status, priority, + result + ) + VALUES ($1, $2::uuid, 'transcribe', 'pending', $3, $4) + """, job_id, meeting_id, priority, { + "audio_path": audio_path, + "video_path": video_path, + "enable_diarization": enable_diarization, + "language": language + }) + + log.info("Created transcription job", job_id=job_id, meeting_id=meeting_id) + return job_id + + async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]: + """Get a job by ID.""" + async with self.pool.acquire() as conn: + row = await conn.fetchrow(""" + SELECT id, meeting_id, job_type, status, priority, + attempts, started_at, completed_at, + error_message, result, created_at + FROM processing_jobs + WHERE id = $1 + """, job_id) + + if row: + return dict(row) + return None + + async def get_next_pending_job(self) -> Optional[Dict[str, Any]]: + """Get the next pending job and mark it as processing.""" + async with self.pool.acquire() as conn: + # Use FOR UPDATE SKIP LOCKED to prevent race conditions + row = await conn.fetchrow(""" + UPDATE processing_jobs + SET status = 'processing', + started_at = NOW(), + attempts = attempts + 1 + WHERE id = ( + SELECT id FROM processing_jobs + WHERE status = 'pending' + AND job_type = 'transcribe' + ORDER BY priority ASC, created_at ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + ) + RETURNING id, meeting_id, job_type, result + """) + + if row: + result = dict(row) + # Merge result JSON into the dict + if result.get("result"): + result.update(result["result"]) + return result + return None + + async def update_job_status( + self, + job_id: str, + status: str, + error_message: Optional[str] = None, + result: Optional[dict] = None, + progress: Optional[float] = None + ): + """Update job status.""" + async with self.pool.acquire() as conn: + if status == "completed": + await conn.execute(""" + UPDATE processing_jobs + SET status = $1, + completed_at = NOW(), + error_message = $2, + result = COALESCE($3::jsonb, result) + WHERE id = $4 + """, status, error_message, result, job_id) + else: + update_result = result + if progress is not None: + update_result = result or {} + update_result["progress"] = progress + + await conn.execute(""" + UPDATE processing_jobs + SET status = $1, + error_message = $2, + result = COALESCE($3::jsonb, result) + WHERE id = $4 + """, status, error_message, update_result, job_id) + + async def update_job_audio_path(self, job_id: str, audio_path: str): + """Update the audio path for a job.""" + async with self.pool.acquire() as conn: + await conn.execute(""" + UPDATE processing_jobs + SET result = result || $1::jsonb + WHERE id = $2 + """, {"audio_path": audio_path}, job_id) + + async def update_meeting_status(self, meeting_id: str, status: str): + """Update meeting processing status.""" + async with self.pool.acquire() as conn: + await conn.execute(""" + UPDATE meetings + SET status = $1, + updated_at = NOW() + WHERE id = $2::uuid + """, status, meeting_id) + + async def insert_transcript_segment( + self, + meeting_id: str, + segment_index: int, + start_time: float, + end_time: float, + text: str, + speaker_id: Optional[str] = None, + speaker_label: Optional[str] = None, + confidence: Optional[float] = None, + language: str = "en" + ): + """Insert a transcript segment.""" + async with self.pool.acquire() as conn: + await conn.execute(""" + INSERT INTO transcripts ( + meeting_id, segment_index, start_time, end_time, + text, speaker_id, speaker_label, confidence, language + ) + VALUES ($1::uuid, $2, $3, $4, $5, $6, $7, $8, $9) + """, meeting_id, segment_index, start_time, end_time, + text, speaker_id, speaker_label, confidence, language) + + async def get_transcript(self, meeting_id: str) -> List[Dict[str, Any]]: + """Get all transcript segments for a meeting.""" + async with self.pool.acquire() as conn: + rows = await conn.fetch(""" + SELECT id, segment_index, start_time, end_time, + speaker_id, speaker_label, text, confidence, language + FROM transcripts + WHERE meeting_id = $1::uuid + ORDER BY segment_index ASC + """, meeting_id) + + return [dict(row) for row in rows] + + async def get_meeting(self, meeting_id: str) -> Optional[Dict[str, Any]]: + """Get meeting details.""" + async with self.pool.acquire() as conn: + row = await conn.fetchrow(""" + SELECT id, conference_id, conference_name, title, + started_at, ended_at, duration_seconds, + recording_path, audio_path, status, + metadata, created_at + FROM meetings + WHERE id = $1::uuid + """, meeting_id) + + if row: + return dict(row) + return None + + async def create_meeting( + self, + conference_id: str, + conference_name: Optional[str] = None, + title: Optional[str] = None, + recording_path: Optional[str] = None, + metadata: Optional[dict] = None + ) -> str: + """Create a new meeting record.""" + meeting_id = str(uuid.uuid4()) + + async with self.pool.acquire() as conn: + await conn.execute(""" + INSERT INTO meetings ( + id, conference_id, conference_name, title, + recording_path, status, metadata + ) + VALUES ($1, $2, $3, $4, $5, 'recording', $6) + """, meeting_id, conference_id, conference_name, title, + recording_path, metadata or {}) + + log.info("Created meeting", meeting_id=meeting_id, conference_id=conference_id) + return meeting_id + + +class DatabaseError(Exception): + """Database operation error.""" + pass diff --git a/deploy/meeting-intelligence/transcriber/app/diarizer.py b/deploy/meeting-intelligence/transcriber/app/diarizer.py new file mode 100644 index 0000000..56e8744 --- /dev/null +++ b/deploy/meeting-intelligence/transcriber/app/diarizer.py @@ -0,0 +1,338 @@ +""" +Speaker Diarization using resemblyzer. + +Identifies who spoke when in the audio. +""" + +import os +from dataclasses import dataclass +from typing import List, Optional, Tuple + +import numpy as np +import soundfile as sf +from resemblyzer import VoiceEncoder, preprocess_wav +from sklearn.cluster import AgglomerativeClustering + +import structlog + +log = structlog.get_logger() + + +@dataclass +class SpeakerSegment: + """A segment attributed to a speaker.""" + start: float + end: float + speaker_id: str + speaker_label: str # e.g., "Speaker 1" + confidence: Optional[float] = None + + +class SpeakerDiarizer: + """Speaker diarization using voice embeddings.""" + + def __init__( + self, + min_segment_duration: float = 0.5, + max_speakers: int = 10, + embedding_step: float = 0.5 # Step size for embeddings in seconds + ): + self.min_segment_duration = min_segment_duration + self.max_speakers = max_speakers + self.embedding_step = embedding_step + + # Load voice encoder (this downloads the model on first use) + log.info("Loading voice encoder model...") + self.encoder = VoiceEncoder() + log.info("Voice encoder loaded") + + def diarize( + self, + audio_path: str, + num_speakers: Optional[int] = None, + transcript_segments: Optional[List[dict]] = None + ) -> List[SpeakerSegment]: + """ + Perform speaker diarization on an audio file. + + Args: + audio_path: Path to audio file (WAV, 16kHz mono) + num_speakers: Number of speakers (if known), otherwise auto-detected + transcript_segments: Optional transcript segments to align with + + Returns: + List of SpeakerSegment with speaker attributions + """ + if not os.path.exists(audio_path): + raise FileNotFoundError(f"Audio file not found: {audio_path}") + + log.info("Starting speaker diarization", audio_path=audio_path) + + # Load and preprocess audio + wav, sample_rate = sf.read(audio_path) + + if sample_rate != 16000: + log.warning(f"Audio sample rate is {sample_rate}, expected 16000") + + # Ensure mono + if len(wav.shape) > 1: + wav = wav.mean(axis=1) + + # Preprocess for resemblyzer + wav = preprocess_wav(wav) + + if len(wav) == 0: + log.warning("Audio file is empty after preprocessing") + return [] + + # Generate embeddings for sliding windows + embeddings, timestamps = self._generate_embeddings(wav, sample_rate) + + if len(embeddings) == 0: + log.warning("No embeddings generated") + return [] + + # Cluster embeddings to identify speakers + speaker_labels = self._cluster_speakers( + embeddings, + num_speakers=num_speakers + ) + + # Convert to speaker segments + segments = self._create_segments(timestamps, speaker_labels) + + # If transcript segments provided, align them + if transcript_segments: + segments = self._align_with_transcript(segments, transcript_segments) + + log.info( + "Diarization complete", + num_segments=len(segments), + num_speakers=len(set(s.speaker_id for s in segments)) + ) + + return segments + + def _generate_embeddings( + self, + wav: np.ndarray, + sample_rate: int + ) -> Tuple[np.ndarray, List[float]]: + """Generate voice embeddings for sliding windows.""" + embeddings = [] + timestamps = [] + + # Window size in samples (1.5 seconds for good speaker representation) + window_size = int(1.5 * sample_rate) + step_size = int(self.embedding_step * sample_rate) + + # Slide through audio + for start_sample in range(0, len(wav) - window_size, step_size): + end_sample = start_sample + window_size + window = wav[start_sample:end_sample] + + # Get embedding for this window + try: + embedding = self.encoder.embed_utterance(window) + embeddings.append(embedding) + timestamps.append(start_sample / sample_rate) + except Exception as e: + log.debug(f"Failed to embed window at {start_sample/sample_rate}s: {e}") + continue + + return np.array(embeddings), timestamps + + def _cluster_speakers( + self, + embeddings: np.ndarray, + num_speakers: Optional[int] = None + ) -> np.ndarray: + """Cluster embeddings to identify speakers.""" + if len(embeddings) == 0: + return np.array([]) + + # If number of speakers not specified, estimate it + if num_speakers is None: + num_speakers = self._estimate_num_speakers(embeddings) + + # Ensure we don't exceed max speakers or embedding count + num_speakers = min(num_speakers, self.max_speakers, len(embeddings)) + num_speakers = max(num_speakers, 1) + + log.info(f"Clustering with {num_speakers} speakers") + + # Use agglomerative clustering + clustering = AgglomerativeClustering( + n_clusters=num_speakers, + metric="cosine", + linkage="average" + ) + + labels = clustering.fit_predict(embeddings) + + return labels + + def _estimate_num_speakers(self, embeddings: np.ndarray) -> int: + """Estimate the number of speakers from embeddings.""" + if len(embeddings) < 2: + return 1 + + # Try different numbers of clusters and find the best + best_score = -1 + best_n = 2 + + for n in range(2, min(6, len(embeddings))): + try: + clustering = AgglomerativeClustering( + n_clusters=n, + metric="cosine", + linkage="average" + ) + labels = clustering.fit_predict(embeddings) + + # Calculate silhouette-like score + score = self._cluster_quality_score(embeddings, labels) + + if score > best_score: + best_score = score + best_n = n + except Exception: + continue + + log.info(f"Estimated {best_n} speakers (score: {best_score:.3f})") + return best_n + + def _cluster_quality_score( + self, + embeddings: np.ndarray, + labels: np.ndarray + ) -> float: + """Calculate a simple cluster quality score.""" + unique_labels = np.unique(labels) + + if len(unique_labels) < 2: + return 0.0 + + # Calculate average intra-cluster distance + intra_distances = [] + for label in unique_labels: + cluster_embeddings = embeddings[labels == label] + if len(cluster_embeddings) > 1: + # Cosine distance within cluster + for i in range(len(cluster_embeddings)): + for j in range(i + 1, len(cluster_embeddings)): + dist = 1 - np.dot(cluster_embeddings[i], cluster_embeddings[j]) + intra_distances.append(dist) + + if not intra_distances: + return 0.0 + + avg_intra = np.mean(intra_distances) + + # Calculate average inter-cluster distance + inter_distances = [] + cluster_centers = [] + for label in unique_labels: + cluster_embeddings = embeddings[labels == label] + center = cluster_embeddings.mean(axis=0) + cluster_centers.append(center) + + for i in range(len(cluster_centers)): + for j in range(i + 1, len(cluster_centers)): + dist = 1 - np.dot(cluster_centers[i], cluster_centers[j]) + inter_distances.append(dist) + + avg_inter = np.mean(inter_distances) if inter_distances else 1.0 + + # Score: higher inter-cluster distance, lower intra-cluster distance is better + return (avg_inter - avg_intra) / max(avg_inter, avg_intra, 0.001) + + def _create_segments( + self, + timestamps: List[float], + labels: np.ndarray + ) -> List[SpeakerSegment]: + """Convert clustered timestamps to speaker segments.""" + if len(timestamps) == 0: + return [] + + segments = [] + current_speaker = labels[0] + segment_start = timestamps[0] + + for i in range(1, len(timestamps)): + if labels[i] != current_speaker: + # End current segment + segment_end = timestamps[i] + + if segment_end - segment_start >= self.min_segment_duration: + segments.append(SpeakerSegment( + start=segment_start, + end=segment_end, + speaker_id=f"speaker_{current_speaker}", + speaker_label=f"Speaker {current_speaker + 1}" + )) + + # Start new segment + current_speaker = labels[i] + segment_start = timestamps[i] + + # Add final segment + if len(timestamps) > 0: + segment_end = timestamps[-1] + self.embedding_step + if segment_end - segment_start >= self.min_segment_duration: + segments.append(SpeakerSegment( + start=segment_start, + end=segment_end, + speaker_id=f"speaker_{current_speaker}", + speaker_label=f"Speaker {current_speaker + 1}" + )) + + return segments + + def _align_with_transcript( + self, + speaker_segments: List[SpeakerSegment], + transcript_segments: List[dict] + ) -> List[SpeakerSegment]: + """Align speaker segments with transcript segments.""" + aligned = [] + + for trans in transcript_segments: + trans_start = trans.get("start", 0) + trans_end = trans.get("end", 0) + trans_mid = (trans_start + trans_end) / 2 + + # Find the speaker segment that best overlaps + best_speaker = None + best_overlap = 0 + + for speaker in speaker_segments: + # Calculate overlap + overlap_start = max(trans_start, speaker.start) + overlap_end = min(trans_end, speaker.end) + overlap = max(0, overlap_end - overlap_start) + + if overlap > best_overlap: + best_overlap = overlap + best_speaker = speaker + + if best_speaker: + aligned.append(SpeakerSegment( + start=trans_start, + end=trans_end, + speaker_id=best_speaker.speaker_id, + speaker_label=best_speaker.speaker_label, + confidence=best_overlap / (trans_end - trans_start) if trans_end > trans_start else 0 + )) + else: + # No match, assign unknown speaker + aligned.append(SpeakerSegment( + start=trans_start, + end=trans_end, + speaker_id="speaker_unknown", + speaker_label="Unknown Speaker", + confidence=0 + )) + + return aligned diff --git a/deploy/meeting-intelligence/transcriber/app/main.py b/deploy/meeting-intelligence/transcriber/app/main.py new file mode 100644 index 0000000..6cea312 --- /dev/null +++ b/deploy/meeting-intelligence/transcriber/app/main.py @@ -0,0 +1,274 @@ +""" +Meeting Intelligence Transcription Service + +FastAPI service that handles: +- Audio extraction from video recordings +- Transcription using whisper.cpp +- Speaker diarization using resemblyzer +- Job queue management via Redis +""" + +import asyncio +import os +from contextlib import asynccontextmanager +from typing import Optional + +from fastapi import FastAPI, BackgroundTasks, HTTPException +from fastapi.responses import JSONResponse +from pydantic import BaseModel +from redis import Redis +from rq import Queue + +from .config import settings +from .transcriber import WhisperTranscriber +from .diarizer import SpeakerDiarizer +from .processor import JobProcessor +from .database import Database + +import structlog + +log = structlog.get_logger() + + +# Pydantic models +class TranscribeRequest(BaseModel): + meeting_id: str + audio_path: str + priority: int = 5 + enable_diarization: bool = True + language: Optional[str] = None + + +class TranscribeResponse(BaseModel): + job_id: str + status: str + message: str + + +class JobStatus(BaseModel): + job_id: str + status: str + progress: Optional[float] = None + result: Optional[dict] = None + error: Optional[str] = None + + +# Application state +class AppState: + redis: Optional[Redis] = None + queue: Optional[Queue] = None + db: Optional[Database] = None + transcriber: Optional[WhisperTranscriber] = None + diarizer: Optional[SpeakerDiarizer] = None + processor: Optional[JobProcessor] = None + + +state = AppState() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application startup and shutdown.""" + log.info("Starting transcription service...") + + # Initialize Redis connection + state.redis = Redis.from_url(settings.redis_url) + state.queue = Queue("transcription", connection=state.redis) + + # Initialize database + state.db = Database(settings.postgres_url) + await state.db.connect() + + # Initialize transcriber + state.transcriber = WhisperTranscriber( + model_path=settings.whisper_model, + threads=settings.whisper_threads + ) + + # Initialize diarizer + state.diarizer = SpeakerDiarizer() + + # Initialize job processor + state.processor = JobProcessor( + transcriber=state.transcriber, + diarizer=state.diarizer, + db=state.db, + redis=state.redis + ) + + # Start background worker + asyncio.create_task(state.processor.process_jobs()) + + log.info("Transcription service started successfully") + + yield + + # Shutdown + log.info("Shutting down transcription service...") + if state.processor: + await state.processor.stop() + if state.db: + await state.db.disconnect() + if state.redis: + state.redis.close() + + log.info("Transcription service stopped") + + +app = FastAPI( + title="Meeting Intelligence Transcription Service", + description="Transcription and speaker diarization for meeting recordings", + version="1.0.0", + lifespan=lifespan +) + + +@app.get("/health") +async def health_check(): + """Health check endpoint.""" + redis_ok = False + db_ok = False + + try: + if state.redis: + state.redis.ping() + redis_ok = True + except Exception as e: + log.error("Redis health check failed", error=str(e)) + + try: + if state.db: + await state.db.health_check() + db_ok = True + except Exception as e: + log.error("Database health check failed", error=str(e)) + + status = "healthy" if (redis_ok and db_ok) else "unhealthy" + + return { + "status": status, + "redis": redis_ok, + "database": db_ok, + "whisper_model": settings.whisper_model, + "threads": settings.whisper_threads + } + + +@app.get("/status") +async def service_status(): + """Get service status and queue info.""" + queue_length = state.queue.count if state.queue else 0 + processing = state.processor.active_jobs if state.processor else 0 + + return { + "status": "running", + "queue_length": queue_length, + "active_jobs": processing, + "workers": settings.num_workers, + "model": os.path.basename(settings.whisper_model) + } + + +@app.post("/transcribe", response_model=TranscribeResponse) +async def queue_transcription(request: TranscribeRequest, background_tasks: BackgroundTasks): + """Queue a transcription job.""" + log.info( + "Received transcription request", + meeting_id=request.meeting_id, + audio_path=request.audio_path + ) + + # Validate audio file exists + if not os.path.exists(request.audio_path): + raise HTTPException( + status_code=404, + detail=f"Audio file not found: {request.audio_path}" + ) + + # Create job record in database + try: + job_id = await state.db.create_transcription_job( + meeting_id=request.meeting_id, + audio_path=request.audio_path, + enable_diarization=request.enable_diarization, + language=request.language, + priority=request.priority + ) + except Exception as e: + log.error("Failed to create job", error=str(e)) + raise HTTPException(status_code=500, detail=str(e)) + + # Queue the job + state.queue.enqueue( + "app.worker.process_transcription", + job_id, + job_timeout="2h", + result_ttl=86400 # 24 hours + ) + + log.info("Job queued", job_id=job_id) + + return TranscribeResponse( + job_id=job_id, + status="queued", + message="Transcription job queued successfully" + ) + + +@app.get("/transcribe/{job_id}", response_model=JobStatus) +async def get_job_status(job_id: str): + """Get the status of a transcription job.""" + job = await state.db.get_job(job_id) + + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + return JobStatus( + job_id=job_id, + status=job["status"], + progress=job.get("progress"), + result=job.get("result"), + error=job.get("error_message") + ) + + +@app.delete("/transcribe/{job_id}") +async def cancel_job(job_id: str): + """Cancel a pending transcription job.""" + job = await state.db.get_job(job_id) + + if not job: + raise HTTPException(status_code=404, detail="Job not found") + + if job["status"] not in ["pending", "queued"]: + raise HTTPException( + status_code=400, + detail=f"Cannot cancel job in status: {job['status']}" + ) + + await state.db.update_job_status(job_id, "cancelled") + + return {"status": "cancelled", "job_id": job_id} + + +@app.get("/meetings/{meeting_id}/transcript") +async def get_transcript(meeting_id: str): + """Get the transcript for a meeting.""" + transcript = await state.db.get_transcript(meeting_id) + + if not transcript: + raise HTTPException( + status_code=404, + detail=f"No transcript found for meeting: {meeting_id}" + ) + + return { + "meeting_id": meeting_id, + "segments": transcript, + "segment_count": len(transcript) + } + + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8001) diff --git a/deploy/meeting-intelligence/transcriber/app/processor.py b/deploy/meeting-intelligence/transcriber/app/processor.py new file mode 100644 index 0000000..8bb67a2 --- /dev/null +++ b/deploy/meeting-intelligence/transcriber/app/processor.py @@ -0,0 +1,282 @@ +""" +Job Processor for the Transcription Service. + +Handles the processing pipeline: +1. Audio extraction from video +2. Transcription +3. Speaker diarization +4. Database storage +""" + +import asyncio +import os +import subprocess +from typing import Optional + +import structlog + +from .config import settings +from .transcriber import WhisperTranscriber, TranscriptionResult +from .diarizer import SpeakerDiarizer, SpeakerSegment +from .database import Database + +log = structlog.get_logger() + + +class JobProcessor: + """Processes transcription jobs from the queue.""" + + def __init__( + self, + transcriber: WhisperTranscriber, + diarizer: SpeakerDiarizer, + db: Database, + redis + ): + self.transcriber = transcriber + self.diarizer = diarizer + self.db = db + self.redis = redis + self.active_jobs = 0 + self._running = False + self._workers = [] + + async def process_jobs(self): + """Main job processing loop.""" + self._running = True + log.info("Job processor started", num_workers=settings.num_workers) + + # Start worker tasks + for i in range(settings.num_workers): + worker = asyncio.create_task(self._worker(i)) + self._workers.append(worker) + + # Wait for all workers + await asyncio.gather(*self._workers, return_exceptions=True) + + async def stop(self): + """Stop the job processor.""" + self._running = False + for worker in self._workers: + worker.cancel() + log.info("Job processor stopped") + + async def _worker(self, worker_id: int): + """Worker that processes individual jobs.""" + log.info(f"Worker {worker_id} started") + + while self._running: + try: + # Get next job from database + job = await self.db.get_next_pending_job() + + if job is None: + # No jobs, wait a bit + await asyncio.sleep(2) + continue + + job_id = job["id"] + meeting_id = job["meeting_id"] + + log.info( + f"Worker {worker_id} processing job", + job_id=job_id, + meeting_id=meeting_id + ) + + self.active_jobs += 1 + + try: + await self._process_job(job) + except Exception as e: + log.error( + "Job processing failed", + job_id=job_id, + error=str(e) + ) + await self.db.update_job_status( + job_id, + "failed", + error_message=str(e) + ) + finally: + self.active_jobs -= 1 + + except asyncio.CancelledError: + break + except Exception as e: + log.error(f"Worker {worker_id} error", error=str(e)) + await asyncio.sleep(5) + + log.info(f"Worker {worker_id} stopped") + + async def _process_job(self, job: dict): + """Process a single transcription job.""" + job_id = job["id"] + meeting_id = job["meeting_id"] + audio_path = job.get("audio_path") + video_path = job.get("video_path") + enable_diarization = job.get("enable_diarization", True) + language = job.get("language") + + # Update status to processing + await self.db.update_job_status(job_id, "processing") + await self.db.update_meeting_status(meeting_id, "transcribing") + + # Step 1: Extract audio if we have video + if video_path and not audio_path: + log.info("Extracting audio from video", video_path=video_path) + await self.db.update_job_status(job_id, "processing", progress=0.1) + + audio_path = await self._extract_audio(video_path, meeting_id) + await self.db.update_job_audio_path(job_id, audio_path) + + if not audio_path or not os.path.exists(audio_path): + raise RuntimeError(f"Audio file not found: {audio_path}") + + # Step 2: Transcribe + log.info("Starting transcription", audio_path=audio_path) + await self.db.update_job_status(job_id, "processing", progress=0.3) + + transcription = await asyncio.get_event_loop().run_in_executor( + None, + lambda: self.transcriber.transcribe(audio_path, language) + ) + + log.info( + "Transcription complete", + segments=len(transcription.segments), + duration=transcription.duration + ) + + # Step 3: Speaker diarization + speaker_segments = [] + if enable_diarization and len(transcription.segments) > 0: + log.info("Starting speaker diarization") + await self.db.update_job_status(job_id, "processing", progress=0.6) + await self.db.update_meeting_status(meeting_id, "diarizing") + + # Convert transcript segments to dicts for diarizer + transcript_dicts = [ + {"start": s.start, "end": s.end, "text": s.text} + for s in transcription.segments + ] + + speaker_segments = await asyncio.get_event_loop().run_in_executor( + None, + lambda: self.diarizer.diarize( + audio_path, + transcript_segments=transcript_dicts + ) + ) + + log.info( + "Diarization complete", + num_segments=len(speaker_segments), + num_speakers=len(set(s.speaker_id for s in speaker_segments)) + ) + + # Step 4: Store results + log.info("Storing transcript in database") + await self.db.update_job_status(job_id, "processing", progress=0.9) + + await self._store_transcript( + meeting_id, + transcription, + speaker_segments + ) + + # Mark job complete + await self.db.update_job_status( + job_id, + "completed", + result={ + "segments": len(transcription.segments), + "duration": transcription.duration, + "language": transcription.language, + "speakers": len(set(s.speaker_id for s in speaker_segments)) if speaker_segments else 0 + } + ) + + # Update meeting status - ready for summarization + await self.db.update_meeting_status(meeting_id, "summarizing") + + log.info("Job completed successfully", job_id=job_id) + + async def _extract_audio(self, video_path: str, meeting_id: str) -> str: + """Extract audio from video file using ffmpeg.""" + output_dir = os.path.join(settings.audio_output_path, meeting_id) + os.makedirs(output_dir, exist_ok=True) + + audio_path = os.path.join(output_dir, "audio.wav") + + cmd = [ + "ffmpeg", + "-i", video_path, + "-vn", # No video + "-acodec", "pcm_s16le", # PCM 16-bit + "-ar", str(settings.audio_sample_rate), # Sample rate + "-ac", str(settings.audio_channels), # Mono + "-y", # Overwrite + audio_path + ] + + log.debug("Running ffmpeg", cmd=" ".join(cmd)) + + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + + _, stderr = await process.communicate() + + if process.returncode != 0: + raise RuntimeError(f"FFmpeg failed: {stderr.decode()}") + + log.info("Audio extracted", output=audio_path) + return audio_path + + async def _store_transcript( + self, + meeting_id: str, + transcription: TranscriptionResult, + speaker_segments: list + ): + """Store transcript segments in database.""" + # Create a map from time ranges to speakers + speaker_map = {} + for seg in speaker_segments: + speaker_map[(seg.start, seg.end)] = (seg.speaker_id, seg.speaker_label) + + # Store each transcript segment + for i, segment in enumerate(transcription.segments): + # Find matching speaker + speaker_id = None + speaker_label = None + + for (start, end), (sid, slabel) in speaker_map.items(): + if segment.start >= start and segment.end <= end: + speaker_id = sid + speaker_label = slabel + break + + # If no exact match, find closest overlap + if speaker_id is None: + for seg in speaker_segments: + if segment.start < seg.end and segment.end > seg.start: + speaker_id = seg.speaker_id + speaker_label = seg.speaker_label + break + + await self.db.insert_transcript_segment( + meeting_id=meeting_id, + segment_index=i, + start_time=segment.start, + end_time=segment.end, + text=segment.text, + speaker_id=speaker_id, + speaker_label=speaker_label, + confidence=segment.confidence, + language=transcription.language + ) diff --git a/deploy/meeting-intelligence/transcriber/app/transcriber.py b/deploy/meeting-intelligence/transcriber/app/transcriber.py new file mode 100644 index 0000000..d69a0c0 --- /dev/null +++ b/deploy/meeting-intelligence/transcriber/app/transcriber.py @@ -0,0 +1,211 @@ +""" +Whisper.cpp transcription wrapper. + +Uses the whisper CLI to transcribe audio files. +""" + +import json +import os +import subprocess +import tempfile +from dataclasses import dataclass +from typing import List, Optional + +import structlog + +log = structlog.get_logger() + + +@dataclass +class TranscriptSegment: + """A single transcript segment.""" + start: float + end: float + text: str + confidence: Optional[float] = None + + +@dataclass +class TranscriptionResult: + """Result of a transcription job.""" + segments: List[TranscriptSegment] + language: str + duration: float + text: str + + +class WhisperTranscriber: + """Wrapper for whisper.cpp transcription.""" + + def __init__( + self, + model_path: str = "/models/ggml-small.bin", + threads: int = 8, + language: str = "en" + ): + self.model_path = model_path + self.threads = threads + self.language = language + self.whisper_bin = "/usr/local/bin/whisper" + + # Verify whisper binary exists + if not os.path.exists(self.whisper_bin): + raise RuntimeError(f"Whisper binary not found at {self.whisper_bin}") + + # Verify model exists + if not os.path.exists(model_path): + raise RuntimeError(f"Whisper model not found at {model_path}") + + log.info( + "WhisperTranscriber initialized", + model=model_path, + threads=threads, + language=language + ) + + def transcribe( + self, + audio_path: str, + language: Optional[str] = None, + translate: bool = False + ) -> TranscriptionResult: + """ + Transcribe an audio file. + + Args: + audio_path: Path to the audio file (WAV format, 16kHz mono) + language: Language code (e.g., 'en', 'es', 'fr') or None for auto-detect + translate: If True, translate to English + + Returns: + TranscriptionResult with segments and full text + """ + if not os.path.exists(audio_path): + raise FileNotFoundError(f"Audio file not found: {audio_path}") + + log.info("Starting transcription", audio_path=audio_path, language=language) + + # Create temp file for JSON output + with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as tmp: + output_json = tmp.name + + try: + # Build whisper command + cmd = [ + self.whisper_bin, + "-m", self.model_path, + "-f", audio_path, + "-t", str(self.threads), + "-oj", # Output JSON + "-of", output_json.replace(".json", ""), # Output file prefix + "--print-progress", + ] + + # Add language if specified + if language: + cmd.extend(["-l", language]) + else: + cmd.extend(["-l", self.language]) + + # Add translate flag if needed + if translate: + cmd.append("--translate") + + log.debug("Running whisper command", cmd=" ".join(cmd)) + + # Run whisper + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=7200 # 2 hour timeout + ) + + if result.returncode != 0: + log.error( + "Whisper transcription failed", + returncode=result.returncode, + stderr=result.stderr + ) + raise RuntimeError(f"Whisper failed: {result.stderr}") + + # Parse JSON output + with open(output_json, "r") as f: + whisper_output = json.load(f) + + # Extract segments + segments = [] + full_text_parts = [] + + for item in whisper_output.get("transcription", []): + segment = TranscriptSegment( + start=item["offsets"]["from"] / 1000.0, # Convert ms to seconds + end=item["offsets"]["to"] / 1000.0, + text=item["text"].strip(), + confidence=item.get("confidence") + ) + segments.append(segment) + full_text_parts.append(segment.text) + + # Get detected language + detected_language = whisper_output.get("result", {}).get("language", language or self.language) + + # Calculate total duration + duration = segments[-1].end if segments else 0.0 + + log.info( + "Transcription complete", + segments=len(segments), + duration=duration, + language=detected_language + ) + + return TranscriptionResult( + segments=segments, + language=detected_language, + duration=duration, + text=" ".join(full_text_parts) + ) + + finally: + # Clean up temp files + for ext in [".json", ".txt", ".vtt", ".srt"]: + tmp_file = output_json.replace(".json", ext) + if os.path.exists(tmp_file): + os.remove(tmp_file) + + def transcribe_with_timestamps( + self, + audio_path: str, + language: Optional[str] = None + ) -> List[dict]: + """ + Transcribe with word-level timestamps. + + Returns list of dicts with word, start, end, confidence. + """ + result = self.transcribe(audio_path, language) + + # Convert segments to word-level format + # Note: whisper.cpp provides segment-level timestamps by default + # For true word-level, we'd need the --max-len 1 flag but it's slower + + words = [] + for segment in result.segments: + # Estimate word timestamps within segment + segment_words = segment.text.split() + if not segment_words: + continue + + duration = segment.end - segment.start + word_duration = duration / len(segment_words) + + for i, word in enumerate(segment_words): + words.append({ + "word": word, + "start": segment.start + (i * word_duration), + "end": segment.start + ((i + 1) * word_duration), + "confidence": segment.confidence + }) + + return words diff --git a/deploy/meeting-intelligence/transcriber/requirements.txt b/deploy/meeting-intelligence/transcriber/requirements.txt new file mode 100644 index 0000000..da038b5 --- /dev/null +++ b/deploy/meeting-intelligence/transcriber/requirements.txt @@ -0,0 +1,41 @@ +# Transcription Service Dependencies + +# Web framework +fastapi==0.109.2 +uvicorn[standard]==0.27.1 +python-multipart==0.0.9 + +# Job queue +redis==5.0.1 +rq==1.16.0 + +# Database +asyncpg==0.29.0 +sqlalchemy[asyncio]==2.0.25 +psycopg2-binary==2.9.9 + +# Audio processing +pydub==0.25.1 +soundfile==0.12.1 +librosa==0.10.1 +numpy==1.26.4 + +# Speaker diarization +resemblyzer==0.1.3 +torch==2.2.0 +torchaudio==2.2.0 +scipy==1.12.0 +scikit-learn==1.4.0 + +# Sentence embeddings (for semantic search) +sentence-transformers==2.3.1 + +# Utilities +pydantic==2.6.1 +pydantic-settings==2.1.0 +python-dotenv==1.0.1 +httpx==0.26.0 +tenacity==8.2.3 + +# Logging & monitoring +structlog==24.1.0