83 lines
2.5 KiB
Python
83 lines
2.5 KiB
Python
"""Transcription service using self-hosted faster-whisper-server."""
|
|
|
|
import logging
|
|
import os
|
|
|
|
import httpx
|
|
|
|
from app.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
async def transcribe(audio_path: str) -> dict:
|
|
"""Transcribe audio file using local Whisper API.
|
|
|
|
Returns dict with:
|
|
- text: full transcript text
|
|
- words: list of {word, start, end} with word-level timestamps
|
|
- language: detected language
|
|
- duration: audio duration
|
|
"""
|
|
url = f"{settings.whisper_api_url}/v1/audio/transcriptions"
|
|
|
|
async with httpx.AsyncClient(timeout=900.0) as client:
|
|
with open(audio_path, "rb") as f:
|
|
files = {"file": (os.path.basename(audio_path), f, "audio/mpeg")}
|
|
data = {
|
|
"model": settings.whisper_model,
|
|
"response_format": "verbose_json",
|
|
"timestamp_granularities[]": "word",
|
|
}
|
|
|
|
logger.info(f"Transcribing {audio_path} via {settings.whisper_api_url}")
|
|
response = await client.post(url, files=files, data=data)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
text = result.get("text", "").strip()
|
|
words = result.get("words", [])
|
|
segments = result.get("segments", [])
|
|
|
|
# Build word-level timestamps
|
|
word_timestamps = []
|
|
if words:
|
|
for w in words:
|
|
word_timestamps.append({
|
|
"word": w.get("word", ""),
|
|
"start": w.get("start", 0.0),
|
|
"end": w.get("end", 0.0),
|
|
})
|
|
elif segments:
|
|
# Fall back to segment-level if word-level not available
|
|
for seg in segments:
|
|
for w in seg.get("words", []):
|
|
word_timestamps.append({
|
|
"word": w.get("word", ""),
|
|
"start": w.get("start", 0.0),
|
|
"end": w.get("end", 0.0),
|
|
})
|
|
|
|
logger.info(
|
|
f"Transcription complete: {len(text)} chars, "
|
|
f"{len(word_timestamps)} word timestamps"
|
|
)
|
|
|
|
return {
|
|
"text": text,
|
|
"words": word_timestamps,
|
|
"segments": segments,
|
|
"language": result.get("language", "en"),
|
|
"duration": result.get("duration", 0.0),
|
|
}
|
|
|
|
|
|
def get_transcript_segment(words: list[dict], start: float, end: float) -> str:
|
|
"""Extract transcript text for a given time range."""
|
|
segment_words = [
|
|
w["word"]
|
|
for w in words
|
|
if w["start"] >= start - 0.5 and w["end"] <= end + 0.5
|
|
]
|
|
return " ".join(segment_words).strip()
|