- Add POST /transcribe/sessions/{id}/export endpoint
- Generate SRT (SubRip subtitle format) with timestamps
- Generate TXT (plain text with [HH:MM:SS,mmm] timestamps)
- Generate JSON (structured data: session, segments, summaries, canvas events)
- Return as FileResponse download with Content-Disposition headers
- Filenames include sanitized session title + date
- No API keys stored or logged during export
510 lines
19 KiB
Python
510 lines
19 KiB
Python
"""Transcription sessions router — CRUD endpoints for transcription sessions and segments."""
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
from fastapi.responses import FileResponse
|
|
from typing import Optional, List
|
|
from datetime import datetime
|
|
import io
|
|
import json
|
|
import tempfile
|
|
import os
|
|
|
|
from modules.auth.supabase_bearer import SupabaseBearer
|
|
from modules.transcription.models import (
|
|
TranscriptionSessionCreate,
|
|
TranscriptionSessionUpdate,
|
|
TranscriptionSessionResponse,
|
|
SessionListResponse,
|
|
TranscriptionSegmentCreate,
|
|
TranscriptionSegmentResponse,
|
|
SummaryGenerateRequest,
|
|
SummaryResponse,
|
|
ExportFormat,
|
|
)
|
|
from modules.transcription.llm_client import call_llm, build_prompt
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
def get_supabase_client():
|
|
"""Get Supabase service role client."""
|
|
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
|
|
return SupabaseServiceRoleClient()
|
|
|
|
|
|
def get_user_id(credentials=Depends(SupabaseBearer())) -> str:
|
|
"""Extract user_id from Supabase JWT token."""
|
|
return credentials.get("sub", credentials.get("user_id", ""))
|
|
|
|
|
|
def seconds_to_srt_timestamp(seconds: float) -> str:
|
|
"""Convert seconds to SRT timestamp format: HH:MM:SS,mmm"""
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
secs = int(seconds % 60)
|
|
millis = int((seconds % 1) * 1000)
|
|
return f"{hours:02d}:{minutes:02d}:{secs:02d},{millis:03d}"
|
|
|
|
|
|
def generate_srt(segments: List[dict]) -> str:
|
|
"""Generate SRT (SubRip subtitle) content from segments."""
|
|
srt_entries = []
|
|
for idx, seg in enumerate(segments, start=1):
|
|
start_sec = float(seg.get("start_seconds", 0))
|
|
end_sec = float(seg.get("end_seconds", 0))
|
|
text = seg.get("text", "").strip()
|
|
if not text:
|
|
continue
|
|
start_ts = seconds_to_srt_timestamp(start_sec)
|
|
end_ts = seconds_to_srt_timestamp(end_sec)
|
|
# Clean text for SRT (no line breaks within a subtitle block)
|
|
clean_text = text.replace("\n", " ").strip()
|
|
srt_entries.append(f"{idx}\n{start_ts} --> {end_ts}\n{clean_text}")
|
|
return "\n\n".join(srt_entries) + "\n" if srt_entries else ""
|
|
|
|
|
|
def generate_txt(segments: List[dict]) -> str:
|
|
"""Generate plain text transcript with timestamps from segments."""
|
|
lines = []
|
|
for seg in segments:
|
|
start_sec = float(seg.get("start_seconds", 0))
|
|
text = seg.get("text", "").strip()
|
|
if not text:
|
|
continue
|
|
ts = seconds_to_srt_timestamp(start_sec)
|
|
lines.append(f"[{ts}] {text}")
|
|
return "\n".join(lines) + "\n" if lines else ""
|
|
|
|
|
|
def generate_json_export(session: dict, segments: List[dict],
|
|
summaries: List[dict],
|
|
canvas_events: List[dict]) -> str:
|
|
"""Generate structured JSON export with segments, metadata, and canvas events."""
|
|
# Build clean segment list (exclude internal DB fields)
|
|
clean_segments = []
|
|
for seg in segments:
|
|
clean_segments.append({
|
|
"sequence_index": seg.get("sequence_index"),
|
|
"text": seg.get("text", ""),
|
|
"start_seconds": float(seg.get("start_seconds", 0)),
|
|
"end_seconds": float(seg.get("end_seconds", 0)),
|
|
"is_final": seg.get("is_final", True),
|
|
"speaker_label": seg.get("speaker_label"),
|
|
"keyword_matches": seg.get("keyword_matches"),
|
|
})
|
|
|
|
# Build clean summary list
|
|
clean_summaries = []
|
|
for s in summaries:
|
|
clean_summaries.append({
|
|
"id": s.get("id"),
|
|
"summary_type": s.get("summary_type"),
|
|
"content": s.get("content", ""),
|
|
"llm_provider": s.get("llm_provider"),
|
|
"llm_model": s.get("llm_model"),
|
|
"created_at": s.get("created_at"),
|
|
})
|
|
|
|
# Build clean canvas events list
|
|
clean_events = []
|
|
for ev in canvas_events:
|
|
clean_events.append({
|
|
"id": ev.get("id"),
|
|
"event_type": ev.get("event_type"),
|
|
"session_elapsed_seconds": float(ev.get("session_elapsed_seconds", 0)) if ev.get("session_elapsed_seconds") else None,
|
|
"timestamp": ev.get("timestamp"),
|
|
"event_payload": ev.get("event_payload", {}),
|
|
})
|
|
|
|
export_data = {
|
|
"session": {
|
|
"id": session.get("id"),
|
|
"title": session.get("title"),
|
|
"canvas_type": session.get("canvas_type"),
|
|
"started_at": session.get("started_at"),
|
|
"ended_at": session.get("ended_at"),
|
|
"duration_seconds": session.get("duration_seconds"),
|
|
"timetable_period_id": session.get("timetable_period_id"),
|
|
"timetable_event_type": session.get("timetable_event_type"),
|
|
"timetable_event_label": session.get("timetable_event_label"),
|
|
"auto_tagged": session.get("auto_tagged", False),
|
|
"llm_provider": session.get("llm_provider"),
|
|
"llm_model": session.get("llm_model"),
|
|
"word_count": session.get("word_count", 0),
|
|
"segment_count": session.get("segment_count", 0),
|
|
},
|
|
"segments": clean_segments,
|
|
"summaries": clean_summaries,
|
|
"canvas_events": clean_events,
|
|
}
|
|
|
|
return json.dumps(export_data, indent=2, default=str)
|
|
|
|
|
|
def sanitize_filename(name: str) -> str:
|
|
"""Remove or replace characters that are unsafe in filenames."""
|
|
safe = "".join(c if c.isalnum() or c in " _-." else "_" for c in name)
|
|
return safe[:100] if safe else "export"
|
|
|
|
|
|
@router.post("/sessions", response_model=TranscriptionSessionResponse)
|
|
async def create_session(
|
|
session_data: TranscriptionSessionCreate,
|
|
user_id: str = Depends(get_user_id),
|
|
):
|
|
"""Create a new transcription session."""
|
|
supabase = get_supabase_client()
|
|
|
|
data = {
|
|
"user_id": user_id,
|
|
"title": session_data.title,
|
|
"canvas_type": session_data.canvas_type,
|
|
}
|
|
|
|
result = supabase.supabase.table("transcription_sessions").insert(data).execute()
|
|
|
|
if not result.data:
|
|
raise HTTPException(status_code=500, detail="Failed to create session")
|
|
|
|
return result.data[0]
|
|
|
|
|
|
@router.patch("/sessions/{session_id}", response_model=TranscriptionSessionResponse)
|
|
async def update_session(
|
|
session_id: str,
|
|
update_data: TranscriptionSessionUpdate,
|
|
user_id: str = Depends(get_user_id),
|
|
):
|
|
"""Update a transcription session (end, tag, title)."""
|
|
supabase = get_supabase_client()
|
|
|
|
# Verify ownership
|
|
existing = supabase.supabase.table("transcription_sessions").select("*").eq("id", session_id).eq("user_id", user_id).execute()
|
|
|
|
if not existing.data:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
|
|
# Build update dict (only non-None fields)
|
|
updates = {k: v for k, v in update_data.model_dump().items() if v is not None}
|
|
updates["updated_at"] = datetime.utcnow().isoformat()
|
|
|
|
result = supabase.supabase.table("transcription_sessions").update(updates).eq("id", session_id).execute()
|
|
|
|
if not result.data:
|
|
raise HTTPException(status_code=500, detail="Failed to update session")
|
|
|
|
return result.data[0]
|
|
|
|
|
|
@router.get("/sessions", response_model=SessionListResponse)
|
|
async def list_sessions(
|
|
user_id: str = Depends(get_user_id),
|
|
page: int = Query(1, ge=1),
|
|
page_size: int = Query(20, ge=1, le=100),
|
|
timetable_period_id: Optional[str] = None,
|
|
):
|
|
"""List transcription sessions for the current user (paginated)."""
|
|
supabase = get_supabase_client()
|
|
|
|
query = supabase.supabase.table("transcription_sessions").select("*", count="exact").eq("user_id", user_id)
|
|
|
|
if timetable_period_id:
|
|
query = query.eq("timetable_period_id", timetable_period_id)
|
|
|
|
query = query.order("started_at", desc=True).range((page - 1) * page_size, page * page_size - 1)
|
|
|
|
result = query.execute()
|
|
|
|
return SessionListResponse(
|
|
sessions=result.data,
|
|
total=result.count or 0,
|
|
page=page,
|
|
page_size=page_size,
|
|
)
|
|
|
|
|
|
@router.get("/sessions/{session_id}", response_model=dict)
|
|
async def get_session(
|
|
session_id: str,
|
|
user_id: str = Depends(get_user_id),
|
|
):
|
|
"""Get a session with its segments and summaries."""
|
|
supabase = get_supabase_client()
|
|
|
|
# Get session
|
|
session_result = supabase.supabase.table("transcription_sessions").select("*").eq("id", session_id).eq("user_id", user_id).execute()
|
|
|
|
if not session_result.data:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
|
|
# Get segments
|
|
segments_result = supabase.supabase.table("transcription_segments").select("*").eq("session_id", session_id).order("sequence_index").execute()
|
|
|
|
# Get summaries
|
|
summaries_result = supabase.supabase.table("transcription_summaries").select("*").eq("session_id", session_id).execute()
|
|
|
|
return {
|
|
"session": session_result.data[0],
|
|
"segments": segments_result.data,
|
|
"summaries": summaries_result.data,
|
|
}
|
|
|
|
|
|
@router.delete("/sessions/{session_id}")
|
|
async def delete_session(
|
|
session_id: str,
|
|
user_id: str = Depends(get_user_id),
|
|
):
|
|
"""Soft delete a transcription session."""
|
|
supabase = get_supabase_client()
|
|
|
|
# Verify ownership
|
|
existing = supabase.supabase.table("transcription_sessions").select("*").eq("id", session_id).eq("user_id", user_id).execute()
|
|
|
|
if not existing.data:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
|
|
# Soft delete: set ended_at and mark metadata
|
|
result = supabase.supabase.table("transcription_sessions").update({
|
|
"ended_at": datetime.utcnow().isoformat(),
|
|
"metadata": {"deleted": True},
|
|
}).eq("id", session_id).execute()
|
|
|
|
return {"message": "Session deleted"}
|
|
|
|
|
|
@router.post("/sessions/{session_id}/segments")
|
|
async def upsert_segments(
|
|
session_id: str,
|
|
segments: List[TranscriptionSegmentCreate],
|
|
user_id: str = Depends(get_user_id),
|
|
):
|
|
"""Batch upsert segments for a session."""
|
|
supabase = get_supabase_client()
|
|
|
|
# Verify session exists and user owns it
|
|
session_check = supabase.supabase.table("transcription_sessions").select("id").eq("id", session_id).eq("user_id", user_id).execute()
|
|
|
|
if not session_check.data:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
|
|
# Batch insert segments
|
|
segment_data = [s.model_dump() for s in segments]
|
|
|
|
if segment_data:
|
|
result = supabase.supabase.table("transcription_segments").insert(segment_data).execute()
|
|
|
|
# Update segment count on session
|
|
supabase.supabase.table("transcription_sessions").update({
|
|
"segment_count": len(segment_data),
|
|
}).eq("id", session_id).execute()
|
|
|
|
return {"message": f"Upserted {len(segment_data)} segments", "count": len(segment_data)}
|
|
|
|
|
|
@router.get("/sessions/{session_id}/segments", response_model=List[TranscriptionSegmentResponse])
|
|
async def list_segments(
|
|
session_id: str,
|
|
user_id: str = Depends(get_user_id),
|
|
):
|
|
"""List all segments for a session."""
|
|
supabase = get_supabase_client()
|
|
|
|
# Verify ownership
|
|
session_check = supabase.supabase.table("transcription_sessions").select("id").eq("id", session_id).eq("user_id", user_id).execute()
|
|
|
|
if not session_check.data:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
|
|
result = supabase.supabase.table("transcription_segments").select("*").eq("session_id", session_id).order("sequence_index").execute()
|
|
|
|
return result.data
|
|
|
|
|
|
@router.post("/sessions/{session_id}/summaries", response_model=SummaryResponse)
|
|
async def generate_summary(
|
|
session_id: str,
|
|
summary_request: SummaryGenerateRequest,
|
|
user_id: str = Depends(get_user_id),
|
|
):
|
|
"""Generate a summary for a session using the specified LLM provider.
|
|
|
|
Phase 3: Full implementation — calls the pluggable LLM client with
|
|
prompt templates from prompts.py. API key is passed per-request and
|
|
never stored or logged.
|
|
"""
|
|
supabase = get_supabase_client()
|
|
|
|
# Verify session exists and user owns it
|
|
session_check = supabase.supabase.table("transcription_sessions").select("*").eq("id", session_id).eq("user_id", user_id).execute()
|
|
|
|
if not session_check.data:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
|
|
session = session_check.data[0]
|
|
|
|
# Build transcript from segments (or use segment range)
|
|
segments_query = supabase.supabase.table("transcription_segments").select("*").eq("session_id", session_id).order("sequence_index")
|
|
segments_result = segments_query.execute()
|
|
|
|
if not segments_result.data:
|
|
raise HTTPException(status_code=400, detail="No segments found for this session")
|
|
|
|
# Apply segment range filter if specified
|
|
segments = segments_result.data
|
|
if summary_request.segment_range and len(summary_request.segment_range) == 2:
|
|
start_idx, end_idx = summary_request.segment_range[0], summary_request.segment_range[1]
|
|
if start_idx is not None and end_idx is not None:
|
|
segments = segments[start_idx:end_idx]
|
|
elif start_idx is not None:
|
|
segments = segments[start_idx:]
|
|
elif end_idx is not None:
|
|
segments = segments[:end_idx]
|
|
|
|
# Build full transcript text from segments
|
|
transcript_parts = [s["text"] for s in segments if s.get("text")]
|
|
transcript = "\n".join(transcript_parts)
|
|
|
|
if not transcript.strip():
|
|
raise HTTPException(status_code=400, detail="Transcript is empty — cannot generate summary")
|
|
|
|
# Build prompt from template
|
|
system_prompt, user_message = build_prompt(summary_request.summary_type, transcript)
|
|
|
|
# Call the LLM client
|
|
try:
|
|
llm_result = await call_llm(
|
|
provider=summary_request.provider,
|
|
model=summary_request.model,
|
|
api_key=summary_request.api_key,
|
|
system_prompt=system_prompt,
|
|
user_message=user_message,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"LLM call failed: {e}")
|
|
raise HTTPException(status_code=502, detail=f"LLM generation failed: {str(e)}")
|
|
|
|
# Determine segment range for storage
|
|
seg_start = None
|
|
seg_end = None
|
|
if summary_request.segment_range and len(summary_request.segment_range) == 2:
|
|
seg_start = summary_request.segment_range[0]
|
|
seg_end = summary_request.segment_range[1]
|
|
|
|
# Build the prompt that was used (for audit trail)
|
|
prompt_used = f"{system_prompt}\n\n{user_message}" if summary_request.summary_type != "segment" else user_message
|
|
|
|
# Save summary to database
|
|
summary_data = {
|
|
"session_id": session_id,
|
|
"user_id": user_id,
|
|
"summary_type": summary_request.summary_type,
|
|
"content": llm_result.content,
|
|
"prompt_used": prompt_used,
|
|
"llm_provider": summary_request.provider,
|
|
"llm_model": summary_request.model,
|
|
"input_tokens": llm_result.input_tokens,
|
|
"output_tokens": llm_result.output_tokens,
|
|
"segment_range_start": seg_start,
|
|
"segment_range_end": seg_end,
|
|
}
|
|
|
|
result = supabase.supabase.table("transcription_summaries").insert(summary_data).execute()
|
|
|
|
if not result.data:
|
|
raise HTTPException(status_code=500, detail="Failed to save summary")
|
|
|
|
return result.data[0]
|
|
|
|
|
|
@router.get("/sessions/{session_id}/summaries", response_model=List[SummaryResponse])
|
|
async def list_summaries(
|
|
session_id: str,
|
|
user_id: str = Depends(get_user_id),
|
|
):
|
|
"""List summaries for a session."""
|
|
supabase = get_supabase_client()
|
|
|
|
# Verify ownership
|
|
session_check = supabase.supabase.table("transcription_sessions").select("id").eq("id", session_id).eq("user_id", user_id).execute()
|
|
|
|
if not session_check.data:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
|
|
result = supabase.supabase.table("transcription_summaries").select("*").eq("session_id", session_id).execute()
|
|
|
|
return result.data
|
|
|
|
|
|
@router.post("/sessions/{session_id}/export")
|
|
async def export_session(
|
|
session_id: str,
|
|
export_format: ExportFormat,
|
|
user_id: str = Depends(get_user_id),
|
|
):
|
|
"""Export session as SRT, TXT, or JSON file download.
|
|
|
|
Phase 3E: Full implementation — generates properly formatted files
|
|
and returns them as downloadable responses. API keys are never stored
|
|
or logged during export.
|
|
"""
|
|
supabase = get_supabase_client()
|
|
|
|
# Verify ownership
|
|
session_check = supabase.supabase.table("transcription_sessions").select("*").eq("id", session_id).eq("user_id", user_id).execute()
|
|
|
|
if not session_check.data:
|
|
raise HTTPException(status_code=404, detail="Session not found")
|
|
|
|
session = session_check.data[0]
|
|
|
|
# Get segments
|
|
segments_result = supabase.supabase.table("transcription_segments").select("*").eq("session_id", session_id).order("sequence_index").execute()
|
|
segments = segments_result.data
|
|
|
|
# Get summaries (for JSON export)
|
|
summaries_result = supabase.supabase.table("transcription_summaries").select("*").eq("session_id", session_id).execute()
|
|
summaries = summaries_result.data
|
|
|
|
# Get canvas events (for JSON export)
|
|
canvas_result = supabase.supabase.table("canvas_events").select("*").eq("session_id", session_id).order("timestamp").execute()
|
|
canvas_events = canvas_result.data
|
|
|
|
fmt = export_format.format.lower()
|
|
|
|
if fmt == "srt":
|
|
content = generate_srt(segments)
|
|
filename = f"{sanitize_filename(session.get('title', session_id))}_{session.get('started_at', 'export')[:10]}.srt"
|
|
return FileResponse(
|
|
io.BytesIO(content.encode("utf-8")),
|
|
media_type="application/x-subrip",
|
|
filename=filename,
|
|
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{filename}"},
|
|
)
|
|
|
|
elif fmt == "txt":
|
|
content = generate_txt(segments)
|
|
filename = f"{sanitize_filename(session.get('title', session_id))}_{session.get('started_at', 'export')[:10]}.txt"
|
|
return FileResponse(
|
|
io.BytesIO(content.encode("utf-8")),
|
|
media_type="text/plain",
|
|
filename=filename,
|
|
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{filename}"},
|
|
)
|
|
|
|
elif fmt == "json":
|
|
content = generate_json_export(session, segments, summaries, canvas_events)
|
|
filename = f"{sanitize_filename(session.get('title', session_id))}_{session.get('started_at', 'export')[:10]}.json"
|
|
return FileResponse(
|
|
io.BytesIO(content.encode("utf-8")),
|
|
media_type="application/json",
|
|
filename=filename,
|
|
headers={"Content-Disposition": f"attachment; filename*=UTF-8''{filename}"},
|
|
)
|
|
|
|
else:
|
|
raise HTTPException(status_code=400, detail=f"Unsupported format: {export_format.format}")
|