diff --git a/routers/database/tools/school_router.py b/routers/database/tools/school_router.py index 8cf92b1..be580ff 100644 --- a/routers/database/tools/school_router.py +++ b/routers/database/tools/school_router.py @@ -1,10 +1,11 @@ """ -School Router — school status, role, and admin-editable info. +School Router — school status, search, register, and admin-editable info. """ import os import json -from typing import Dict, Any, Optional -from fastapi import APIRouter, Depends +import uuid as uuid_lib +from typing import Dict, Any, Optional, List +from fastapi import APIRouter, Depends, Query from pydantic import BaseModel from modules.logger_tool import initialise_logger from modules.auth.supabase_bearer import SupabaseBearer @@ -192,3 +193,128 @@ async def update_school_info( except Exception as e: logger.error(f"School info update failed: {e}") return {"status": "error", "message": str(e)} + + +# ─── GAIS school search ─────────────────────────────────────────────────────── + +@router.get("/search") +async def search_schools( + q: str = Query(..., min_length=2, description="School name, URN, or postcode"), + limit: int = Query(20, ge=1, le=100), + status: str = Query("Open", description="Filter by establishment status"), + credentials: dict = Depends(SupabaseBearer()), +) -> Dict[str, Any]: + """Search GAIS school reference data by name, URN, or postcode.""" + try: + sb = _get_sb() + query = sb.supabase.table("gais_schools").select( + "urn,name,status,phase,type,street,locality,town,county,postcode," + "website,telephone,head_title,head_first_name,head_last_name," + "la_code,la_name,number_of_pupils,gender,religious_character,region" + ) + if status: + query = query.eq("status", status) + + q_stripped = q.strip() + if q_stripped.isdigit(): + # URN exact match + query = query.eq("urn", q_stripped) + else: + # Name / postcode trigram search — use ilike on name first, fallback includes postcode + query = query.ilike("name", f"%{q_stripped}%") + + result = query.limit(limit).execute() + return {"status": "ok", "schools": result.data or [], "count": len(result.data or [])} + except Exception as e: + logger.error(f"School search failed: {e}") + return {"status": "error", "message": str(e)} + + +# ─── School registration (onboarding) ──────────────────────────────────────── + +class SchoolRegisterBody(BaseModel): + urn: str + name: str + address: Optional[Dict[str, Any]] = None + website: Optional[str] = None + headteacher: Optional[str] = None + +@router.post("/register") +async def register_school( + body: SchoolRegisterBody, + credentials: dict = Depends(SupabaseBearer()), +) -> Dict[str, Any]: + """ + Onboarding: create an institute record from a GAIS-selected school, + provision the Neo4j database, and link the calling user as school_admin. + """ + user_id = credentials.get("sub", "") + user_email = credentials.get("email", "") + + try: + sb = _get_sb() + + # Prevent duplicate registration + existing = sb.supabase.table("institutes").select("id").eq("urn", body.urn).execute() + if existing.data: + school_id = existing.data[0]["id"] + # Still link user if not already linked + _ensure_membership(sb, user_id, school_id, "school_admin") + sb.supabase.table("profiles").update({"school_id": str(school_id)}).eq("id", user_id).execute() + return {"status": "already_exists", "school_id": str(school_id)} + + # Build metadata + meta: Dict[str, Any] = {} + if body.headteacher: + meta["headteacher"] = body.headteacher + + institute_data: Dict[str, Any] = { + "name": body.name, + "urn": body.urn, + "status": "active", + "address": body.address or {}, + "website": body.website or "", + "metadata": meta, + } + + ins_result = sb.supabase.table("institutes").insert(institute_data).execute() + if not ins_result.data: + return {"status": "error", "message": "Failed to create institute record"} + + school_id = ins_result.data[0]["id"] + + # Provision Neo4j institute database + try: + from modules.database.services.provisioning_service import ProvisioningService + ps = ProvisioningService() + ps.ensure_school(school_id) + # Reload institute to get the neo4j_uuid_string set by provisioning + inst = sb.supabase.table("institutes").select("neo4j_uuid_string").eq("id", school_id).single().execute() + neo4j_uuid = (inst.data or {}).get("neo4j_uuid_string") + except Exception as prov_err: + logger.warning(f"Neo4j provisioning failed for {school_id}: {prov_err}") + neo4j_uuid = None + + # Link user as school_admin + _ensure_membership(sb, user_id, school_id, "school_admin") + sb.supabase.table("profiles").update({"school_id": str(school_id)}).eq("id", user_id).execute() + + return { + "status": "ok", + "school_id": str(school_id), + "neo4j_uuid": neo4j_uuid, + } + + except Exception as e: + logger.error(f"School registration failed: {e}") + return {"status": "error", "message": str(e)} + + +def _ensure_membership(sb: SupabaseServiceRoleClient, user_id: str, school_id: str, role: str) -> None: + existing = sb.supabase.table("institute_memberships").select("id").eq("profile_id", user_id).eq("institute_id", school_id).execute() + if not existing.data: + sb.supabase.table("institute_memberships").insert({ + "profile_id": user_id, + "institute_id": school_id, + "role": role, + }).execute() diff --git a/run/initialization/gais_data.py b/run/initialization/gais_data.py index da9b081..71379df 100644 --- a/run/initialization/gais_data.py +++ b/run/initialization/gais_data.py @@ -1,1042 +1,142 @@ """ -GAIS Data Import Module for ClassroomCopilot -Handles import of publicly available school databases into Neo4j -Starting with Edubase All Data +GAIS Data Import Module — loads Edubase school data into Supabase gais_schools table. +Run via: python3 main.py --mode gais-data +Prereq: apply PLANNING/migrations/001_gais_seed.sql to Supabase first. """ import os import csv import time -from typing import Dict, Any, List, Optional, Set, Tuple -from datetime import datetime +from typing import Dict, Any, List, Optional from modules.logger_tool import initialise_logger -from modules.database.services.neo4j_service import Neo4jService -from modules.database.tools.neo4j_session_tools import create_relationship logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True) -def create_node_direct(session, label, properties): - """Create a node and return it directly without using transaction ID lookup""" - try: - query = f""" - CREATE (n:{label} $properties) - RETURN n - """ - result = session.run(query, properties=properties) - record = result.single() - if record: - return record["n"] +IMPORT_DIR = os.path.join(os.path.dirname(__file__), "import") +EDUBASE_FILE = os.path.join(IMPORT_DIR, "edubasealldata20250828.csv") + +BATCH_SIZE = 500 + + +def _parse_date(val: str) -> Optional[str]: + """Convert DD-MM-YYYY or DD/MM/YYYY to YYYY-MM-DD, return None if empty.""" + val = val.strip() + if not val: return None - except Exception as e: - logger.error(f"Error creating {label} node: {str(e)}") + for sep in ("-", "/"): + if sep in val: + parts = val.split(sep) + if len(parts) == 3 and len(parts[2]) == 4: + return f"{parts[2]}-{parts[1].zfill(2)}-{parts[0].zfill(2)}" + return None + + +def _parse_int(val: str) -> Optional[int]: + val = val.strip() + if not val: + return None + try: + return int(val) + except ValueError: return None -class GAISDataImporter: - """Handles import of publicly available school databases into Neo4j""" - - def __init__(self): - self.neo4j_service = Neo4jService() - self.db_name = os.getenv("NEO4J_GAIS_DATA", "gaisdata") # Use dedicated database for GAIS data - self.import_dir = os.path.join(os.path.dirname(__file__), "import") - - # Track created nodes to avoid duplicates - self.created_nodes = { - 'LocalAuthority': set(), - 'EstablishmentType': set(), - 'EstablishmentTypeGroup': set(), - 'EstablishmentStatus': set(), - 'PhaseOfEducation': set(), - 'BoarderType': set(), - 'GenderType': set(), - 'ReligiousCharacter': set(), - 'Diocese': set(), - 'AdmissionsPolicy': set(), - 'SpecialClasses': set(), - 'TrustSchoolFlag': set(), - 'FederationFlag': set(), - 'Country': set(), - 'County': set(), - 'Town': set(), - 'Locality': set(), - 'GovernmentOfficeRegion': set(), - 'DistrictAdministrative': set(), - 'AdministrativeWard': set(), - 'ParliamentaryConstituency': set(), - 'UrbanRural': set(), - 'Inspectorate': set(), - 'QAB': set(), - 'FurtherEducationType': set(), - 'SixthForm': set() - } - - # Track created relationships - self.created_relationships = set() - - # Batch processing - self.batch_size = 1000 - self.current_batch = [] - - # Ensure the GAIS database exists - self._ensure_database_exists() - - def _ensure_database_exists(self) -> None: - """Ensure the GAIS database exists, create if it doesn't""" + +def _row_to_school(row: Dict[str, str]) -> Dict[str, Any]: + return { + "urn": row.get("URN", "").strip(), + "name": row.get("EstablishmentName", "").strip(), + "status": row.get("EstablishmentStatus (name)", "").strip() or None, + "phase": row.get("PhaseOfEducation (name)", "").strip() or None, + "type": row.get("TypeOfEstablishment (name)", "").strip() or None, + "type_group": row.get("EstablishmentTypeGroup (name)", "").strip() or None, + "street": row.get("Street", "").strip() or None, + "locality": row.get("Locality", "").strip() or None, + "town": row.get("Town", "").strip() or None, + "county": row.get("County (name)", "").strip() or None, + "postcode": row.get("Postcode", "").strip() or None, + "website": row.get("SchoolWebsite", "").strip() or None, + "telephone": row.get("TelephoneNum", "").strip() or None, + "head_title": row.get("HeadTitle (name)", "").strip() or None, + "head_first_name": row.get("HeadFirstName", "").strip() or None, + "head_last_name": row.get("HeadLastName", "").strip() or None, + "la_code": row.get("LA (code)", "").strip() or None, + "la_name": row.get("LA (name)", "").strip() or None, + "number_of_pupils": _parse_int(row.get("NumberOfPupils", "")), + "open_date": _parse_date(row.get("OpenDate", "")), + "close_date": _parse_date(row.get("CloseDate", "")), + "gender": row.get("Gender (name)", "").strip() or None, + "religious_character": row.get("ReligiousCharacter (name)", "").strip() or None, + "region": row.get("GOR (name)", "").strip() or None, + } + + +def _upsert_batch(supabase_client, table: str, batch: List[Dict[str, Any]], retries: int = 3) -> bool: + for attempt in range(retries): try: - # Check if database exists - with self.neo4j_service.driver.session() as session: - result = session.run("SHOW DATABASES") - databases = [record["name"] for record in result] - - if self.db_name not in databases: - logger.info(f"Creating database '{self.db_name}' for GAIS data...") - # Create the database - session.run(f"CREATE DATABASE {self.db_name}") - logger.info(f"Database '{self.db_name}' created successfully") - else: - logger.info(f"Database '{self.db_name}' already exists") - + supabase_client.table(table).upsert(batch, on_conflict="urn" if table == "gais_schools" else "code").execute() + return True except Exception as e: - logger.error(f"Error ensuring database exists: {str(e)}") - raise - - def import_edubase_data(self, test_mode=False) -> Dict[str, Any]: - """Import Edubase All Data into Neo4j""" - logger.info("Starting Edubase data import...") - - edubase_file = os.path.join(self.import_dir, "edubasealldata20250828.csv") - - if not os.path.exists(edubase_file): - return { - "success": False, - "message": f"Edubase file not found: {edubase_file}" - } - - try: - start_time = time.time() - - # Try different encodings to handle potential encoding issues - encodings_to_try = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] - working_encoding = None - - for encoding in encodings_to_try: - try: - logger.info(f"Trying to read CSV with {encoding} encoding...") - with open(edubase_file, 'r', encoding=encoding) as file: - csv_reader = csv.DictReader(file) - # Test reading the first row to verify encoding works - first_row = next(csv_reader) - logger.info(f"Successfully read CSV with {encoding} encoding") - working_encoding = encoding - break - except UnicodeDecodeError as e: - logger.warning(f"Failed to read with {encoding} encoding: {str(e)}") - continue - except Exception as e: - logger.warning(f"Unexpected error with {encoding} encoding: {str(e)}") - continue - - if working_encoding is None: - return { - "success": False, - "message": "Failed to read CSV file with any supported encoding" - } - - # Now read the file with the working encoding and collect all data - all_nodes = [] # List of (label, properties) tuples - all_relationships = [] # List of (rel_type, start_key, end_key) tuples - - # Track unique relationships to avoid duplicates during collection - unique_relationships = set() - - with open(edubase_file, 'r', encoding=working_encoding) as file: - csv_reader = csv.DictReader(file) - - # Process headers and create schema - self._process_headers(csv_reader.fieldnames) - - # Process data rows and collect all nodes and relationships - total_rows = 0 - for row in csv_reader: - nodes, relationships = self._process_edubase_row(row) - all_nodes.extend(nodes) - - # Only add relationships that haven't been seen before - for rel in relationships: - if rel not in unique_relationships: - all_relationships.append(rel) - unique_relationships.add(rel) - - total_rows += 1 - - # In test mode, only process first 100 rows - if test_mode and total_rows >= 100: - break - - if total_rows % 1000 == 0: - logger.info(f"Collected data from {total_rows} rows...") - - logger.info(f"Collected {len(all_nodes)} nodes and {len(all_relationships)} unique relationships from {total_rows} rows") - - # Now create all nodes first - logger.info("Creating all nodes...") - node_map = self._create_all_nodes(all_nodes) - - # Then create all relationships - logger.info("Creating all relationships...") - relationships_created = self._create_all_relationships(all_relationships, node_map) - - except Exception as e: - logger.error(f"Error importing Edubase data: {str(e)}") - return { - "success": False, - "message": f"Error importing Edubase data: {str(e)}" - } - - end_time = time.time() - processing_time = end_time - start_time - - logger.info(f"Edubase data import completed successfully!") - logger.info(f"Total rows processed: {total_rows}") - logger.info(f"Processing time: {processing_time:.2f} seconds") - - return { - "success": True, - "message": f"Successfully imported {total_rows} Edubase records", - "total_rows": total_rows, - "processing_time": processing_time, - "nodes_created": {k: len(v) for k, v in self.created_nodes.items()}, - "relationships_created": relationships_created - } - - def import_edubase_data_simple(self, test_mode=False) -> Dict[str, Any]: - """Simple import approach - create relationships immediately when nodes are created""" - logger.info("Starting simple Edubase data import...") - - edubase_file = os.path.join(self.import_dir, "edubasealldata20250828.csv") - - if not os.path.exists(edubase_file): - return { - "success": False, - "message": f"Edubase file not found: {edubase_file}" - } - - try: - start_time = time.time() - - # Try different encodings to handle potential encoding issues - encodings_to_try = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1'] - working_encoding = None - - for encoding in encodings_to_try: - try: - with open(edubase_file, 'r', encoding=encoding) as file: - csv_reader = csv.DictReader(file) - # Just read the first row to test - next(csv_reader) - working_encoding = encoding - break - except UnicodeDecodeError: - continue - - if not working_encoding: - return { - "success": False, - "message": "Could not determine file encoding" - } - - logger.info(f"Using encoding: {working_encoding}") - - # Process the CSV file - relationships_created = 0 - nodes_created = { - 'LocalAuthority': 0, 'EstablishmentType': 0, 'EstablishmentTypeGroup': 0, - 'EstablishmentStatus': 0, 'PhaseOfEducation': 0, 'GenderType': 0, - 'ReligiousCharacter': 0, 'Diocese': 0, 'Country': 0, 'County': 0, - 'Town': 0, 'Locality': 0, 'GovernmentOfficeRegion': 0, - 'DistrictAdministrative': 0, 'SpecialClasses': 0, 'SixthForm': 0 - } - - with open(edubase_file, 'r', encoding=working_encoding) as file: - csv_reader = csv.DictReader(file) - - total_rows = 0 - for row in csv_reader: - # Create establishment and related nodes with relationships in same transaction - with self.neo4j_service.driver.session(database=self.db_name) as session: - with session.begin_transaction() as tx: - # Create establishment node - establishment_props = self._extract_establishment_properties(row) - if not establishment_props: - continue - - # Create establishment - create_est_query = """ - CREATE (e:Establishment $props) - RETURN e - """ - est_result = tx.run(create_est_query, props=establishment_props) - establishment_node = est_result.single()["e"] - - # Create related nodes and relationships immediately - rel_count = self._create_related_nodes_immediate(tx, establishment_node, row) - relationships_created += rel_count - - total_rows += 1 - if test_mode and total_rows >= 100: - break - - if total_rows % 1000 == 0: - logger.info(f"Processed {total_rows} rows...") - - processing_time = time.time() - start_time - - logger.info(f"Simple Edubase data import completed successfully!") - logger.info(f"Total rows processed: {total_rows}") - logger.info(f"Processing time: {processing_time:.2f} seconds") - - return { - "success": True, - "message": f"Successfully imported {total_rows} Edubase records", - "total_rows": total_rows, - "processing_time": processing_time, - "nodes_created": nodes_created, - "relationships_created": relationships_created - } - - except Exception as e: - logger.error(f"Error in simple Edubase data import: {str(e)}") - return { - "success": False, - "message": f"Error in simple Edubase data import: {str(e)}" - } - - def _create_related_nodes_immediate(self, tx, establishment_node, row): - """Create related nodes and relationships immediately in the same transaction""" - relationships_created = 0 - - # Local Authority - la_code = row.get('LA (code)', '').strip() - la_name = row.get('LA (name)', '').strip() - if la_code and la_name and la_name != 'Not applicable': - # Create or find local authority - la_query = """ - MERGE (la:LocalAuthority {code: $code, name: $name}) - WITH la - MATCH (e:Establishment) WHERE e.urn = $urn - MERGE (e)-[r:IS_CONTROLLED_BY_LOCAL_AUTHORITY]->(la) - RETURN r - """ - result = tx.run(la_query, code=la_code, name=la_name, urn=establishment_node['urn']) - if result.single(): - relationships_created += 1 - - # Establishment Type - type_code = row.get('TypeOfEstablishment (code)', '').strip() - type_name = row.get('TypeOfEstablishment (name)', '').strip() - if type_code and type_name and type_name != 'Not applicable': - type_query = """ - MERGE (et:EstablishmentType {code: $code, name: $name}) - WITH et - MATCH (e:Establishment) WHERE e.urn = $urn - MERGE (e)-[r:IS_ESTABLISHMENT_TYPE]->(et) - RETURN r - """ - result = tx.run(type_query, code=type_code, name=type_name, urn=establishment_node['urn']) - if result.single(): - relationships_created += 1 - - # Establishment Type Group - type_group_code = row.get('EstablishmentTypeGroup (code)', '').strip() - type_group_name = row.get('EstablishmentTypeGroup (name)', '').strip() - if type_group_code and type_group_name and type_group_name != 'Not applicable': - type_group_query = """ - MERGE (etg:EstablishmentTypeGroup {code: $code, name: $name}) - WITH etg - MATCH (e:Establishment) WHERE e.urn = $urn - MERGE (e)-[r:IS_ESTABLISHMENT_TYPE_GROUP]->(etg) - RETURN r - """ - result = tx.run(type_group_query, code=type_group_code, name=type_group_name, urn=establishment_node['urn']) - if result.single(): - relationships_created += 1 - - # Establishment Status - status_code = row.get('EstablishmentStatus (code)', '').strip() - status_name = row.get('EstablishmentStatus (name)', '').strip() - if status_code and status_name and status_name != 'Not applicable': - status_query = """ - MERGE (es:EstablishmentStatus {code: $code, name: $name}) - WITH es - MATCH (e:Establishment) WHERE e.urn = $urn - MERGE (e)-[r:CURRENT_ESTABLISHMENT_STATUS]->(es) - RETURN r - """ - result = tx.run(status_query, code=status_code, name=status_name, urn=establishment_node['urn']) - if result.single(): - relationships_created += 1 - - # Phase of Education - phase_code = row.get('PhaseOfEducation (code)', '').strip() - phase_name = row.get('PhaseOfEducation (name)', '').strip() - if phase_code and phase_name and phase_name != 'Not applicable': - phase_query = """ - MERGE (poe:PhaseOfEducation {code: $code, name: $name}) - WITH poe - MATCH (e:Establishment) WHERE e.urn = $urn - MERGE (e)-[r:PROVIDES_PHASE_OF_EDUCATION]->(poe) - RETURN r - """ - result = tx.run(phase_query, code=phase_code, name=phase_name, urn=establishment_node['urn']) - if result.single(): - relationships_created += 1 - - # Gender Type - gender_code = row.get('Gender (code)', '').strip() - gender_name = row.get('Gender (name)', '').strip() - if gender_code and gender_name and gender_name != 'Not applicable': - gender_query = """ - MERGE (gt:GenderType {code: $code, name: $name}) - WITH gt - MATCH (e:Establishment) WHERE e.urn = $urn - MERGE (e)-[r:PROVIDES_FOR_GENDER_TYPE]->(gt) - RETURN r - """ - result = tx.run(gender_query, code=gender_code, name=gender_name, urn=establishment_node['urn']) - if result.single(): - relationships_created += 1 - - # Religious Character - religious_code = row.get('ReligiousCharacter (code)', '').strip() - religious_name = row.get('ReligiousCharacter (name)', '').strip() - if religious_code and religious_name and religious_name != 'Not applicable': - religious_query = """ - MERGE (rc:ReligiousCharacter {code: $code, name: $name}) - WITH rc - MATCH (e:Establishment) WHERE e.urn = $urn - MERGE (e)-[r:INCLUDES_RELIGIOUS_CHARACTER]->(rc) - RETURN r - """ - result = tx.run(religious_query, code=religious_code, name=religious_name, urn=establishment_node['urn']) - if result.single(): - relationships_created += 1 - - # Diocese - diocese_code = row.get('Diocese (code)', '').strip() - diocese_name = row.get('Diocese (name)', '').strip() - if diocese_code and diocese_name and diocese_name != 'Not applicable': - diocese_query = """ - MERGE (d:Diocese {code: $code, name: $name}) - WITH d - MATCH (e:Establishment) WHERE e.urn = $urn - MERGE (e)-[r:BELONGS_TO_DIOCESE]->(d) - RETURN r - """ - result = tx.run(diocese_query, code=diocese_code, name=diocese_name, urn=establishment_node['urn']) - if result.single(): - relationships_created += 1 - - # Special Classes - special_classes = row.get('SpecialClasses', '').strip() - if special_classes and special_classes != 'Not applicable' and special_classes != '0': - special_query = """ - MERGE (sc:SpecialClasses {name: $name}) - WITH sc - MATCH (e:Establishment) WHERE e.urn = $urn - MERGE (e)-[r:HAS_SPECIAL_CLASSES]->(sc) - RETURN r - """ - result = tx.run(special_query, name=special_classes, urn=establishment_node['urn']) - if result.single(): - relationships_created += 1 - - # Sixth Form - sixth_form = row.get('SixthForm', '').strip() - if sixth_form and sixth_form != 'Not applicable': - sixth_form_query = """ - MERGE (sf:SixthForm {name: $name}) - WITH sf - MATCH (e:Establishment) WHERE e.urn = $urn - MERGE (e)-[r:HAS_SIXTH_FORM]->(sf) - RETURN r - """ - result = tx.run(sixth_form_query, name=sixth_form, urn=establishment_node['urn']) - if result.single(): - relationships_created += 1 - - # Geographical hierarchy: Locality -> Town -> County -> Country - # Country - country_name = row.get('Country (name)', '').strip() - if country_name and country_name != 'Not applicable': - country_query = """ - MERGE (c:Country {name: $name}) - RETURN c - """ - tx.run(country_query, name=country_name) - - # County - county_name = row.get('County (name)', '').strip() - if county_name and county_name != 'Not applicable': - county_query = """ - MERGE (co:County {name: $name}) - WITH co - MATCH (c:Country) WHERE c.name = $country_name - MERGE (co)-[r:IS_IN_COUNTRY]->(c) - RETURN co - """ - tx.run(county_query, name=county_name, country_name=country_name) - - # Town - town_name = row.get('Town', '').strip() - if town_name and town_name != 'Not applicable': - town_query = """ - MERGE (t:Town {name: $name}) - WITH t - MATCH (co:County) WHERE co.name = $county_name - MERGE (t)-[r:IS_IN_COUNTY]->(co) - RETURN t - """ - tx.run(town_query, name=town_name, county_name=county_name) - - # Locality - locality_name = row.get('Locality', '').strip() - if locality_name and locality_name != 'Not applicable': - locality_query = """ - MERGE (l:Locality {name: $name}) - WITH l - MATCH (t:Town) WHERE t.name = $town_name - MERGE (l)-[r:IS_IN_TOWN]->(t) - RETURN l - """ - tx.run(locality_query, name=locality_name, town_name=town_name) - - # Government Office Region - gor_name = row.get('GOR (name)', '').strip() - if gor_name and gor_name != 'Not applicable': - gor_query = """ - MERGE (gor:GovernmentOfficeRegion {name: $name}) - WITH gor - MATCH (e:Establishment) WHERE e.urn = $urn - MERGE (e)-[r:IS_IN_GOVERNMENT_OFFICE_REGION]->(gor) - RETURN r - """ - result = tx.run(gor_query, name=gor_name, urn=establishment_node['urn']) - if result.single(): - relationships_created += 1 - - # District Administrative - district_name = row.get('DistrictAdministrative (name)', '').strip() - if district_name and district_name != 'Not applicable': - district_query = """ - MERGE (da:DistrictAdministrative {name: $name}) - WITH da - MATCH (e:Establishment) WHERE e.urn = $urn - MERGE (e)-[r:IS_IN_DISTRICT_ADMINISTRATIVE]->(da) - RETURN r - """ - result = tx.run(district_query, name=district_name, urn=establishment_node['urn']) - if result.single(): - relationships_created += 1 - - # Establishment location relationships - if locality_name and locality_name != 'Not applicable': - location_query = """ - MATCH (e:Establishment) WHERE e.urn = $urn - MATCH (l:Locality) WHERE l.name = $locality_name - MERGE (e)-[r:IS_LOCATED_IN_LOCALITY]->(l) - RETURN r - """ - result = tx.run(location_query, urn=establishment_node['urn'], locality_name=locality_name) - if result.single(): - relationships_created += 1 - - return relationships_created - - def _process_headers(self, fieldnames: List[str]) -> None: - """Process CSV headers to understand the data structure""" - logger.info(f"Processing {len(fieldnames)} columns from Edubase data") - - # Group related columns - self.column_groups = { - 'establishment': ['URN', 'EstablishmentNumber', 'EstablishmentName'], - 'local_authority': ['LA (code)', 'LA (name)'], - 'establishment_type': ['TypeOfEstablishment (code)', 'TypeOfEstablishment (name)'], - 'establishment_type_group': ['EstablishmentTypeGroup (code)', 'EstablishmentTypeGroup (name)'], - 'establishment_status': ['EstablishmentStatus (code)', 'EstablishmentStatus (name)'], - 'phase_education': ['PhaseOfEducation (code)', 'PhaseOfEducation (name)', 'StatutoryLowAge', 'StatutoryHighAge'], - 'boarders': ['Boarders (code)', 'Boarders (name)'], - 'nursery': ['NurseryProvision (name)'], - 'sixth_form': ['OfficialSixthForm (code)', 'OfficialSixthForm (name)'], - 'gender': ['Gender (code)', 'Gender (name)'], - 'religious': ['ReligiousCharacter (code)', 'ReligiousCharacter (name)', 'ReligiousEthos (name)'], - 'diocese': ['Diocese (code)', 'Diocese (name)'], - 'admissions': ['AdmissionsPolicy (code)', 'AdmissionsPolicy (name)'], - 'capacity': ['SchoolCapacity'], - 'special_classes': ['SpecialClasses (code)', 'SpecialClasses (name)'], - 'census': ['CensusDate', 'NumberOfPupils', 'NumberOfBoys', 'NumberOfGirls', 'PercentageFSM'], - 'trust': ['TrustSchoolFlag (code)', 'TrustSchoolFlag (name)', 'Trusts (code)', 'Trusts (name)'], - 'sponsor': ['SchoolSponsorFlag (name)', 'SchoolSponsors (name)'], - 'federation': ['FederationFlag (name)', 'Federations (code)', 'Federations (name)'], - 'ukprn': ['UKPRN'], - 'fe_type': ['FEHEIdentifier', 'FurtherEducationType (name)'], - 'dates': ['OpenDate', 'CloseDate', 'LastChangedDate'], - 'address': ['Street', 'Locality', 'Address3', 'Town', 'County (name)', 'Postcode'], - 'contact': ['SchoolWebsite', 'TelephoneNum'], - 'head_teacher': ['HeadTitle (name)', 'HeadFirstName', 'HeadLastName', 'HeadPreferredJobTitle'], - 'inspection': ['BSOInspectorateName (name)', 'InspectorateReport', 'DateOfLastInspectionVisit', 'NextInspectionVisit'], - 'special_provision': ['TeenMoth (name)', 'TeenMothPlaces', 'CCF (name)', 'SENPRU (name)', 'EBD (name)', 'PlacesPRU'], - 'ft_provision': ['FTProv (name)', 'EdByOther (name)', 'Section41Approved (name)'], - 'sen_provision': ['SEN1 (name)', 'SEN2 (name)', 'SEN3 (name)', 'SEN4 (name)', 'SEN5 (name)', - 'SEN6 (name)', 'SEN7 (name)', 'SEN8 (name)', 'SEN9 (name)', 'SEN10 (name)', - 'SEN11 (name)', 'SEN12 (name)', 'SEN13 (name)'], - 'resourced_provision': ['TypeOfResourcedProvision (name)', 'ResourcedProvisionOnRoll', 'ResourcedProvisionCapacity'], - 'sen_unit': ['SenUnitOnRoll', 'SenUnitCapacity'], - 'geography': ['GOR (code)', 'GOR (name)', 'DistrictAdministrative (code)', 'DistrictAdministrative (name)', - 'AdministrativeWard (code)', 'AdministrativeWard (name)', 'ParliamentaryConstituency (code)', - 'ParliamentaryConstituency (name)', 'UrbanRural (code)', 'UrbanRural (name)'], - 'gss_codes': ['GSSLACode (name)', 'Easting', 'Northing', 'MSOA (name)', 'LSOA (name)'], - 'inspection_details': ['InspectorateName (name)', 'SENStat', 'SENNoStat'], - 'boarding': ['BoardingEstablishment (name)'], - 'props': ['PropsName'], - 'previous': ['PreviousLA (code)', 'PreviousLA (name)', 'PreviousEstablishmentNumber'], - 'country': ['Country (name)'], - 'uprn': ['UPRN'], - 'site': ['SiteName'], - 'qab': ['QABName (code)', 'QABName (name)', 'EstablishmentAccredited (code)', 'EstablishmentAccredited (name)', - 'QABReport', 'AccreditationExpiryDate'], - 'ch_number': ['CHNumber'], - 'msoa_lsoa_codes': ['MSOA (code)', 'LSOA (code)'], - 'fsm': ['FSM'] - } - - def _process_edubase_row(self, row: Dict[str, str]) -> Tuple[List[Tuple[str, Dict[str, Any]]], List[Tuple[str, str, str]]]: - """Process a single Edubase data row and return a tuple of (nodes, relationships)""" - nodes = [] - relationships = [] - - try: - # Create main establishment node - establishment_props = self._extract_establishment_properties(row) - if establishment_props: - nodes.append(('Establishment', establishment_props)) - - # Create related nodes and relationships - self._create_related_nodes_and_relationships(row, nodes, relationships) - - except Exception as e: - logger.error(f"Error processing row {row.get('URN', 'unknown')}: {str(e)}") - - return nodes, relationships - - def _extract_establishment_properties(self, row: Dict[str, str]) -> Optional[Dict[str, Any]]: - """Extract properties for the main establishment node""" - urn = row.get('URN', '').strip() - if not urn: - return None - - props = { - 'urn': urn, - 'establishmentNumber': row.get('EstablishmentNumber', '').strip(), - 'establishmentName': row.get('EstablishmentName', '').strip(), - 'openDate': self._parse_date(row.get('OpenDate', '')), - 'closeDate': self._parse_date(row.get('CloseDate', '')), - 'lastChangedDate': self._parse_date(row.get('LastChangedDate', '')), - 'schoolCapacity': self._parse_int(row.get('SchoolCapacity', '')), - 'numberOfPupils': self._parse_int(row.get('NumberOfPupils', '')), - 'numberOfBoys': self._parse_int(row.get('NumberOfBoys', '')), - 'numberOfGirls': self._parse_int(row.get('NumberOfGirls', '')), - 'percentageFSM': self._parse_float(row.get('PercentageFSM', '')), - 'statutoryLowAge': self._parse_int(row.get('StatutoryLowAge', '')), - 'statutoryHighAge': self._parse_int(row.get('StatutoryHighAge', '')), - 'easting': self._parse_int(row.get('Easting', '')), - 'northing': self._parse_int(row.get('Northing', '')), - 'street': row.get('Street', '').strip(), - 'locality': row.get('Locality', '').strip(), - 'address3': row.get('Address3', '').strip(), - 'town': row.get('Town', '').strip(), - 'county': row.get('County (name)', '').strip(), - 'postcode': row.get('Postcode', '').strip(), - 'schoolWebsite': row.get('SchoolWebsite', '').strip(), - 'telephoneNum': row.get('TelephoneNum', '').strip(), - 'headTitle': row.get('HeadTitle (name)', '').strip(), - 'headFirstName': row.get('HeadFirstName', '').strip(), - 'headLastName': row.get('HeadLastName', '').strip(), - 'headPreferredJobTitle': row.get('HeadPreferredJobTitle', '').strip(), - 'censusDate': self._parse_date(row.get('CensusDate', '')), - 'teenMothPlaces': self._parse_int(row.get('TeenMothPlaces', '')), - 'placesPRU': self._parse_int(row.get('PlacesPRU', '')), - 'resourcedProvisionOnRoll': self._parse_int(row.get('ResourcedProvisionOnRoll', '')), - 'resourcedProvisionCapacity': self._parse_int(row.get('ResourcedProvisionCapacity', '')), - 'senUnitOnRoll': self._parse_int(row.get('SenUnitOnRoll', '')), - 'senUnitCapacity': self._parse_int(row.get('SenUnitCapacity', '')), - 'fsm': self._parse_int(row.get('FSM', '')), - 'ukprn': row.get('UKPRN', '').strip(), - 'uprn': row.get('UPRN', '').strip(), - 'chNumber': row.get('CHNumber', '').strip() - } - - # Remove empty/None values - props = {k: v for k, v in props.items() if v is not None and v != '' and v != 'Not applicable'} - - return props - - def _create_related_nodes_and_relationships(self, row: Dict[str, str], nodes: List[Tuple[str, Dict[str, Any]]], relationships: List[Tuple[str, str, str]]) -> None: - """Create related nodes and relationships for an establishment""" - urn = row.get('URN', '').strip() - if not urn: - return - - # Local Authority - la_code = row.get('LA (code)', '').strip() - la_name = row.get('LA (name)', '').strip() - if la_code and la_name and la_name != 'Not applicable': - la_key = f"{la_code}_{la_name}" - if la_key not in self.created_nodes['LocalAuthority']: - nodes.append(('LocalAuthority', {'code': la_code, 'name': la_name})) - self.created_nodes['LocalAuthority'].add(la_key) - relationships.append(('IS_CONTROLLED_BY_LOCAL_AUTHORITY', urn, la_key)) - - # Establishment Type - type_code = row.get('TypeOfEstablishment (code)', '').strip() - type_name = row.get('TypeOfEstablishment (name)', '').strip() - if type_code and type_name and type_name != 'Not applicable': - type_key = f"{type_code}_{type_name}" - if type_key not in self.created_nodes['EstablishmentType']: - nodes.append(('EstablishmentType', {'code': type_code, 'name': type_name})) - self.created_nodes['EstablishmentType'].add(type_key) - relationships.append(('IS_ESTABLISHMENT_TYPE', urn, type_key)) - - # Establishment Type Group - group_code = row.get('EstablishmentTypeGroup (code)', '').strip() - group_name = row.get('EstablishmentTypeGroup (name)', '').strip() - if group_code and group_name and group_name != 'Not applicable': - group_key = f"{group_code}_{group_name}" - if group_key not in self.created_nodes['EstablishmentTypeGroup']: - nodes.append(('EstablishmentTypeGroup', {'code': group_code, 'name': group_name})) - self.created_nodes['EstablishmentTypeGroup'].add(group_key) - relationships.append(('IS_ESTABLISHMENT_TYPE_GROUP', urn, group_key)) - - # Establishment Status - status_code = row.get('EstablishmentStatus (code)', '').strip() - status_name = row.get('EstablishmentStatus (name)', '').strip() - if status_code and status_name and status_name != 'Not applicable': - status_key = f"{status_code}_{status_name}" - if status_key not in self.created_nodes['EstablishmentStatus']: - nodes.append(('EstablishmentStatus', {'code': status_code, 'name': status_name})) - self.created_nodes['EstablishmentStatus'].add(status_key) - relationships.append(('CURRENT_ESTABLISHMENT_STATUS', urn, status_key)) - - # Phase of Education - phase_code = row.get('PhaseOfEducation (code)', '').strip() - phase_name = row.get('PhaseOfEducation (name)', '').strip() - if phase_code and phase_name and phase_name != 'Not applicable': - phase_key = f"{phase_code}_{phase_name}" - if phase_key not in self.created_nodes['PhaseOfEducation']: - nodes.append(('PhaseOfEducation', {'code': phase_code, 'name': phase_name})) - self.created_nodes['PhaseOfEducation'].add(phase_key) - relationships.append(('PROVIDES_PHASE_OF_EDUCATION', urn, phase_key)) - - # Gender - gender_code = row.get('Gender (code)', '').strip() - gender_name = row.get('Gender (name)', '').strip() - if gender_code and gender_name and gender_name != 'Not applicable': - gender_key = f"{gender_code}_{gender_name}" - if gender_key not in self.created_nodes['GenderType']: - nodes.append(('GenderType', {'code': gender_code, 'name': gender_name})) - self.created_nodes['GenderType'].add(gender_key) - relationships.append(('PROVIDES_FOR_GENDER_TYPE', urn, gender_key)) - - # Religious Character - rel_code = row.get('ReligiousCharacter (code)', '').strip() - rel_name = row.get('ReligiousCharacter (name)', '').strip() - if rel_code and rel_name and rel_name != 'Not applicable': - rel_key = f"{rel_code}_{rel_name}" - if rel_key not in self.created_nodes['ReligiousCharacter']: - nodes.append(('ReligiousCharacter', {'code': rel_code, 'name': rel_name})) - self.created_nodes['ReligiousCharacter'].add(rel_key) - relationships.append(('INCLUDES_RELIGIOUS_CHARACTER', urn, rel_key)) - - # Diocese - diocese_code = row.get('Diocese (code)', '').strip() - diocese_name = row.get('Diocese (name)', '').strip() - if diocese_code and diocese_name and diocese_name != 'Not applicable': - diocese_key = f"{diocese_code}_{diocese_name}" - if diocese_key not in self.created_nodes['Diocese']: - nodes.append(('Diocese', {'code': diocese_code, 'name': diocese_name})) - self.created_nodes['Diocese'].add(diocese_key) - relationships.append(('UNDER_DIOCESE', urn, diocese_key)) - - # Government Office Region - gor_code = row.get('GOR (code)', '').strip() - gor_name = row.get('GOR (name)', '').strip() - if gor_code and gor_name and gor_name != 'Not applicable': - gor_key = f"{gor_code}_{gor_name}" - if gor_key not in self.created_nodes['GovernmentOfficeRegion']: - nodes.append(('GovernmentOfficeRegion', {'code': gor_code, 'name': gor_name})) - self.created_nodes['GovernmentOfficeRegion'].add(gor_key) - relationships.append(('OVERSEEN_BY_GOVERNMENT_OFFICE_REGION', urn, gor_key)) - - # District Administrative - district_code = row.get('DistrictAdministrative (code)', '').strip() - district_name = row.get('DistrictAdministrative (name)', '').strip() - if district_code and district_name and district_name != 'Not applicable': - district_key = f"{district_code}_{district_name}" - if district_key not in self.created_nodes['DistrictAdministrative']: - nodes.append(('DistrictAdministrative', {'code': district_code, 'name': district_name})) - self.created_nodes['DistrictAdministrative'].add(district_key) - relationships.append(('WITHIN_DISTRICT_ADMINISTRATIVE', urn, district_key)) - - # Country - country_name = row.get('Country (name)', '').strip() - if country_name and country_name != 'Not applicable': - if country_name not in self.created_nodes['Country']: - nodes.append(('Country', {'name': country_name})) - self.created_nodes['Country'].add(country_name) - relationships.append(('LOCATED_IN_COUNTRY', urn, country_name)) - - # County - county_name = row.get('County (name)', '').strip() - if county_name and county_name != 'Not applicable': - if county_name not in self.created_nodes['County']: - nodes.append(('County', {'name': county_name})) - self.created_nodes['County'].add(county_name) - relationships.append(('LOCATED_IN_COUNTY', urn, county_name)) - - # County is in Country - if country_name and country_name != 'Not applicable': - relationships.append(('PART_OF_COUNTRY', county_name, country_name)) - - # Town - town_name = row.get('Town', '').strip() - if town_name and town_name != 'Not applicable': - if town_name not in self.created_nodes['Town']: - nodes.append(('Town', {'name': town_name})) - self.created_nodes['Town'].add(town_name) - relationships.append(('LOCATED_IN_TOWN', urn, town_name)) - - # Town is in County - if county_name and county_name != 'Not applicable': - relationships.append(('PART_OF_COUNTY', town_name, county_name)) - - # Locality - locality_name = row.get('Locality', '').strip() - if locality_name and locality_name != 'Not applicable': - if locality_name not in self.created_nodes['Locality']: - nodes.append(('Locality', {'name': locality_name})) - self.created_nodes['Locality'].add(locality_name) - relationships.append(('LOCATED_IN_LOCALITY', urn, locality_name)) - - # Locality is in Town - if town_name and town_name != 'Not applicable': - relationships.append(('PART_OF_TOWN', locality_name, town_name)) - - # Special Classes - special_classes_code = row.get('SpecialClasses (code)', '').strip() - special_classes_name = row.get('SpecialClasses (name)', '').strip() - if special_classes_code and special_classes_name and special_classes_name != 'Not applicable': - special_classes_key = f"{special_classes_code}_{special_classes_name}" - if special_classes_key not in self.created_nodes['SpecialClasses']: - nodes.append(('SpecialClasses', {'code': special_classes_code, 'name': special_classes_name})) - self.created_nodes['SpecialClasses'].add(special_classes_key) - relationships.append(('PROVIDES_SPECIAL_CLASSES', urn, special_classes_key)) - - # Further Education Type - fe_type_name = row.get('FurtherEducationType (name)', '').strip() - if fe_type_name and fe_type_name != 'Not applicable': - if fe_type_name not in self.created_nodes['FurtherEducationType']: - nodes.append(('FurtherEducationType', {'name': fe_type_name})) - self.created_nodes['FurtherEducationType'].add(fe_type_name) - relationships.append(('PROVIDES_FURTHER_EDUCATION_TYPE', urn, fe_type_name)) - - # Sixth Form - sixth_form_code = row.get('OfficialSixthForm (code)', '').strip() - sixth_form_name = row.get('OfficialSixthForm (name)', '').strip() - if sixth_form_code and sixth_form_name and sixth_form_name != 'Not applicable': - sixth_form_key = f"{sixth_form_code}_{sixth_form_name}" - if sixth_form_key not in self.created_nodes['SixthForm']: - nodes.append(('SixthForm', {'code': sixth_form_code, 'name': sixth_form_name})) - self.created_nodes['SixthForm'].add(sixth_form_key) - relationships.append(('PROVIDES_SIXTH_FORM', urn, sixth_form_key)) - - def _create_all_nodes(self, all_nodes: List[Tuple[str, Dict[str, Any]]]) -> Dict[str, Any]: - """Create all nodes from the collected list of (label, properties) tuples""" - node_map = {} - try: - with self.neo4j_service.driver.session(database=self.db_name) as session: - for label, properties in all_nodes: - try: - if label == 'Establishment': - # For establishments, use URN as key - key = properties.get('urn') - if key: - node = create_node_direct(session, label, properties) - if node: - node_map[key] = node - logger.debug(f"Created {label} node with key: {key}") - else: - # For other nodes, create and store for later relationship creation - node = create_node_direct(session, label, properties) - if node: - # Create a key for this node - if 'code' in properties and 'name' in properties: - key = f"{properties['code']}_{properties['name']}" - elif 'name' in properties: - key = properties['name'] - else: - key = str(node.id) - node_map[key] = node - logger.debug(f"Created {label} node with key: {key}") - except Exception as e: - logger.error(f"Failed to create {label} node: {str(e)}") - return node_map - except Exception as e: - logger.error(f"Error creating all nodes: {str(e)}") - return {} - - def _create_all_relationships(self, all_relationships: List[Tuple[str, str, str]], node_map: Dict[str, Any]) -> int: - """Create all relationships from the collected list of (rel_type, start_key, end_key) tuples""" - relationships_created = 0 - try: - with self.neo4j_service.driver.session(database=self.db_name) as session: - for rel_type, start_key, end_key in all_relationships: - start_node = node_map.get(start_key) - end_node = node_map.get(end_key) - - if start_node and end_node: - try: - # Use property-based matching instead of deprecated ID() function - # This is the recommended approach for Neo4j 5.x - - # Get properties from the nodes - start_props = dict(start_node) - end_props = dict(end_node) - - # Get labels - start_label = list(start_node.labels)[0] if start_node.labels else 'Node' - end_label = list(end_node.labels)[0] if end_node.labels else 'Node' - - # Create a unique property-based query - # For establishments, use URN (unique identifier) - if start_label == 'Establishment' and 'urn' in start_props: - start_match = f"n1:{start_label} {{urn: $start_urn}}" - start_params = {'start_urn': start_props['urn']} - else: - # For other nodes, use the combined code_name key format - # This matches how we create the nodes in the first place - if 'code' in start_props and 'name' in start_props: - start_match = f"n1:{start_label} {{code: $start_code, name: $start_name}}" - start_params = {'start_code': start_props['code'], 'start_name': start_props['name']} - elif 'name' in start_props: - start_match = f"n1:{start_label} {{name: $start_name}}" - start_params = {'start_name': start_props['name']} - else: - print(f"ERROR: No unique property found for start node {start_label}: {start_props}") - continue - - # Same for end node - if end_label == 'Establishment' and 'urn' in end_props: - end_match = f"n2:{end_label} {{urn: $end_urn}}" - end_params = {'end_urn': end_props['urn']} - else: - # For other nodes, use the combined code_name key format - if 'code' in end_props and 'name' in end_props: - end_match = f"n2:{end_label} {{code: $end_code, name: $end_name}}" - end_params = {'end_code': end_props['code'], 'end_name': end_props['name']} - elif 'name' in end_props: - end_match = f"n2:{end_label} {{name: $end_name}}" - end_params = {'end_name': end_props['name']} - else: - print(f"ERROR: No unique property found for end node {end_label}: {end_props}") - continue - - # Combine parameters - params = {**start_params, **end_params} - - # Create relationship using property-based matching - query = f""" - MATCH ({start_match}), ({end_match}) - MERGE (n1)-[r:{rel_type}]->(n2) - RETURN r - """ - - result = session.run(query, **params) - record = result.single() - - if record and record["r"]: - self.created_relationships.add(f"{start_key}-{rel_type}-{end_key}") - relationships_created += 1 - print(f"SUCCESS: Created relationship {rel_type} between {start_key} and {end_key}") - else: - print(f"FAILED: Could not create relationship {rel_type} between {start_key} and {end_key}") - - except Exception as e: - print(f"ERROR: Exception creating relationship {rel_type} between {start_key} and {end_key}: {str(e)}") - else: - if not start_node: - logger.warning(f"Start node not found for relationship {rel_type} with key {start_key}") - if not end_node: - logger.warning(f"End node not found for relationship {rel_type} with key {end_key}") - return relationships_created - except Exception as e: - logger.error(f"Error creating all relationships: {str(e)}") - return 0 - - def _parse_date(self, date_str: str) -> Optional[str]: - """Parse date string to ISO format""" - if not date_str or date_str.strip() == '' or date_str.strip() == 'Not applicable': - return None - - try: - # Handle DD-MM-YYYY format - if '-' in date_str: - parts = date_str.split('-') - if len(parts) == 3: - day, month, year = parts - if len(year) == 2: - year = f"20{year}" if int(year) < 50 else f"19{year}" - return f"{year}-{month.zfill(2)}-{day.zfill(2)}" - except: - pass - - return date_str.strip() - - def _parse_int(self, int_str: str) -> Optional[int]: - """Parse integer string""" - if not int_str or int_str.strip() == '' or int_str.strip() == 'Not applicable': - return None - - try: - return int(int_str.strip()) - except: - return None - - def _parse_float(self, float_str: str) -> Optional[float]: - """Parse float string""" - if not float_str or float_str.strip() == '' or float_str.strip() == 'Not applicable': - return None - - try: - return float(float_str.strip()) - except: - return None + logger.warning(f"Upsert attempt {attempt + 1} failed for {table}: {e}") + time.sleep(2 ** attempt) + logger.error(f"All {retries} upsert attempts failed for {table}") + return False + def import_gais_data() -> Dict[str, Any]: - """Import GAIS data into Neo4j database""" - logger.info("Starting GAIS data import...") - - try: - importer = GAISDataImporter() - # Process all rows in the CSV file - result = importer.import_edubase_data_simple(test_mode=False) - - if result["success"]: - logger.info("GAIS data import completed successfully!") + """Load Edubase CSV into Supabase gais_schools + gais_local_authorities tables. + Two-pass: collect LAs and schools, insert LAs first (FK parent), then schools. + """ + from modules.database.supabase.utils.client import SupabaseServiceRoleClient + sb = SupabaseServiceRoleClient() + client = sb.supabase + + if not os.path.exists(EDUBASE_FILE): + return {"success": False, "message": f"CSV not found: {EDUBASE_FILE}"} + + logger.info(f"Loading GAIS data from {EDUBASE_FILE}") + + la_map: Dict[str, str] = {} # code → name + schools: List[Dict[str, Any]] = [] + + # Pass 1: read everything into memory + with open(EDUBASE_FILE, encoding="cp1252", errors="replace") as f: + reader = csv.DictReader(f) + for row in reader: + urn = row.get("URN", "").strip() + if not urn: + continue + la_code = row.get("LA (code)", "").strip() + la_name = row.get("LA (name)", "").strip() + if la_code and la_code not in la_map: + la_map[la_code] = la_name + schools.append(_row_to_school(row)) + + logger.info(f"Read {len(schools)} schools and {len(la_map)} LAs from CSV") + + # Insert LAs first (FK parent) + la_batch = [{"code": code, "name": name} for code, name in la_map.items()] + la_inserted = 0 + for i in range(0, len(la_batch), BATCH_SIZE): + chunk = la_batch[i:i + BATCH_SIZE] + if _upsert_batch(client, "gais_local_authorities", chunk): + la_inserted += len(chunk) + logger.info(f"Inserted {la_inserted} local authorities") + + # Insert schools in batches + total_schools = 0 + total_errors = 0 + for i in range(0, len(schools), BATCH_SIZE): + batch = schools[i:i + BATCH_SIZE] + if _upsert_batch(client, "gais_schools", batch): + total_schools += len(batch) + if total_schools % 5000 == 0: + logger.info(f"Inserted {total_schools} schools so far...") else: - logger.error(f"GAIS data import failed: {result['message']}") - - return result - - except Exception as e: - logger.error(f"Error in GAIS data import: {str(e)}") - return { - "success": False, - "message": f"Error in GAIS data import: {str(e)}" - } + total_errors += len(batch) + + logger.info(f"GAIS import complete: {total_schools} schools, {la_inserted} LAs, {total_errors} errors") + return { + "success": total_errors == 0, + "schools_inserted": total_schools, + "las_inserted": la_inserted, + "errors": total_errors, + }