api/run/initialization/gais_data.py
kcar e42cd09dea feat(phase-b): GAIS Supabase loader + school search/register endpoints
- gais_data.py: rewrite to load Edubase CSV into Supabase gais_schools +
  gais_local_authorities via two-pass batch upsert (LAs first for FK integrity)
- school_router.py: add GET /school/search (trigram ilike on name, URN exact),
  POST /school/register (create institute + Neo4j provision + membership link)
- Encoding: handles Windows-1252 (cp1252) Edubase CSV format

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 01:51:45 +01:00

143 lines
5.6 KiB
Python

"""
GAIS Data Import Module — loads Edubase school data into Supabase gais_schools table.
Run via: python3 main.py --mode gais-data
Prereq: apply PLANNING/migrations/001_gais_seed.sql to Supabase first.
"""
import os
import csv
import time
from typing import Dict, Any, List, Optional
from modules.logger_tool import initialise_logger
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True)
IMPORT_DIR = os.path.join(os.path.dirname(__file__), "import")
EDUBASE_FILE = os.path.join(IMPORT_DIR, "edubasealldata20250828.csv")
BATCH_SIZE = 500
def _parse_date(val: str) -> Optional[str]:
"""Convert DD-MM-YYYY or DD/MM/YYYY to YYYY-MM-DD, return None if empty."""
val = val.strip()
if not val:
return None
for sep in ("-", "/"):
if sep in val:
parts = val.split(sep)
if len(parts) == 3 and len(parts[2]) == 4:
return f"{parts[2]}-{parts[1].zfill(2)}-{parts[0].zfill(2)}"
return None
def _parse_int(val: str) -> Optional[int]:
val = val.strip()
if not val:
return None
try:
return int(val)
except ValueError:
return None
def _row_to_school(row: Dict[str, str]) -> Dict[str, Any]:
return {
"urn": row.get("URN", "").strip(),
"name": row.get("EstablishmentName", "").strip(),
"status": row.get("EstablishmentStatus (name)", "").strip() or None,
"phase": row.get("PhaseOfEducation (name)", "").strip() or None,
"type": row.get("TypeOfEstablishment (name)", "").strip() or None,
"type_group": row.get("EstablishmentTypeGroup (name)", "").strip() or None,
"street": row.get("Street", "").strip() or None,
"locality": row.get("Locality", "").strip() or None,
"town": row.get("Town", "").strip() or None,
"county": row.get("County (name)", "").strip() or None,
"postcode": row.get("Postcode", "").strip() or None,
"website": row.get("SchoolWebsite", "").strip() or None,
"telephone": row.get("TelephoneNum", "").strip() or None,
"head_title": row.get("HeadTitle (name)", "").strip() or None,
"head_first_name": row.get("HeadFirstName", "").strip() or None,
"head_last_name": row.get("HeadLastName", "").strip() or None,
"la_code": row.get("LA (code)", "").strip() or None,
"la_name": row.get("LA (name)", "").strip() or None,
"number_of_pupils": _parse_int(row.get("NumberOfPupils", "")),
"open_date": _parse_date(row.get("OpenDate", "")),
"close_date": _parse_date(row.get("CloseDate", "")),
"gender": row.get("Gender (name)", "").strip() or None,
"religious_character": row.get("ReligiousCharacter (name)", "").strip() or None,
"region": row.get("GOR (name)", "").strip() or None,
}
def _upsert_batch(supabase_client, table: str, batch: List[Dict[str, Any]], retries: int = 3) -> bool:
for attempt in range(retries):
try:
supabase_client.table(table).upsert(batch, on_conflict="urn" if table == "gais_schools" else "code").execute()
return True
except Exception as e:
logger.warning(f"Upsert attempt {attempt + 1} failed for {table}: {e}")
time.sleep(2 ** attempt)
logger.error(f"All {retries} upsert attempts failed for {table}")
return False
def import_gais_data() -> Dict[str, Any]:
"""Load Edubase CSV into Supabase gais_schools + gais_local_authorities tables.
Two-pass: collect LAs and schools, insert LAs first (FK parent), then schools.
"""
from modules.database.supabase.utils.client import SupabaseServiceRoleClient
sb = SupabaseServiceRoleClient()
client = sb.supabase
if not os.path.exists(EDUBASE_FILE):
return {"success": False, "message": f"CSV not found: {EDUBASE_FILE}"}
logger.info(f"Loading GAIS data from {EDUBASE_FILE}")
la_map: Dict[str, str] = {} # code → name
schools: List[Dict[str, Any]] = []
# Pass 1: read everything into memory
with open(EDUBASE_FILE, encoding="cp1252", errors="replace") as f:
reader = csv.DictReader(f)
for row in reader:
urn = row.get("URN", "").strip()
if not urn:
continue
la_code = row.get("LA (code)", "").strip()
la_name = row.get("LA (name)", "").strip()
if la_code and la_code not in la_map:
la_map[la_code] = la_name
schools.append(_row_to_school(row))
logger.info(f"Read {len(schools)} schools and {len(la_map)} LAs from CSV")
# Insert LAs first (FK parent)
la_batch = [{"code": code, "name": name} for code, name in la_map.items()]
la_inserted = 0
for i in range(0, len(la_batch), BATCH_SIZE):
chunk = la_batch[i:i + BATCH_SIZE]
if _upsert_batch(client, "gais_local_authorities", chunk):
la_inserted += len(chunk)
logger.info(f"Inserted {la_inserted} local authorities")
# Insert schools in batches
total_schools = 0
total_errors = 0
for i in range(0, len(schools), BATCH_SIZE):
batch = schools[i:i + BATCH_SIZE]
if _upsert_batch(client, "gais_schools", batch):
total_schools += len(batch)
if total_schools % 5000 == 0:
logger.info(f"Inserted {total_schools} schools so far...")
else:
total_errors += len(batch)
logger.info(f"GAIS import complete: {total_schools} schools, {la_inserted} LAs, {total_errors} errors")
return {
"success": total_errors == 0,
"schools_inserted": total_schools,
"las_inserted": la_inserted,
"errors": total_errors,
}