""" GAIS Data Import Module — loads Edubase school data into Supabase gais_schools table. Run via: python3 main.py --mode gais-data Prereq: apply PLANNING/migrations/001_gais_seed.sql to Supabase first. """ import os import csv import time from typing import Dict, Any, List, Optional from modules.logger_tool import initialise_logger logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True) IMPORT_DIR = os.path.join(os.path.dirname(__file__), "import") EDUBASE_FILE = os.path.join(IMPORT_DIR, "edubasealldata20250828.csv") BATCH_SIZE = 500 def _parse_date(val: str) -> Optional[str]: """Convert DD-MM-YYYY or DD/MM/YYYY to YYYY-MM-DD, return None if empty.""" val = val.strip() if not val: return None for sep in ("-", "/"): if sep in val: parts = val.split(sep) if len(parts) == 3 and len(parts[2]) == 4: return f"{parts[2]}-{parts[1].zfill(2)}-{parts[0].zfill(2)}" return None def _parse_int(val: str) -> Optional[int]: val = val.strip() if not val: return None try: return int(val) except ValueError: return None def _row_to_school(row: Dict[str, str]) -> Dict[str, Any]: return { "urn": row.get("URN", "").strip(), "name": row.get("EstablishmentName", "").strip(), "status": row.get("EstablishmentStatus (name)", "").strip() or None, "phase": row.get("PhaseOfEducation (name)", "").strip() or None, "type": row.get("TypeOfEstablishment (name)", "").strip() or None, "type_group": row.get("EstablishmentTypeGroup (name)", "").strip() or None, "street": row.get("Street", "").strip() or None, "locality": row.get("Locality", "").strip() or None, "town": row.get("Town", "").strip() or None, "county": row.get("County (name)", "").strip() or None, "postcode": row.get("Postcode", "").strip() or None, "website": row.get("SchoolWebsite", "").strip() or None, "telephone": row.get("TelephoneNum", "").strip() or None, "head_title": row.get("HeadTitle (name)", "").strip() or None, "head_first_name": row.get("HeadFirstName", "").strip() or None, "head_last_name": row.get("HeadLastName", "").strip() or None, "la_code": row.get("LA (code)", "").strip() or None, "la_name": row.get("LA (name)", "").strip() or None, "number_of_pupils": _parse_int(row.get("NumberOfPupils", "")), "open_date": _parse_date(row.get("OpenDate", "")), "close_date": _parse_date(row.get("CloseDate", "")), "gender": row.get("Gender (name)", "").strip() or None, "religious_character": row.get("ReligiousCharacter (name)", "").strip() or None, "region": row.get("GOR (name)", "").strip() or None, } def _upsert_batch(supabase_client, table: str, batch: List[Dict[str, Any]], retries: int = 3) -> bool: for attempt in range(retries): try: supabase_client.table(table).upsert(batch, on_conflict="urn" if table == "gais_schools" else "code").execute() return True except Exception as e: logger.warning(f"Upsert attempt {attempt + 1} failed for {table}: {e}") time.sleep(2 ** attempt) logger.error(f"All {retries} upsert attempts failed for {table}") return False def import_gais_data() -> Dict[str, Any]: """Load Edubase CSV into Supabase gais_schools + gais_local_authorities tables. Two-pass: collect LAs and schools, insert LAs first (FK parent), then schools. """ from modules.database.supabase.utils.client import SupabaseServiceRoleClient sb = SupabaseServiceRoleClient() client = sb.supabase if not os.path.exists(EDUBASE_FILE): return {"success": False, "message": f"CSV not found: {EDUBASE_FILE}"} logger.info(f"Loading GAIS data from {EDUBASE_FILE}") la_map: Dict[str, str] = {} # code → name schools: List[Dict[str, Any]] = [] # Pass 1: read everything into memory with open(EDUBASE_FILE, encoding="cp1252", errors="replace") as f: reader = csv.DictReader(f) for row in reader: urn = row.get("URN", "").strip() if not urn: continue la_code = row.get("LA (code)", "").strip() la_name = row.get("LA (name)", "").strip() if la_code and la_code not in la_map: la_map[la_code] = la_name schools.append(_row_to_school(row)) logger.info(f"Read {len(schools)} schools and {len(la_map)} LAs from CSV") # Insert LAs first (FK parent) la_batch = [{"code": code, "name": name} for code, name in la_map.items()] la_inserted = 0 for i in range(0, len(la_batch), BATCH_SIZE): chunk = la_batch[i:i + BATCH_SIZE] if _upsert_batch(client, "gais_local_authorities", chunk): la_inserted += len(chunk) logger.info(f"Inserted {la_inserted} local authorities") # Insert schools in batches total_schools = 0 total_errors = 0 for i in range(0, len(schools), BATCH_SIZE): batch = schools[i:i + BATCH_SIZE] if _upsert_batch(client, "gais_schools", batch): total_schools += len(batch) if total_schools % 5000 == 0: logger.info(f"Inserted {total_schools} schools so far...") else: total_errors += len(batch) logger.info(f"GAIS import complete: {total_schools} schools, {la_inserted} LAs, {total_errors} errors") return { "success": total_errors == 0, "schools_inserted": total_schools, "las_inserted": la_inserted, "errors": total_errors, }