"""Auth + data-access plumbing for the /api/exam/ router. Per the audit (spec S1/E1): the exam API calls Supabase **as the user** so the RLS in 72-exam-marker.sql is actually enforced — it does NOT use the service role for user-facing reads/writes the way files.py / classes_router.py do. The bearer already attaches the raw JWT as payload["_access_token"] (supabase_bearer.py) precisely for this. Institute resolution is the one wrinkle: institute_memberships and profiles are RLS deny-all to a normal authenticated user (E4), so we cannot read them as-user. Instead we call public.user_institute_ids() — a SECURITY DEFINER function (71-class-management.sql) that PostgREST exposes as an RPC — which returns the caller's institute ids regardless of those table policies. This is the same function the RLS policies themselves key off, so the API's view of "which institutes is this user in" is guaranteed consistent with what RLS will allow. """ from __future__ import annotations import os from typing import Any, Dict, List, Optional from fastapi import Depends, HTTPException from modules.auth.supabase_bearer import SupabaseBearer from modules.database.supabase.utils.client import ( SupabaseAnonClient, SupabaseServiceRoleClient, ) from modules.logger_tool import initialise_logger logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True) auth = SupabaseBearer() class ExamContext: """The per-request handle every exam endpoint works through. Bundles the caller's id, an as-user Supabase client (RLS-enforced), and the set of institute ids the caller belongs to (for R5.5 institute validation on writes). """ def __init__(self, user_id: str, access_token: str, supabase: Any, institute_ids: List[str]): self.user_id = user_id self.access_token = access_token self.supabase = supabase self.institute_ids = institute_ids def resolve_institute(self, requested: Optional[str]) -> str: """Validate a client-supplied institute_id, or pick the sole membership. R5.5: a client-supplied institute_id is never trusted as the authz signal — it must be one the caller actually belongs to. RLS would reject a bad value at write time anyway; resolving here turns that into a clean 400/403 instead of an opaque DB error. """ if requested: if requested not in self.institute_ids: raise HTTPException(status_code=403, detail="Not a member of the requested institute") return requested if len(self.institute_ids) == 1: return self.institute_ids[0] if not self.institute_ids: raise HTTPException(status_code=403, detail="Caller has no institute membership") raise HTTPException( status_code=400, detail="institute_id is required when the caller belongs to multiple institutes", ) def _extract_institute_ids(rpc_data: Any) -> List[str]: """Normalise the user_institute_ids() RPC result to a list of uuid strings. A `returns setof uuid` function comes back from PostgREST as a JSON array of scalars, but tolerate the `[{"user_institute_ids": "..."}]` shape too in case of driver quirks. """ out: List[str] = [] for row in rpc_data or []: if isinstance(row, dict): val = row.get("user_institute_ids") or next(iter(row.values()), None) else: val = row if val: out.append(str(val)) return out async def get_exam_context(payload: Dict[str, Any] = Depends(auth)) -> ExamContext: user_id = payload.get("sub") or payload.get("user_id") access_token = payload.get("_access_token") if not user_id or not access_token: raise HTTPException(status_code=401, detail="Invalid token payload") supabase = SupabaseAnonClient.for_user(access_token).supabase try: res = supabase.rpc("user_institute_ids").execute() institute_ids = _extract_institute_ids(getattr(res, "data", None)) except Exception as exc: logger.error(f"Failed to resolve institute memberships: {exc}") raise HTTPException(status_code=502, detail="Could not resolve institute membership") return ExamContext(user_id, access_token, supabase, institute_ids) def resolve_student_names(student_ids: List[str]) -> Dict[str, str]: """Map profile id → display name for roster students (batch-creation denormalisation). Documented service-role exception (S1, mirrors lookup_exam_code): `profiles` has no as-user SELECT policy (E4), so the roster's display names can't be read as-the-user. The caller's right to the roster itself is already enforced as-user (class_students.cs_read requires the caller to teach/admin the class); this only resolves names for ids already authorised, and the result is denormalised onto student_submissions so later reads need no profiles access. """ if not student_ids: return {} try: sb = SupabaseServiceRoleClient().supabase res = ( sb.table("profiles") .select("id, full_name, display_name, email") .in_("id", list(student_ids)) .execute() ) out: Dict[str, str] = {} for p in getattr(res, "data", None) or []: out[p["id"]] = p.get("full_name") or p.get("display_name") or p.get("email") or "" return out except Exception as exc: logger.warning(f"student name resolution failed: {exc}") return {} def lookup_exam_code(exam_id: str) -> Optional[str]: """Resolve eb_exams.exam_code for a catalogue paper (denormalised onto the template). Documented service-role exception (S1): eb_exams is shared exam-board reference data with no as-user SELECT policy (E4), so a normal user cannot read it. This is a read of public catalogue metadata only — not user-scoped data — and is used solely to keep the Neo4j join key (exam_code) correct on the template row. """ try: sb = SupabaseServiceRoleClient().supabase res = sb.table("eb_exams").select("exam_code").eq("id", exam_id).limit(1).execute() rows = getattr(res, "data", None) or [] return rows[0].get("exam_code") if rows else None except Exception as exc: logger.warning(f"exam_code lookup failed for exam_id={exam_id}: {exc}") return None