api/routers/exam/dependencies.py
CC Worker f52c3267ca feat(exam): /api/exam template CRUD router (as-user RLS, E1 fix)
S4-5: new routers/exam/ package mounted at /api/exam (R5.1/E5, not under
/database/). Template CRUD with hybrid persistence (R5.2):

- POST/GET/GET{id}/PUT{id}/DELETE{id} /templates + PATCH /questions/{qid}
- Calls Supabase AS THE USER via SupabaseAnonClient.for_user (E1 fix), so the
  RLS in 72-exam-marker.sql is enforced; no service-role for user-facing ops.
- Institute resolved/validated via the user_institute_ids() SECURITY DEFINER
  RPC (institute_memberships is deny-all as-user per E4); client-supplied
  institute_id is validated, never trusted (R5.5).
- Ownership pre-checked before writes (E2); out-of-scope ids read back as 404
  under RLS (IDOR-safe). Soft-delete archives, never hard-deletes.
- PUT full-replace preserves client UUIDs as Neo4j join keys (spec §2).
- eb_exams.exam_code denormalised via a documented service-role catalogue
  lookup (eb_exams is shared reference data, deny-all as-user per E4).

Unit tests cover auth, CRUD, ownership/IDOR, institute validation, soft-delete.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-06 17:49:58 +00:00

119 lines
5.1 KiB
Python

"""Auth + data-access plumbing for the /api/exam/ router.
Per the audit (spec S1/E1): the exam API calls Supabase **as the user** so the RLS in
72-exam-marker.sql is actually enforced — it does NOT use the service role for user-facing
reads/writes the way files.py / classes_router.py do. The bearer already attaches the raw
JWT as payload["_access_token"] (supabase_bearer.py) precisely for this.
Institute resolution is the one wrinkle: institute_memberships and profiles are RLS
deny-all to a normal authenticated user (E4), so we cannot read them as-user. Instead we
call public.user_institute_ids() — a SECURITY DEFINER function (71-class-management.sql) that
PostgREST exposes as an RPC — which returns the caller's institute ids regardless of those
table policies. This is the same function the RLS policies themselves key off, so the API's
view of "which institutes is this user in" is guaranteed consistent with what RLS will allow.
"""
from __future__ import annotations
import os
from typing import Any, Dict, List, Optional
from fastapi import Depends, HTTPException
from modules.auth.supabase_bearer import SupabaseBearer
from modules.database.supabase.utils.client import (
SupabaseAnonClient,
SupabaseServiceRoleClient,
)
from modules.logger_tool import initialise_logger
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), "default", True)
auth = SupabaseBearer()
class ExamContext:
"""The per-request handle every exam endpoint works through.
Bundles the caller's id, an as-user Supabase client (RLS-enforced), and the set of
institute ids the caller belongs to (for R5.5 institute validation on writes).
"""
def __init__(self, user_id: str, access_token: str, supabase: Any, institute_ids: List[str]):
self.user_id = user_id
self.access_token = access_token
self.supabase = supabase
self.institute_ids = institute_ids
def resolve_institute(self, requested: Optional[str]) -> str:
"""Validate a client-supplied institute_id, or pick the sole membership.
R5.5: a client-supplied institute_id is never trusted as the authz signal — it must
be one the caller actually belongs to. RLS would reject a bad value at write time
anyway; resolving here turns that into a clean 400/403 instead of an opaque DB error.
"""
if requested:
if requested not in self.institute_ids:
raise HTTPException(status_code=403, detail="Not a member of the requested institute")
return requested
if len(self.institute_ids) == 1:
return self.institute_ids[0]
if not self.institute_ids:
raise HTTPException(status_code=403, detail="Caller has no institute membership")
raise HTTPException(
status_code=400,
detail="institute_id is required when the caller belongs to multiple institutes",
)
def _extract_institute_ids(rpc_data: Any) -> List[str]:
"""Normalise the user_institute_ids() RPC result to a list of uuid strings.
A `returns setof uuid` function comes back from PostgREST as a JSON array of scalars,
but tolerate the `[{"user_institute_ids": "..."}]` shape too in case of driver quirks.
"""
out: List[str] = []
for row in rpc_data or []:
if isinstance(row, dict):
val = row.get("user_institute_ids") or next(iter(row.values()), None)
else:
val = row
if val:
out.append(str(val))
return out
async def get_exam_context(payload: Dict[str, Any] = Depends(auth)) -> ExamContext:
user_id = payload.get("sub") or payload.get("user_id")
access_token = payload.get("_access_token")
if not user_id or not access_token:
raise HTTPException(status_code=401, detail="Invalid token payload")
supabase = SupabaseAnonClient.for_user(access_token).supabase
try:
res = supabase.rpc("user_institute_ids").execute()
institute_ids = _extract_institute_ids(getattr(res, "data", None))
except Exception as exc:
logger.error(f"Failed to resolve institute memberships: {exc}")
raise HTTPException(status_code=502, detail="Could not resolve institute membership")
return ExamContext(user_id, access_token, supabase, institute_ids)
def lookup_exam_code(exam_id: str) -> Optional[str]:
"""Resolve eb_exams.exam_code for a catalogue paper (denormalised onto the template).
Documented service-role exception (S1): eb_exams is shared exam-board reference data with
no as-user SELECT policy (E4), so a normal user cannot read it. This is a read of public
catalogue metadata only — not user-scoped data — and is used solely to keep the Neo4j join
key (exam_code) correct on the template row.
"""
try:
sb = SupabaseServiceRoleClient().supabase
res = sb.table("eb_exams").select("exam_code").eq("id", exam_id).limit(1).execute()
rows = getattr(res, "data", None) or []
return rows[0].get("exam_code") if rows else None
except Exception as exc:
logger.warning(f"exam_code lookup failed for exam_id={exam_id}: {exc}")
return None