diff --git a/deploy/meeting-intelligence/api/app/database.py b/deploy/meeting-intelligence/api/app/database.py index 1819df1..a7b786b 100644 --- a/deploy/meeting-intelligence/api/app/database.py +++ b/deploy/meeting-intelligence/api/app/database.py @@ -2,6 +2,7 @@ Database operations for the Meeting Intelligence API. """ +import json import uuid from datetime import datetime from typing import Optional, List, Dict, Any @@ -109,9 +110,9 @@ class Database: id, conference_id, conference_name, title, recording_path, started_at, status, metadata ) - VALUES ($1, $2, $3, $4, $5, $6, 'recording', $7) + VALUES ($1, $2, $3, $4, $5, $6, 'recording', $7::jsonb) """, meeting_id, conference_id, conference_name, title, - recording_path, started_at or datetime.utcnow(), metadata or {}) + recording_path, started_at or datetime.utcnow(), json.dumps(metadata or {})) return meeting_id @@ -326,12 +327,13 @@ class Database: payload: dict ) -> int: """Save a webhook event for processing.""" + import json async with self.pool.acquire() as conn: row = await conn.fetchrow(""" INSERT INTO webhook_events (event_type, payload) - VALUES ($1, $2) + VALUES ($1, $2::jsonb) RETURNING id - """, event_type, payload) + """, event_type, json.dumps(payload)) return row["id"] @@ -348,8 +350,8 @@ class Database: async with self.pool.acquire() as conn: row = await conn.fetchrow(""" INSERT INTO processing_jobs (meeting_id, job_type, priority, result) - VALUES ($1::uuid, $2, $3, $4) + VALUES ($1::uuid, $2, $3, $4::jsonb) RETURNING id - """, meeting_id, job_type, priority, result or {}) + """, meeting_id, job_type, priority, json.dumps(result or {})) return row["id"] diff --git a/deploy/meeting-intelligence/jibri/config/finalize.sh b/deploy/meeting-intelligence/jibri/config/finalize.sh index 1a1f3b9..f4942e9 100755 --- a/deploy/meeting-intelligence/jibri/config/finalize.sh +++ b/deploy/meeting-intelligence/jibri/config/finalize.sh @@ -1,19 +1,12 @@ #!/bin/bash # Jibri Recording Finalize Script -# Called when Jibri finishes recording a meeting -# -# Arguments: -# $1 - Recording directory path (e.g., /recordings//) -# -# This script: -# 1. Finds the recording file -# 2. Notifies the Meeting Intelligence API to start processing - set -e RECORDING_DIR="$1" API_URL="${MEETING_INTELLIGENCE_API:-http://api:8000}" -LOG_FILE="/var/log/jibri/finalize.log" +LOG_FILE="/config/logs/finalize.log" + +mkdir -p /config/logs log() { echo "[$(date -Iseconds)] $1" >> "$LOG_FILE" @@ -23,13 +16,11 @@ log() { log "=== Finalize script started ===" log "Recording directory: $RECORDING_DIR" -# Validate recording directory if [ -z "$RECORDING_DIR" ] || [ ! -d "$RECORDING_DIR" ]; then log "ERROR: Invalid recording directory: $RECORDING_DIR" exit 1 fi -# Find the recording file (MP4 or WebM) RECORDING_FILE=$(find "$RECORDING_DIR" -type f \( -name "*.mp4" -o -name "*.webm" \) | head -1) if [ -z "$RECORDING_FILE" ]; then @@ -39,66 +30,26 @@ fi log "Found recording file: $RECORDING_FILE" -# Get file info FILE_SIZE=$(stat -c%s "$RECORDING_FILE" 2>/dev/null || echo "0") log "Recording file size: $FILE_SIZE bytes" -# Extract conference info from path -# Expected format: /recordings///recording.mp4 -CONFERENCE_ID=$(echo "$RECORDING_DIR" | awk -F'/' '{print $(NF-1)}') -if [ -z "$CONFERENCE_ID" ]; then - CONFERENCE_ID=$(basename "$(dirname "$RECORDING_DIR")") -fi +CONFERENCE_ID=$(basename "$RECORDING_DIR") -# Look for metadata file (Jibri sometimes creates this) METADATA_FILE="$RECORDING_DIR/metadata.json" if [ -f "$METADATA_FILE" ]; then - log "Found metadata file: $METADATA_FILE" METADATA=$(cat "$METADATA_FILE") else METADATA="{}" fi -# Prepare webhook payload -PAYLOAD=$(cat <&1) +RESPONSE=$(curl -s -w "\n%{http_code}" -X POST -H "Content-Type: application/json" -d "$PAYLOAD" "$API_URL/webhooks/recording-complete" 2>&1) HTTP_CODE=$(echo "$RESPONSE" | tail -1) BODY=$(echo "$RESPONSE" | head -n -1) -if [ "$HTTP_CODE" = "200" ] || [ "$HTTP_CODE" = "201" ] || [ "$HTTP_CODE" = "202" ]; then - log "SUCCESS: Webhook accepted (HTTP $HTTP_CODE)" - log "Response: $BODY" -else - log "WARNING: Webhook returned HTTP $HTTP_CODE" - log "Response: $BODY" - - # Don't fail the script - the recording is still saved - # The API can be retried later -fi - -# Optional: Clean up old recordings (keep last 30 days) -# find /recordings -type f -mtime +30 -delete - +log "Response: HTTP $HTTP_CODE - $BODY" log "=== Finalize script completed ===" exit 0 diff --git a/deploy/meeting-intelligence/transcriber/Dockerfile b/deploy/meeting-intelligence/transcriber/Dockerfile index 09951bc..1b7814e 100644 --- a/deploy/meeting-intelligence/transcriber/Dockerfile +++ b/deploy/meeting-intelligence/transcriber/Dockerfile @@ -5,44 +5,30 @@ FROM python:3.11-slim AS builder # Install build dependencies -RUN apt-get update && apt-get install -y --no-install-recommends \ - build-essential \ - cmake \ - git \ - ffmpeg \ - wget \ - && rm -rf /var/lib/apt/lists/* +RUN apt-get update && apt-get install -y --no-install-recommends build-essential cmake git ffmpeg wget && rm -rf /var/lib/apt/lists/* # Build whisper.cpp WORKDIR /build -RUN git clone https://github.com/ggerganov/whisper.cpp.git && \ - cd whisper.cpp && \ - cmake -B build -DWHISPER_BUILD_EXAMPLES=ON && \ - cmake --build build --config Release -j$(nproc) && \ - cp build/bin/whisper-cli /usr/local/bin/whisper && \ - cp build/bin/whisper-server /usr/local/bin/whisper-server 2>/dev/null || true +RUN git clone https://github.com/ggerganov/whisper.cpp.git && cd whisper.cpp && cmake -B build -DWHISPER_BUILD_EXAMPLES=ON -DBUILD_SHARED_LIBS=ON && cmake --build build --config Release -j$(nproc) && cp build/bin/whisper-cli /usr/local/bin/whisper && cp build/bin/whisper-server /usr/local/bin/whisper-server 2>/dev/null || true && mkdir -p /usr/local/lib/whisper && find build -name '*.so*' -exec cp {} /usr/local/lib/whisper/ \; && ls -la /usr/local/lib/whisper/ # Download whisper models WORKDIR /models -RUN cd /build/whisper.cpp && \ - bash models/download-ggml-model.sh small && \ - mv models/ggml-small.bin /models/ +RUN cd /build/whisper.cpp && bash models/download-ggml-model.sh small && mv models/ggml-small.bin /models/ # Production image FROM python:3.11-slim # Install runtime dependencies and build tools (for compiling Python packages) -RUN apt-get update && apt-get install -y --no-install-recommends \ - ffmpeg \ - libsndfile1 \ - curl \ - build-essential \ - && rm -rf /var/lib/apt/lists/* +RUN apt-get update && apt-get install -y --no-install-recommends ffmpeg libsndfile1 curl build-essential && rm -rf /var/lib/apt/lists/* -# Copy whisper binary and models +# Copy whisper binary, libraries, and models COPY --from=builder /usr/local/bin/whisper /usr/local/bin/whisper +COPY --from=builder /usr/local/lib/whisper/ /usr/local/lib/ COPY --from=builder /models /models +# Update shared library cache +RUN ldconfig && /usr/local/bin/whisper --help || echo "Whisper help check failed" + # Set up Python environment WORKDIR /app @@ -65,8 +51,7 @@ ENV WHISPER_MODEL=/models/ggml-small.bin ENV WHISPER_THREADS=8 # Health check -HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \ - CMD curl -f http://localhost:8001/health || exit 1 +HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 CMD curl -f http://localhost:8001/health || exit 1 # Run the service EXPOSE 8001 diff --git a/deploy/meeting-intelligence/transcriber/app/database.py b/deploy/meeting-intelligence/transcriber/app/database.py index ad6d763..a836f05 100644 --- a/deploy/meeting-intelligence/transcriber/app/database.py +++ b/deploy/meeting-intelligence/transcriber/app/database.py @@ -2,6 +2,7 @@ Database operations for the Transcription Service. """ +import json import uuid from typing import Optional, List, Dict, Any @@ -47,28 +48,28 @@ class Database: enable_diarization: bool = True, language: Optional[str] = None, priority: int = 5 - ) -> str: - """Create a new transcription job.""" - job_id = str(uuid.uuid4()) + ) -> int: + """Create a new transcription job. Returns the auto-generated job ID.""" + result_data = { + "audio_path": audio_path, + "video_path": video_path, + "enable_diarization": enable_diarization, + "language": language + } async with self.pool.acquire() as conn: - await conn.execute(""" + job_id = await conn.fetchval(""" INSERT INTO processing_jobs ( - id, meeting_id, job_type, status, priority, - result + meeting_id, job_type, status, priority, result ) - VALUES ($1, $2::uuid, 'transcribe', 'pending', $3, $4) - """, job_id, meeting_id, priority, { - "audio_path": audio_path, - "video_path": video_path, - "enable_diarization": enable_diarization, - "language": language - }) + VALUES ($1::uuid, 'transcribe', 'pending', $2, $3::jsonb) + RETURNING id + """, meeting_id, priority, json.dumps(result_data)) log.info("Created transcription job", job_id=job_id, meeting_id=meeting_id) return job_id - async def get_job(self, job_id: str) -> Optional[Dict[str, Any]]: + async def get_job(self, job_id: int) -> Optional[Dict[str, Any]]: """Get a job by ID.""" async with self.pool.acquire() as conn: row = await conn.fetchrow(""" @@ -107,19 +108,30 @@ class Database: result = dict(row) # Merge result JSON into the dict if result.get("result"): - result.update(result["result"]) + if isinstance(result["result"], dict): + result.update(result["result"]) + elif isinstance(result["result"], str): + result.update(json.loads(result["result"])) return result return None async def update_job_status( self, - job_id: str, + job_id: int, status: str, error_message: Optional[str] = None, result: Optional[dict] = None, progress: Optional[float] = None ): """Update job status.""" + result_json = None + if result is not None: + if progress is not None: + result["progress"] = progress + result_json = json.dumps(result) + elif progress is not None: + result_json = json.dumps({"progress": progress}) + async with self.pool.acquire() as conn: if status == "completed": await conn.execute(""" @@ -129,29 +141,24 @@ class Database: error_message = $2, result = COALESCE($3::jsonb, result) WHERE id = $4 - """, status, error_message, result, job_id) + """, status, error_message, result_json, job_id) else: - update_result = result - if progress is not None: - update_result = result or {} - update_result["progress"] = progress - await conn.execute(""" UPDATE processing_jobs SET status = $1, error_message = $2, result = COALESCE($3::jsonb, result) WHERE id = $4 - """, status, error_message, update_result, job_id) + """, status, error_message, result_json, job_id) - async def update_job_audio_path(self, job_id: str, audio_path: str): + async def update_job_audio_path(self, job_id: int, audio_path: str): """Update the audio path for a job.""" async with self.pool.acquire() as conn: await conn.execute(""" UPDATE processing_jobs SET result = result || $1::jsonb WHERE id = $2 - """, {"audio_path": audio_path}, job_id) + """, json.dumps({"audio_path": audio_path}), job_id) async def update_meeting_status(self, meeting_id: str, status: str): """Update meeting processing status.""" @@ -232,9 +239,9 @@ class Database: id, conference_id, conference_name, title, recording_path, status, metadata ) - VALUES ($1, $2, $3, $4, $5, 'recording', $6) + VALUES ($1::uuid, $2, $3, $4, $5, 'recording', $6::jsonb) """, meeting_id, conference_id, conference_name, title, - recording_path, metadata or {}) + recording_path, json.dumps(metadata or {})) log.info("Created meeting", meeting_id=meeting_id, conference_id=conference_id) return meeting_id diff --git a/deploy/meeting-intelligence/transcriber/app/main.py b/deploy/meeting-intelligence/transcriber/app/main.py index 6cea312..06bf4dd 100644 --- a/deploy/meeting-intelligence/transcriber/app/main.py +++ b/deploy/meeting-intelligence/transcriber/app/main.py @@ -11,7 +11,7 @@ FastAPI service that handles: import asyncio import os from contextlib import asynccontextmanager -from typing import Optional +from typing import Optional, Union from fastapi import FastAPI, BackgroundTasks, HTTPException from fastapi.responses import JSONResponse @@ -33,20 +33,21 @@ log = structlog.get_logger() # Pydantic models class TranscribeRequest(BaseModel): meeting_id: str - audio_path: str + audio_path: Optional[str] = None + video_path: Optional[str] = None # If provided, will extract audio first priority: int = 5 enable_diarization: bool = True language: Optional[str] = None class TranscribeResponse(BaseModel): - job_id: str + job_id: int # Integer from database auto-increment status: str message: str class JobStatus(BaseModel): - job_id: str + job_id: int status: str progress: Optional[float] = None result: Optional[dict] = None @@ -172,24 +173,64 @@ async def service_status(): @app.post("/transcribe", response_model=TranscribeResponse) async def queue_transcription(request: TranscribeRequest, background_tasks: BackgroundTasks): """Queue a transcription job.""" + audio_path = request.audio_path + + # If video_path provided, extract audio first + if request.video_path and not audio_path: + log.info( + "Extracting audio from video", + meeting_id=request.meeting_id, + video_path=request.video_path + ) + + if not os.path.exists(request.video_path): + raise HTTPException( + status_code=404, + detail=f"Video file not found: {request.video_path}" + ) + + # Extract audio using ffmpeg + import subprocess + audio_dir = os.environ.get("AUDIO_OUTPUT_DIR", "/audio") + os.makedirs(audio_dir, exist_ok=True) + audio_path = os.path.join(audio_dir, f"{request.meeting_id}.wav") + + try: + result = subprocess.run([ + "ffmpeg", "-y", "-i", request.video_path, + "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", + audio_path + ], capture_output=True, text=True, timeout=300) + + if result.returncode != 0: + log.error("FFmpeg error", stderr=result.stderr) + raise HTTPException(status_code=500, detail=f"Audio extraction failed: {result.stderr}") + + log.info("Audio extracted", audio_path=audio_path) + except subprocess.TimeoutExpired: + raise HTTPException(status_code=500, detail="Audio extraction timed out") + except Exception as e: + raise HTTPException(status_code=500, detail=f"Audio extraction failed: {str(e)}") + log.info( "Received transcription request", meeting_id=request.meeting_id, - audio_path=request.audio_path + audio_path=audio_path ) # Validate audio file exists - if not os.path.exists(request.audio_path): + if not audio_path or not os.path.exists(audio_path): raise HTTPException( status_code=404, - detail=f"Audio file not found: {request.audio_path}" + detail=f"Audio file not found: {audio_path}" ) - # Create job record in database + # Create job record in database - use the extracted audio_path try: job_id = await state.db.create_transcription_job( meeting_id=request.meeting_id, - audio_path=request.audio_path, + audio_path=audio_path, # Use extracted audio_path, not request.audio_path + video_path=request.video_path, enable_diarization=request.enable_diarization, language=request.language, priority=request.priority @@ -216,7 +257,7 @@ async def queue_transcription(request: TranscribeRequest, background_tasks: Back @app.get("/transcribe/{job_id}", response_model=JobStatus) -async def get_job_status(job_id: str): +async def get_job_status(job_id: int): """Get the status of a transcription job.""" job = await state.db.get_job(job_id) @@ -233,7 +274,7 @@ async def get_job_status(job_id: str): @app.delete("/transcribe/{job_id}") -async def cancel_job(job_id: str): +async def cancel_job(job_id: int): """Cancel a pending transcription job.""" job = await state.db.get_job(job_id) diff --git a/deploy/meeting-intelligence/transcriber/app/processor.py b/deploy/meeting-intelligence/transcriber/app/processor.py index 8bb67a2..df6732c 100644 --- a/deploy/meeting-intelligence/transcriber/app/processor.py +++ b/deploy/meeting-intelligence/transcriber/app/processor.py @@ -4,7 +4,7 @@ Job Processor for the Transcription Service. Handles the processing pipeline: 1. Audio extraction from video 2. Transcription -3. Speaker diarization +3. Speaker diarization (optional, fails gracefully) 4. Database storage """ @@ -76,7 +76,7 @@ class JobProcessor: continue job_id = job["id"] - meeting_id = job["meeting_id"] + meeting_id = str(job["meeting_id"]) # Convert UUID to string log.info( f"Worker {worker_id} processing job", @@ -113,7 +113,7 @@ class JobProcessor: async def _process_job(self, job: dict): """Process a single transcription job.""" job_id = job["id"] - meeting_id = job["meeting_id"] + meeting_id = str(job["meeting_id"]) # Ensure string audio_path = job.get("audio_path") video_path = job.get("video_path") enable_diarization = job.get("enable_diarization", True) @@ -149,32 +149,39 @@ class JobProcessor: duration=transcription.duration ) - # Step 3: Speaker diarization + # Step 3: Speaker diarization (optional, fails gracefully) speaker_segments = [] if enable_diarization and len(transcription.segments) > 0: log.info("Starting speaker diarization") await self.db.update_job_status(job_id, "processing", progress=0.6) await self.db.update_meeting_status(meeting_id, "diarizing") - # Convert transcript segments to dicts for diarizer - transcript_dicts = [ - {"start": s.start, "end": s.end, "text": s.text} - for s in transcription.segments - ] + try: + # Convert transcript segments to dicts for diarizer + transcript_dicts = [ + {"start": s.start, "end": s.end, "text": s.text} + for s in transcription.segments + ] - speaker_segments = await asyncio.get_event_loop().run_in_executor( - None, - lambda: self.diarizer.diarize( - audio_path, - transcript_segments=transcript_dicts + speaker_segments = await asyncio.get_event_loop().run_in_executor( + None, + lambda: self.diarizer.diarize( + audio_path, + transcript_segments=transcript_dicts + ) ) - ) - log.info( - "Diarization complete", - num_segments=len(speaker_segments), - num_speakers=len(set(s.speaker_id for s in speaker_segments)) - ) + log.info( + "Diarization complete", + num_segments=len(speaker_segments), + num_speakers=len(set(s.speaker_id for s in speaker_segments)) + ) + except Exception as e: + log.warning( + "Diarization failed, continuing without speaker labels", + error=str(e) + ) + speaker_segments = [] # Step 4: Store results log.info("Storing transcript in database")