api/modules/upload_validation.py
kcar c69451fba2
Some checks failed
api-ci-deploy / test-build-deploy (push) Has been cancelled
[verified] add upload size and MIME guards
(cherry picked from commit f5e05376f637f55b73e474cac8199529682ca398)
2026-06-08 01:18:39 +00:00

100 lines
3.6 KiB
Python

"""Upload boundary validation shared by file-upload endpoints.
E3 hardening: keep user-facing upload routes from buffering arbitrary data and
from accepting arbitrary MIME/types into Supabase storage.
"""
from __future__ import annotations
import os
from typing import Iterable, Optional
from fastapi import HTTPException, UploadFile
# Conservative defaults: Classroom Copilot uploads are user documents/images.
# Exam scan uploads already have their own 50 MB PDF-only guard in routers.exam.batches.
MAX_UPLOAD_BYTES = int(os.getenv("CC_UPLOAD_MAX_BYTES", str(25 * 1024 * 1024)))
UPLOAD_CHUNK_BYTES = 1024 * 1024
ALLOWED_UPLOAD_MIME_TYPES = frozenset(
mt.strip().lower()
for mt in os.getenv(
"CC_UPLOAD_ALLOWED_MIME_TYPES",
",".join(
[
"application/pdf",
"image/png",
"image/jpeg",
"image/webp",
"image/gif",
"text/plain",
"text/csv",
"text/markdown",
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/vnd.ms-powerpoint",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"application/vnd.ms-excel",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
]
),
).split(",")
if mt.strip()
)
_PDF_MIME_TYPES = {"application/pdf", "application/x-pdf"}
def allowed_upload_mime_types_csv() -> str:
"""Stable display string for evidence/errors without leaking config internals."""
return ", ".join(sorted(ALLOWED_UPLOAD_MIME_TYPES))
def _declared_mime(upload: UploadFile) -> str:
return (upload.content_type or "application/octet-stream").split(";", 1)[0].strip().lower()
def validate_upload_mime(upload: UploadFile, *, allowed_mime_types: Optional[Iterable[str]] = None) -> str:
"""Validate client-declared upload MIME/type and return its normalised value."""
declared = _declared_mime(upload)
allowed = {mt.lower() for mt in (allowed_mime_types or ALLOWED_UPLOAD_MIME_TYPES)}
if declared not in allowed:
raise HTTPException(
status_code=415,
detail=(
f"Unsupported upload type '{declared}'. Allowed MIME types: "
f"{', '.join(sorted(allowed))}"
),
)
return declared
async def read_upload_bytes(
upload: UploadFile,
*,
max_bytes: int = MAX_UPLOAD_BYTES,
allowed_mime_types: Optional[Iterable[str]] = None,
) -> tuple[bytes, str]:
"""Validate MIME and read an UploadFile with a hard size ceiling."""
mime_type = validate_upload_mime(upload, allowed_mime_types=allowed_mime_types)
chunks: list[bytes] = []
total = 0
while True:
chunk = await upload.read(UPLOAD_CHUNK_BYTES)
if not chunk:
break
total += len(chunk)
if total > max_bytes:
raise HTTPException(status_code=413, detail=f"Upload exceeds max size ({max_bytes} bytes)")
chunks.append(chunk)
return b"".join(chunks), mime_type
async def read_pdf_upload_bytes(upload: UploadFile, *, max_bytes: int = MAX_UPLOAD_BYTES) -> bytes:
"""Read a PDF-only upload with size and lightweight magic-header validation."""
data, _mime_type = await read_upload_bytes(upload, max_bytes=max_bytes, allowed_mime_types=_PDF_MIME_TYPES)
if not data:
raise HTTPException(status_code=400, detail="Uploaded PDF is empty")
if not data.startswith(b"%PDF-"):
raise HTTPException(status_code=415, detail="Uploaded file is not a valid PDF")
return data