feat(transcription): add Supabase schema and API endpoints for CIS

This commit is contained in:
kcar 2026-05-20 21:03:00 +00:00
parent cf9bdca813
commit b47c7c252d
10 changed files with 818 additions and 17 deletions

View File

@ -0,0 +1 @@
# Transcription module for Classroom Copilot

View File

@ -0,0 +1,53 @@
"""Pluggable LLM client for transcription summaries.
Phase 1: Stub implementation returns TODO string.
Phase 3: Wire up Anthropic, OpenAI, and Ollama providers.
"""
import os
from typing import Optional
async def call_llm(
provider: str,
model: str,
api_key: str,
system_prompt: str,
user_message: str,
) -> str:
"""Call an LLM to generate a summary.
Phase 1 stub returns a TODO string.
Phase 3 will implement actual provider routing.
Args:
provider: 'anthropic', 'openai', 'ollama', 'openrouter', 'google'
model: Model name (e.g. 'claude-sonnet-4-6', 'gpt-4o', 'llama3')
api_key: User's API key (from localStorage, passed per-request)
system_prompt: System prompt template (already filled with transcript)
user_message: User message content
Returns:
LLM-generated summary text
"""
# Phase 1 stub — TODO: implement in Phase 3
return f"[TODO: Implement LLM call for provider={provider}, model={model}]"
async def call_anthropic(api_key: str, model: str, system_prompt: str, user_message: str) -> str:
"""Call Anthropic Claude API."""
# Phase 3 implementation placeholder
return f"[TODO: Anthropic call — model={model}]"
async def call_openai(api_key: str, model: str, system_prompt: str, user_message: str) -> str:
"""Call OpenAI API."""
# Phase 3 implementation placeholder
return f"[TODO: OpenAI call — model={model}]"
async def call_ollama(api_key: str, model: str, system_prompt: str, user_message: str) -> str:
"""Call local Ollama instance."""
# Phase 3 implementation placeholder
ollama_url = os.getenv("OLLAMA_URL", "https://ollama.kevlarai.com")
return f"[TODO: Ollama call — url={ollama_url}, model={model}]"

View File

@ -0,0 +1,208 @@
"""Pydantic models for the Transcription system."""
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
# --- Session Models ---
class TranscriptionSessionCreate(BaseModel):
user_id: str
title: Optional[str] = None
canvas_type: str = "teaching-canvas"
class TranscriptionSessionUpdate(BaseModel):
title: Optional[str] = None
ended_at: Optional[datetime] = None
timetable_period_id: Optional[str] = None
timetable_event_type: Optional[str] = None
timetable_event_label: Optional[str] = None
auto_tagged: Optional[bool] = None
llm_provider: Optional[str] = None
llm_model: Optional[str] = None
class TranscriptionSessionResponse(BaseModel):
id: str
user_id: str
title: Optional[str] = None
canvas_type: str
started_at: datetime
ended_at: Optional[datetime] = None
duration_seconds: Optional[int] = None
timetable_period_id: Optional[str] = None
timetable_event_type: Optional[str] = None
timetable_event_label: Optional[str] = None
auto_tagged: bool = False
llm_provider: Optional[str] = None
llm_model: Optional[str] = None
word_count: int = 0
segment_count: int = 0
metadata: dict = {}
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class SessionListResponse(BaseModel):
sessions: List[TranscriptionSessionResponse]
total: int
page: int
page_size: int
# --- Segment Models ---
class TranscriptionSegmentCreate(BaseModel):
session_id: str
sequence_index: int
text: str
start_seconds: float
end_seconds: float
is_final: bool = True
speaker_label: Optional[str] = None
keyword_matches: Optional[List[str]] = None
class TranscriptionSegmentResponse(BaseModel):
id: str
session_id: str
sequence_index: int
text: str
start_seconds: float
end_seconds: float
is_final: bool = True
speaker_label: Optional[str] = None
keyword_matches: Optional[List[str]] = None
created_at: datetime
class Config:
from_attributes = True
# --- Canvas Event Models ---
class CanvasEventCreate(BaseModel):
session_id: Optional[str] = None
user_id: str
timestamp: Optional[datetime] = None
session_elapsed_seconds: Optional[float] = None
event_type: str
event_payload: dict = {}
canvas_snapshot_url: Optional[str] = None
tldraw_page_id: Optional[str] = None
tldraw_shape_ids: Optional[List[str]] = None
class CanvasEventResponse(BaseModel):
id: str
session_id: Optional[str] = None
user_id: str
timestamp: datetime
session_elapsed_seconds: Optional[float] = None
event_type: str
event_payload: dict = {}
canvas_snapshot_url: Optional[str] = None
tldraw_page_id: Optional[str] = None
tldraw_shape_ids: Optional[List[str]] = None
class Config:
from_attributes = True
# --- Summary Models ---
class SummaryGenerateRequest(BaseModel):
summary_type: str # full_lesson, questions_asked, teaching_style, key_moments, segment
provider: str # anthropic, openai, ollama, openrouter, google
model: str
api_key: str # from frontend user settings, passed per-request
segment_range: Optional[List[Optional[int]]] = None # [start, end], null = all
include_canvas_snapshots: bool = False
class SummaryResponse(BaseModel):
id: str
session_id: str
user_id: str
summary_type: str
content: str
prompt_used: Optional[str] = None
llm_provider: str
llm_model: str
input_tokens: Optional[int] = None
output_tokens: Optional[int] = None
segment_range_start: Optional[int] = None
segment_range_end: Optional[int] = None
canvas_snapshot_urls: Optional[List[str]] = None
created_at: datetime
class Config:
from_attributes = True
# --- Keyword Watch Models ---
class KeywordWatchCreate(BaseModel):
user_id: str
keyword: str
match_type: str = "contains" # contains, exact, starts_with, regex
action: str = "log" # log, alert, canvas_shape, webhook
class KeywordWatchResponse(BaseModel):
id: str
user_id: str
keyword: str
match_type: str = "contains"
action: str = "log"
is_active: bool = True
created_at: datetime
class Config:
from_attributes = True
# --- Keyword Event Models ---
class KeywordEventCreate(BaseModel):
session_id: str
segment_id: Optional[str] = None
keyword_watch_id: Optional[str] = None
keyword_text: str
matched_in_text: str
session_elapsed_seconds: Optional[float] = None
class KeywordEventResponse(BaseModel):
id: str
session_id: str
segment_id: Optional[str] = None
keyword_watch_id: Optional[str] = None
keyword_text: str
matched_in_text: str
session_elapsed_seconds: Optional[float] = None
created_at: datetime
class Config:
from_attributes = True
# --- Export Models ---
class ExportFormat(BaseModel):
format: str # srt, txt, json
# --- Timetable Models ---
class CurrentPeriodResponse(BaseModel):
period_id: Optional[str] = None
event_type: Optional[str] = None
event_label: Optional[str] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None

View File

@ -0,0 +1,51 @@
"""LLM prompt templates for transcription summaries."""
FULL_LESSON = """You are an expert educational analyst. Below is a transcript of a lesson. Provide a structured summary including:
1. Main topics covered (with estimated time on each)
2. Key teaching moments
3. Notable observations about pacing and engagement
4. Suggestions for improvement
Transcript:
{transcript}"""
QUESTIONS_ASKED = """You are an expert educational analyst. Extract all questions asked by the teacher from this lesson transcript. For each question:
1. Quote the exact question
2. Categorize by type: open/closed
3. Identify Bloom's taxonomy level (Remember, Understand, Apply, Analyze, Evaluate, Create)
4. Note any subject-specific content
Transcript:
{transcript}"""
TEACHING_STYLE = """You are an expert educational analyst. Analyse this lesson transcript for teaching style. Comment on:
1. Pacing was the lesson well-paced? Where did it drag or rush?
2. Questioning technique variety, depth, follow-up
3. Explanation clarity were concepts explained effectively?
4. Student engagement indicators (changes in tone, pauses for responses)
5. Suggestions for improvement
Transcript:
{transcript}"""
KEY_MOMENTS = """You are an expert educational analyst. Identify the most significant moments in this lesson:
1. Topic transitions (with timestamps)
2. Student interactions (marked by change in tone or pause)
3. Key explanations that seemed to land well
4. Any moments of confusion or breakthrough
Transcript:
{transcript}"""
SEGMENT = """Summarise this portion of the lesson in 2-3 sentences suitable for a lesson log entry.
Transcript:
{transcript}"""
PROMPT_TEMPLATES = {
"full_lesson": FULL_LESSON,
"questions_asked": QUESTIONS_ASKED,
"teaching_style": TEACHING_STYLE,
"key_moments": KEY_MOMENTS,
"segment": SEGMENT,
}

View File

@ -162,3 +162,13 @@ async def process_worker_timetable(file_content, user_node_data, worker_node_dat
finally: finally:
logging.info(f"Closing driver for {worker_node_data['worker_db_name']}") logging.info(f"Closing driver for {worker_node_data['worker_db_name']}")
driver.close_driver(neo_driver) driver.close_driver(neo_driver)
@router.get("/current-period")
async def get_current_period(user_id: str = ""):
# Phase 1: return stub — TODO: implement Neo4j query in Phase 2
return {
"period_id": None,
"event_type": None,
"event_label": None,
"start_time": None,
"end_time": None,
}

View File

@ -0,0 +1,63 @@
"""Canvas events router — batch write and query canvas event logs."""
from fastapi import APIRouter, Depends, HTTPException, Query
from typing import List, Optional
from datetime import datetime
from modules.auth.supabase_bearer import SupabaseBearer
from modules.transcription.models import CanvasEventCreate, CanvasEventResponse
router = APIRouter()
def get_supabase_client():
"""Get Supabase service role client."""
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
return SupabaseServiceRoleClient()
def get_user_id(credentials=Depends(SupabaseBearer())) -> str:
"""Extract user_id from Supabase JWT token."""
return credentials.get("sub", credentials.get("user_id", ""))
@router.post("/canvas-events")
async def batch_write_canvas_events(
events: List[CanvasEventCreate],
user_id: str = Depends(get_user_id),
):
"""Batch write canvas events."""
supabase = get_supabase_client()
# Filter events to only this user's
user_events = [e for e in events if e.user_id == user_id]
if not user_events:
return {"message": "No events to write", "count": 0}
event_data = [e.model_dump() for e in user_events]
result = supabase.supabase.table("canvas_events").insert(event_data).execute()
return {"message": f"Wrote {len(event_data)} events", "count": len(event_data)}
@router.get("/canvas-events", response_model=List[CanvasEventResponse])
async def get_canvas_events(
session_id: Optional[str] = Query(None),
user_id: str = Depends(get_user_id),
limit: int = Query(100, ge=1, le=1000),
):
"""Get canvas events for a session or user."""
supabase = get_supabase_client()
query = supabase.supabase.table("canvas_events").select("*").eq("user_id", user_id)
if session_id:
query = query.eq("session_id", session_id)
query = query.order("timestamp", desc=True).limit(limit)
result = query.execute()
return result.data

View File

@ -0,0 +1,102 @@
"""Keyword watches router — CRUD for keyword watch rules and events."""
from fastapi import APIRouter, Depends, HTTPException
from typing import List
from modules.auth.supabase_bearer import SupabaseBearer
from modules.transcription.models import (
KeywordWatchCreate,
KeywordWatchResponse,
KeywordEventCreate,
KeywordEventResponse,
)
router = APIRouter()
def get_supabase_client():
"""Get Supabase service role client."""
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
return SupabaseServiceRoleClient()
def get_user_id(credentials=Depends(SupabaseBearer())) -> str:
"""Extract user_id from Supabase JWT token."""
return credentials.get("sub", credentials.get("user_id", ""))
@router.get("/keywords", response_model=List[KeywordWatchResponse])
async def list_keyword_watches(
user_id: str = Depends(get_user_id),
):
"""List user's keyword watches."""
supabase = get_supabase_client()
result = supabase.supabase.table("keyword_watches").select("*").eq("user_id", user_id).execute()
return result.data
@router.post("/keywords", response_model=KeywordWatchResponse)
async def create_keyword_watch(
watch: KeywordWatchCreate,
user_id: str = Depends(get_user_id),
):
"""Create a keyword watch."""
supabase = get_supabase_client()
data = {
"user_id": user_id,
"keyword": watch.keyword.lower(), # Store lowercase for case-insensitive matching
"match_type": watch.match_type,
"action": watch.action,
}
result = supabase.supabase.table("keyword_watches").insert(data).execute()
if not result.data:
raise HTTPException(status_code=500, detail="Failed to create keyword watch")
return result.data[0]
@router.delete("/keywords/{watch_id}")
async def delete_keyword_watch(
watch_id: str,
user_id: str = Depends(get_user_id),
):
"""Delete a keyword watch."""
supabase = get_supabase_client()
# Verify ownership
existing = supabase.supabase.table("keyword_watches").select("*").eq("id", watch_id).eq("user_id", user_id).execute()
if not existing.data:
raise HTTPException(status_code=404, detail="Keyword watch not found")
supabase.supabase.table("keyword_watches").delete().eq("id", watch_id).execute()
return {"message": "Keyword watch deleted"}
@router.post("/keywords/events")
async def log_keyword_event(
event: KeywordEventCreate,
user_id: str = Depends(get_user_id),
):
"""Log a keyword event (triggered when a watch matches)."""
supabase = get_supabase_client()
# Verify session ownership
session_check = supabase.supabase.table("transcription_sessions").select("id").eq("id", event.session_id).eq("user_id", user_id).execute()
if not session_check.data:
raise HTTPException(status_code=404, detail="Session not found")
data = event.model_dump()
result = supabase.supabase.table("keyword_events").insert(data).execute()
if not result.data:
raise HTTPException(status_code=500, detail="Failed to log keyword event")
return result.data[0]

View File

@ -0,0 +1,293 @@
"""Transcription sessions router — CRUD endpoints for transcription sessions and segments."""
from fastapi import APIRouter, Depends, HTTPException, Query
from typing import Optional, List
from datetime import datetime
from modules.auth.supabase_bearer import SupabaseBearer
from modules.transcription.models import (
TranscriptionSessionCreate,
TranscriptionSessionUpdate,
TranscriptionSessionResponse,
SessionListResponse,
TranscriptionSegmentCreate,
TranscriptionSegmentResponse,
SummaryGenerateRequest,
SummaryResponse,
ExportFormat,
)
router = APIRouter()
def get_supabase_client():
"""Get Supabase service role client."""
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
return SupabaseServiceRoleClient()
def get_user_id(credentials=Depends(SupabaseBearer())) -> str:
"""Extract user_id from Supabase JWT token."""
return credentials.get("sub", credentials.get("user_id", ""))
@router.post("/sessions", response_model=TranscriptionSessionResponse)
async def create_session(
session_data: TranscriptionSessionCreate,
user_id: str = Depends(get_user_id),
):
"""Create a new transcription session."""
supabase = get_supabase_client()
data = {
"user_id": user_id,
"title": session_data.title,
"canvas_type": session_data.canvas_type,
}
result = supabase.supabase.table("transcription_sessions").insert(data).execute()
if not result.data:
raise HTTPException(status_code=500, detail="Failed to create session")
return result.data[0]
@router.patch("/sessions/{session_id}", response_model=TranscriptionSessionResponse)
async def update_session(
session_id: str,
update_data: TranscriptionSessionUpdate,
user_id: str = Depends(get_user_id),
):
"""Update a transcription session (end, tag, title)."""
supabase = get_supabase_client()
# Verify ownership
existing = supabase.supabase.table("transcription_sessions").select("*").eq("id", session_id).eq("user_id", user_id).execute()
if not existing.data:
raise HTTPException(status_code=404, detail="Session not found")
# Build update dict (only non-None fields)
updates = {k: v for k, v in update_data.model_dump().items() if v is not None}
updates["updated_at"] = datetime.utcnow().isoformat()
result = supabase.supabase.table("transcription_sessions").update(updates).eq("id", session_id).execute()
if not result.data:
raise HTTPException(status_code=500, detail="Failed to update session")
return result.data[0]
@router.get("/sessions", response_model=SessionListResponse)
async def list_sessions(
user_id: str = Depends(get_user_id),
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
timetable_period_id: Optional[str] = None,
):
"""List transcription sessions for the current user (paginated)."""
supabase = get_supabase_client()
query = supabase.supabase.table("transcription_sessions").select("*", count="exact").eq("user_id", user_id)
if timetable_period_id:
query = query.eq("timetable_period_id", timetable_period_id)
query = query.order("started_at", desc=True).range((page - 1) * page_size, page * page_size - 1)
result = query.execute()
return SessionListResponse(
sessions=result.data,
total=result.count or 0,
page=page,
page_size=page_size,
)
@router.get("/sessions/{session_id}", response_model=dict)
async def get_session(
session_id: str,
user_id: str = Depends(get_user_id),
):
"""Get a session with its segments and summaries."""
supabase = get_supabase_client()
# Get session
session_result = supabase.supabase.table("transcription_sessions").select("*").eq("id", session_id).eq("user_id", user_id).execute()
if not session_result.data:
raise HTTPException(status_code=404, detail="Session not found")
# Get segments
segments_result = supabase.supabase.table("transcription_segments").select("*").eq("session_id", session_id).order("sequence_index").execute()
# Get summaries
summaries_result = supabase.supabase.table("transcription_summaries").select("*").eq("session_id", session_id).execute()
return {
"session": session_result.data[0],
"segments": segments_result.data,
"summaries": summaries_result.data,
}
@router.delete("/sessions/{session_id}")
async def delete_session(
session_id: str,
user_id: str = Depends(get_user_id),
):
"""Soft delete a transcription session."""
supabase = get_supabase_client()
# Verify ownership
existing = supabase.supabase.table("transcription_sessions").select("*").eq("id", session_id).eq("user_id", user_id).execute()
if not existing.data:
raise HTTPException(status_code=404, detail="Session not found")
# Soft delete: set ended_at and mark metadata
result = supabase.supabase.table("transcription_sessions").update({
"ended_at": datetime.utcnow().isoformat(),
"metadata": {"deleted": True},
}).eq("id", session_id).execute()
return {"message": "Session deleted"}
@router.post("/sessions/{session_id}/segments")
async def upsert_segments(
session_id: str,
segments: List[TranscriptionSegmentCreate],
user_id: str = Depends(get_user_id),
):
"""Batch upsert segments for a session."""
supabase = get_supabase_client()
# Verify session exists and user owns it
session_check = supabase.supabase.table("transcription_sessions").select("id").eq("id", session_id).eq("user_id", user_id).execute()
if not session_check.data:
raise HTTPException(status_code=404, detail="Session not found")
# Batch insert segments
segment_data = [s.model_dump() for s in segments]
if segment_data:
result = supabase.supabase.table("transcription_segments").insert(segment_data).execute()
# Update segment count on session
supabase.supabase.table("transcription_sessions").update({
"segment_count": len(segment_data),
}).eq("id", session_id).execute()
return {"message": f"Upserted {len(segment_data)} segments", "count": len(segment_data)}
@router.get("/sessions/{session_id}/segments", response_model=List[TranscriptionSegmentResponse])
async def list_segments(
session_id: str,
user_id: str = Depends(get_user_id),
):
"""List all segments for a session."""
supabase = get_supabase_client()
# Verify ownership
session_check = supabase.supabase.table("transcription_sessions").select("id").eq("id", session_id).eq("user_id", user_id).execute()
if not session_check.data:
raise HTTPException(status_code=404, detail="Session not found")
result = supabase.supabase.table("transcription_segments").select("*").eq("session_id", session_id).order("sequence_index").execute()
return result.data
@router.post("/sessions/{session_id}/summaries", response_model=SummaryResponse)
async def generate_summary(
session_id: str,
summary_request: SummaryGenerateRequest,
user_id: str = Depends(get_user_id),
):
"""Generate a summary for a session (Phase 1 stub)."""
supabase = get_supabase_client()
# Verify session exists and user owns it
session_check = supabase.supabase.table("transcription_sessions").select("id").eq("id", session_id).eq("user_id", user_id).execute()
if not session_check.data:
raise HTTPException(status_code=404, detail="Session not found")
# Phase 1 stub: TODO implement LLM call in Phase 3
content = "[TODO: Generate summary via LLM — provider={}, model={}]".format(
summary_request.provider, summary_request.model
)
# Save summary to database
summary_data = {
"session_id": session_id,
"user_id": user_id,
"summary_type": summary_request.summary_type,
"content": content,
"llm_provider": summary_request.provider,
"llm_model": summary_request.model,
}
result = supabase.supabase.table("transcription_summaries").insert(summary_data).execute()
if not result.data:
raise HTTPException(status_code=500, detail="Failed to save summary")
return result.data[0]
@router.get("/sessions/{session_id}/summaries", response_model=List[SummaryResponse])
async def list_summaries(
session_id: str,
user_id: str = Depends(get_user_id),
):
"""List summaries for a session."""
supabase = get_supabase_client()
# Verify ownership
session_check = supabase.supabase.table("transcription_sessions").select("id").eq("id", session_id).eq("user_id", user_id).execute()
if not session_check.data:
raise HTTPException(status_code=404, detail="Session not found")
result = supabase.supabase.table("transcription_summaries").select("*").eq("session_id", session_id).execute()
return result.data
@router.post("/sessions/{session_id}/export")
async def export_session(
session_id: str,
export_format: ExportFormat,
user_id: str = Depends(get_user_id),
):
"""Export session as SRT, TXT, or JSON (Phase 1 stub)."""
supabase = get_supabase_client()
# Verify ownership
session_check = supabase.supabase.table("transcription_sessions").select("id").eq("id", session_id).eq("user_id", user_id).execute()
if not session_check.data:
raise HTTPException(status_code=404, detail="Session not found")
# Get segments
segments_result = supabase.supabase.table("transcription_segments").select("*").eq("session_id", session_id).order("sequence_index").execute()
segments = segments_result.data
if export_format.format == "srt":
# Phase 1 stub — implement in Phase 3
return {"format": "srt", "content": "[TODO: Generate SRT from segments]"}
elif export_format.format == "txt":
text = "\n".join(s["text"] for s in segments)
return {"format": "txt", "content": text}
elif export_format.format == "json":
return {"format": "json", "content": {"segments": segments}}
else:
raise HTTPException(status_code=400, detail=f"Unsupported format: {export_format.format}")

View File

@ -1,24 +1,31 @@
from fastapi import APIRouter, Request """DEPRECATED - utterance.py
This module is deprecated. All transcription data now goes through
the new sessions router at /transcribe/sessions.
The old endpoints are kept for backwards compatibility but will be
removed in a future release. New integrations should use:
POST /transcribe/sessions - create session
POST /transcribe/sessions/{id}/segments - batch upsert segments
"""
from fastapi import APIRouter, Request, HTTPException
import os import os
import queue
from dotenv import load_dotenv
import json import json
load_dotenv() router = APIRouter(deprecated=True)
router = APIRouter()
@router.post("/handle_whisper_live_eos_utterance/{user_id}") @router.post("/handle_whisper_live_eos_utterance/{user_id}")
async def handle_whisper_live_eos_utterance(user_id: str, request: Request): async def handle_whisper_live_eos_utterance(user_id: str, request: Request):
"""DEPRECATED - Use POST /transcribe/sessions instead."""
data = await request.json() data = await request.json()
utterance = data.get("utterance")
print(f"Utterance: {utterance}") # Log deprecation warning
start = data.get("start") print(f"[DEPRECATED] /handle_whisper_live_eos_utterance called for user {user_id}. "
end = data.get("end") f"Please migrate to POST /transcribe/sessions/{{id}}/segments")
print(f"Start: {start}")
print(f"End: {end}") # Keep writing to flat file for backwards compat
eos = data.get("eos")
print(f"Eos: {eos}")
user_dir = f"../../data/users/{user_id}/transcripts" user_dir = f"../../data/users/{user_id}/transcripts"
if not os.path.exists(user_dir): if not os.path.exists(user_dir):
os.makedirs(user_dir) os.makedirs(user_dir)
@ -27,16 +34,21 @@ async def handle_whisper_live_eos_utterance(user_id: str, request: Request):
with open(log_file, "a") as f: with open(log_file, "a") as f:
f.write(json.dumps(data) + "\n") f.write(json.dumps(data) + "\n")
return {"message": "Utterance logged successfully"} return {"message": "Utterance logged (deprecated - migrate to sessions API)", "_deprecated": True}
@router.get("/get_utterances/{user_id}") @router.get("/get_utterances/{user_id}")
async def get_utterances(user_id: str): async def get_utterances(user_id: str):
"""DEPRECATED - Use GET /transcribe/sessions/{id}/segments instead."""
print(f"[DEPRECATED] /get_utterances called for user {user_id}. "
f"Please migrate to GET /transcribe/sessions/{{id}}/segments")
user_dir = f"../../data/users/{user_id}/transcripts" user_dir = f"../../data/users/{user_id}/transcripts"
log_file = os.path.join(user_dir, "utterances.log") log_file = os.path.join(user_dir, "utterances.log")
if not os.path.exists(log_file): if not os.path.exists(log_file):
return {"utterances": []} return {"utterances": [], "_deprecated": True}
with open(log_file, "r") as f: with open(log_file, "r") as f:
utterances = [json.loads(line) for line in f] utterances = [json.loads(line) for line in f]
return {"utterances": utterances} return {"utterances": utterances, "_deprecated": True}

View File

@ -26,6 +26,9 @@ from routers.dev.test_analysis import router as test_analysis_router
from routers.queue_management import router as queue_management_router from routers.queue_management import router as queue_management_router
from routers.maintenance.redis_admin import router as redis_admin_router from routers.maintenance.redis_admin import router as redis_admin_router
from routers import provisioning as provisioning_router from routers import provisioning as provisioning_router
from routers.transcribe.sessions import router as sessions_router
from routers.transcribe.canvas_events import router as canvas_events_router
from routers.transcribe.keywords import router as keywords_router
def register_routes(app: FastAPI): def register_routes(app: FastAPI):
logger.info("Starting to register routes...") logger.info("Starting to register routes...")
@ -101,5 +104,10 @@ def register_routes(app: FastAPI):
# Provisioning Routes # Provisioning Routes
app.include_router(provisioning_router.router) app.include_router(provisioning_router.router)
# Transcription Routes (CIS Phase 1)
app.include_router(sessions_router, prefix="/transcribe", tags=["Transcription Sessions"])
app.include_router(canvas_events_router, prefix="/transcribe", tags=["Transcription Canvas Events"])
app.include_router(keywords_router, prefix="/transcribe", tags=["Transcription Keywords"])
# Test Routes # Test Routes
app.include_router(timetable_test.router, prefix="/tests", tags=["Tests"]) app.include_router(timetable_test.router, prefix="/tests", tags=["Tests"])