diff --git a/deploy/meeting-intelligence/.env.example b/deploy/meeting-intelligence/.env.example
new file mode 100644
index 0000000..ec555da
--- /dev/null
+++ b/deploy/meeting-intelligence/.env.example
@@ -0,0 +1,17 @@
+# Meeting Intelligence System - Environment Variables
+# Copy this file to .env and update values
+
+# PostgreSQL
+POSTGRES_PASSWORD=your-secure-password-here
+
+# API Security
+API_SECRET_KEY=your-api-secret-key-here
+
+# Jibri XMPP Configuration
+XMPP_SERVER=meet.jeffemmett.com
+XMPP_DOMAIN=meet.jeffemmett.com
+JIBRI_XMPP_PASSWORD=jibri-xmpp-password
+JIBRI_RECORDER_PASSWORD=recorder-password
+
+# Ollama (uses host.docker.internal by default)
+# OLLAMA_URL=http://host.docker.internal:11434
diff --git a/deploy/meeting-intelligence/README.md b/deploy/meeting-intelligence/README.md
new file mode 100644
index 0000000..e94c027
--- /dev/null
+++ b/deploy/meeting-intelligence/README.md
@@ -0,0 +1,151 @@
+# Meeting Intelligence System
+
+A fully self-hosted, zero-cost meeting intelligence system for Jeffsi Meet that provides:
+- Automatic meeting recording via Jibri
+- Local transcription via whisper.cpp (CPU-only)
+- Speaker diarization (who said what)
+- AI-powered summaries via Ollama
+- Searchable meeting archive with dashboard
+
+## Architecture
+
+```
+┌─────────────────────────────────────────────────────────────────────┐
+│ Netcup RS 8000 (Backend) │
+│ │
+│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
+│ │ Jibri │───▶│ Whisper │───▶│ AI Processor │ │
+│ │ Recording │ │ Transcriber │ │ (Ollama + Summarizer) │ │
+│ │ Container │ │ Service │ │ │ │
+│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
+│ │ │ │ │
+│ ▼ ▼ ▼ │
+│ ┌─────────────────────────────────────────────────────────────┐ │
+│ │ PostgreSQL + pgvector │ │
+│ └─────────────────────────────────────────────────────────────┘ │
+└─────────────────────────────────────────────────────────────────────┘
+```
+
+## Components
+
+| Service | Port | Description |
+|---------|------|-------------|
+| PostgreSQL | 5432 | Database with pgvector for semantic search |
+| Redis | 6379 | Job queue for async processing |
+| Transcriber | 8001 | whisper.cpp + speaker diarization |
+| API | 8000 | REST API for meetings, transcripts, search |
+| Jibri | - | Recording service (joins meetings as hidden participant) |
+
+## Deployment
+
+### Prerequisites
+
+1. Docker and Docker Compose installed
+2. Ollama running on the host (for AI summaries)
+3. Jeffsi Meet configured with recording enabled
+
+### Setup
+
+1. Copy environment file:
+ ```bash
+ cp .env.example .env
+ ```
+
+2. Edit `.env` with your configuration:
+ ```bash
+ vim .env
+ ```
+
+3. Create storage directories:
+ ```bash
+ sudo mkdir -p /opt/meetings/{recordings,audio}
+ sudo chown -R 1000:1000 /opt/meetings
+ ```
+
+4. Start services:
+ ```bash
+ docker compose up -d
+ ```
+
+5. Check logs:
+ ```bash
+ docker compose logs -f
+ ```
+
+## API Endpoints
+
+Base URL: `https://meet.jeffemmett.com/api/intelligence`
+
+### Meetings
+- `GET /meetings` - List all meetings
+- `GET /meetings/{id}` - Get meeting details
+- `DELETE /meetings/{id}` - Delete meeting
+
+### Transcripts
+- `GET /meetings/{id}/transcript` - Get full transcript
+- `GET /meetings/{id}/transcript/text` - Get as plain text
+- `GET /meetings/{id}/speakers` - Get speaker statistics
+
+### Summaries
+- `GET /meetings/{id}/summary` - Get AI summary
+- `POST /meetings/{id}/summary` - Generate summary
+
+### Search
+- `POST /search` - Search transcripts (text + semantic)
+- `GET /search/suggest` - Get search suggestions
+
+### Export
+- `GET /meetings/{id}/export?format=markdown` - Export as Markdown
+- `GET /meetings/{id}/export?format=json` - Export as JSON
+- `GET /meetings/{id}/export?format=pdf` - Export as PDF
+
+### Webhooks
+- `POST /webhooks/recording-complete` - Jibri recording callback
+
+## Processing Pipeline
+
+1. **Recording** - Jibri joins meeting and records
+2. **Webhook** - Jibri calls `/webhooks/recording-complete`
+3. **Audio Extraction** - FFmpeg extracts audio from video
+4. **Transcription** - whisper.cpp transcribes audio
+5. **Diarization** - resemblyzer identifies speakers
+6. **Embedding** - Generate vector embeddings for search
+7. **Summary** - Ollama generates AI summary
+8. **Ready** - Meeting available in dashboard
+
+## Resource Usage
+
+| Service | CPU | RAM | Storage |
+|---------|-----|-----|---------|
+| Transcriber | 8 cores | 12GB | 5GB (models) |
+| API | 1 core | 2GB | - |
+| PostgreSQL | 2 cores | 4GB | ~50GB |
+| Jibri | 2 cores | 4GB | - |
+| Redis | 0.5 cores | 512MB | - |
+
+## Troubleshooting
+
+### Transcription is slow
+- Check CPU usage: `docker stats meeting-intelligence-transcriber`
+- Increase `WHISPER_THREADS` in docker-compose.yml
+- Consider using the `tiny` model for faster (less accurate) transcription
+
+### No summary generated
+- Check Ollama is running: `curl http://localhost:11434/api/tags`
+- Check logs: `docker compose logs api`
+- Verify model is available: `ollama list`
+
+### Recording not starting
+- Check Jibri logs: `docker compose logs jibri`
+- Verify XMPP credentials in `.env`
+- Check Prosody recorder virtual host configuration
+
+## Cost Analysis
+
+| Component | Monthly Cost |
+|-----------|-------------|
+| Jibri recording | $0 (local) |
+| Whisper transcription | $0 (local CPU) |
+| Ollama summarization | $0 (local) |
+| PostgreSQL | $0 (local) |
+| **Total** | **$0/month** |
diff --git a/deploy/meeting-intelligence/api/Dockerfile b/deploy/meeting-intelligence/api/Dockerfile
new file mode 100644
index 0000000..456c02c
--- /dev/null
+++ b/deploy/meeting-intelligence/api/Dockerfile
@@ -0,0 +1,32 @@
+# Meeting Intelligence API
+# Provides REST API for meeting transcripts, summaries, and search
+
+FROM python:3.11-slim
+
+# Install dependencies
+RUN apt-get update && apt-get install -y --no-install-recommends \
+ curl \
+ && rm -rf /var/lib/apt/lists/*
+
+WORKDIR /app
+
+# Install Python dependencies
+COPY requirements.txt .
+RUN pip install --no-cache-dir -r requirements.txt
+
+# Copy application code
+COPY app/ ./app/
+
+# Create directories
+RUN mkdir -p /recordings /logs
+
+# Environment variables
+ENV PYTHONUNBUFFERED=1
+
+# Health check
+HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 \
+ CMD curl -f http://localhost:8000/health || exit 1
+
+# Run the service
+EXPOSE 8000
+CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
diff --git a/deploy/meeting-intelligence/api/app/__init__.py b/deploy/meeting-intelligence/api/app/__init__.py
new file mode 100644
index 0000000..e2fe786
--- /dev/null
+++ b/deploy/meeting-intelligence/api/app/__init__.py
@@ -0,0 +1 @@
+# Meeting Intelligence API
diff --git a/deploy/meeting-intelligence/api/app/config.py b/deploy/meeting-intelligence/api/app/config.py
new file mode 100644
index 0000000..2ca6afa
--- /dev/null
+++ b/deploy/meeting-intelligence/api/app/config.py
@@ -0,0 +1,50 @@
+"""
+Configuration settings for the Meeting Intelligence API.
+"""
+
+from typing import List
+from pydantic_settings import BaseSettings
+
+
+class Settings(BaseSettings):
+ """Application settings loaded from environment variables."""
+
+ # Database
+ postgres_url: str = "postgresql://meeting_intelligence:changeme@localhost:5432/meeting_intelligence"
+
+ # Redis
+ redis_url: str = "redis://localhost:6379"
+
+ # Ollama (for AI summaries)
+ ollama_url: str = "http://localhost:11434"
+ ollama_model: str = "llama3.2"
+
+ # File paths
+ recordings_path: str = "/recordings"
+
+ # Security
+ secret_key: str = "changeme"
+ api_key: str = "" # Optional API key authentication
+
+ # CORS
+ cors_origins: List[str] = [
+ "https://meet.jeffemmett.com",
+ "http://localhost:8080",
+ "http://localhost:3000"
+ ]
+
+ # Embeddings model for semantic search
+ embedding_model: str = "all-MiniLM-L6-v2"
+
+ # Export settings
+ export_temp_dir: str = "/tmp/exports"
+
+ # Transcriber service URL
+ transcriber_url: str = "http://transcriber:8001"
+
+ class Config:
+ env_file = ".env"
+ env_file_encoding = "utf-8"
+
+
+settings = Settings()
diff --git a/deploy/meeting-intelligence/api/app/database.py b/deploy/meeting-intelligence/api/app/database.py
new file mode 100644
index 0000000..1819df1
--- /dev/null
+++ b/deploy/meeting-intelligence/api/app/database.py
@@ -0,0 +1,355 @@
+"""
+Database operations for the Meeting Intelligence API.
+"""
+
+import uuid
+from datetime import datetime
+from typing import Optional, List, Dict, Any
+
+import asyncpg
+import structlog
+
+log = structlog.get_logger()
+
+
+class Database:
+ """Database operations for Meeting Intelligence API."""
+
+ def __init__(self, connection_string: str):
+ self.connection_string = connection_string
+ self.pool: Optional[asyncpg.Pool] = None
+
+ async def connect(self):
+ """Establish database connection pool."""
+ log.info("Connecting to database...")
+ self.pool = await asyncpg.create_pool(
+ self.connection_string,
+ min_size=2,
+ max_size=20
+ )
+ log.info("Database connected")
+
+ async def disconnect(self):
+ """Close database connection pool."""
+ if self.pool:
+ await self.pool.close()
+ log.info("Database disconnected")
+
+ async def health_check(self):
+ """Check database connectivity."""
+ async with self.pool.acquire() as conn:
+ await conn.fetchval("SELECT 1")
+
+ # ==================== Meetings ====================
+
+ async def list_meetings(
+ self,
+ limit: int = 50,
+ offset: int = 0,
+ status: Optional[str] = None
+ ) -> List[Dict[str, Any]]:
+ """List meetings with pagination."""
+ async with self.pool.acquire() as conn:
+ if status:
+ rows = await conn.fetch("""
+ SELECT id, conference_id, conference_name, title,
+ started_at, ended_at, duration_seconds,
+ status, created_at
+ FROM meetings
+ WHERE status = $1
+ ORDER BY created_at DESC
+ LIMIT $2 OFFSET $3
+ """, status, limit, offset)
+ else:
+ rows = await conn.fetch("""
+ SELECT id, conference_id, conference_name, title,
+ started_at, ended_at, duration_seconds,
+ status, created_at
+ FROM meetings
+ ORDER BY created_at DESC
+ LIMIT $1 OFFSET $2
+ """, limit, offset)
+
+ return [dict(row) for row in rows]
+
+ async def get_meeting(self, meeting_id: str) -> Optional[Dict[str, Any]]:
+ """Get meeting details."""
+ async with self.pool.acquire() as conn:
+ row = await conn.fetchrow("""
+ SELECT m.id, m.conference_id, m.conference_name, m.title,
+ m.started_at, m.ended_at, m.duration_seconds,
+ m.recording_path, m.audio_path, m.status,
+ m.metadata, m.created_at,
+ (SELECT COUNT(*) FROM transcripts WHERE meeting_id = m.id) as segment_count,
+ (SELECT COUNT(*) FROM meeting_participants WHERE meeting_id = m.id) as participant_count,
+ (SELECT id FROM summaries WHERE meeting_id = m.id LIMIT 1) as summary_id
+ FROM meetings m
+ WHERE m.id = $1::uuid
+ """, meeting_id)
+
+ if row:
+ return dict(row)
+ return None
+
+ async def create_meeting(
+ self,
+ conference_id: str,
+ conference_name: Optional[str] = None,
+ title: Optional[str] = None,
+ recording_path: Optional[str] = None,
+ started_at: Optional[datetime] = None,
+ metadata: Optional[dict] = None
+ ) -> str:
+ """Create a new meeting record."""
+ meeting_id = str(uuid.uuid4())
+
+ async with self.pool.acquire() as conn:
+ await conn.execute("""
+ INSERT INTO meetings (
+ id, conference_id, conference_name, title,
+ recording_path, started_at, status, metadata
+ )
+ VALUES ($1, $2, $3, $4, $5, $6, 'recording', $7)
+ """, meeting_id, conference_id, conference_name, title,
+ recording_path, started_at or datetime.utcnow(), metadata or {})
+
+ return meeting_id
+
+ async def update_meeting(
+ self,
+ meeting_id: str,
+ **kwargs
+ ):
+ """Update meeting fields."""
+ if not kwargs:
+ return
+
+ set_clauses = []
+ values = []
+ i = 1
+
+ for key, value in kwargs.items():
+ if key in ['status', 'title', 'ended_at', 'duration_seconds',
+ 'recording_path', 'audio_path', 'error_message']:
+ set_clauses.append(f"{key} = ${i}")
+ values.append(value)
+ i += 1
+
+ if not set_clauses:
+ return
+
+ values.append(meeting_id)
+
+ async with self.pool.acquire() as conn:
+ await conn.execute(f"""
+ UPDATE meetings
+ SET {', '.join(set_clauses)}, updated_at = NOW()
+ WHERE id = ${i}::uuid
+ """, *values)
+
+ # ==================== Transcripts ====================
+
+ async def get_transcript(
+ self,
+ meeting_id: str,
+ speaker_filter: Optional[str] = None
+ ) -> List[Dict[str, Any]]:
+ """Get transcript segments for a meeting."""
+ async with self.pool.acquire() as conn:
+ if speaker_filter:
+ rows = await conn.fetch("""
+ SELECT id, segment_index, start_time, end_time,
+ speaker_id, speaker_name, speaker_label,
+ text, confidence, language
+ FROM transcripts
+ WHERE meeting_id = $1::uuid AND speaker_id = $2
+ ORDER BY segment_index ASC
+ """, meeting_id, speaker_filter)
+ else:
+ rows = await conn.fetch("""
+ SELECT id, segment_index, start_time, end_time,
+ speaker_id, speaker_name, speaker_label,
+ text, confidence, language
+ FROM transcripts
+ WHERE meeting_id = $1::uuid
+ ORDER BY segment_index ASC
+ """, meeting_id)
+
+ return [dict(row) for row in rows]
+
+ async def get_speakers(self, meeting_id: str) -> List[Dict[str, Any]]:
+ """Get speaker statistics for a meeting."""
+ async with self.pool.acquire() as conn:
+ rows = await conn.fetch("""
+ SELECT speaker_id, speaker_label,
+ COUNT(*) as segment_count,
+ SUM(end_time - start_time) as speaking_time,
+ SUM(LENGTH(text)) as character_count
+ FROM transcripts
+ WHERE meeting_id = $1::uuid AND speaker_id IS NOT NULL
+ GROUP BY speaker_id, speaker_label
+ ORDER BY speaking_time DESC
+ """, meeting_id)
+
+ return [dict(row) for row in rows]
+
+ # ==================== Summaries ====================
+
+ async def get_summary(self, meeting_id: str) -> Optional[Dict[str, Any]]:
+ """Get AI summary for a meeting."""
+ async with self.pool.acquire() as conn:
+ row = await conn.fetchrow("""
+ SELECT id, meeting_id, summary_text, key_points,
+ action_items, decisions, topics, sentiment,
+ model_used, generated_at
+ FROM summaries
+ WHERE meeting_id = $1::uuid
+ ORDER BY generated_at DESC
+ LIMIT 1
+ """, meeting_id)
+
+ if row:
+ return dict(row)
+ return None
+
+ async def save_summary(
+ self,
+ meeting_id: str,
+ summary_text: str,
+ key_points: List[str],
+ action_items: List[dict],
+ decisions: List[str],
+ topics: List[dict],
+ sentiment: str,
+ model_used: str,
+ prompt_tokens: int = 0,
+ completion_tokens: int = 0
+ ) -> int:
+ """Save AI-generated summary."""
+ async with self.pool.acquire() as conn:
+ row = await conn.fetchrow("""
+ INSERT INTO summaries (
+ meeting_id, summary_text, key_points, action_items,
+ decisions, topics, sentiment, model_used,
+ prompt_tokens, completion_tokens
+ )
+ VALUES ($1::uuid, $2, $3, $4, $5, $6, $7, $8, $9, $10)
+ RETURNING id
+ """, meeting_id, summary_text, key_points, action_items,
+ decisions, topics, sentiment, model_used,
+ prompt_tokens, completion_tokens)
+
+ return row["id"]
+
+ # ==================== Search ====================
+
+ async def fulltext_search(
+ self,
+ query: str,
+ meeting_id: Optional[str] = None,
+ limit: int = 50
+ ) -> List[Dict[str, Any]]:
+ """Full-text search across transcripts."""
+ async with self.pool.acquire() as conn:
+ if meeting_id:
+ rows = await conn.fetch("""
+ SELECT t.id, t.meeting_id, t.start_time, t.end_time,
+ t.speaker_label, t.text, m.title as meeting_title,
+ ts_rank(to_tsvector('english', t.text),
+ plainto_tsquery('english', $1)) as rank
+ FROM transcripts t
+ JOIN meetings m ON t.meeting_id = m.id
+ WHERE t.meeting_id = $2::uuid
+ AND to_tsvector('english', t.text) @@ plainto_tsquery('english', $1)
+ ORDER BY rank DESC
+ LIMIT $3
+ """, query, meeting_id, limit)
+ else:
+ rows = await conn.fetch("""
+ SELECT t.id, t.meeting_id, t.start_time, t.end_time,
+ t.speaker_label, t.text, m.title as meeting_title,
+ ts_rank(to_tsvector('english', t.text),
+ plainto_tsquery('english', $1)) as rank
+ FROM transcripts t
+ JOIN meetings m ON t.meeting_id = m.id
+ WHERE to_tsvector('english', t.text) @@ plainto_tsquery('english', $1)
+ ORDER BY rank DESC
+ LIMIT $2
+ """, query, limit)
+
+ return [dict(row) for row in rows]
+
+ async def semantic_search(
+ self,
+ embedding: List[float],
+ meeting_id: Optional[str] = None,
+ threshold: float = 0.7,
+ limit: int = 20
+ ) -> List[Dict[str, Any]]:
+ """Semantic search using vector embeddings."""
+ async with self.pool.acquire() as conn:
+ embedding_str = f"[{','.join(map(str, embedding))}]"
+
+ if meeting_id:
+ rows = await conn.fetch("""
+ SELECT te.transcript_id, te.meeting_id, te.chunk_text,
+ t.start_time, t.speaker_label, m.title as meeting_title,
+ 1 - (te.embedding <=> $1::vector) as similarity
+ FROM transcript_embeddings te
+ JOIN transcripts t ON te.transcript_id = t.id
+ JOIN meetings m ON te.meeting_id = m.id
+ WHERE te.meeting_id = $2::uuid
+ AND 1 - (te.embedding <=> $1::vector) > $3
+ ORDER BY te.embedding <=> $1::vector
+ LIMIT $4
+ """, embedding_str, meeting_id, threshold, limit)
+ else:
+ rows = await conn.fetch("""
+ SELECT te.transcript_id, te.meeting_id, te.chunk_text,
+ t.start_time, t.speaker_label, m.title as meeting_title,
+ 1 - (te.embedding <=> $1::vector) as similarity
+ FROM transcript_embeddings te
+ JOIN transcripts t ON te.transcript_id = t.id
+ JOIN meetings m ON te.meeting_id = m.id
+ WHERE 1 - (te.embedding <=> $1::vector) > $2
+ ORDER BY te.embedding <=> $1::vector
+ LIMIT $3
+ """, embedding_str, threshold, limit)
+
+ return [dict(row) for row in rows]
+
+ # ==================== Webhooks ====================
+
+ async def save_webhook_event(
+ self,
+ event_type: str,
+ payload: dict
+ ) -> int:
+ """Save a webhook event for processing."""
+ async with self.pool.acquire() as conn:
+ row = await conn.fetchrow("""
+ INSERT INTO webhook_events (event_type, payload)
+ VALUES ($1, $2)
+ RETURNING id
+ """, event_type, payload)
+
+ return row["id"]
+
+ # ==================== Jobs ====================
+
+ async def create_job(
+ self,
+ meeting_id: str,
+ job_type: str,
+ priority: int = 5,
+ result: Optional[dict] = None
+ ) -> int:
+ """Create a processing job."""
+ async with self.pool.acquire() as conn:
+ row = await conn.fetchrow("""
+ INSERT INTO processing_jobs (meeting_id, job_type, priority, result)
+ VALUES ($1::uuid, $2, $3, $4)
+ RETURNING id
+ """, meeting_id, job_type, priority, result or {})
+
+ return row["id"]
diff --git a/deploy/meeting-intelligence/api/app/main.py b/deploy/meeting-intelligence/api/app/main.py
new file mode 100644
index 0000000..8473f63
--- /dev/null
+++ b/deploy/meeting-intelligence/api/app/main.py
@@ -0,0 +1,113 @@
+"""
+Meeting Intelligence API
+
+Provides REST API for:
+- Meeting management
+- Transcript retrieval
+- AI-powered summaries
+- Semantic search
+- Export functionality
+"""
+
+import os
+from contextlib import asynccontextmanager
+from typing import Optional
+
+from fastapi import FastAPI
+from fastapi.middleware.cors import CORSMiddleware
+
+from .config import settings
+from .database import Database
+from .routes import meetings, transcripts, summaries, search, webhooks, export
+
+import structlog
+
+log = structlog.get_logger()
+
+
+# Application state
+class AppState:
+ db: Optional[Database] = None
+
+
+state = AppState()
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ """Application startup and shutdown."""
+ log.info("Starting Meeting Intelligence API...")
+
+ # Initialize database
+ state.db = Database(settings.postgres_url)
+ await state.db.connect()
+
+ # Make database available to routes
+ app.state.db = state.db
+
+ log.info("Meeting Intelligence API started successfully")
+
+ yield
+
+ # Shutdown
+ log.info("Shutting down Meeting Intelligence API...")
+ if state.db:
+ await state.db.disconnect()
+
+ log.info("Meeting Intelligence API stopped")
+
+
+app = FastAPI(
+ title="Meeting Intelligence API",
+ description="API for meeting transcripts, summaries, and search",
+ version="1.0.0",
+ lifespan=lifespan,
+ docs_url="/docs",
+ redoc_url="/redoc"
+)
+
+# CORS configuration
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=settings.cors_origins,
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+)
+
+# Include routers
+app.include_router(meetings.router, prefix="/meetings", tags=["Meetings"])
+app.include_router(transcripts.router, prefix="/meetings", tags=["Transcripts"])
+app.include_router(summaries.router, prefix="/meetings", tags=["Summaries"])
+app.include_router(search.router, prefix="/search", tags=["Search"])
+app.include_router(webhooks.router, prefix="/webhooks", tags=["Webhooks"])
+app.include_router(export.router, prefix="/meetings", tags=["Export"])
+
+
+@app.get("/health")
+async def health_check():
+ """Health check endpoint."""
+ db_ok = False
+
+ try:
+ if state.db:
+ await state.db.health_check()
+ db_ok = True
+ except Exception as e:
+ log.error("Database health check failed", error=str(e))
+
+ return {
+ "status": "healthy" if db_ok else "unhealthy",
+ "database": db_ok,
+ "version": "1.0.0"
+ }
+
+
+@app.get("/")
+async def root():
+ """Root endpoint."""
+ return {
+ "service": "Meeting Intelligence API",
+ "version": "1.0.0",
+ "docs": "/docs"
+ }
diff --git a/deploy/meeting-intelligence/api/app/routes/__init__.py b/deploy/meeting-intelligence/api/app/routes/__init__.py
new file mode 100644
index 0000000..83ed419
--- /dev/null
+++ b/deploy/meeting-intelligence/api/app/routes/__init__.py
@@ -0,0 +1,2 @@
+# API Routes
+from . import meetings, transcripts, summaries, search, webhooks, export
diff --git a/deploy/meeting-intelligence/api/app/routes/export.py b/deploy/meeting-intelligence/api/app/routes/export.py
new file mode 100644
index 0000000..f88863e
--- /dev/null
+++ b/deploy/meeting-intelligence/api/app/routes/export.py
@@ -0,0 +1,319 @@
+"""
+Export routes for Meeting Intelligence.
+
+Supports exporting meetings as PDF, Markdown, and JSON.
+"""
+
+import io
+import json
+import os
+from datetime import datetime
+from typing import Optional
+
+from fastapi import APIRouter, HTTPException, Request, Response
+from fastapi.responses import StreamingResponse
+from pydantic import BaseModel
+
+import structlog
+
+log = structlog.get_logger()
+
+router = APIRouter()
+
+
+class ExportRequest(BaseModel):
+ format: str = "markdown" # "pdf", "markdown", "json"
+ include_transcript: bool = True
+ include_summary: bool = True
+
+
+@router.get("/{meeting_id}/export")
+async def export_meeting(
+ request: Request,
+ meeting_id: str,
+ format: str = "markdown",
+ include_transcript: bool = True,
+ include_summary: bool = True
+):
+ """Export meeting data in various formats."""
+ db = request.app.state.db
+
+ # Get meeting data
+ meeting = await db.get_meeting(meeting_id)
+ if not meeting:
+ raise HTTPException(status_code=404, detail="Meeting not found")
+
+ # Get transcript if requested
+ transcript = None
+ if include_transcript:
+ transcript = await db.get_transcript(meeting_id)
+
+ # Get summary if requested
+ summary = None
+ if include_summary:
+ summary = await db.get_summary(meeting_id)
+
+ # Export based on format
+ if format == "json":
+ return _export_json(meeting, transcript, summary)
+ elif format == "markdown":
+ return _export_markdown(meeting, transcript, summary)
+ elif format == "pdf":
+ return await _export_pdf(meeting, transcript, summary)
+ else:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Unsupported format: {format}. Use: json, markdown, pdf"
+ )
+
+
+def _export_json(meeting: dict, transcript: list, summary: dict) -> Response:
+ """Export as JSON."""
+ data = {
+ "meeting": {
+ "id": str(meeting["id"]),
+ "conference_id": meeting["conference_id"],
+ "title": meeting.get("title"),
+ "started_at": meeting["started_at"].isoformat() if meeting.get("started_at") else None,
+ "ended_at": meeting["ended_at"].isoformat() if meeting.get("ended_at") else None,
+ "duration_seconds": meeting.get("duration_seconds"),
+ "status": meeting["status"]
+ },
+ "transcript": [
+ {
+ "start_time": s["start_time"],
+ "end_time": s["end_time"],
+ "speaker": s.get("speaker_label"),
+ "text": s["text"]
+ }
+ for s in (transcript or [])
+ ] if transcript else None,
+ "summary": {
+ "text": summary["summary_text"],
+ "key_points": summary["key_points"],
+ "action_items": summary["action_items"],
+ "decisions": summary["decisions"],
+ "topics": summary["topics"],
+ "sentiment": summary.get("sentiment")
+ } if summary else None,
+ "exported_at": datetime.utcnow().isoformat()
+ }
+
+ filename = f"meeting-{meeting['conference_id']}-{datetime.utcnow().strftime('%Y%m%d')}.json"
+
+ return Response(
+ content=json.dumps(data, indent=2),
+ media_type="application/json",
+ headers={
+ "Content-Disposition": f'attachment; filename="{filename}"'
+ }
+ )
+
+
+def _export_markdown(meeting: dict, transcript: list, summary: dict) -> Response:
+ """Export as Markdown."""
+ lines = []
+
+ # Header
+ title = meeting.get("title") or f"Meeting: {meeting['conference_id']}"
+ lines.append(f"# {title}")
+ lines.append("")
+
+ # Metadata
+ lines.append("## Meeting Details")
+ lines.append("")
+ lines.append(f"- **Conference ID:** {meeting['conference_id']}")
+ if meeting.get("started_at"):
+ lines.append(f"- **Date:** {meeting['started_at'].strftime('%Y-%m-%d %H:%M UTC')}")
+ if meeting.get("duration_seconds"):
+ minutes = meeting["duration_seconds"] // 60
+ lines.append(f"- **Duration:** {minutes} minutes")
+ lines.append(f"- **Status:** {meeting['status']}")
+ lines.append("")
+
+ # Summary
+ if summary:
+ lines.append("## Summary")
+ lines.append("")
+ lines.append(summary["summary_text"])
+ lines.append("")
+
+ # Key Points
+ if summary.get("key_points"):
+ lines.append("### Key Points")
+ lines.append("")
+ for point in summary["key_points"]:
+ lines.append(f"- {point}")
+ lines.append("")
+
+ # Action Items
+ if summary.get("action_items"):
+ lines.append("### Action Items")
+ lines.append("")
+ for item in summary["action_items"]:
+ task = item.get("task", item) if isinstance(item, dict) else item
+ assignee = item.get("assignee", "") if isinstance(item, dict) else ""
+ checkbox = "[ ]"
+ if assignee:
+ lines.append(f"- {checkbox} {task} *(Assigned: {assignee})*")
+ else:
+ lines.append(f"- {checkbox} {task}")
+ lines.append("")
+
+ # Decisions
+ if summary.get("decisions"):
+ lines.append("### Decisions")
+ lines.append("")
+ for decision in summary["decisions"]:
+ lines.append(f"- {decision}")
+ lines.append("")
+
+ # Transcript
+ if transcript:
+ lines.append("## Transcript")
+ lines.append("")
+
+ current_speaker = None
+ for segment in transcript:
+ speaker = segment.get("speaker_label") or "Speaker"
+ time_str = _format_time(segment["start_time"])
+
+ if speaker != current_speaker:
+ lines.append("")
+ lines.append(f"**{speaker}** *({time_str})*")
+ current_speaker = speaker
+
+ lines.append(f"> {segment['text']}")
+
+ lines.append("")
+
+ # Footer
+ lines.append("---")
+ lines.append(f"*Exported on {datetime.utcnow().strftime('%Y-%m-%d %H:%M UTC')} by Meeting Intelligence*")
+
+ content = "\n".join(lines)
+ filename = f"meeting-{meeting['conference_id']}-{datetime.utcnow().strftime('%Y%m%d')}.md"
+
+ return Response(
+ content=content,
+ media_type="text/markdown",
+ headers={
+ "Content-Disposition": f'attachment; filename="{filename}"'
+ }
+ )
+
+
+async def _export_pdf(meeting: dict, transcript: list, summary: dict) -> StreamingResponse:
+ """Export as PDF using reportlab."""
+ try:
+ from reportlab.lib.pagesizes import letter
+ from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
+ from reportlab.lib.units import inch
+ from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, ListFlowable, ListItem
+ except ImportError:
+ raise HTTPException(
+ status_code=501,
+ detail="PDF export requires reportlab. Use markdown or json format."
+ )
+
+ buffer = io.BytesIO()
+
+ # Create PDF document
+ doc = SimpleDocTemplate(
+ buffer,
+ pagesize=letter,
+ rightMargin=72,
+ leftMargin=72,
+ topMargin=72,
+ bottomMargin=72
+ )
+
+ styles = getSampleStyleSheet()
+ story = []
+
+ # Title
+ title = meeting.get("title") or f"Meeting: {meeting['conference_id']}"
+ story.append(Paragraph(title, styles['Title']))
+ story.append(Spacer(1, 12))
+
+ # Metadata
+ story.append(Paragraph("Meeting Details", styles['Heading2']))
+ if meeting.get("started_at"):
+ story.append(Paragraph(
+ f"Date: {meeting['started_at'].strftime('%Y-%m-%d %H:%M UTC')}",
+ styles['Normal']
+ ))
+ if meeting.get("duration_seconds"):
+ minutes = meeting["duration_seconds"] // 60
+ story.append(Paragraph(f"Duration: {minutes} minutes", styles['Normal']))
+ story.append(Spacer(1, 12))
+
+ # Summary
+ if summary:
+ story.append(Paragraph("Summary", styles['Heading2']))
+ story.append(Paragraph(summary["summary_text"], styles['Normal']))
+ story.append(Spacer(1, 12))
+
+ if summary.get("key_points"):
+ story.append(Paragraph("Key Points", styles['Heading3']))
+ for point in summary["key_points"]:
+ story.append(Paragraph(f"• {point}", styles['Normal']))
+ story.append(Spacer(1, 12))
+
+ if summary.get("action_items"):
+ story.append(Paragraph("Action Items", styles['Heading3']))
+ for item in summary["action_items"]:
+ task = item.get("task", item) if isinstance(item, dict) else item
+ story.append(Paragraph(f"☐ {task}", styles['Normal']))
+ story.append(Spacer(1, 12))
+
+ # Transcript (abbreviated for PDF)
+ if transcript:
+ story.append(Paragraph("Transcript", styles['Heading2']))
+ current_speaker = None
+
+ for segment in transcript[:100]: # Limit segments for PDF
+ speaker = segment.get("speaker_label") or "Speaker"
+
+ if speaker != current_speaker:
+ story.append(Spacer(1, 6))
+ story.append(Paragraph(
+ f"{speaker} ({_format_time(segment['start_time'])})",
+ styles['Normal']
+ ))
+ current_speaker = speaker
+
+ story.append(Paragraph(segment['text'], styles['Normal']))
+
+ if len(transcript) > 100:
+ story.append(Spacer(1, 12))
+ story.append(Paragraph(
+ f"[... {len(transcript) - 100} more segments not shown in PDF]",
+ styles['Normal']
+ ))
+
+ # Build PDF
+ doc.build(story)
+ buffer.seek(0)
+
+ filename = f"meeting-{meeting['conference_id']}-{datetime.utcnow().strftime('%Y%m%d')}.pdf"
+
+ return StreamingResponse(
+ buffer,
+ media_type="application/pdf",
+ headers={
+ "Content-Disposition": f'attachment; filename="{filename}"'
+ }
+ )
+
+
+def _format_time(seconds: float) -> str:
+ """Format seconds as HH:MM:SS or MM:SS."""
+ total_seconds = int(seconds)
+ hours = total_seconds // 3600
+ minutes = (total_seconds % 3600) // 60
+ secs = total_seconds % 60
+
+ if hours > 0:
+ return f"{hours}:{minutes:02d}:{secs:02d}"
+ return f"{minutes}:{secs:02d}"
diff --git a/deploy/meeting-intelligence/api/app/routes/meetings.py b/deploy/meeting-intelligence/api/app/routes/meetings.py
new file mode 100644
index 0000000..ba0327b
--- /dev/null
+++ b/deploy/meeting-intelligence/api/app/routes/meetings.py
@@ -0,0 +1,112 @@
+"""
+Meeting management routes.
+"""
+
+from typing import Optional, List
+
+from fastapi import APIRouter, HTTPException, Request, Query
+from pydantic import BaseModel
+
+import structlog
+
+log = structlog.get_logger()
+
+router = APIRouter()
+
+
+class MeetingResponse(BaseModel):
+ id: str
+ conference_id: str
+ conference_name: Optional[str]
+ title: Optional[str]
+ started_at: Optional[str]
+ ended_at: Optional[str]
+ duration_seconds: Optional[int]
+ status: str
+ created_at: str
+ segment_count: Optional[int] = None
+ participant_count: Optional[int] = None
+ has_summary: Optional[bool] = None
+
+
+class MeetingListResponse(BaseModel):
+ meetings: List[MeetingResponse]
+ total: int
+ limit: int
+ offset: int
+
+
+@router.get("", response_model=MeetingListResponse)
+async def list_meetings(
+ request: Request,
+ limit: int = Query(default=50, le=100),
+ offset: int = Query(default=0, ge=0),
+ status: Optional[str] = Query(default=None)
+):
+ """List all meetings with pagination."""
+ db = request.app.state.db
+
+ meetings = await db.list_meetings(limit=limit, offset=offset, status=status)
+
+ return MeetingListResponse(
+ meetings=[
+ MeetingResponse(
+ id=str(m["id"]),
+ conference_id=m["conference_id"],
+ conference_name=m.get("conference_name"),
+ title=m.get("title"),
+ started_at=m["started_at"].isoformat() if m.get("started_at") else None,
+ ended_at=m["ended_at"].isoformat() if m.get("ended_at") else None,
+ duration_seconds=m.get("duration_seconds"),
+ status=m["status"],
+ created_at=m["created_at"].isoformat()
+ )
+ for m in meetings
+ ],
+ total=len(meetings), # TODO: Add total count query
+ limit=limit,
+ offset=offset
+ )
+
+
+@router.get("/{meeting_id}", response_model=MeetingResponse)
+async def get_meeting(request: Request, meeting_id: str):
+ """Get meeting details."""
+ db = request.app.state.db
+
+ meeting = await db.get_meeting(meeting_id)
+
+ if not meeting:
+ raise HTTPException(status_code=404, detail="Meeting not found")
+
+ return MeetingResponse(
+ id=str(meeting["id"]),
+ conference_id=meeting["conference_id"],
+ conference_name=meeting.get("conference_name"),
+ title=meeting.get("title"),
+ started_at=meeting["started_at"].isoformat() if meeting.get("started_at") else None,
+ ended_at=meeting["ended_at"].isoformat() if meeting.get("ended_at") else None,
+ duration_seconds=meeting.get("duration_seconds"),
+ status=meeting["status"],
+ created_at=meeting["created_at"].isoformat(),
+ segment_count=meeting.get("segment_count"),
+ participant_count=meeting.get("participant_count"),
+ has_summary=meeting.get("summary_id") is not None
+ )
+
+
+@router.delete("/{meeting_id}")
+async def delete_meeting(request: Request, meeting_id: str):
+ """Delete a meeting and all associated data."""
+ db = request.app.state.db
+
+ meeting = await db.get_meeting(meeting_id)
+
+ if not meeting:
+ raise HTTPException(status_code=404, detail="Meeting not found")
+
+ # TODO: Implement cascade delete
+ # For now, just mark as deleted
+ await db.update_meeting(meeting_id, status="deleted")
+
+ return {"status": "deleted", "meeting_id": meeting_id}
diff --git a/deploy/meeting-intelligence/api/app/routes/search.py b/deploy/meeting-intelligence/api/app/routes/search.py
new file mode 100644
index 0000000..6f209de
--- /dev/null
+++ b/deploy/meeting-intelligence/api/app/routes/search.py
@@ -0,0 +1,173 @@
+"""
+Search routes for Meeting Intelligence.
+"""
+
+from typing import Optional, List
+
+from fastapi import APIRouter, HTTPException, Request, Query
+from pydantic import BaseModel
+from sentence_transformers import SentenceTransformer
+
+from ..config import settings
+
+import structlog
+
+log = structlog.get_logger()
+
+router = APIRouter()
+
+# Lazy-load embedding model
+_embedding_model = None
+
+
+def get_embedding_model():
+ """Get or initialize the embedding model."""
+ global _embedding_model
+ if _embedding_model is None:
+ log.info("Loading embedding model...", model=settings.embedding_model)
+ _embedding_model = SentenceTransformer(settings.embedding_model)
+ log.info("Embedding model loaded")
+ return _embedding_model
+
+
+class SearchResult(BaseModel):
+ meeting_id: str
+ meeting_title: Optional[str]
+ text: str
+ start_time: Optional[float]
+ speaker_label: Optional[str]
+ score: float
+ search_type: str
+
+
+class SearchResponse(BaseModel):
+ query: str
+ results: List[SearchResult]
+ total: int
+ search_type: str
+
+
+class SearchRequest(BaseModel):
+ query: str
+ meeting_id: Optional[str] = None
+ search_type: str = "combined" # "text", "semantic", "combined"
+ limit: int = 20
+
+
+@router.post("", response_model=SearchResponse)
+async def search_transcripts(request: Request, body: SearchRequest):
+ """Search across meeting transcripts.
+
+ Search types:
+ - text: Full-text search using PostgreSQL ts_vector
+ - semantic: Semantic search using vector embeddings
+ - combined: Both text and semantic search, merged results
+ """
+ db = request.app.state.db
+
+ if not body.query or len(body.query.strip()) < 2:
+ raise HTTPException(
+ status_code=400,
+ detail="Query must be at least 2 characters"
+ )
+
+ results = []
+
+ # Full-text search
+ if body.search_type in ["text", "combined"]:
+ text_results = await db.fulltext_search(
+ query=body.query,
+ meeting_id=body.meeting_id,
+ limit=body.limit
+ )
+
+ for r in text_results:
+ results.append(SearchResult(
+ meeting_id=str(r["meeting_id"]),
+ meeting_title=r.get("meeting_title"),
+ text=r["text"],
+ start_time=r.get("start_time"),
+ speaker_label=r.get("speaker_label"),
+ score=float(r["rank"]),
+ search_type="text"
+ ))
+
+ # Semantic search
+ if body.search_type in ["semantic", "combined"]:
+ try:
+ model = get_embedding_model()
+ query_embedding = model.encode(body.query).tolist()
+
+ semantic_results = await db.semantic_search(
+ embedding=query_embedding,
+ meeting_id=body.meeting_id,
+ threshold=0.6,
+ limit=body.limit
+ )
+
+ for r in semantic_results:
+ results.append(SearchResult(
+ meeting_id=str(r["meeting_id"]),
+ meeting_title=r.get("meeting_title"),
+ text=r["chunk_text"],
+ start_time=r.get("start_time"),
+ speaker_label=r.get("speaker_label"),
+ score=float(r["similarity"]),
+ search_type="semantic"
+ ))
+
+ except Exception as e:
+ log.error("Semantic search failed", error=str(e))
+ if body.search_type == "semantic":
+ raise HTTPException(
+ status_code=500,
+ detail=f"Semantic search failed: {str(e)}"
+ )
+
+ # Deduplicate and sort by score
+ seen = set()
+ unique_results = []
+ for r in sorted(results, key=lambda x: x.score, reverse=True):
+ key = (r.meeting_id, r.text[:100])
+ if key not in seen:
+ seen.add(key)
+ unique_results.append(r)
+
+ return SearchResponse(
+ query=body.query,
+ results=unique_results[:body.limit],
+ total=len(unique_results),
+ search_type=body.search_type
+ )
+
+
+@router.get("/suggest")
+async def search_suggestions(
+ request: Request,
+ q: str = Query(..., min_length=2)
+):
+ """Get search suggestions based on partial query."""
+ db = request.app.state.db
+
+ # Simple prefix search on common terms
+ results = await db.fulltext_search(query=q, limit=5)
+
+ # Extract unique phrases
+ suggestions = []
+ for r in results:
+ # Get surrounding context
+ text = r["text"]
+ words = text.split()
+
+ # Find matching words and get context
+ for i, word in enumerate(words):
+ if q.lower() in word.lower():
+ start = max(0, i - 2)
+ end = min(len(words), i + 3)
+ phrase = " ".join(words[start:end])
+ if phrase not in suggestions:
+ suggestions.append(phrase)
+ if len(suggestions) >= 5:
+ break
+
+ return {"suggestions": suggestions}
diff --git a/deploy/meeting-intelligence/api/app/routes/summaries.py b/deploy/meeting-intelligence/api/app/routes/summaries.py
new file mode 100644
index 0000000..a464a1f
--- /dev/null
+++ b/deploy/meeting-intelligence/api/app/routes/summaries.py
@@ -0,0 +1,251 @@
+"""
+AI Summary routes.
+"""
+
+import json
+from typing import Optional, List
+
+import httpx
+from fastapi import APIRouter, HTTPException, Request, BackgroundTasks
+from pydantic import BaseModel
+
+from ..config import settings
+
+import structlog
+
+log = structlog.get_logger()
+
+router = APIRouter()
+
+
+class ActionItem(BaseModel):
+ task: str
+ assignee: Optional[str] = None
+ due_date: Optional[str] = None
+ completed: bool = False
+
+
+class Topic(BaseModel):
+ topic: str
+ duration_seconds: Optional[float] = None
+ relevance_score: Optional[float] = None
+
+
+class SummaryResponse(BaseModel):
+ meeting_id: str
+ summary_text: str
+ key_points: List[str]
+ action_items: List[ActionItem]
+ decisions: List[str]
+ topics: List[Topic]
+ sentiment: Optional[str]
+ model_used: str
+ generated_at: str
+
+
+class GenerateSummaryRequest(BaseModel):
+ force_regenerate: bool = False
+
+
+# Summarization prompt template
+SUMMARY_PROMPT = """You are analyzing a meeting transcript. Your task is to extract key information and provide a structured summary.
+
+## Meeting Transcript:
+{transcript}
+
+## Instructions:
+Analyze the transcript and extract the following information. Be concise and accurate.
+
+Respond ONLY with a valid JSON object in this exact format (no markdown, no extra text):
+{{
+ "summary": "A 2-3 sentence overview of what was discussed in the meeting",
+ "key_points": ["Point 1", "Point 2", "Point 3"],
+ "action_items": [
+ {{"task": "Description of task", "assignee": "Person name or null", "due_date": "Date or null"}}
+ ],
+ "decisions": ["Decision 1", "Decision 2"],
+ "topics": [
+ {{"topic": "Topic name", "relevance_score": 0.9}}
+ ],
+ "sentiment": "positive" or "neutral" or "negative" or "mixed"
+}}
+
+Remember:
+- key_points: 3-5 most important points discussed
+- action_items: Tasks that need to be done, with assignees if mentioned
+- decisions: Any decisions or conclusions reached
+- topics: Main themes discussed with relevance scores (0-1)
+- sentiment: Overall tone of the meeting
+"""
+
+
+@router.get("/{meeting_id}/summary", response_model=SummaryResponse)
+async def get_summary(request: Request, meeting_id: str):
+ """Get AI-generated summary for a meeting."""
+ db = request.app.state.db
+
+ # Verify meeting exists
+ meeting = await db.get_meeting(meeting_id)
+ if not meeting:
+ raise HTTPException(status_code=404, detail="Meeting not found")
+
+ summary = await db.get_summary(meeting_id)
+
+ if not summary:
+ raise HTTPException(
+ status_code=404,
+ detail="No summary available. Use POST to generate one."
+ )
+
+ return SummaryResponse(
+ meeting_id=meeting_id,
+ summary_text=summary["summary_text"],
+ key_points=summary["key_points"] or [],
+ action_items=[
+ ActionItem(**item) for item in (summary["action_items"] or [])
+ ],
+ decisions=summary["decisions"] or [],
+ topics=[
+ Topic(**topic) for topic in (summary["topics"] or [])
+ ],
+ sentiment=summary.get("sentiment"),
+ model_used=summary["model_used"],
+ generated_at=summary["generated_at"].isoformat()
+ )
+
+
+@router.post("/{meeting_id}/summary", response_model=SummaryResponse)
+async def generate_summary(
+ request: Request,
+ meeting_id: str,
+ body: GenerateSummaryRequest,
+ background_tasks: BackgroundTasks
+):
+ """Generate AI summary for a meeting."""
+ db = request.app.state.db
+
+ # Verify meeting exists
+ meeting = await db.get_meeting(meeting_id)
+ if not meeting:
+ raise HTTPException(status_code=404, detail="Meeting not found")
+
+ # Check if summary already exists
+ if not body.force_regenerate:
+ existing = await db.get_summary(meeting_id)
+ if existing:
+ raise HTTPException(
+ status_code=409,
+ detail="Summary already exists. Set force_regenerate=true to regenerate."
+ )
+
+ # Get transcript
+ segments = await db.get_transcript(meeting_id)
+ if not segments:
+ raise HTTPException(
+ status_code=400,
+ detail="No transcript available for summarization"
+ )
+
+ # Format transcript for LLM
+ transcript_text = _format_transcript(segments)
+
+ # Generate summary using Ollama
+ summary_data = await _generate_summary_with_ollama(transcript_text)
+
+ # Save summary
+ await db.save_summary(
+ meeting_id=meeting_id,
+ summary_text=summary_data["summary"],
+ key_points=summary_data["key_points"],
+ action_items=summary_data["action_items"],
+ decisions=summary_data["decisions"],
+ topics=summary_data["topics"],
+ sentiment=summary_data["sentiment"],
+ model_used=settings.ollama_model
+ )
+
+ # Update meeting status
+ await db.update_meeting(meeting_id, status="ready")
+
+ # Get the saved summary
+ summary = await db.get_summary(meeting_id)
+
+ return SummaryResponse(
+ meeting_id=meeting_id,
+ summary_text=summary["summary_text"],
+ key_points=summary["key_points"] or [],
+ action_items=[
+ ActionItem(**item) for item in (summary["action_items"] or [])
+ ],
+ decisions=summary["decisions"] or [],
+ topics=[
+ Topic(**topic) for topic in (summary["topics"] or [])
+ ],
+ sentiment=summary.get("sentiment"),
+ model_used=summary["model_used"],
+ generated_at=summary["generated_at"].isoformat()
+ )
+
+
+def _format_transcript(segments: list) -> str:
+ """Format transcript segments for LLM processing."""
+ lines = []
+ current_speaker = None
+
+ for s in segments:
+ speaker = s.get("speaker_label") or "Speaker"
+
+ if speaker != current_speaker:
+ lines.append(f"\n[{speaker}]")
+ current_speaker = speaker
+
+ lines.append(s["text"])
+
+ return "\n".join(lines)
+
+
+async def _generate_summary_with_ollama(transcript: str) -> dict:
+ """Generate summary using Ollama."""
+ prompt = SUMMARY_PROMPT.format(transcript=transcript[:15000]) # Limit context
+
+ async with httpx.AsyncClient(timeout=120.0) as client:
+ try:
+ response = await client.post(
+ f"{settings.ollama_url}/api/generate",
+ json={
+ "model": settings.ollama_model,
+ "prompt": prompt,
+ "stream": False,
+ "format": "json"
+ }
+ )
+ response.raise_for_status()
+
+ result = response.json()
+ response_text = result.get("response", "")
+
+ # Parse JSON from response
+ summary_data = json.loads(response_text)
+
+ # Validate required fields
+ return {
+ "summary": summary_data.get("summary", "No summary generated"),
+ "key_points": summary_data.get("key_points", []),
+ "action_items": summary_data.get("action_items", []),
+ "decisions": summary_data.get("decisions", []),
+ "topics": summary_data.get("topics", []),
+ "sentiment": summary_data.get("sentiment", "neutral")
+ }
+
+ except httpx.HTTPError as e:
+ log.error("Ollama request failed", error=str(e))
+ raise HTTPException(
+ status_code=503,
+ detail=f"AI service unavailable: {str(e)}"
+ )
+ except json.JSONDecodeError as e:
+ log.error("Failed to parse Ollama response", error=str(e))
+ raise HTTPException(
+ status_code=500,
+ detail="Failed to parse AI response"
+ )
diff --git a/deploy/meeting-intelligence/api/app/routes/transcripts.py b/deploy/meeting-intelligence/api/app/routes/transcripts.py
new file mode 100644
index 0000000..a47bf89
--- /dev/null
+++ b/deploy/meeting-intelligence/api/app/routes/transcripts.py
@@ -0,0 +1,161 @@
+"""
+Transcript routes.
+"""
+
+from typing import Optional, List
+
+from fastapi import APIRouter, HTTPException, Request, Query
+from pydantic import BaseModel
+
+import structlog
+
+log = structlog.get_logger()
+
+router = APIRouter()
+
+
+class TranscriptSegment(BaseModel):
+ id: int
+ segment_index: int
+ start_time: float
+ end_time: float
+ speaker_id: Optional[str]
+ speaker_name: Optional[str]
+ speaker_label: Optional[str]
+ text: str
+ confidence: Optional[float]
+ language: Optional[str]
+
+
+class TranscriptResponse(BaseModel):
+ meeting_id: str
+ segments: List[TranscriptSegment]
+ total_segments: int
+ duration: Optional[float]
+
+
+class SpeakerStats(BaseModel):
+ speaker_id: str
+ speaker_label: Optional[str]
+ segment_count: int
+ speaking_time: float
+ character_count: int
+
+
+class SpeakersResponse(BaseModel):
+ meeting_id: str
+ speakers: List[SpeakerStats]
+
+
+@router.get("/{meeting_id}/transcript", response_model=TranscriptResponse)
+async def get_transcript(
+ request: Request,
+ meeting_id: str,
+ speaker: Optional[str] = Query(default=None, description="Filter by speaker ID")
+):
+ """Get full transcript for a meeting."""
+ db = request.app.state.db
+
+ # Verify meeting exists
+ meeting = await db.get_meeting(meeting_id)
+ if not meeting:
+ raise HTTPException(status_code=404, detail="Meeting not found")
+
+ segments = await db.get_transcript(meeting_id, speaker_filter=speaker)
+
+ if not segments:
+ raise HTTPException(
+ status_code=404,
+ detail="No transcript available for this meeting"
+ )
+
+ # Calculate duration from last segment
+ duration = segments[-1]["end_time"] if segments else None
+
+ return TranscriptResponse(
+ meeting_id=meeting_id,
+ segments=[
+ TranscriptSegment(
+ id=s["id"],
+ segment_index=s["segment_index"],
+ start_time=s["start_time"],
+ end_time=s["end_time"],
+ speaker_id=s.get("speaker_id"),
+ speaker_name=s.get("speaker_name"),
+ speaker_label=s.get("speaker_label"),
+ text=s["text"],
+ confidence=s.get("confidence"),
+ language=s.get("language")
+ )
+ for s in segments
+ ],
+ total_segments=len(segments),
+ duration=duration
+ )
+
+
+@router.get("/{meeting_id}/speakers", response_model=SpeakersResponse)
+async def get_speakers(request: Request, meeting_id: str):
+ """Get speaker statistics for a meeting."""
+ db = request.app.state.db
+
+ # Verify meeting exists
+ meeting = await db.get_meeting(meeting_id)
+ if not meeting:
+ raise HTTPException(status_code=404, detail="Meeting not found")
+
+ speakers = await db.get_speakers(meeting_id)
+
+ return SpeakersResponse(
+ meeting_id=meeting_id,
+ speakers=[
+ SpeakerStats(
+ speaker_id=s["speaker_id"],
+ speaker_label=s.get("speaker_label"),
+ segment_count=s["segment_count"],
+ speaking_time=float(s["speaking_time"] or 0),
+ character_count=s["character_count"] or 0
+ )
+ for s in speakers
+ ]
+ )
+
+
+@router.get("/{meeting_id}/transcript/text")
+async def get_transcript_text(request: Request, meeting_id: str):
+ """Get transcript as plain text."""
+ db = request.app.state.db
+
+ # Verify meeting exists
+ meeting = await db.get_meeting(meeting_id)
+ if not meeting:
+ raise HTTPException(status_code=404, detail="Meeting not found")
+
+ segments = await db.get_transcript(meeting_id)
+
+ if not segments:
+ raise HTTPException(
+ status_code=404,
+ detail="No transcript available for this meeting"
+ )
+
+ # Format as plain text
+ lines = []
+ current_speaker = None
+
+ for s in segments:
+ speaker = s.get("speaker_label") or "Unknown"
+
+ if speaker != current_speaker:
+ lines.append(f"\n{speaker}:")
+ current_speaker = speaker
+
+ lines.append(f" {s['text']}")
+
+ text = "\n".join(lines)
+
+ return {
+ "meeting_id": meeting_id,
+ "text": text,
+ "format": "plain"
+ }
diff --git a/deploy/meeting-intelligence/api/app/routes/webhooks.py b/deploy/meeting-intelligence/api/app/routes/webhooks.py
new file mode 100644
index 0000000..4d8cedb
--- /dev/null
+++ b/deploy/meeting-intelligence/api/app/routes/webhooks.py
@@ -0,0 +1,139 @@
+"""
+Webhook routes for Jibri recording callbacks.
+"""
+
+from datetime import datetime
+from typing import Optional
+
+import httpx
+from fastapi import APIRouter, HTTPException, Request, BackgroundTasks
+from pydantic import BaseModel
+
+from ..config import settings
+
+import structlog
+
+log = structlog.get_logger()
+
+router = APIRouter()
+
+
+class RecordingCompletePayload(BaseModel):
+ event_type: str
+ conference_id: str
+ recording_path: str
+ recording_dir: Optional[str] = None
+ file_size_bytes: Optional[int] = None
+ completed_at: Optional[str] = None
+ metadata: Optional[dict] = None
+
+
+class WebhookResponse(BaseModel):
+ status: str
+ meeting_id: str
+ message: str
+
+
+@router.post("/recording-complete", response_model=WebhookResponse)
+async def recording_complete(
+ request: Request,
+ payload: RecordingCompletePayload,
+ background_tasks: BackgroundTasks
+):
+ """
+ Webhook called by Jibri when a recording completes.
+
+ This triggers the processing pipeline:
+ 1. Create meeting record
+ 2. Queue transcription job
+ 3. (Later) Generate summary
+ """
+ db = request.app.state.db
+
+ log.info(
+ "Recording complete webhook received",
+ conference_id=payload.conference_id,
+ recording_path=payload.recording_path
+ )
+
+ # Save webhook event for audit
+ await db.save_webhook_event(
+ event_type=payload.event_type,
+ payload=payload.model_dump()
+ )
+
+ # Create meeting record
+ meeting_id = await db.create_meeting(
+ conference_id=payload.conference_id,
+ conference_name=payload.conference_id, # Use conference_id as name for now
+ title=f"Meeting - {payload.conference_id}",
+ recording_path=payload.recording_path,
+ started_at=datetime.utcnow(), # Will be updated from recording metadata
+ metadata=payload.metadata or {}
+ )
+
+ log.info("Meeting record created", meeting_id=meeting_id)
+
+ # Update meeting status
+ await db.update_meeting(meeting_id, status="extracting_audio")
+
+ # Queue transcription job
+ job_id = await db.create_job(
+ meeting_id=meeting_id,
+ job_type="transcribe",
+ priority=5,
+ result={
+ "video_path": payload.recording_path,
+ "enable_diarization": True
+ }
+ )
+
+ log.info("Transcription job queued", job_id=job_id, meeting_id=meeting_id)
+
+ # Trigger transcription service asynchronously
+ background_tasks.add_task(
+ _notify_transcriber,
+ meeting_id,
+ payload.recording_path
+ )
+
+ return WebhookResponse(
+ status="accepted",
+ meeting_id=meeting_id,
+ message="Recording queued for processing"
+ )
+
+
+async def _notify_transcriber(meeting_id: str, recording_path: str):
+ """Notify the transcription service to start processing."""
+ try:
+ async with httpx.AsyncClient(timeout=30.0) as client:
+ response = await client.post(
+ f"{settings.transcriber_url}/transcribe",
+ json={
+ "meeting_id": meeting_id,
+ "video_path": recording_path,
+ "enable_diarization": True
+ }
+ )
+ response.raise_for_status()
+ log.info(
+ "Transcriber notified",
+ meeting_id=meeting_id,
+ response=response.json()
+ )
+ except Exception as e:
+ log.error(
+ "Failed to notify transcriber",
+ meeting_id=meeting_id,
+ error=str(e)
+ )
+ # Job is in database, transcriber will pick it up on next poll
+
+
+@router.post("/test")
+async def test_webhook(request: Request):
+ """Test endpoint for webhook connectivity."""
+ body = await request.json()
+ log.info("Test webhook received", body=body)
+ return {"status": "ok", "received": body}
diff --git a/deploy/meeting-intelligence/api/requirements.txt b/deploy/meeting-intelligence/api/requirements.txt
new file mode 100644
index 0000000..d840225
--- /dev/null
+++ b/deploy/meeting-intelligence/api/requirements.txt
@@ -0,0 +1,37 @@
+# Meeting Intelligence API Dependencies
+
+# Web framework
+fastapi==0.109.2
+uvicorn[standard]==0.27.1
+python-multipart==0.0.9
+
+# Database
+asyncpg==0.29.0
+sqlalchemy[asyncio]==2.0.25
+psycopg2-binary==2.9.9
+
+# Redis
+redis==5.0.1
+
+# HTTP client (for Ollama)
+httpx==0.26.0
+aiohttp==3.9.3
+
+# Validation
+pydantic==2.6.1
+pydantic-settings==2.1.0
+
+# Sentence embeddings (for semantic search)
+sentence-transformers==2.3.1
+numpy==1.26.4
+
+# PDF export
+reportlab==4.0.8
+markdown2==2.4.12
+
+# Utilities
+python-dotenv==1.0.1
+tenacity==8.2.3
+
+# Logging
+structlog==24.1.0
diff --git a/deploy/meeting-intelligence/docker-compose.yml b/deploy/meeting-intelligence/docker-compose.yml
new file mode 100644
index 0000000..cf98bec
--- /dev/null
+++ b/deploy/meeting-intelligence/docker-compose.yml
@@ -0,0 +1,186 @@
+# Meeting Intelligence System - Full Docker Compose
+# Deploy on Netcup RS 8000 at /opt/meeting-intelligence/
+#
+# Components:
+# - Jibri (recording)
+# - Transcriber (whisper.cpp + diarization)
+# - Meeting Intelligence API
+# - PostgreSQL (storage)
+# - Redis (job queue)
+
+services:
+ # ============================================================
+ # PostgreSQL Database
+ # ============================================================
+ postgres:
+ image: pgvector/pgvector:pg16
+ container_name: meeting-intelligence-db
+ restart: unless-stopped
+ environment:
+ POSTGRES_USER: meeting_intelligence
+ POSTGRES_PASSWORD: ${POSTGRES_PASSWORD:-changeme}
+ POSTGRES_DB: meeting_intelligence
+ volumes:
+ - postgres_data:/var/lib/postgresql/data
+ - ./postgres/init.sql:/docker-entrypoint-initdb.d/init.sql:ro
+ healthcheck:
+ test: ["CMD-SHELL", "pg_isready -U meeting_intelligence"]
+ interval: 10s
+ timeout: 5s
+ retries: 5
+ networks:
+ - meeting-intelligence
+
+ # ============================================================
+ # Redis Job Queue
+ # ============================================================
+ redis:
+ image: redis:7-alpine
+ container_name: meeting-intelligence-redis
+ restart: unless-stopped
+ command: redis-server --appendonly yes
+ volumes:
+ - redis_data:/data
+ healthcheck:
+ test: ["CMD", "redis-cli", "ping"]
+ interval: 10s
+ timeout: 5s
+ retries: 5
+ networks:
+ - meeting-intelligence
+
+ # ============================================================
+ # Transcription Service (whisper.cpp + diarization)
+ # ============================================================
+ transcriber:
+ build:
+ context: ./transcriber
+ dockerfile: Dockerfile
+ container_name: meeting-intelligence-transcriber
+ restart: unless-stopped
+ environment:
+ REDIS_URL: redis://redis:6379
+ POSTGRES_URL: postgresql://meeting_intelligence:${POSTGRES_PASSWORD:-changeme}@postgres:5432/meeting_intelligence
+ WHISPER_MODEL: small
+ WHISPER_THREADS: 8
+ NUM_WORKERS: 4
+ volumes:
+ - recordings:/recordings:ro
+ - audio_processed:/audio
+ - whisper_models:/models
+ depends_on:
+ postgres:
+ condition: service_healthy
+ redis:
+ condition: service_healthy
+ deploy:
+ resources:
+ limits:
+ cpus: '12'
+ memory: 16G
+ networks:
+ - meeting-intelligence
+
+ # ============================================================
+ # Meeting Intelligence API
+ # ============================================================
+ api:
+ build:
+ context: ./api
+ dockerfile: Dockerfile
+ container_name: meeting-intelligence-api
+ restart: unless-stopped
+ environment:
+ REDIS_URL: redis://redis:6379
+ POSTGRES_URL: postgresql://meeting_intelligence:${POSTGRES_PASSWORD:-changeme}@postgres:5432/meeting_intelligence
+ OLLAMA_URL: http://host.docker.internal:11434
+ RECORDINGS_PATH: /recordings
+ SECRET_KEY: ${API_SECRET_KEY:-changeme}
+ volumes:
+ - recordings:/recordings
+ depends_on:
+ postgres:
+ condition: service_healthy
+ redis:
+ condition: service_healthy
+ labels:
+ - "traefik.enable=true"
+ - "traefik.http.routers.meeting-intelligence.rule=Host(`meet.jeffemmett.com`) && PathPrefix(`/api/intelligence`)"
+ - "traefik.http.services.meeting-intelligence.loadbalancer.server.port=8000"
+ - "traefik.http.routers.meeting-intelligence.middlewares=strip-intelligence-prefix"
+ - "traefik.http.middlewares.strip-intelligence-prefix.stripprefix.prefixes=/api/intelligence"
+ networks:
+ - meeting-intelligence
+ - traefik-public
+
+ # ============================================================
+ # Jibri Recording Service
+ # ============================================================
+ jibri:
+ image: jitsi/jibri:stable-9584
+ container_name: meeting-intelligence-jibri
+ restart: unless-stopped
+ privileged: true
+ environment:
+ # XMPP Connection
+ XMPP_SERVER: ${XMPP_SERVER:-meet.jeffemmett.com}
+ XMPP_DOMAIN: ${XMPP_DOMAIN:-meet.jeffemmett.com}
+ XMPP_AUTH_DOMAIN: auth.${XMPP_DOMAIN:-meet.jeffemmett.com}
+ XMPP_INTERNAL_MUC_DOMAIN: internal.auth.${XMPP_DOMAIN:-meet.jeffemmett.com}
+ XMPP_RECORDER_DOMAIN: recorder.${XMPP_DOMAIN:-meet.jeffemmett.com}
+ XMPP_MUC_DOMAIN: muc.${XMPP_DOMAIN:-meet.jeffemmett.com}
+
+ # Jibri Settings
+ JIBRI_BREWERY_MUC: JibriBrewery
+ JIBRI_PENDING_TIMEOUT: 90
+ JIBRI_RECORDING_DIR: /recordings
+ JIBRI_FINALIZE_RECORDING_SCRIPT_PATH: /config/finalize.sh
+ JIBRI_XMPP_USER: jibri
+ JIBRI_XMPP_PASSWORD: ${JIBRI_XMPP_PASSWORD:-changeme}
+ JIBRI_RECORDER_USER: recorder
+ JIBRI_RECORDER_PASSWORD: ${JIBRI_RECORDER_PASSWORD:-changeme}
+
+ # Display Settings
+ DISPLAY: ":0"
+ CHROMIUM_FLAGS: --use-fake-ui-for-media-stream,--start-maximized,--kiosk,--enabled,--disable-infobars,--autoplay-policy=no-user-gesture-required
+
+ # Public URL
+ PUBLIC_URL: https://${XMPP_DOMAIN:-meet.jeffemmett.com}
+
+ # Timezone
+ TZ: UTC
+ volumes:
+ - recordings:/recordings
+ - ./jibri/config:/config
+ - /dev/shm:/dev/shm
+ cap_add:
+ - SYS_ADMIN
+ - NET_BIND_SERVICE
+ security_opt:
+ - seccomp:unconfined
+ shm_size: 2gb
+ networks:
+ - meeting-intelligence
+
+volumes:
+ postgres_data:
+ redis_data:
+ recordings:
+ driver: local
+ driver_opts:
+ type: none
+ o: bind
+ device: /opt/meetings/recordings
+ audio_processed:
+ driver: local
+ driver_opts:
+ type: none
+ o: bind
+ device: /opt/meetings/audio
+ whisper_models:
+
+networks:
+ meeting-intelligence:
+ driver: bridge
+ traefik-public:
+ external: true
diff --git a/deploy/meeting-intelligence/jibri/config/finalize.sh b/deploy/meeting-intelligence/jibri/config/finalize.sh
new file mode 100755
index 0000000..1a1f3b9
--- /dev/null
+++ b/deploy/meeting-intelligence/jibri/config/finalize.sh
@@ -0,0 +1,104 @@
+#!/bin/bash
+# Jibri Recording Finalize Script
+# Called when Jibri finishes recording a meeting
+#
+# Arguments:
+# $1 - Recording directory path (e.g., /recordings//)
+#
+# This script:
+# 1. Finds the recording file
+# 2. Notifies the Meeting Intelligence API to start processing
+
+set -e
+
+RECORDING_DIR="$1"
+API_URL="${MEETING_INTELLIGENCE_API:-http://api:8000}"
+LOG_FILE="/var/log/jibri/finalize.log"
+
+log() {
+ echo "[$(date -Iseconds)] $1" >> "$LOG_FILE"
+ echo "[$(date -Iseconds)] $1"
+}
+
+log "=== Finalize script started ==="
+log "Recording directory: $RECORDING_DIR"
+
+# Validate recording directory
+if [ -z "$RECORDING_DIR" ] || [ ! -d "$RECORDING_DIR" ]; then
+ log "ERROR: Invalid recording directory: $RECORDING_DIR"
+ exit 1
+fi
+
+# Find the recording file (MP4 or WebM)
+RECORDING_FILE=$(find "$RECORDING_DIR" -type f \( -name "*.mp4" -o -name "*.webm" \) | head -1)
+
+if [ -z "$RECORDING_FILE" ]; then
+ log "ERROR: No recording file found in $RECORDING_DIR"
+ exit 1
+fi
+
+log "Found recording file: $RECORDING_FILE"
+
+# Get file info
+FILE_SIZE=$(stat -c%s "$RECORDING_FILE" 2>/dev/null || echo "0")
+log "Recording file size: $FILE_SIZE bytes"
+
+# Extract conference info from path
+# Expected format: /recordings///recording.mp4
+CONFERENCE_ID=$(echo "$RECORDING_DIR" | awk -F'/' '{print $(NF-1)}')
+if [ -z "$CONFERENCE_ID" ]; then
+ CONFERENCE_ID=$(basename "$(dirname "$RECORDING_DIR")")
+fi
+
+# Look for metadata file (Jibri sometimes creates this)
+METADATA_FILE="$RECORDING_DIR/metadata.json"
+if [ -f "$METADATA_FILE" ]; then
+ log "Found metadata file: $METADATA_FILE"
+ METADATA=$(cat "$METADATA_FILE")
+else
+ METADATA="{}"
+fi
+
+# Prepare webhook payload
+PAYLOAD=$(cat <&1)
+
+HTTP_CODE=$(echo "$RESPONSE" | tail -1)
+BODY=$(echo "$RESPONSE" | head -n -1)
+
+if [ "$HTTP_CODE" = "200" ] || [ "$HTTP_CODE" = "201" ] || [ "$HTTP_CODE" = "202" ]; then
+ log "SUCCESS: Webhook accepted (HTTP $HTTP_CODE)"
+ log "Response: $BODY"
+else
+ log "WARNING: Webhook returned HTTP $HTTP_CODE"
+ log "Response: $BODY"
+
+ # Don't fail the script - the recording is still saved
+ # The API can be retried later
+fi
+
+# Optional: Clean up old recordings (keep last 30 days)
+# find /recordings -type f -mtime +30 -delete
+
+log "=== Finalize script completed ==="
+exit 0
diff --git a/deploy/meeting-intelligence/postgres/init.sql b/deploy/meeting-intelligence/postgres/init.sql
new file mode 100644
index 0000000..24556ca
--- /dev/null
+++ b/deploy/meeting-intelligence/postgres/init.sql
@@ -0,0 +1,310 @@
+-- Meeting Intelligence System - PostgreSQL Schema
+-- Uses pgvector extension for semantic search
+
+-- Enable required extensions
+CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
+CREATE EXTENSION IF NOT EXISTS "vector";
+
+-- ============================================================
+-- Meetings Table
+-- ============================================================
+CREATE TABLE meetings (
+ id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
+ conference_id VARCHAR(255) NOT NULL,
+ conference_name VARCHAR(255),
+ title VARCHAR(500),
+ started_at TIMESTAMP WITH TIME ZONE,
+ ended_at TIMESTAMP WITH TIME ZONE,
+ duration_seconds INTEGER,
+ recording_path VARCHAR(1000),
+ audio_path VARCHAR(1000),
+ status VARCHAR(50) DEFAULT 'recording',
+ -- Status: 'recording', 'extracting_audio', 'transcribing', 'diarizing', 'summarizing', 'ready', 'failed'
+ error_message TEXT,
+ metadata JSONB DEFAULT '{}',
+ created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
+ updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
+);
+
+CREATE INDEX idx_meetings_conference_id ON meetings(conference_id);
+CREATE INDEX idx_meetings_status ON meetings(status);
+CREATE INDEX idx_meetings_started_at ON meetings(started_at DESC);
+CREATE INDEX idx_meetings_created_at ON meetings(created_at DESC);
+
+-- ============================================================
+-- Meeting Participants
+-- ============================================================
+CREATE TABLE meeting_participants (
+ id SERIAL PRIMARY KEY,
+ meeting_id UUID NOT NULL REFERENCES meetings(id) ON DELETE CASCADE,
+ participant_id VARCHAR(255) NOT NULL,
+ display_name VARCHAR(255),
+ email VARCHAR(255),
+ joined_at TIMESTAMP WITH TIME ZONE,
+ left_at TIMESTAMP WITH TIME ZONE,
+ duration_seconds INTEGER,
+ is_moderator BOOLEAN DEFAULT FALSE,
+ metadata JSONB DEFAULT '{}',
+ created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
+);
+
+CREATE INDEX idx_participants_meeting_id ON meeting_participants(meeting_id);
+CREATE INDEX idx_participants_participant_id ON meeting_participants(participant_id);
+
+-- ============================================================
+-- Transcripts
+-- ============================================================
+CREATE TABLE transcripts (
+ id SERIAL PRIMARY KEY,
+ meeting_id UUID NOT NULL REFERENCES meetings(id) ON DELETE CASCADE,
+ segment_index INTEGER NOT NULL,
+ start_time FLOAT NOT NULL,
+ end_time FLOAT NOT NULL,
+ speaker_id VARCHAR(255),
+ speaker_name VARCHAR(255),
+ speaker_label VARCHAR(50), -- e.g., "Speaker 1", "Speaker 2"
+ text TEXT NOT NULL,
+ confidence FLOAT,
+ language VARCHAR(10) DEFAULT 'en',
+ word_timestamps JSONB, -- Array of {word, start, end, confidence}
+ metadata JSONB DEFAULT '{}',
+ created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
+);
+
+CREATE INDEX idx_transcripts_meeting_id ON transcripts(meeting_id);
+CREATE INDEX idx_transcripts_speaker_id ON transcripts(speaker_id);
+CREATE INDEX idx_transcripts_start_time ON transcripts(meeting_id, start_time);
+CREATE INDEX idx_transcripts_text_search ON transcripts USING gin(to_tsvector('english', text));
+
+-- ============================================================
+-- Transcript Embeddings (for semantic search)
+-- ============================================================
+CREATE TABLE transcript_embeddings (
+ id SERIAL PRIMARY KEY,
+ transcript_id INTEGER NOT NULL REFERENCES transcripts(id) ON DELETE CASCADE,
+ meeting_id UUID NOT NULL REFERENCES meetings(id) ON DELETE CASCADE,
+ embedding vector(384), -- all-MiniLM-L6-v2 dimensions
+ chunk_text TEXT, -- The text chunk this embedding represents
+ created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
+);
+
+CREATE INDEX idx_embeddings_transcript_id ON transcript_embeddings(transcript_id);
+CREATE INDEX idx_embeddings_meeting_id ON transcript_embeddings(meeting_id);
+CREATE INDEX idx_embeddings_vector ON transcript_embeddings USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
+
+-- ============================================================
+-- AI Summaries
+-- ============================================================
+CREATE TABLE summaries (
+ id SERIAL PRIMARY KEY,
+ meeting_id UUID NOT NULL REFERENCES meetings(id) ON DELETE CASCADE,
+ summary_text TEXT,
+ key_points JSONB, -- Array of key point strings
+ action_items JSONB, -- Array of {task, assignee, due_date, completed}
+ decisions JSONB, -- Array of decision strings
+ topics JSONB, -- Array of {topic, duration_seconds, relevance_score}
+ sentiment VARCHAR(50), -- 'positive', 'neutral', 'negative', 'mixed'
+ sentiment_scores JSONB, -- {positive: 0.7, neutral: 0.2, negative: 0.1}
+ participants_summary JSONB, -- {participant_id: {speaking_time, word_count, topics}}
+ model_used VARCHAR(100),
+ prompt_tokens INTEGER,
+ completion_tokens INTEGER,
+ generated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
+ metadata JSONB DEFAULT '{}'
+);
+
+CREATE INDEX idx_summaries_meeting_id ON summaries(meeting_id);
+CREATE INDEX idx_summaries_generated_at ON summaries(generated_at DESC);
+
+-- ============================================================
+-- Processing Jobs Queue
+-- ============================================================
+CREATE TABLE processing_jobs (
+ id SERIAL PRIMARY KEY,
+ meeting_id UUID NOT NULL REFERENCES meetings(id) ON DELETE CASCADE,
+ job_type VARCHAR(50) NOT NULL, -- 'extract_audio', 'transcribe', 'diarize', 'summarize', 'embed'
+ status VARCHAR(50) DEFAULT 'pending', -- 'pending', 'processing', 'completed', 'failed', 'cancelled'
+ priority INTEGER DEFAULT 5, -- 1 = highest, 10 = lowest
+ attempts INTEGER DEFAULT 0,
+ max_attempts INTEGER DEFAULT 3,
+ started_at TIMESTAMP WITH TIME ZONE,
+ completed_at TIMESTAMP WITH TIME ZONE,
+ error_message TEXT,
+ result JSONB,
+ worker_id VARCHAR(100),
+ created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
+ updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
+);
+
+CREATE INDEX idx_jobs_meeting_id ON processing_jobs(meeting_id);
+CREATE INDEX idx_jobs_status ON processing_jobs(status, priority, created_at);
+CREATE INDEX idx_jobs_type_status ON processing_jobs(job_type, status);
+
+-- ============================================================
+-- Search History (for analytics)
+-- ============================================================
+CREATE TABLE search_history (
+ id SERIAL PRIMARY KEY,
+ user_id VARCHAR(255),
+ query TEXT NOT NULL,
+ search_type VARCHAR(50), -- 'text', 'semantic', 'combined'
+ results_count INTEGER,
+ meeting_ids UUID[],
+ filters JSONB,
+ created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
+);
+
+CREATE INDEX idx_search_history_created_at ON search_history(created_at DESC);
+
+-- ============================================================
+-- Webhook Events (for Jibri callbacks)
+-- ============================================================
+CREATE TABLE webhook_events (
+ id SERIAL PRIMARY KEY,
+ event_type VARCHAR(100) NOT NULL,
+ payload JSONB NOT NULL,
+ processed BOOLEAN DEFAULT FALSE,
+ processed_at TIMESTAMP WITH TIME ZONE,
+ error_message TEXT,
+ created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
+);
+
+CREATE INDEX idx_webhooks_processed ON webhook_events(processed, created_at);
+
+-- ============================================================
+-- Functions
+-- ============================================================
+
+-- Update timestamp trigger
+CREATE OR REPLACE FUNCTION update_updated_at()
+RETURNS TRIGGER AS $$
+BEGIN
+ NEW.updated_at = NOW();
+ RETURN NEW;
+END;
+$$ LANGUAGE plpgsql;
+
+CREATE TRIGGER meetings_updated_at
+ BEFORE UPDATE ON meetings
+ FOR EACH ROW
+ EXECUTE FUNCTION update_updated_at();
+
+CREATE TRIGGER jobs_updated_at
+ BEFORE UPDATE ON processing_jobs
+ FOR EACH ROW
+ EXECUTE FUNCTION update_updated_at();
+
+-- Semantic search function
+CREATE OR REPLACE FUNCTION semantic_search(
+ query_embedding vector(384),
+ match_threshold FLOAT DEFAULT 0.7,
+ match_count INT DEFAULT 10,
+ meeting_filter UUID DEFAULT NULL
+)
+RETURNS TABLE (
+ transcript_id INT,
+ meeting_id UUID,
+ chunk_text TEXT,
+ similarity FLOAT
+) AS $$
+BEGIN
+ RETURN QUERY
+ SELECT
+ te.transcript_id,
+ te.meeting_id,
+ te.chunk_text,
+ 1 - (te.embedding <=> query_embedding) AS similarity
+ FROM transcript_embeddings te
+ WHERE
+ (meeting_filter IS NULL OR te.meeting_id = meeting_filter)
+ AND 1 - (te.embedding <=> query_embedding) > match_threshold
+ ORDER BY te.embedding <=> query_embedding
+ LIMIT match_count;
+END;
+$$ LANGUAGE plpgsql;
+
+-- Full-text search function
+CREATE OR REPLACE FUNCTION fulltext_search(
+ search_query TEXT,
+ meeting_filter UUID DEFAULT NULL,
+ match_count INT DEFAULT 50
+)
+RETURNS TABLE (
+ transcript_id INT,
+ meeting_id UUID,
+ text TEXT,
+ speaker_name VARCHAR,
+ start_time FLOAT,
+ rank FLOAT
+) AS $$
+BEGIN
+ RETURN QUERY
+ SELECT
+ t.id,
+ t.meeting_id,
+ t.text,
+ t.speaker_name,
+ t.start_time,
+ ts_rank(to_tsvector('english', t.text), plainto_tsquery('english', search_query)) AS rank
+ FROM transcripts t
+ WHERE
+ (meeting_filter IS NULL OR t.meeting_id = meeting_filter)
+ AND to_tsvector('english', t.text) @@ plainto_tsquery('english', search_query)
+ ORDER BY rank DESC
+ LIMIT match_count;
+END;
+$$ LANGUAGE plpgsql;
+
+-- ============================================================
+-- Views
+-- ============================================================
+
+-- Meeting overview with stats
+CREATE VIEW meeting_overview AS
+SELECT
+ m.id,
+ m.conference_id,
+ m.conference_name,
+ m.title,
+ m.started_at,
+ m.ended_at,
+ m.duration_seconds,
+ m.status,
+ m.recording_path,
+ COUNT(DISTINCT mp.id) AS participant_count,
+ COUNT(DISTINCT t.id) AS transcript_segment_count,
+ COALESCE(SUM(LENGTH(t.text)), 0) AS total_characters,
+ s.id IS NOT NULL AS has_summary,
+ m.created_at
+FROM meetings m
+LEFT JOIN meeting_participants mp ON m.id = mp.meeting_id
+LEFT JOIN transcripts t ON m.id = t.meeting_id
+LEFT JOIN summaries s ON m.id = s.meeting_id
+GROUP BY m.id, s.id;
+
+-- Speaker stats per meeting
+CREATE VIEW speaker_stats AS
+SELECT
+ t.meeting_id,
+ t.speaker_id,
+ t.speaker_name,
+ t.speaker_label,
+ COUNT(*) AS segment_count,
+ SUM(t.end_time - t.start_time) AS speaking_time_seconds,
+ SUM(LENGTH(t.text)) AS character_count,
+ SUM(array_length(regexp_split_to_array(t.text, '\s+'), 1)) AS word_count
+FROM transcripts t
+GROUP BY t.meeting_id, t.speaker_id, t.speaker_name, t.speaker_label;
+
+-- ============================================================
+-- Sample Data (for testing - remove in production)
+-- ============================================================
+
+-- INSERT INTO meetings (conference_id, conference_name, title, started_at, status)
+-- VALUES ('test-room-123', 'Test Room', 'Test Meeting', NOW() - INTERVAL '1 hour', 'ready');
+
+COMMENT ON TABLE meetings IS 'Stores meeting metadata and processing status';
+COMMENT ON TABLE transcripts IS 'Stores time-stamped transcript segments with speaker attribution';
+COMMENT ON TABLE summaries IS 'Stores AI-generated meeting summaries and extracted information';
+COMMENT ON TABLE transcript_embeddings IS 'Stores vector embeddings for semantic search';
+COMMENT ON TABLE processing_jobs IS 'Job queue for async processing tasks';
diff --git a/deploy/meeting-intelligence/transcriber/Dockerfile b/deploy/meeting-intelligence/transcriber/Dockerfile
new file mode 100644
index 0000000..eaabce3
--- /dev/null
+++ b/deploy/meeting-intelligence/transcriber/Dockerfile
@@ -0,0 +1,67 @@
+# Meeting Intelligence Transcription Service
+# Uses whisper.cpp for fast CPU-based transcription
+# Uses resemblyzer for speaker diarization
+
+FROM python:3.11-slim AS builder
+
+# Install build dependencies
+RUN apt-get update && apt-get install -y --no-install-recommends \
+ build-essential \
+ cmake \
+ git \
+ ffmpeg \
+ && rm -rf /var/lib/apt/lists/*
+
+# Build whisper.cpp
+WORKDIR /build
+RUN git clone https://github.com/ggerganov/whisper.cpp.git && \
+ cd whisper.cpp && \
+ cmake -B build -DWHISPER_BUILD_EXAMPLES=ON && \
+ cmake --build build --config Release -j$(nproc) && \
+ cp build/bin/whisper-cli /usr/local/bin/whisper && \
+ cp build/bin/whisper-server /usr/local/bin/whisper-server 2>/dev/null || true
+
+# Download whisper models
+WORKDIR /models
+RUN cd /build/whisper.cpp && \
+ bash models/download-ggml-model.sh small && \
+ mv models/ggml-small.bin /models/
+
+# Production image
+FROM python:3.11-slim
+
+# Install runtime dependencies
+RUN apt-get update && apt-get install -y --no-install-recommends \
+ ffmpeg \
+ libsndfile1 \
+ && rm -rf /var/lib/apt/lists/*
+
+# Copy whisper binary and models
+COPY --from=builder /usr/local/bin/whisper /usr/local/bin/whisper
+COPY --from=builder /models /models
+
+# Set up Python environment
+WORKDIR /app
+
+# Install Python dependencies
+COPY requirements.txt .
+RUN pip install --no-cache-dir -r requirements.txt
+
+# Copy application code
+COPY app/ ./app/
+
+# Create directories
+RUN mkdir -p /recordings /audio /logs
+
+# Environment variables
+ENV PYTHONUNBUFFERED=1
+ENV WHISPER_MODEL=/models/ggml-small.bin
+ENV WHISPER_THREADS=8
+
+# Health check
+HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
+ CMD curl -f http://localhost:8001/health || exit 1
+
+# Run the service
+EXPOSE 8001
+CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8001", "--workers", "1"]
diff --git a/deploy/meeting-intelligence/transcriber/app/__init__.py b/deploy/meeting-intelligence/transcriber/app/__init__.py
new file mode 100644
index 0000000..ab06c1d
--- /dev/null
+++ b/deploy/meeting-intelligence/transcriber/app/__init__.py
@@ -0,0 +1 @@
+# Meeting Intelligence Transcription Service
diff --git a/deploy/meeting-intelligence/transcriber/app/config.py b/deploy/meeting-intelligence/transcriber/app/config.py
new file mode 100644
index 0000000..84299d5
--- /dev/null
+++ b/deploy/meeting-intelligence/transcriber/app/config.py
@@ -0,0 +1,45 @@
+"""
+Configuration settings for the Transcription Service.
+"""
+
+import os
+from pydantic_settings import BaseSettings
+
+
+class Settings(BaseSettings):
+ """Application settings loaded from environment variables."""
+
+ # Redis configuration
+ redis_url: str = "redis://localhost:6379"
+
+ # PostgreSQL configuration
+ postgres_url: str = "postgresql://meeting_intelligence:changeme@localhost:5432/meeting_intelligence"
+
+ # Whisper configuration
+ whisper_model: str = "/models/ggml-small.bin"
+ whisper_threads: int = 8
+ whisper_language: str = "en"
+
+ # Worker configuration
+ num_workers: int = 4
+ job_timeout: int = 7200 # 2 hours in seconds
+
+ # Audio processing
+ audio_sample_rate: int = 16000
+ audio_channels: int = 1
+
+ # Diarization settings
+ min_speaker_duration: float = 0.5 # Minimum speaker segment in seconds
+ max_speakers: int = 10
+
+ # Paths
+ recordings_path: str = "/recordings"
+ audio_output_path: str = "/audio"
+ temp_path: str = "/tmp/transcriber"
+
+ class Config:
+ env_file = ".env"
+ env_file_encoding = "utf-8"
+
+
+settings = Settings()
diff --git a/deploy/meeting-intelligence/transcriber/app/database.py b/deploy/meeting-intelligence/transcriber/app/database.py
new file mode 100644
index 0000000..ad6d763
--- /dev/null
+++ b/deploy/meeting-intelligence/transcriber/app/database.py
@@ -0,0 +1,245 @@
+"""
+Database operations for the Transcription Service.
+"""
+
+import uuid
+from typing import Optional, List, Dict, Any
+
+import asyncpg
+import structlog
+
+log = structlog.get_logger()
+
+
+class Database:
+ """Database operations for transcription service."""
+
+ def __init__(self, connection_string: str):
+ self.connection_string = connection_string
+ self.pool: Optional[asyncpg.Pool] = None
+
+ async def connect(self):
+ """Establish database connection pool."""
+ log.info("Connecting to database...")
+ self.pool = await asyncpg.create_pool(
+ self.connection_string,
+ min_size=2,
+ max_size=10
+ )
+ log.info("Database connected")
+
+ async def disconnect(self):
+ """Close database connection pool."""
+ if self.pool:
+ await self.pool.close()
+ log.info("Database disconnected")
+
+ async def health_check(self):
+ """Check database connectivity."""
+ async with self.pool.acquire() as conn:
+ await conn.fetchval("SELECT 1")
+
+ async def create_transcription_job(
+ self,
+ meeting_id: str,
+ audio_path: Optional[str] = None,
+ video_path: Optional[str] = None,
+ enable_diarization: bool = True,
+ language: Optional[str] = None,
+ priority: int = 5
+ ) -> str:
+ """Create a new transcription job."""
+ job_id = str(uuid.uuid4())
+
+ async with self.pool.acquire() as conn:
+ await conn.execute("""
+ INSERT INTO processing_jobs (
+ id, meeting_id, job_type, status, priority,
+ result
+ )
+ VALUES ($1, $2::uuid, 'transcribe', 'pending', $3, $4)
+ """, job_id, meeting_id, priority, {
+ "audio_path": audio_path,
+ "video_path": video_path,
+ "enable_diarization": enable_diarization,
+ "language": language
+ })
+
+ log.info("Created transcription job", job_id=job_id, meeting_id=meeting_id)
+ return job_id
+
+ async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
+ """Get a job by ID."""
+ async with self.pool.acquire() as conn:
+ row = await conn.fetchrow("""
+ SELECT id, meeting_id, job_type, status, priority,
+ attempts, started_at, completed_at,
+ error_message, result, created_at
+ FROM processing_jobs
+ WHERE id = $1
+ """, job_id)
+
+ if row:
+ return dict(row)
+ return None
+
+ async def get_next_pending_job(self) -> Optional[Dict[str, Any]]:
+ """Get the next pending job and mark it as processing."""
+ async with self.pool.acquire() as conn:
+ # Use FOR UPDATE SKIP LOCKED to prevent race conditions
+ row = await conn.fetchrow("""
+ UPDATE processing_jobs
+ SET status = 'processing',
+ started_at = NOW(),
+ attempts = attempts + 1
+ WHERE id = (
+ SELECT id FROM processing_jobs
+ WHERE status = 'pending'
+ AND job_type = 'transcribe'
+ ORDER BY priority ASC, created_at ASC
+ FOR UPDATE SKIP LOCKED
+ LIMIT 1
+ )
+ RETURNING id, meeting_id, job_type, result
+ """)
+
+ if row:
+ result = dict(row)
+ # Merge result JSON into the dict
+ if result.get("result"):
+ result.update(result["result"])
+ return result
+ return None
+
+ async def update_job_status(
+ self,
+ job_id: str,
+ status: str,
+ error_message: Optional[str] = None,
+ result: Optional[dict] = None,
+ progress: Optional[float] = None
+ ):
+ """Update job status."""
+ async with self.pool.acquire() as conn:
+ if status == "completed":
+ await conn.execute("""
+ UPDATE processing_jobs
+ SET status = $1,
+ completed_at = NOW(),
+ error_message = $2,
+ result = COALESCE($3::jsonb, result)
+ WHERE id = $4
+ """, status, error_message, result, job_id)
+ else:
+ update_result = result
+ if progress is not None:
+ update_result = result or {}
+ update_result["progress"] = progress
+
+ await conn.execute("""
+ UPDATE processing_jobs
+ SET status = $1,
+ error_message = $2,
+ result = COALESCE($3::jsonb, result)
+ WHERE id = $4
+ """, status, error_message, update_result, job_id)
+
+ async def update_job_audio_path(self, job_id: str, audio_path: str):
+ """Update the audio path for a job."""
+ async with self.pool.acquire() as conn:
+ await conn.execute("""
+ UPDATE processing_jobs
+ SET result = result || $1::jsonb
+ WHERE id = $2
+ """, {"audio_path": audio_path}, job_id)
+
+ async def update_meeting_status(self, meeting_id: str, status: str):
+ """Update meeting processing status."""
+ async with self.pool.acquire() as conn:
+ await conn.execute("""
+ UPDATE meetings
+ SET status = $1,
+ updated_at = NOW()
+ WHERE id = $2::uuid
+ """, status, meeting_id)
+
+ async def insert_transcript_segment(
+ self,
+ meeting_id: str,
+ segment_index: int,
+ start_time: float,
+ end_time: float,
+ text: str,
+ speaker_id: Optional[str] = None,
+ speaker_label: Optional[str] = None,
+ confidence: Optional[float] = None,
+ language: str = "en"
+ ):
+ """Insert a transcript segment."""
+ async with self.pool.acquire() as conn:
+ await conn.execute("""
+ INSERT INTO transcripts (
+ meeting_id, segment_index, start_time, end_time,
+ text, speaker_id, speaker_label, confidence, language
+ )
+ VALUES ($1::uuid, $2, $3, $4, $5, $6, $7, $8, $9)
+ """, meeting_id, segment_index, start_time, end_time,
+ text, speaker_id, speaker_label, confidence, language)
+
+ async def get_transcript(self, meeting_id: str) -> List[Dict[str, Any]]:
+ """Get all transcript segments for a meeting."""
+ async with self.pool.acquire() as conn:
+ rows = await conn.fetch("""
+ SELECT id, segment_index, start_time, end_time,
+ speaker_id, speaker_label, text, confidence, language
+ FROM transcripts
+ WHERE meeting_id = $1::uuid
+ ORDER BY segment_index ASC
+ """, meeting_id)
+
+ return [dict(row) for row in rows]
+
+ async def get_meeting(self, meeting_id: str) -> Optional[Dict[str, Any]]:
+ """Get meeting details."""
+ async with self.pool.acquire() as conn:
+ row = await conn.fetchrow("""
+ SELECT id, conference_id, conference_name, title,
+ started_at, ended_at, duration_seconds,
+ recording_path, audio_path, status,
+ metadata, created_at
+ FROM meetings
+ WHERE id = $1::uuid
+ """, meeting_id)
+
+ if row:
+ return dict(row)
+ return None
+
+ async def create_meeting(
+ self,
+ conference_id: str,
+ conference_name: Optional[str] = None,
+ title: Optional[str] = None,
+ recording_path: Optional[str] = None,
+ metadata: Optional[dict] = None
+ ) -> str:
+ """Create a new meeting record."""
+ meeting_id = str(uuid.uuid4())
+
+ async with self.pool.acquire() as conn:
+ await conn.execute("""
+ INSERT INTO meetings (
+ id, conference_id, conference_name, title,
+ recording_path, status, metadata
+ )
+ VALUES ($1, $2, $3, $4, $5, 'recording', $6)
+ """, meeting_id, conference_id, conference_name, title,
+ recording_path, metadata or {})
+
+ log.info("Created meeting", meeting_id=meeting_id, conference_id=conference_id)
+ return meeting_id
+
+
+class DatabaseError(Exception):
+ """Database operation error."""
+ pass
diff --git a/deploy/meeting-intelligence/transcriber/app/diarizer.py b/deploy/meeting-intelligence/transcriber/app/diarizer.py
new file mode 100644
index 0000000..56e8744
--- /dev/null
+++ b/deploy/meeting-intelligence/transcriber/app/diarizer.py
@@ -0,0 +1,338 @@
+"""
+Speaker Diarization using resemblyzer.
+
+Identifies who spoke when in the audio.
+"""
+
+import os
+from dataclasses import dataclass
+from typing import List, Optional, Tuple
+
+import numpy as np
+import soundfile as sf
+from resemblyzer import VoiceEncoder, preprocess_wav
+from sklearn.cluster import AgglomerativeClustering
+
+import structlog
+
+log = structlog.get_logger()
+
+
+@dataclass
+class SpeakerSegment:
+ """A segment attributed to a speaker."""
+ start: float
+ end: float
+ speaker_id: str
+ speaker_label: str # e.g., "Speaker 1"
+ confidence: Optional[float] = None
+
+
+class SpeakerDiarizer:
+ """Speaker diarization using voice embeddings."""
+
+ def __init__(
+ self,
+ min_segment_duration: float = 0.5,
+ max_speakers: int = 10,
+ embedding_step: float = 0.5 # Step size for embeddings in seconds
+ ):
+ self.min_segment_duration = min_segment_duration
+ self.max_speakers = max_speakers
+ self.embedding_step = embedding_step
+
+ # Load voice encoder (this downloads the model on first use)
+ log.info("Loading voice encoder model...")
+ self.encoder = VoiceEncoder()
+ log.info("Voice encoder loaded")
+
+ def diarize(
+ self,
+ audio_path: str,
+ num_speakers: Optional[int] = None,
+ transcript_segments: Optional[List[dict]] = None
+ ) -> List[SpeakerSegment]:
+ """
+ Perform speaker diarization on an audio file.
+
+ Args:
+ audio_path: Path to audio file (WAV, 16kHz mono)
+ num_speakers: Number of speakers (if known), otherwise auto-detected
+ transcript_segments: Optional transcript segments to align with
+
+ Returns:
+ List of SpeakerSegment with speaker attributions
+ """
+ if not os.path.exists(audio_path):
+ raise FileNotFoundError(f"Audio file not found: {audio_path}")
+
+ log.info("Starting speaker diarization", audio_path=audio_path)
+
+ # Load and preprocess audio
+ wav, sample_rate = sf.read(audio_path)
+
+ if sample_rate != 16000:
+ log.warning(f"Audio sample rate is {sample_rate}, expected 16000")
+
+ # Ensure mono
+ if len(wav.shape) > 1:
+ wav = wav.mean(axis=1)
+
+ # Preprocess for resemblyzer
+ wav = preprocess_wav(wav)
+
+ if len(wav) == 0:
+ log.warning("Audio file is empty after preprocessing")
+ return []
+
+ # Generate embeddings for sliding windows
+ embeddings, timestamps = self._generate_embeddings(wav, sample_rate)
+
+ if len(embeddings) == 0:
+ log.warning("No embeddings generated")
+ return []
+
+ # Cluster embeddings to identify speakers
+ speaker_labels = self._cluster_speakers(
+ embeddings,
+ num_speakers=num_speakers
+ )
+
+ # Convert to speaker segments
+ segments = self._create_segments(timestamps, speaker_labels)
+
+ # If transcript segments provided, align them
+ if transcript_segments:
+ segments = self._align_with_transcript(segments, transcript_segments)
+
+ log.info(
+ "Diarization complete",
+ num_segments=len(segments),
+ num_speakers=len(set(s.speaker_id for s in segments))
+ )
+
+ return segments
+
+ def _generate_embeddings(
+ self,
+ wav: np.ndarray,
+ sample_rate: int
+ ) -> Tuple[np.ndarray, List[float]]:
+ """Generate voice embeddings for sliding windows."""
+ embeddings = []
+ timestamps = []
+
+ # Window size in samples (1.5 seconds for good speaker representation)
+ window_size = int(1.5 * sample_rate)
+ step_size = int(self.embedding_step * sample_rate)
+
+ # Slide through audio
+ for start_sample in range(0, len(wav) - window_size, step_size):
+ end_sample = start_sample + window_size
+ window = wav[start_sample:end_sample]
+
+ # Get embedding for this window
+ try:
+ embedding = self.encoder.embed_utterance(window)
+ embeddings.append(embedding)
+ timestamps.append(start_sample / sample_rate)
+ except Exception as e:
+ log.debug(f"Failed to embed window at {start_sample/sample_rate}s: {e}")
+ continue
+
+ return np.array(embeddings), timestamps
+
+ def _cluster_speakers(
+ self,
+ embeddings: np.ndarray,
+ num_speakers: Optional[int] = None
+ ) -> np.ndarray:
+ """Cluster embeddings to identify speakers."""
+ if len(embeddings) == 0:
+ return np.array([])
+
+ # If number of speakers not specified, estimate it
+ if num_speakers is None:
+ num_speakers = self._estimate_num_speakers(embeddings)
+
+ # Ensure we don't exceed max speakers or embedding count
+ num_speakers = min(num_speakers, self.max_speakers, len(embeddings))
+ num_speakers = max(num_speakers, 1)
+
+ log.info(f"Clustering with {num_speakers} speakers")
+
+ # Use agglomerative clustering
+ clustering = AgglomerativeClustering(
+ n_clusters=num_speakers,
+ metric="cosine",
+ linkage="average"
+ )
+
+ labels = clustering.fit_predict(embeddings)
+
+ return labels
+
+ def _estimate_num_speakers(self, embeddings: np.ndarray) -> int:
+ """Estimate the number of speakers from embeddings."""
+ if len(embeddings) < 2:
+ return 1
+
+ # Try different numbers of clusters and find the best
+ best_score = -1
+ best_n = 2
+
+ for n in range(2, min(6, len(embeddings))):
+ try:
+ clustering = AgglomerativeClustering(
+ n_clusters=n,
+ metric="cosine",
+ linkage="average"
+ )
+ labels = clustering.fit_predict(embeddings)
+
+ # Calculate silhouette-like score
+ score = self._cluster_quality_score(embeddings, labels)
+
+ if score > best_score:
+ best_score = score
+ best_n = n
+ except Exception:
+ continue
+
+ log.info(f"Estimated {best_n} speakers (score: {best_score:.3f})")
+ return best_n
+
+ def _cluster_quality_score(
+ self,
+ embeddings: np.ndarray,
+ labels: np.ndarray
+ ) -> float:
+ """Calculate a simple cluster quality score."""
+ unique_labels = np.unique(labels)
+
+ if len(unique_labels) < 2:
+ return 0.0
+
+ # Calculate average intra-cluster distance
+ intra_distances = []
+ for label in unique_labels:
+ cluster_embeddings = embeddings[labels == label]
+ if len(cluster_embeddings) > 1:
+ # Cosine distance within cluster
+ for i in range(len(cluster_embeddings)):
+ for j in range(i + 1, len(cluster_embeddings)):
+ dist = 1 - np.dot(cluster_embeddings[i], cluster_embeddings[j])
+ intra_distances.append(dist)
+
+ if not intra_distances:
+ return 0.0
+
+ avg_intra = np.mean(intra_distances)
+
+ # Calculate average inter-cluster distance
+ inter_distances = []
+ cluster_centers = []
+ for label in unique_labels:
+ cluster_embeddings = embeddings[labels == label]
+ center = cluster_embeddings.mean(axis=0)
+ cluster_centers.append(center)
+
+ for i in range(len(cluster_centers)):
+ for j in range(i + 1, len(cluster_centers)):
+ dist = 1 - np.dot(cluster_centers[i], cluster_centers[j])
+ inter_distances.append(dist)
+
+ avg_inter = np.mean(inter_distances) if inter_distances else 1.0
+
+ # Score: higher inter-cluster distance, lower intra-cluster distance is better
+ return (avg_inter - avg_intra) / max(avg_inter, avg_intra, 0.001)
+
+ def _create_segments(
+ self,
+ timestamps: List[float],
+ labels: np.ndarray
+ ) -> List[SpeakerSegment]:
+ """Convert clustered timestamps to speaker segments."""
+ if len(timestamps) == 0:
+ return []
+
+ segments = []
+ current_speaker = labels[0]
+ segment_start = timestamps[0]
+
+ for i in range(1, len(timestamps)):
+ if labels[i] != current_speaker:
+ # End current segment
+ segment_end = timestamps[i]
+
+ if segment_end - segment_start >= self.min_segment_duration:
+ segments.append(SpeakerSegment(
+ start=segment_start,
+ end=segment_end,
+ speaker_id=f"speaker_{current_speaker}",
+ speaker_label=f"Speaker {current_speaker + 1}"
+ ))
+
+ # Start new segment
+ current_speaker = labels[i]
+ segment_start = timestamps[i]
+
+ # Add final segment
+ if len(timestamps) > 0:
+ segment_end = timestamps[-1] + self.embedding_step
+ if segment_end - segment_start >= self.min_segment_duration:
+ segments.append(SpeakerSegment(
+ start=segment_start,
+ end=segment_end,
+ speaker_id=f"speaker_{current_speaker}",
+ speaker_label=f"Speaker {current_speaker + 1}"
+ ))
+
+ return segments
+
+ def _align_with_transcript(
+ self,
+ speaker_segments: List[SpeakerSegment],
+ transcript_segments: List[dict]
+ ) -> List[SpeakerSegment]:
+ """Align speaker segments with transcript segments."""
+ aligned = []
+
+ for trans in transcript_segments:
+ trans_start = trans.get("start", 0)
+ trans_end = trans.get("end", 0)
+ trans_mid = (trans_start + trans_end) / 2
+
+ # Find the speaker segment that best overlaps
+ best_speaker = None
+ best_overlap = 0
+
+ for speaker in speaker_segments:
+ # Calculate overlap
+ overlap_start = max(trans_start, speaker.start)
+ overlap_end = min(trans_end, speaker.end)
+ overlap = max(0, overlap_end - overlap_start)
+
+ if overlap > best_overlap:
+ best_overlap = overlap
+ best_speaker = speaker
+
+ if best_speaker:
+ aligned.append(SpeakerSegment(
+ start=trans_start,
+ end=trans_end,
+ speaker_id=best_speaker.speaker_id,
+ speaker_label=best_speaker.speaker_label,
+ confidence=best_overlap / (trans_end - trans_start) if trans_end > trans_start else 0
+ ))
+ else:
+ # No match, assign unknown speaker
+ aligned.append(SpeakerSegment(
+ start=trans_start,
+ end=trans_end,
+ speaker_id="speaker_unknown",
+ speaker_label="Unknown Speaker",
+ confidence=0
+ ))
+
+ return aligned
diff --git a/deploy/meeting-intelligence/transcriber/app/main.py b/deploy/meeting-intelligence/transcriber/app/main.py
new file mode 100644
index 0000000..6cea312
--- /dev/null
+++ b/deploy/meeting-intelligence/transcriber/app/main.py
@@ -0,0 +1,274 @@
+"""
+Meeting Intelligence Transcription Service
+
+FastAPI service that handles:
+- Audio extraction from video recordings
+- Transcription using whisper.cpp
+- Speaker diarization using resemblyzer
+- Job queue management via Redis
+"""
+
+import asyncio
+import os
+from contextlib import asynccontextmanager
+from typing import Optional
+
+from fastapi import FastAPI, BackgroundTasks, HTTPException
+from fastapi.responses import JSONResponse
+from pydantic import BaseModel
+from redis import Redis
+from rq import Queue
+
+from .config import settings
+from .transcriber import WhisperTranscriber
+from .diarizer import SpeakerDiarizer
+from .processor import JobProcessor
+from .database import Database
+
+import structlog
+
+log = structlog.get_logger()
+
+
+# Pydantic models
+class TranscribeRequest(BaseModel):
+ meeting_id: str
+ audio_path: str
+ priority: int = 5
+ enable_diarization: bool = True
+ language: Optional[str] = None
+
+
+class TranscribeResponse(BaseModel):
+ job_id: str
+ status: str
+ message: str
+
+
+class JobStatus(BaseModel):
+ job_id: str
+ status: str
+ progress: Optional[float] = None
+ result: Optional[dict] = None
+ error: Optional[str] = None
+
+
+# Application state
+class AppState:
+ redis: Optional[Redis] = None
+ queue: Optional[Queue] = None
+ db: Optional[Database] = None
+ transcriber: Optional[WhisperTranscriber] = None
+ diarizer: Optional[SpeakerDiarizer] = None
+ processor: Optional[JobProcessor] = None
+
+
+state = AppState()
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ """Application startup and shutdown."""
+ log.info("Starting transcription service...")
+
+ # Initialize Redis connection
+ state.redis = Redis.from_url(settings.redis_url)
+ state.queue = Queue("transcription", connection=state.redis)
+
+ # Initialize database
+ state.db = Database(settings.postgres_url)
+ await state.db.connect()
+
+ # Initialize transcriber
+ state.transcriber = WhisperTranscriber(
+ model_path=settings.whisper_model,
+ threads=settings.whisper_threads
+ )
+
+ # Initialize diarizer
+ state.diarizer = SpeakerDiarizer()
+
+ # Initialize job processor
+ state.processor = JobProcessor(
+ transcriber=state.transcriber,
+ diarizer=state.diarizer,
+ db=state.db,
+ redis=state.redis
+ )
+
+ # Start background worker
+ asyncio.create_task(state.processor.process_jobs())
+
+ log.info("Transcription service started successfully")
+
+ yield
+
+ # Shutdown
+ log.info("Shutting down transcription service...")
+ if state.processor:
+ await state.processor.stop()
+ if state.db:
+ await state.db.disconnect()
+ if state.redis:
+ state.redis.close()
+
+ log.info("Transcription service stopped")
+
+
+app = FastAPI(
+ title="Meeting Intelligence Transcription Service",
+ description="Transcription and speaker diarization for meeting recordings",
+ version="1.0.0",
+ lifespan=lifespan
+)
+
+
+@app.get("/health")
+async def health_check():
+ """Health check endpoint."""
+ redis_ok = False
+ db_ok = False
+
+ try:
+ if state.redis:
+ state.redis.ping()
+ redis_ok = True
+ except Exception as e:
+ log.error("Redis health check failed", error=str(e))
+
+ try:
+ if state.db:
+ await state.db.health_check()
+ db_ok = True
+ except Exception as e:
+ log.error("Database health check failed", error=str(e))
+
+ status = "healthy" if (redis_ok and db_ok) else "unhealthy"
+
+ return {
+ "status": status,
+ "redis": redis_ok,
+ "database": db_ok,
+ "whisper_model": settings.whisper_model,
+ "threads": settings.whisper_threads
+ }
+
+
+@app.get("/status")
+async def service_status():
+ """Get service status and queue info."""
+ queue_length = state.queue.count if state.queue else 0
+ processing = state.processor.active_jobs if state.processor else 0
+
+ return {
+ "status": "running",
+ "queue_length": queue_length,
+ "active_jobs": processing,
+ "workers": settings.num_workers,
+ "model": os.path.basename(settings.whisper_model)
+ }
+
+
+@app.post("/transcribe", response_model=TranscribeResponse)
+async def queue_transcription(request: TranscribeRequest, background_tasks: BackgroundTasks):
+ """Queue a transcription job."""
+ log.info(
+ "Received transcription request",
+ meeting_id=request.meeting_id,
+ audio_path=request.audio_path
+ )
+
+ # Validate audio file exists
+ if not os.path.exists(request.audio_path):
+ raise HTTPException(
+ status_code=404,
+ detail=f"Audio file not found: {request.audio_path}"
+ )
+
+ # Create job record in database
+ try:
+ job_id = await state.db.create_transcription_job(
+ meeting_id=request.meeting_id,
+ audio_path=request.audio_path,
+ enable_diarization=request.enable_diarization,
+ language=request.language,
+ priority=request.priority
+ )
+ except Exception as e:
+ log.error("Failed to create job", error=str(e))
+ raise HTTPException(status_code=500, detail=str(e))
+
+ # Queue the job
+ state.queue.enqueue(
+ "app.worker.process_transcription",
+ job_id,
+ job_timeout="2h",
+ result_ttl=86400 # 24 hours
+ )
+
+ log.info("Job queued", job_id=job_id)
+
+ return TranscribeResponse(
+ job_id=job_id,
+ status="queued",
+ message="Transcription job queued successfully"
+ )
+
+
+@app.get("/transcribe/{job_id}", response_model=JobStatus)
+async def get_job_status(job_id: str):
+ """Get the status of a transcription job."""
+ job = await state.db.get_job(job_id)
+
+ if not job:
+ raise HTTPException(status_code=404, detail="Job not found")
+
+ return JobStatus(
+ job_id=job_id,
+ status=job["status"],
+ progress=job.get("progress"),
+ result=job.get("result"),
+ error=job.get("error_message")
+ )
+
+
+@app.delete("/transcribe/{job_id}")
+async def cancel_job(job_id: str):
+ """Cancel a pending transcription job."""
+ job = await state.db.get_job(job_id)
+
+ if not job:
+ raise HTTPException(status_code=404, detail="Job not found")
+
+ if job["status"] not in ["pending", "queued"]:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Cannot cancel job in status: {job['status']}"
+ )
+
+ await state.db.update_job_status(job_id, "cancelled")
+
+ return {"status": "cancelled", "job_id": job_id}
+
+
+@app.get("/meetings/{meeting_id}/transcript")
+async def get_transcript(meeting_id: str):
+ """Get the transcript for a meeting."""
+ transcript = await state.db.get_transcript(meeting_id)
+
+ if not transcript:
+ raise HTTPException(
+ status_code=404,
+ detail=f"No transcript found for meeting: {meeting_id}"
+ )
+
+ return {
+ "meeting_id": meeting_id,
+ "segments": transcript,
+ "segment_count": len(transcript)
+ }
+
+
+if __name__ == "__main__":
+ import uvicorn
+ uvicorn.run(app, host="0.0.0.0", port=8001)
diff --git a/deploy/meeting-intelligence/transcriber/app/processor.py b/deploy/meeting-intelligence/transcriber/app/processor.py
new file mode 100644
index 0000000..8bb67a2
--- /dev/null
+++ b/deploy/meeting-intelligence/transcriber/app/processor.py
@@ -0,0 +1,282 @@
+"""
+Job Processor for the Transcription Service.
+
+Handles the processing pipeline:
+1. Audio extraction from video
+2. Transcription
+3. Speaker diarization
+4. Database storage
+"""
+
+import asyncio
+import os
+import subprocess
+from typing import Optional
+
+import structlog
+
+from .config import settings
+from .transcriber import WhisperTranscriber, TranscriptionResult
+from .diarizer import SpeakerDiarizer, SpeakerSegment
+from .database import Database
+
+log = structlog.get_logger()
+
+
+class JobProcessor:
+ """Processes transcription jobs from the queue."""
+
+ def __init__(
+ self,
+ transcriber: WhisperTranscriber,
+ diarizer: SpeakerDiarizer,
+ db: Database,
+ redis
+ ):
+ self.transcriber = transcriber
+ self.diarizer = diarizer
+ self.db = db
+ self.redis = redis
+ self.active_jobs = 0
+ self._running = False
+ self._workers = []
+
+ async def process_jobs(self):
+ """Main job processing loop."""
+ self._running = True
+ log.info("Job processor started", num_workers=settings.num_workers)
+
+ # Start worker tasks
+ for i in range(settings.num_workers):
+ worker = asyncio.create_task(self._worker(i))
+ self._workers.append(worker)
+
+ # Wait for all workers
+ await asyncio.gather(*self._workers, return_exceptions=True)
+
+ async def stop(self):
+ """Stop the job processor."""
+ self._running = False
+ for worker in self._workers:
+ worker.cancel()
+ log.info("Job processor stopped")
+
+ async def _worker(self, worker_id: int):
+ """Worker that processes individual jobs."""
+ log.info(f"Worker {worker_id} started")
+
+ while self._running:
+ try:
+ # Get next job from database
+ job = await self.db.get_next_pending_job()
+
+ if job is None:
+ # No jobs, wait a bit
+ await asyncio.sleep(2)
+ continue
+
+ job_id = job["id"]
+ meeting_id = job["meeting_id"]
+
+ log.info(
+ f"Worker {worker_id} processing job",
+ job_id=job_id,
+ meeting_id=meeting_id
+ )
+
+ self.active_jobs += 1
+
+ try:
+ await self._process_job(job)
+ except Exception as e:
+ log.error(
+ "Job processing failed",
+ job_id=job_id,
+ error=str(e)
+ )
+ await self.db.update_job_status(
+ job_id,
+ "failed",
+ error_message=str(e)
+ )
+ finally:
+ self.active_jobs -= 1
+
+ except asyncio.CancelledError:
+ break
+ except Exception as e:
+ log.error(f"Worker {worker_id} error", error=str(e))
+ await asyncio.sleep(5)
+
+ log.info(f"Worker {worker_id} stopped")
+
+ async def _process_job(self, job: dict):
+ """Process a single transcription job."""
+ job_id = job["id"]
+ meeting_id = job["meeting_id"]
+ audio_path = job.get("audio_path")
+ video_path = job.get("video_path")
+ enable_diarization = job.get("enable_diarization", True)
+ language = job.get("language")
+
+ # Update status to processing
+ await self.db.update_job_status(job_id, "processing")
+ await self.db.update_meeting_status(meeting_id, "transcribing")
+
+ # Step 1: Extract audio if we have video
+ if video_path and not audio_path:
+ log.info("Extracting audio from video", video_path=video_path)
+ await self.db.update_job_status(job_id, "processing", progress=0.1)
+
+ audio_path = await self._extract_audio(video_path, meeting_id)
+ await self.db.update_job_audio_path(job_id, audio_path)
+
+ if not audio_path or not os.path.exists(audio_path):
+ raise RuntimeError(f"Audio file not found: {audio_path}")
+
+ # Step 2: Transcribe
+ log.info("Starting transcription", audio_path=audio_path)
+ await self.db.update_job_status(job_id, "processing", progress=0.3)
+
+ transcription = await asyncio.get_event_loop().run_in_executor(
+ None,
+ lambda: self.transcriber.transcribe(audio_path, language)
+ )
+
+ log.info(
+ "Transcription complete",
+ segments=len(transcription.segments),
+ duration=transcription.duration
+ )
+
+ # Step 3: Speaker diarization
+ speaker_segments = []
+ if enable_diarization and len(transcription.segments) > 0:
+ log.info("Starting speaker diarization")
+ await self.db.update_job_status(job_id, "processing", progress=0.6)
+ await self.db.update_meeting_status(meeting_id, "diarizing")
+
+ # Convert transcript segments to dicts for diarizer
+ transcript_dicts = [
+ {"start": s.start, "end": s.end, "text": s.text}
+ for s in transcription.segments
+ ]
+
+ speaker_segments = await asyncio.get_event_loop().run_in_executor(
+ None,
+ lambda: self.diarizer.diarize(
+ audio_path,
+ transcript_segments=transcript_dicts
+ )
+ )
+
+ log.info(
+ "Diarization complete",
+ num_segments=len(speaker_segments),
+ num_speakers=len(set(s.speaker_id for s in speaker_segments))
+ )
+
+ # Step 4: Store results
+ log.info("Storing transcript in database")
+ await self.db.update_job_status(job_id, "processing", progress=0.9)
+
+ await self._store_transcript(
+ meeting_id,
+ transcription,
+ speaker_segments
+ )
+
+ # Mark job complete
+ await self.db.update_job_status(
+ job_id,
+ "completed",
+ result={
+ "segments": len(transcription.segments),
+ "duration": transcription.duration,
+ "language": transcription.language,
+ "speakers": len(set(s.speaker_id for s in speaker_segments)) if speaker_segments else 0
+ }
+ )
+
+ # Update meeting status - ready for summarization
+ await self.db.update_meeting_status(meeting_id, "summarizing")
+
+ log.info("Job completed successfully", job_id=job_id)
+
+ async def _extract_audio(self, video_path: str, meeting_id: str) -> str:
+ """Extract audio from video file using ffmpeg."""
+ output_dir = os.path.join(settings.audio_output_path, meeting_id)
+ os.makedirs(output_dir, exist_ok=True)
+
+ audio_path = os.path.join(output_dir, "audio.wav")
+
+ cmd = [
+ "ffmpeg",
+ "-i", video_path,
+ "-vn", # No video
+ "-acodec", "pcm_s16le", # PCM 16-bit
+ "-ar", str(settings.audio_sample_rate), # Sample rate
+ "-ac", str(settings.audio_channels), # Mono
+ "-y", # Overwrite
+ audio_path
+ ]
+
+ log.debug("Running ffmpeg", cmd=" ".join(cmd))
+
+ process = await asyncio.create_subprocess_exec(
+ *cmd,
+ stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE
+ )
+
+ _, stderr = await process.communicate()
+
+ if process.returncode != 0:
+ raise RuntimeError(f"FFmpeg failed: {stderr.decode()}")
+
+ log.info("Audio extracted", output=audio_path)
+ return audio_path
+
+ async def _store_transcript(
+ self,
+ meeting_id: str,
+ transcription: TranscriptionResult,
+ speaker_segments: list
+ ):
+ """Store transcript segments in database."""
+ # Create a map from time ranges to speakers
+ speaker_map = {}
+ for seg in speaker_segments:
+ speaker_map[(seg.start, seg.end)] = (seg.speaker_id, seg.speaker_label)
+
+ # Store each transcript segment
+ for i, segment in enumerate(transcription.segments):
+ # Find matching speaker
+ speaker_id = None
+ speaker_label = None
+
+ for (start, end), (sid, slabel) in speaker_map.items():
+ if segment.start >= start and segment.end <= end:
+ speaker_id = sid
+ speaker_label = slabel
+ break
+
+ # If no exact match, find closest overlap
+ if speaker_id is None:
+ for seg in speaker_segments:
+ if segment.start < seg.end and segment.end > seg.start:
+ speaker_id = seg.speaker_id
+ speaker_label = seg.speaker_label
+ break
+
+ await self.db.insert_transcript_segment(
+ meeting_id=meeting_id,
+ segment_index=i,
+ start_time=segment.start,
+ end_time=segment.end,
+ text=segment.text,
+ speaker_id=speaker_id,
+ speaker_label=speaker_label,
+ confidence=segment.confidence,
+ language=transcription.language
+ )
diff --git a/deploy/meeting-intelligence/transcriber/app/transcriber.py b/deploy/meeting-intelligence/transcriber/app/transcriber.py
new file mode 100644
index 0000000..d69a0c0
--- /dev/null
+++ b/deploy/meeting-intelligence/transcriber/app/transcriber.py
@@ -0,0 +1,211 @@
+"""
+Whisper.cpp transcription wrapper.
+
+Uses the whisper CLI to transcribe audio files.
+"""
+
+import json
+import os
+import subprocess
+import tempfile
+from dataclasses import dataclass
+from typing import List, Optional
+
+import structlog
+
+log = structlog.get_logger()
+
+
+@dataclass
+class TranscriptSegment:
+ """A single transcript segment."""
+ start: float
+ end: float
+ text: str
+ confidence: Optional[float] = None
+
+
+@dataclass
+class TranscriptionResult:
+ """Result of a transcription job."""
+ segments: List[TranscriptSegment]
+ language: str
+ duration: float
+ text: str
+
+
+class WhisperTranscriber:
+ """Wrapper for whisper.cpp transcription."""
+
+ def __init__(
+ self,
+ model_path: str = "/models/ggml-small.bin",
+ threads: int = 8,
+ language: str = "en"
+ ):
+ self.model_path = model_path
+ self.threads = threads
+ self.language = language
+ self.whisper_bin = "/usr/local/bin/whisper"
+
+ # Verify whisper binary exists
+ if not os.path.exists(self.whisper_bin):
+ raise RuntimeError(f"Whisper binary not found at {self.whisper_bin}")
+
+ # Verify model exists
+ if not os.path.exists(model_path):
+ raise RuntimeError(f"Whisper model not found at {model_path}")
+
+ log.info(
+ "WhisperTranscriber initialized",
+ model=model_path,
+ threads=threads,
+ language=language
+ )
+
+ def transcribe(
+ self,
+ audio_path: str,
+ language: Optional[str] = None,
+ translate: bool = False
+ ) -> TranscriptionResult:
+ """
+ Transcribe an audio file.
+
+ Args:
+ audio_path: Path to the audio file (WAV format, 16kHz mono)
+ language: Language code (e.g., 'en', 'es', 'fr') or None for auto-detect
+ translate: If True, translate to English
+
+ Returns:
+ TranscriptionResult with segments and full text
+ """
+ if not os.path.exists(audio_path):
+ raise FileNotFoundError(f"Audio file not found: {audio_path}")
+
+ log.info("Starting transcription", audio_path=audio_path, language=language)
+
+ # Create temp file for JSON output
+ with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as tmp:
+ output_json = tmp.name
+
+ try:
+ # Build whisper command
+ cmd = [
+ self.whisper_bin,
+ "-m", self.model_path,
+ "-f", audio_path,
+ "-t", str(self.threads),
+ "-oj", # Output JSON
+ "-of", output_json.replace(".json", ""), # Output file prefix
+ "--print-progress",
+ ]
+
+ # Add language if specified
+ if language:
+ cmd.extend(["-l", language])
+ else:
+ cmd.extend(["-l", self.language])
+
+ # Add translate flag if needed
+ if translate:
+ cmd.append("--translate")
+
+ log.debug("Running whisper command", cmd=" ".join(cmd))
+
+ # Run whisper
+ result = subprocess.run(
+ cmd,
+ capture_output=True,
+ text=True,
+ timeout=7200 # 2 hour timeout
+ )
+
+ if result.returncode != 0:
+ log.error(
+ "Whisper transcription failed",
+ returncode=result.returncode,
+ stderr=result.stderr
+ )
+ raise RuntimeError(f"Whisper failed: {result.stderr}")
+
+ # Parse JSON output
+ with open(output_json, "r") as f:
+ whisper_output = json.load(f)
+
+ # Extract segments
+ segments = []
+ full_text_parts = []
+
+ for item in whisper_output.get("transcription", []):
+ segment = TranscriptSegment(
+ start=item["offsets"]["from"] / 1000.0, # Convert ms to seconds
+ end=item["offsets"]["to"] / 1000.0,
+ text=item["text"].strip(),
+ confidence=item.get("confidence")
+ )
+ segments.append(segment)
+ full_text_parts.append(segment.text)
+
+ # Get detected language
+ detected_language = whisper_output.get("result", {}).get("language", language or self.language)
+
+ # Calculate total duration
+ duration = segments[-1].end if segments else 0.0
+
+ log.info(
+ "Transcription complete",
+ segments=len(segments),
+ duration=duration,
+ language=detected_language
+ )
+
+ return TranscriptionResult(
+ segments=segments,
+ language=detected_language,
+ duration=duration,
+ text=" ".join(full_text_parts)
+ )
+
+ finally:
+ # Clean up temp files
+ for ext in [".json", ".txt", ".vtt", ".srt"]:
+ tmp_file = output_json.replace(".json", ext)
+ if os.path.exists(tmp_file):
+ os.remove(tmp_file)
+
+ def transcribe_with_timestamps(
+ self,
+ audio_path: str,
+ language: Optional[str] = None
+ ) -> List[dict]:
+ """
+ Transcribe with word-level timestamps.
+
+ Returns list of dicts with word, start, end, confidence.
+ """
+ result = self.transcribe(audio_path, language)
+
+ # Convert segments to word-level format
+ # Note: whisper.cpp provides segment-level timestamps by default
+ # For true word-level, we'd need the --max-len 1 flag but it's slower
+
+ words = []
+ for segment in result.segments:
+ # Estimate word timestamps within segment
+ segment_words = segment.text.split()
+ if not segment_words:
+ continue
+
+ duration = segment.end - segment.start
+ word_duration = duration / len(segment_words)
+
+ for i, word in enumerate(segment_words):
+ words.append({
+ "word": word,
+ "start": segment.start + (i * word_duration),
+ "end": segment.start + ((i + 1) * word_duration),
+ "confidence": segment.confidence
+ })
+
+ return words
diff --git a/deploy/meeting-intelligence/transcriber/requirements.txt b/deploy/meeting-intelligence/transcriber/requirements.txt
new file mode 100644
index 0000000..da038b5
--- /dev/null
+++ b/deploy/meeting-intelligence/transcriber/requirements.txt
@@ -0,0 +1,41 @@
+# Transcription Service Dependencies
+
+# Web framework
+fastapi==0.109.2
+uvicorn[standard]==0.27.1
+python-multipart==0.0.9
+
+# Job queue
+redis==5.0.1
+rq==1.16.0
+
+# Database
+asyncpg==0.29.0
+sqlalchemy[asyncio]==2.0.25
+psycopg2-binary==2.9.9
+
+# Audio processing
+pydub==0.25.1
+soundfile==0.12.1
+librosa==0.10.1
+numpy==1.26.4
+
+# Speaker diarization
+resemblyzer==0.1.3
+torch==2.2.0
+torchaudio==2.2.0
+scipy==1.12.0
+scikit-learn==1.4.0
+
+# Sentence embeddings (for semantic search)
+sentence-transformers==2.3.1
+
+# Utilities
+pydantic==2.6.1
+pydantic-settings==2.1.0
+python-dotenv==1.0.1
+httpx==0.26.0
+tenacity==8.2.3
+
+# Logging & monitoring
+structlog==24.1.0