feat: implement export endpoint for transcription sessions (Phase 3E)

- Add POST /transcribe/sessions/{id}/export endpoint
- Generate SRT (SubRip subtitle format) with timestamps
- Generate TXT (plain text with [HH:MM:SS,mmm] timestamps)
- Generate JSON (structured data: session, segments, summaries, canvas events)
- Return as FileResponse download with Content-Disposition headers
- Filenames include sanitized session title + date
- No API keys stored or logged during export
This commit is contained in:
Kevin Carter 2026-05-20 22:25:36 +00:00
parent 36ae76143f
commit 7ca21ef538

View File

@ -1,8 +1,13 @@
"""Transcription sessions router — CRUD endpoints for transcription sessions and segments.""" """Transcription sessions router — CRUD endpoints for transcription sessions and segments."""
from fastapi import APIRouter, Depends, HTTPException, Query from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import FileResponse
from typing import Optional, List from typing import Optional, List
from datetime import datetime from datetime import datetime
import io
import json
import tempfile
import os
from modules.auth.supabase_bearer import SupabaseBearer from modules.auth.supabase_bearer import SupabaseBearer
from modules.transcription.models import ( from modules.transcription.models import (
@ -35,6 +40,116 @@ def get_user_id(credentials=Depends(SupabaseBearer())) -> str:
return credentials.get("sub", credentials.get("user_id", "")) return credentials.get("sub", credentials.get("user_id", ""))
def seconds_to_srt_timestamp(seconds: float) -> str:
"""Convert seconds to SRT timestamp format: HH:MM:SS,mmm"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
millis = int((seconds % 1) * 1000)
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
def generate_srt(segments: List[dict]) -> str:
"""Generate SRT (SubRip subtitle) content from segments."""
srt_entries = []
for idx, seg in enumerate(segments, start=1):
start_sec = float(seg.get("start_seconds", 0))
end_sec = float(seg.get("end_seconds", 0))
text = seg.get("text", "").strip()
if not text:
continue
start_ts = seconds_to_srt_timestamp(start_sec)
end_ts = seconds_to_srt_timestamp(end_sec)
# Clean text for SRT (no line breaks within a subtitle block)
clean_text = text.replace("\n", " ").strip()
srt_entries.append(f"{idx}\n{start_ts} --> {end_ts}\n{clean_text}")
return "\n\n".join(srt_entries) + "\n" if srt_entries else ""
def generate_txt(segments: List[dict]) -> str:
"""Generate plain text transcript with timestamps from segments."""
lines = []
for seg in segments:
start_sec = float(seg.get("start_seconds", 0))
text = seg.get("text", "").strip()
if not text:
continue
ts = seconds_to_srt_timestamp(start_sec)
lines.append(f"[{ts}] {text}")
return "\n".join(lines) + "\n" if lines else ""
def generate_json_export(session: dict, segments: List[dict],
summaries: List[dict],
canvas_events: List[dict]) -> str:
"""Generate structured JSON export with segments, metadata, and canvas events."""
# Build clean segment list (exclude internal DB fields)
clean_segments = []
for seg in segments:
clean_segments.append({
"sequence_index": seg.get("sequence_index"),
"text": seg.get("text", ""),
"start_seconds": float(seg.get("start_seconds", 0)),
"end_seconds": float(seg.get("end_seconds", 0)),
"is_final": seg.get("is_final", True),
"speaker_label": seg.get("speaker_label"),
"keyword_matches": seg.get("keyword_matches"),
})
# Build clean summary list
clean_summaries = []
for s in summaries:
clean_summaries.append({
"id": s.get("id"),
"summary_type": s.get("summary_type"),
"content": s.get("content", ""),
"llm_provider": s.get("llm_provider"),
"llm_model": s.get("llm_model"),
"created_at": s.get("created_at"),
})
# Build clean canvas events list
clean_events = []
for ev in canvas_events:
clean_events.append({
"id": ev.get("id"),
"event_type": ev.get("event_type"),
"session_elapsed_seconds": float(ev.get("session_elapsed_seconds", 0)) if ev.get("session_elapsed_seconds") else None,
"timestamp": ev.get("timestamp"),
"event_payload": ev.get("event_payload", {}),
})
export_data = {
"session": {
"id": session.get("id"),
"title": session.get("title"),
"canvas_type": session.get("canvas_type"),
"started_at": session.get("started_at"),
"ended_at": session.get("ended_at"),
"duration_seconds": session.get("duration_seconds"),
"timetable_period_id": session.get("timetable_period_id"),
"timetable_event_type": session.get("timetable_event_type"),
"timetable_event_label": session.get("timetable_event_label"),
"auto_tagged": session.get("auto_tagged", False),
"llm_provider": session.get("llm_provider"),
"llm_model": session.get("llm_model"),
"word_count": session.get("word_count", 0),
"segment_count": session.get("segment_count", 0),
},
"segments": clean_segments,
"summaries": clean_summaries,
"canvas_events": clean_events,
}
return json.dumps(export_data, indent=2, default=str)
def sanitize_filename(name: str) -> str:
"""Remove or replace characters that are unsafe in filenames."""
safe = "".join(c if c.isalnum() or c in " _-." else "_" for c in name)
return safe[:100] if safe else "export"
@router.post("/sessions", response_model=TranscriptionSessionResponse) @router.post("/sessions", response_model=TranscriptionSessionResponse)
async def create_session( async def create_session(
session_data: TranscriptionSessionCreate, session_data: TranscriptionSessionCreate,
@ -330,26 +445,65 @@ async def export_session(
export_format: ExportFormat, export_format: ExportFormat,
user_id: str = Depends(get_user_id), user_id: str = Depends(get_user_id),
): ):
"""Export session as SRT, TXT, or JSON (Phase 1 stub).""" """Export session as SRT, TXT, or JSON file download.
Phase 3E: Full implementation generates properly formatted files
and returns them as downloadable responses. API keys are never stored
or logged during export.
"""
supabase = get_supabase_client() supabase = get_supabase_client()
# Verify ownership # Verify ownership
session_check = supabase.supabase.table("transcription_sessions").select("id").eq("id", session_id).eq("user_id", user_id).execute() session_check = supabase.supabase.table("transcription_sessions").select("*").eq("id", session_id).eq("user_id", user_id).execute()
if not session_check.data: if not session_check.data:
raise HTTPException(status_code=404, detail="Session not found") raise HTTPException(status_code=404, detail="Session not found")
session = session_check.data[0]
# Get segments # Get segments
segments_result = supabase.supabase.table("transcription_segments").select("*").eq("session_id", session_id).order("sequence_index").execute() segments_result = supabase.supabase.table("transcription_segments").select("*").eq("session_id", session_id).order("sequence_index").execute()
segments = segments_result.data segments = segments_result.data
if export_format.format == "srt": # Get summaries (for JSON export)
# Phase 1 stub — implement in Phase 3 summaries_result = supabase.supabase.table("transcription_summaries").select("*").eq("session_id", session_id).execute()
return {"format": "srt", "content": "[TODO: Generate SRT from segments]"} summaries = summaries_result.data
elif export_format.format == "txt":
text = "\n".join(s["text"] for s in segments) # Get canvas events (for JSON export)
return {"format": "txt", "content": text} canvas_result = supabase.supabase.table("canvas_events").select("*").eq("session_id", session_id).order("timestamp").execute()
elif export_format.format == "json": canvas_events = canvas_result.data
return {"format": "json", "content": {"segments": segments}}
fmt = export_format.format.lower()
if fmt == "srt":
content = generate_srt(segments)
filename = f"{sanitize_filename(session.get('title', session_id))}_{session.get('started_at', 'export')[:10]}.srt"
return FileResponse(
io.BytesIO(content.encode("utf-8")),
media_type="application/x-subrip",
filename=filename,
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{filename}"},
)
elif fmt == "txt":
content = generate_txt(segments)
filename = f"{sanitize_filename(session.get('title', session_id))}_{session.get('started_at', 'export')[:10]}.txt"
return FileResponse(
io.BytesIO(content.encode("utf-8")),
media_type="text/plain",
filename=filename,
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{filename}"},
)
elif fmt == "json":
content = generate_json_export(session, segments, summaries, canvas_events)
filename = f"{sanitize_filename(session.get('title', session_id))}_{session.get('started_at', 'export')[:10]}.json"
return FileResponse(
io.BytesIO(content.encode("utf-8")),
media_type="application/json",
filename=filename,
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{filename}"},
)
else: else:
raise HTTPException(status_code=400, detail=f"Unsupported format: {export_format.format}") raise HTTPException(status_code=400, detail=f"Unsupported format: {export_format.format}")