api/routers/transcribe/sessions.py
Kevin Carter 7ca21ef538 feat: implement export endpoint for transcription sessions (Phase 3E)
- Add POST /transcribe/sessions/{id}/export endpoint
- Generate SRT (SubRip subtitle format) with timestamps
- Generate TXT (plain text with [HH:MM:SS,mmm] timestamps)
- Generate JSON (structured data: session, segments, summaries, canvas events)
- Return as FileResponse download with Content-Disposition headers
- Filenames include sanitized session title + date
- No API keys stored or logged during export
2026-05-20 22:25:36 +00:00

510 lines
19 KiB
Python

"""Transcription sessions router — CRUD endpoints for transcription sessions and segments."""
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import FileResponse
from typing import Optional, List
from datetime import datetime
import io
import json
import tempfile
import os
from modules.auth.supabase_bearer import SupabaseBearer
from modules.transcription.models import (
TranscriptionSessionCreate,
TranscriptionSessionUpdate,
TranscriptionSessionResponse,
SessionListResponse,
TranscriptionSegmentCreate,
TranscriptionSegmentResponse,
SummaryGenerateRequest,
SummaryResponse,
ExportFormat,
)
from modules.transcription.llm_client import call_llm, build_prompt
import logging
logger = logging.getLogger(__name__)
router = APIRouter()
def get_supabase_client():
"""Get Supabase service role client."""
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
return SupabaseServiceRoleClient()
def get_user_id(credentials=Depends(SupabaseBearer())) -> str:
"""Extract user_id from Supabase JWT token."""
return credentials.get("sub", credentials.get("user_id", ""))
def seconds_to_srt_timestamp(seconds: float) -> str:
"""Convert seconds to SRT timestamp format: HH:MM:SS,mmm"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
def generate_srt(segments: List[dict]) -> str:
"""Generate SRT (SubRip subtitle) content from segments."""
srt_entries = []
for idx, seg in enumerate(segments, start=1):
start_sec = float(seg.get("start_seconds", 0))
end_sec = float(seg.get("end_seconds", 0))
text = seg.get("text", "").strip()
if not text:
continue
start_ts = seconds_to_srt_timestamp(start_sec)
end_ts = seconds_to_srt_timestamp(end_sec)
# Clean text for SRT (no line breaks within a subtitle block)
clean_text = text.replace("\n", " ").strip()
srt_entries.append(f"{idx}\n{start_ts} --> {end_ts}\n{clean_text}")
return "\n\n".join(srt_entries) + "\n" if srt_entries else ""
def generate_txt(segments: List[dict]) -> str:
"""Generate plain text transcript with timestamps from segments."""
lines = []
for seg in segments:
start_sec = float(seg.get("start_seconds", 0))
text = seg.get("text", "").strip()
if not text:
continue
ts = seconds_to_srt_timestamp(start_sec)
lines.append(f"[{ts}] {text}")
return "\n".join(lines) + "\n" if lines else ""
def generate_json_export(session: dict, segments: List[dict],
summaries: List[dict],
canvas_events: List[dict]) -> str:
"""Generate structured JSON export with segments, metadata, and canvas events."""
# Build clean segment list (exclude internal DB fields)
clean_segments = []
for seg in segments:
clean_segments.append({
"sequence_index": seg.get("sequence_index"),
"text": seg.get("text", ""),
"start_seconds": float(seg.get("start_seconds", 0)),
"end_seconds": float(seg.get("end_seconds", 0)),
"is_final": seg.get("is_final", True),
"speaker_label": seg.get("speaker_label"),
"keyword_matches": seg.get("keyword_matches"),
})
# Build clean summary list
clean_summaries = []
for s in summaries:
clean_summaries.append({
"id": s.get("id"),
"summary_type": s.get("summary_type"),
"content": s.get("content", ""),
"llm_provider": s.get("llm_provider"),
"llm_model": s.get("llm_model"),
"created_at": s.get("created_at"),
})
# Build clean canvas events list
clean_events = []
for ev in canvas_events:
clean_events.append({
"id": ev.get("id"),
"event_type": ev.get("event_type"),
"session_elapsed_seconds": float(ev.get("session_elapsed_seconds", 0)) if ev.get("session_elapsed_seconds") else None,
"timestamp": ev.get("timestamp"),
"event_payload": ev.get("event_payload", {}),
})
export_data = {
"session": {
"id": session.get("id"),
"title": session.get("title"),
"canvas_type": session.get("canvas_type"),
"started_at": session.get("started_at"),
"ended_at": session.get("ended_at"),
"duration_seconds": session.get("duration_seconds"),
"timetable_period_id": session.get("timetable_period_id"),
"timetable_event_type": session.get("timetable_event_type"),
"timetable_event_label": session.get("timetable_event_label"),
"auto_tagged": session.get("auto_tagged", False),
"llm_provider": session.get("llm_provider"),
"llm_model": session.get("llm_model"),
"word_count": session.get("word_count", 0),
"segment_count": session.get("segment_count", 0),
},
"segments": clean_segments,
"summaries": clean_summaries,
"canvas_events": clean_events,
}
return json.dumps(export_data, indent=2, default=str)
def sanitize_filename(name: str) -> str:
"""Remove or replace characters that are unsafe in filenames."""
safe = "".join(c if c.isalnum() or c in " _-." else "_" for c in name)
return safe[:100] if safe else "export"
@router.post("/sessions", response_model=TranscriptionSessionResponse)
async def create_session(
session_data: TranscriptionSessionCreate,
user_id: str = Depends(get_user_id),
):
"""Create a new transcription session."""
supabase = get_supabase_client()
data = {
"user_id": user_id,
"title": session_data.title,
"canvas_type": session_data.canvas_type,
}
result = supabase.supabase.table("transcription_sessions").insert(data).execute()
if not result.data:
raise HTTPException(status_code=500, detail="Failed to create session")
return result.data[0]
@router.patch("/sessions/{session_id}", response_model=TranscriptionSessionResponse)
async def update_session(
session_id: str,
update_data: TranscriptionSessionUpdate,
user_id: str = Depends(get_user_id),
):
"""Update a transcription session (end, tag, title)."""
supabase = get_supabase_client()
# Verify ownership
existing = supabase.supabase.table("transcription_sessions").select("*").eq("id", session_id).eq("user_id", user_id).execute()
if not existing.data:
raise HTTPException(status_code=404, detail="Session not found")
# Build update dict (only non-None fields)
updates = {k: v for k, v in update_data.model_dump().items() if v is not None}
updates["updated_at"] = datetime.utcnow().isoformat()
result = supabase.supabase.table("transcription_sessions").update(updates).eq("id", session_id).execute()
if not result.data:
raise HTTPException(status_code=500, detail="Failed to update session")
return result.data[0]
@router.get("/sessions", response_model=SessionListResponse)
async def list_sessions(
user_id: str = Depends(get_user_id),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
timetable_period_id: Optional[str] = None,
):
"""List transcription sessions for the current user (paginated)."""
supabase = get_supabase_client()
query = supabase.supabase.table("transcription_sessions").select("*", count="exact").eq("user_id", user_id)
if timetable_period_id:
query = query.eq("timetable_period_id", timetable_period_id)
query = query.order("started_at", desc=True).range((page - 1) * page_size, page * page_size - 1)
result = query.execute()
return SessionListResponse(
sessions=result.data,
total=result.count or 0,
page=page,
page_size=page_size,
)
@router.get("/sessions/{session_id}", response_model=dict)
async def get_session(
session_id: str,
user_id: str = Depends(get_user_id),
):
"""Get a session with its segments and summaries."""
supabase = get_supabase_client()
# Get session
session_result = supabase.supabase.table("transcription_sessions").select("*").eq("id", session_id).eq("user_id", user_id).execute()
if not session_result.data:
raise HTTPException(status_code=404, detail="Session not found")
# Get segments
segments_result = supabase.supabase.table("transcription_segments").select("*").eq("session_id", session_id).order("sequence_index").execute()
# Get summaries
summaries_result = supabase.supabase.table("transcription_summaries").select("*").eq("session_id", session_id).execute()
return {
"session": session_result.data[0],
"segments": segments_result.data,
"summaries": summaries_result.data,
}
@router.delete("/sessions/{session_id}")
async def delete_session(
session_id: str,
user_id: str = Depends(get_user_id),
):
"""Soft delete a transcription session."""
supabase = get_supabase_client()
# Verify ownership
existing = supabase.supabase.table("transcription_sessions").select("*").eq("id", session_id).eq("user_id", user_id).execute()
if not existing.data:
raise HTTPException(status_code=404, detail="Session not found")
# Soft delete: set ended_at and mark metadata
result = supabase.supabase.table("transcription_sessions").update({
"ended_at": datetime.utcnow().isoformat(),
"metadata": {"deleted": True},
}).eq("id", session_id).execute()
return {"message": "Session deleted"}
@router.post("/sessions/{session_id}/segments")
async def upsert_segments(
session_id: str,
segments: List[TranscriptionSegmentCreate],
user_id: str = Depends(get_user_id),
):
"""Batch upsert segments for a session."""
supabase = get_supabase_client()
# Verify session exists and user owns it
session_check = supabase.supabase.table("transcription_sessions").select("id").eq("id", session_id).eq("user_id", user_id).execute()
if not session_check.data:
raise HTTPException(status_code=404, detail="Session not found")
# Batch insert segments
segment_data = [s.model_dump() for s in segments]
if segment_data:
result = supabase.supabase.table("transcription_segments").insert(segment_data).execute()
# Update segment count on session
supabase.supabase.table("transcription_sessions").update({
"segment_count": len(segment_data),
}).eq("id", session_id).execute()
return {"message": f"Upserted {len(segment_data)} segments", "count": len(segment_data)}
@router.get("/sessions/{session_id}/segments", response_model=List[TranscriptionSegmentResponse])
async def list_segments(
session_id: str,
user_id: str = Depends(get_user_id),
):
"""List all segments for a session."""
supabase = get_supabase_client()
# Verify ownership
session_check = supabase.supabase.table("transcription_sessions").select("id").eq("id", session_id).eq("user_id", user_id).execute()
if not session_check.data:
raise HTTPException(status_code=404, detail="Session not found")
result = supabase.supabase.table("transcription_segments").select("*").eq("session_id", session_id).order("sequence_index").execute()
return result.data
@router.post("/sessions/{session_id}/summaries", response_model=SummaryResponse)
async def generate_summary(
session_id: str,
summary_request: SummaryGenerateRequest,
user_id: str = Depends(get_user_id),
):
"""Generate a summary for a session using the specified LLM provider.
Phase 3: Full implementation — calls the pluggable LLM client with
prompt templates from prompts.py. API key is passed per-request and
never stored or logged.
"""
supabase = get_supabase_client()
# Verify session exists and user owns it
session_check = supabase.supabase.table("transcription_sessions").select("*").eq("id", session_id).eq("user_id", user_id).execute()
if not session_check.data:
raise HTTPException(status_code=404, detail="Session not found")
session = session_check.data[0]
# Build transcript from segments (or use segment range)
segments_query = supabase.supabase.table("transcription_segments").select("*").eq("session_id", session_id).order("sequence_index")
segments_result = segments_query.execute()
if not segments_result.data:
raise HTTPException(status_code=400, detail="No segments found for this session")
# Apply segment range filter if specified
segments = segments_result.data
if summary_request.segment_range and len(summary_request.segment_range) == 2:
start_idx, end_idx = summary_request.segment_range[0], summary_request.segment_range[1]
if start_idx is not None and end_idx is not None:
segments = segments[start_idx:end_idx]
elif start_idx is not None:
segments = segments[start_idx:]
elif end_idx is not None:
segments = segments[:end_idx]
# Build full transcript text from segments
transcript_parts = [s["text"] for s in segments if s.get("text")]
transcript = "\n".join(transcript_parts)
if not transcript.strip():
raise HTTPException(status_code=400, detail="Transcript is empty — cannot generate summary")
# Build prompt from template
system_prompt, user_message = build_prompt(summary_request.summary_type, transcript)
# Call the LLM client
try:
llm_result = await call_llm(
provider=summary_request.provider,
model=summary_request.model,
api_key=summary_request.api_key,
system_prompt=system_prompt,
user_message=user_message,
)
except Exception as e:
logger.error(f"LLM call failed: {e}")
raise HTTPException(status_code=502, detail=f"LLM generation failed: {str(e)}")
# Determine segment range for storage
seg_start = None
seg_end = None
if summary_request.segment_range and len(summary_request.segment_range) == 2:
seg_start = summary_request.segment_range[0]
seg_end = summary_request.segment_range[1]
# Build the prompt that was used (for audit trail)
prompt_used = f"{system_prompt}\n\n{user_message}" if summary_request.summary_type != "segment" else user_message
# Save summary to database
summary_data = {
"session_id": session_id,
"user_id": user_id,
"summary_type": summary_request.summary_type,
"content": llm_result.content,
"prompt_used": prompt_used,
"llm_provider": summary_request.provider,
"llm_model": summary_request.model,
"input_tokens": llm_result.input_tokens,
"output_tokens": llm_result.output_tokens,
"segment_range_start": seg_start,
"segment_range_end": seg_end,
}
result = supabase.supabase.table("transcription_summaries").insert(summary_data).execute()
if not result.data:
raise HTTPException(status_code=500, detail="Failed to save summary")
return result.data[0]
@router.get("/sessions/{session_id}/summaries", response_model=List[SummaryResponse])
async def list_summaries(
session_id: str,
user_id: str = Depends(get_user_id),
):
"""List summaries for a session."""
supabase = get_supabase_client()
# Verify ownership
session_check = supabase.supabase.table("transcription_sessions").select("id").eq("id", session_id).eq("user_id", user_id).execute()
if not session_check.data:
raise HTTPException(status_code=404, detail="Session not found")
result = supabase.supabase.table("transcription_summaries").select("*").eq("session_id", session_id).execute()
return result.data
@router.post("/sessions/{session_id}/export")
async def export_session(
session_id: str,
export_format: ExportFormat,
user_id: str = Depends(get_user_id),
):
"""Export session as SRT, TXT, or JSON file download.
Phase 3E: Full implementation — generates properly formatted files
and returns them as downloadable responses. API keys are never stored
or logged during export.
"""
supabase = get_supabase_client()
# Verify ownership
session_check = supabase.supabase.table("transcription_sessions").select("*").eq("id", session_id).eq("user_id", user_id).execute()
if not session_check.data:
raise HTTPException(status_code=404, detail="Session not found")
session = session_check.data[0]
# Get segments
segments_result = supabase.supabase.table("transcription_segments").select("*").eq("session_id", session_id).order("sequence_index").execute()
segments = segments_result.data
# Get summaries (for JSON export)
summaries_result = supabase.supabase.table("transcription_summaries").select("*").eq("session_id", session_id).execute()
summaries = summaries_result.data
# Get canvas events (for JSON export)
canvas_result = supabase.supabase.table("canvas_events").select("*").eq("session_id", session_id).order("timestamp").execute()
canvas_events = canvas_result.data
fmt = export_format.format.lower()
if fmt == "srt":
content = generate_srt(segments)
filename = f"{sanitize_filename(session.get('title', session_id))}_{session.get('started_at', 'export')[:10]}.srt"
return FileResponse(
io.BytesIO(content.encode("utf-8")),
media_type="application/x-subrip",
filename=filename,
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{filename}"},
)
elif fmt == "txt":
content = generate_txt(segments)
filename = f"{sanitize_filename(session.get('title', session_id))}_{session.get('started_at', 'export')[:10]}.txt"
return FileResponse(
io.BytesIO(content.encode("utf-8")),
media_type="text/plain",
filename=filename,
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{filename}"},
)
elif fmt == "json":
content = generate_json_export(session, segments, summaries, canvas_events)
filename = f"{sanitize_filename(session.get('title', session_id))}_{session.get('started_at', 'export')[:10]}.json"
return FileResponse(
io.BytesIO(content.encode("utf-8")),
media_type="application/json",
filename=filename,
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{filename}"},
)
else:
raise HTTPException(status_code=400, detail=f"Unsupported format: {export_format.format}")