api/run/initialization/gais_data.py
2025-11-14 14:47:19 +00:00

1043 lines
51 KiB
Python

"""
GAIS Data Import Module for ClassroomCopilot
Handles import of publicly available school databases into Neo4j
Starting with Edubase All Data
"""
import os
import csv
import time
from typing import Dict, Any, List, Optional, Set, Tuple
from datetime import datetime
from modules.logger_tool import initialise_logger
from modules.database.services.neo4j_service import Neo4jService
from modules.database.tools.neo4j_session_tools import create_relationship
logger = initialise_logger(__name__, os.getenv("LOG_LEVEL"), os.getenv("LOG_PATH"), 'default', True)
def create_node_direct(session, label, properties):
"""Create a node and return it directly without using transaction ID lookup"""
try:
query = f"""
CREATE (n:{label} $properties)
RETURN n
"""
result = session.run(query, properties=properties)
record = result.single()
if record:
return record["n"]
return None
except Exception as e:
logger.error(f"Error creating {label} node: {str(e)}")
return None
class GAISDataImporter:
"""Handles import of publicly available school databases into Neo4j"""
def __init__(self):
self.neo4j_service = Neo4jService()
self.db_name = os.getenv("NEO4J_GAIS_DATA", "gaisdata") # Use dedicated database for GAIS data
self.import_dir = os.path.join(os.path.dirname(__file__), "import")
# Track created nodes to avoid duplicates
self.created_nodes = {
'LocalAuthority': set(),
'EstablishmentType': set(),
'EstablishmentTypeGroup': set(),
'EstablishmentStatus': set(),
'PhaseOfEducation': set(),
'BoarderType': set(),
'GenderType': set(),
'ReligiousCharacter': set(),
'Diocese': set(),
'AdmissionsPolicy': set(),
'SpecialClasses': set(),
'TrustSchoolFlag': set(),
'FederationFlag': set(),
'Country': set(),
'County': set(),
'Town': set(),
'Locality': set(),
'GovernmentOfficeRegion': set(),
'DistrictAdministrative': set(),
'AdministrativeWard': set(),
'ParliamentaryConstituency': set(),
'UrbanRural': set(),
'Inspectorate': set(),
'QAB': set(),
'FurtherEducationType': set(),
'SixthForm': set()
}
# Track created relationships
self.created_relationships = set()
# Batch processing
self.batch_size = 1000
self.current_batch = []
# Ensure the GAIS database exists
self._ensure_database_exists()
def _ensure_database_exists(self) -> None:
"""Ensure the GAIS database exists, create if it doesn't"""
try:
# Check if database exists
with self.neo4j_service.driver.session() as session:
result = session.run("SHOW DATABASES")
databases = [record["name"] for record in result]
if self.db_name not in databases:
logger.info(f"Creating database '{self.db_name}' for GAIS data...")
# Create the database
session.run(f"CREATE DATABASE {self.db_name}")
logger.info(f"Database '{self.db_name}' created successfully")
else:
logger.info(f"Database '{self.db_name}' already exists")
except Exception as e:
logger.error(f"Error ensuring database exists: {str(e)}")
raise
def import_edubase_data(self, test_mode=False) -> Dict[str, Any]:
"""Import Edubase All Data into Neo4j"""
logger.info("Starting Edubase data import...")
edubase_file = os.path.join(self.import_dir, "edubasealldata20250828.csv")
if not os.path.exists(edubase_file):
return {
"success": False,
"message": f"Edubase file not found: {edubase_file}"
}
try:
start_time = time.time()
# Try different encodings to handle potential encoding issues
encodings_to_try = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']
working_encoding = None
for encoding in encodings_to_try:
try:
logger.info(f"Trying to read CSV with {encoding} encoding...")
with open(edubase_file, 'r', encoding=encoding) as file:
csv_reader = csv.DictReader(file)
# Test reading the first row to verify encoding works
first_row = next(csv_reader)
logger.info(f"Successfully read CSV with {encoding} encoding")
working_encoding = encoding
break
except UnicodeDecodeError as e:
logger.warning(f"Failed to read with {encoding} encoding: {str(e)}")
continue
except Exception as e:
logger.warning(f"Unexpected error with {encoding} encoding: {str(e)}")
continue
if working_encoding is None:
return {
"success": False,
"message": "Failed to read CSV file with any supported encoding"
}
# Now read the file with the working encoding and collect all data
all_nodes = [] # List of (label, properties) tuples
all_relationships = [] # List of (rel_type, start_key, end_key) tuples
# Track unique relationships to avoid duplicates during collection
unique_relationships = set()
with open(edubase_file, 'r', encoding=working_encoding) as file:
csv_reader = csv.DictReader(file)
# Process headers and create schema
self._process_headers(csv_reader.fieldnames)
# Process data rows and collect all nodes and relationships
total_rows = 0
for row in csv_reader:
nodes, relationships = self._process_edubase_row(row)
all_nodes.extend(nodes)
# Only add relationships that haven't been seen before
for rel in relationships:
if rel not in unique_relationships:
all_relationships.append(rel)
unique_relationships.add(rel)
total_rows += 1
# In test mode, only process first 100 rows
if test_mode and total_rows >= 100:
break
if total_rows % 1000 == 0:
logger.info(f"Collected data from {total_rows} rows...")
logger.info(f"Collected {len(all_nodes)} nodes and {len(all_relationships)} unique relationships from {total_rows} rows")
# Now create all nodes first
logger.info("Creating all nodes...")
node_map = self._create_all_nodes(all_nodes)
# Then create all relationships
logger.info("Creating all relationships...")
relationships_created = self._create_all_relationships(all_relationships, node_map)
except Exception as e:
logger.error(f"Error importing Edubase data: {str(e)}")
return {
"success": False,
"message": f"Error importing Edubase data: {str(e)}"
}
end_time = time.time()
processing_time = end_time - start_time
logger.info(f"Edubase data import completed successfully!")
logger.info(f"Total rows processed: {total_rows}")
logger.info(f"Processing time: {processing_time:.2f} seconds")
return {
"success": True,
"message": f"Successfully imported {total_rows} Edubase records",
"total_rows": total_rows,
"processing_time": processing_time,
"nodes_created": {k: len(v) for k, v in self.created_nodes.items()},
"relationships_created": relationships_created
}
def import_edubase_data_simple(self, test_mode=False) -> Dict[str, Any]:
"""Simple import approach - create relationships immediately when nodes are created"""
logger.info("Starting simple Edubase data import...")
edubase_file = os.path.join(self.import_dir, "edubasealldata20250828.csv")
if not os.path.exists(edubase_file):
return {
"success": False,
"message": f"Edubase file not found: {edubase_file}"
}
try:
start_time = time.time()
# Try different encodings to handle potential encoding issues
encodings_to_try = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']
working_encoding = None
for encoding in encodings_to_try:
try:
with open(edubase_file, 'r', encoding=encoding) as file:
csv_reader = csv.DictReader(file)
# Just read the first row to test
next(csv_reader)
working_encoding = encoding
break
except UnicodeDecodeError:
continue
if not working_encoding:
return {
"success": False,
"message": "Could not determine file encoding"
}
logger.info(f"Using encoding: {working_encoding}")
# Process the CSV file
relationships_created = 0
nodes_created = {
'LocalAuthority': 0, 'EstablishmentType': 0, 'EstablishmentTypeGroup': 0,
'EstablishmentStatus': 0, 'PhaseOfEducation': 0, 'GenderType': 0,
'ReligiousCharacter': 0, 'Diocese': 0, 'Country': 0, 'County': 0,
'Town': 0, 'Locality': 0, 'GovernmentOfficeRegion': 0,
'DistrictAdministrative': 0, 'SpecialClasses': 0, 'SixthForm': 0
}
with open(edubase_file, 'r', encoding=working_encoding) as file:
csv_reader = csv.DictReader(file)
total_rows = 0
for row in csv_reader:
# Create establishment and related nodes with relationships in same transaction
with self.neo4j_service.driver.session(database=self.db_name) as session:
with session.begin_transaction() as tx:
# Create establishment node
establishment_props = self._extract_establishment_properties(row)
if not establishment_props:
continue
# Create establishment
create_est_query = """
CREATE (e:Establishment $props)
RETURN e
"""
est_result = tx.run(create_est_query, props=establishment_props)
establishment_node = est_result.single()["e"]
# Create related nodes and relationships immediately
rel_count = self._create_related_nodes_immediate(tx, establishment_node, row)
relationships_created += rel_count
total_rows += 1
if test_mode and total_rows >= 100:
break
if total_rows % 1000 == 0:
logger.info(f"Processed {total_rows} rows...")
processing_time = time.time() - start_time
logger.info(f"Simple Edubase data import completed successfully!")
logger.info(f"Total rows processed: {total_rows}")
logger.info(f"Processing time: {processing_time:.2f} seconds")
return {
"success": True,
"message": f"Successfully imported {total_rows} Edubase records",
"total_rows": total_rows,
"processing_time": processing_time,
"nodes_created": nodes_created,
"relationships_created": relationships_created
}
except Exception as e:
logger.error(f"Error in simple Edubase data import: {str(e)}")
return {
"success": False,
"message": f"Error in simple Edubase data import: {str(e)}"
}
def _create_related_nodes_immediate(self, tx, establishment_node, row):
"""Create related nodes and relationships immediately in the same transaction"""
relationships_created = 0
# Local Authority
la_code = row.get('LA (code)', '').strip()
la_name = row.get('LA (name)', '').strip()
if la_code and la_name and la_name != 'Not applicable':
# Create or find local authority
la_query = """
MERGE (la:LocalAuthority {code: $code, name: $name})
WITH la
MATCH (e:Establishment) WHERE e.urn = $urn
MERGE (e)-[r:IS_CONTROLLED_BY_LOCAL_AUTHORITY]->(la)
RETURN r
"""
result = tx.run(la_query, code=la_code, name=la_name, urn=establishment_node['urn'])
if result.single():
relationships_created += 1
# Establishment Type
type_code = row.get('TypeOfEstablishment (code)', '').strip()
type_name = row.get('TypeOfEstablishment (name)', '').strip()
if type_code and type_name and type_name != 'Not applicable':
type_query = """
MERGE (et:EstablishmentType {code: $code, name: $name})
WITH et
MATCH (e:Establishment) WHERE e.urn = $urn
MERGE (e)-[r:IS_ESTABLISHMENT_TYPE]->(et)
RETURN r
"""
result = tx.run(type_query, code=type_code, name=type_name, urn=establishment_node['urn'])
if result.single():
relationships_created += 1
# Establishment Type Group
type_group_code = row.get('EstablishmentTypeGroup (code)', '').strip()
type_group_name = row.get('EstablishmentTypeGroup (name)', '').strip()
if type_group_code and type_group_name and type_group_name != 'Not applicable':
type_group_query = """
MERGE (etg:EstablishmentTypeGroup {code: $code, name: $name})
WITH etg
MATCH (e:Establishment) WHERE e.urn = $urn
MERGE (e)-[r:IS_ESTABLISHMENT_TYPE_GROUP]->(etg)
RETURN r
"""
result = tx.run(type_group_query, code=type_group_code, name=type_group_name, urn=establishment_node['urn'])
if result.single():
relationships_created += 1
# Establishment Status
status_code = row.get('EstablishmentStatus (code)', '').strip()
status_name = row.get('EstablishmentStatus (name)', '').strip()
if status_code and status_name and status_name != 'Not applicable':
status_query = """
MERGE (es:EstablishmentStatus {code: $code, name: $name})
WITH es
MATCH (e:Establishment) WHERE e.urn = $urn
MERGE (e)-[r:CURRENT_ESTABLISHMENT_STATUS]->(es)
RETURN r
"""
result = tx.run(status_query, code=status_code, name=status_name, urn=establishment_node['urn'])
if result.single():
relationships_created += 1
# Phase of Education
phase_code = row.get('PhaseOfEducation (code)', '').strip()
phase_name = row.get('PhaseOfEducation (name)', '').strip()
if phase_code and phase_name and phase_name != 'Not applicable':
phase_query = """
MERGE (poe:PhaseOfEducation {code: $code, name: $name})
WITH poe
MATCH (e:Establishment) WHERE e.urn = $urn
MERGE (e)-[r:PROVIDES_PHASE_OF_EDUCATION]->(poe)
RETURN r
"""
result = tx.run(phase_query, code=phase_code, name=phase_name, urn=establishment_node['urn'])
if result.single():
relationships_created += 1
# Gender Type
gender_code = row.get('Gender (code)', '').strip()
gender_name = row.get('Gender (name)', '').strip()
if gender_code and gender_name and gender_name != 'Not applicable':
gender_query = """
MERGE (gt:GenderType {code: $code, name: $name})
WITH gt
MATCH (e:Establishment) WHERE e.urn = $urn
MERGE (e)-[r:PROVIDES_FOR_GENDER_TYPE]->(gt)
RETURN r
"""
result = tx.run(gender_query, code=gender_code, name=gender_name, urn=establishment_node['urn'])
if result.single():
relationships_created += 1
# Religious Character
religious_code = row.get('ReligiousCharacter (code)', '').strip()
religious_name = row.get('ReligiousCharacter (name)', '').strip()
if religious_code and religious_name and religious_name != 'Not applicable':
religious_query = """
MERGE (rc:ReligiousCharacter {code: $code, name: $name})
WITH rc
MATCH (e:Establishment) WHERE e.urn = $urn
MERGE (e)-[r:INCLUDES_RELIGIOUS_CHARACTER]->(rc)
RETURN r
"""
result = tx.run(religious_query, code=religious_code, name=religious_name, urn=establishment_node['urn'])
if result.single():
relationships_created += 1
# Diocese
diocese_code = row.get('Diocese (code)', '').strip()
diocese_name = row.get('Diocese (name)', '').strip()
if diocese_code and diocese_name and diocese_name != 'Not applicable':
diocese_query = """
MERGE (d:Diocese {code: $code, name: $name})
WITH d
MATCH (e:Establishment) WHERE e.urn = $urn
MERGE (e)-[r:BELONGS_TO_DIOCESE]->(d)
RETURN r
"""
result = tx.run(diocese_query, code=diocese_code, name=diocese_name, urn=establishment_node['urn'])
if result.single():
relationships_created += 1
# Special Classes
special_classes = row.get('SpecialClasses', '').strip()
if special_classes and special_classes != 'Not applicable' and special_classes != '0':
special_query = """
MERGE (sc:SpecialClasses {name: $name})
WITH sc
MATCH (e:Establishment) WHERE e.urn = $urn
MERGE (e)-[r:HAS_SPECIAL_CLASSES]->(sc)
RETURN r
"""
result = tx.run(special_query, name=special_classes, urn=establishment_node['urn'])
if result.single():
relationships_created += 1
# Sixth Form
sixth_form = row.get('SixthForm', '').strip()
if sixth_form and sixth_form != 'Not applicable':
sixth_form_query = """
MERGE (sf:SixthForm {name: $name})
WITH sf
MATCH (e:Establishment) WHERE e.urn = $urn
MERGE (e)-[r:HAS_SIXTH_FORM]->(sf)
RETURN r
"""
result = tx.run(sixth_form_query, name=sixth_form, urn=establishment_node['urn'])
if result.single():
relationships_created += 1
# Geographical hierarchy: Locality -> Town -> County -> Country
# Country
country_name = row.get('Country (name)', '').strip()
if country_name and country_name != 'Not applicable':
country_query = """
MERGE (c:Country {name: $name})
RETURN c
"""
tx.run(country_query, name=country_name)
# County
county_name = row.get('County (name)', '').strip()
if county_name and county_name != 'Not applicable':
county_query = """
MERGE (co:County {name: $name})
WITH co
MATCH (c:Country) WHERE c.name = $country_name
MERGE (co)-[r:IS_IN_COUNTRY]->(c)
RETURN co
"""
tx.run(county_query, name=county_name, country_name=country_name)
# Town
town_name = row.get('Town', '').strip()
if town_name and town_name != 'Not applicable':
town_query = """
MERGE (t:Town {name: $name})
WITH t
MATCH (co:County) WHERE co.name = $county_name
MERGE (t)-[r:IS_IN_COUNTY]->(co)
RETURN t
"""
tx.run(town_query, name=town_name, county_name=county_name)
# Locality
locality_name = row.get('Locality', '').strip()
if locality_name and locality_name != 'Not applicable':
locality_query = """
MERGE (l:Locality {name: $name})
WITH l
MATCH (t:Town) WHERE t.name = $town_name
MERGE (l)-[r:IS_IN_TOWN]->(t)
RETURN l
"""
tx.run(locality_query, name=locality_name, town_name=town_name)
# Government Office Region
gor_name = row.get('GOR (name)', '').strip()
if gor_name and gor_name != 'Not applicable':
gor_query = """
MERGE (gor:GovernmentOfficeRegion {name: $name})
WITH gor
MATCH (e:Establishment) WHERE e.urn = $urn
MERGE (e)-[r:IS_IN_GOVERNMENT_OFFICE_REGION]->(gor)
RETURN r
"""
result = tx.run(gor_query, name=gor_name, urn=establishment_node['urn'])
if result.single():
relationships_created += 1
# District Administrative
district_name = row.get('DistrictAdministrative (name)', '').strip()
if district_name and district_name != 'Not applicable':
district_query = """
MERGE (da:DistrictAdministrative {name: $name})
WITH da
MATCH (e:Establishment) WHERE e.urn = $urn
MERGE (e)-[r:IS_IN_DISTRICT_ADMINISTRATIVE]->(da)
RETURN r
"""
result = tx.run(district_query, name=district_name, urn=establishment_node['urn'])
if result.single():
relationships_created += 1
# Establishment location relationships
if locality_name and locality_name != 'Not applicable':
location_query = """
MATCH (e:Establishment) WHERE e.urn = $urn
MATCH (l:Locality) WHERE l.name = $locality_name
MERGE (e)-[r:IS_LOCATED_IN_LOCALITY]->(l)
RETURN r
"""
result = tx.run(location_query, urn=establishment_node['urn'], locality_name=locality_name)
if result.single():
relationships_created += 1
return relationships_created
def _process_headers(self, fieldnames: List[str]) -> None:
"""Process CSV headers to understand the data structure"""
logger.info(f"Processing {len(fieldnames)} columns from Edubase data")
# Group related columns
self.column_groups = {
'establishment': ['URN', 'EstablishmentNumber', 'EstablishmentName'],
'local_authority': ['LA (code)', 'LA (name)'],
'establishment_type': ['TypeOfEstablishment (code)', 'TypeOfEstablishment (name)'],
'establishment_type_group': ['EstablishmentTypeGroup (code)', 'EstablishmentTypeGroup (name)'],
'establishment_status': ['EstablishmentStatus (code)', 'EstablishmentStatus (name)'],
'phase_education': ['PhaseOfEducation (code)', 'PhaseOfEducation (name)', 'StatutoryLowAge', 'StatutoryHighAge'],
'boarders': ['Boarders (code)', 'Boarders (name)'],
'nursery': ['NurseryProvision (name)'],
'sixth_form': ['OfficialSixthForm (code)', 'OfficialSixthForm (name)'],
'gender': ['Gender (code)', 'Gender (name)'],
'religious': ['ReligiousCharacter (code)', 'ReligiousCharacter (name)', 'ReligiousEthos (name)'],
'diocese': ['Diocese (code)', 'Diocese (name)'],
'admissions': ['AdmissionsPolicy (code)', 'AdmissionsPolicy (name)'],
'capacity': ['SchoolCapacity'],
'special_classes': ['SpecialClasses (code)', 'SpecialClasses (name)'],
'census': ['CensusDate', 'NumberOfPupils', 'NumberOfBoys', 'NumberOfGirls', 'PercentageFSM'],
'trust': ['TrustSchoolFlag (code)', 'TrustSchoolFlag (name)', 'Trusts (code)', 'Trusts (name)'],
'sponsor': ['SchoolSponsorFlag (name)', 'SchoolSponsors (name)'],
'federation': ['FederationFlag (name)', 'Federations (code)', 'Federations (name)'],
'ukprn': ['UKPRN'],
'fe_type': ['FEHEIdentifier', 'FurtherEducationType (name)'],
'dates': ['OpenDate', 'CloseDate', 'LastChangedDate'],
'address': ['Street', 'Locality', 'Address3', 'Town', 'County (name)', 'Postcode'],
'contact': ['SchoolWebsite', 'TelephoneNum'],
'head_teacher': ['HeadTitle (name)', 'HeadFirstName', 'HeadLastName', 'HeadPreferredJobTitle'],
'inspection': ['BSOInspectorateName (name)', 'InspectorateReport', 'DateOfLastInspectionVisit', 'NextInspectionVisit'],
'special_provision': ['TeenMoth (name)', 'TeenMothPlaces', 'CCF (name)', 'SENPRU (name)', 'EBD (name)', 'PlacesPRU'],
'ft_provision': ['FTProv (name)', 'EdByOther (name)', 'Section41Approved (name)'],
'sen_provision': ['SEN1 (name)', 'SEN2 (name)', 'SEN3 (name)', 'SEN4 (name)', 'SEN5 (name)',
'SEN6 (name)', 'SEN7 (name)', 'SEN8 (name)', 'SEN9 (name)', 'SEN10 (name)',
'SEN11 (name)', 'SEN12 (name)', 'SEN13 (name)'],
'resourced_provision': ['TypeOfResourcedProvision (name)', 'ResourcedProvisionOnRoll', 'ResourcedProvisionCapacity'],
'sen_unit': ['SenUnitOnRoll', 'SenUnitCapacity'],
'geography': ['GOR (code)', 'GOR (name)', 'DistrictAdministrative (code)', 'DistrictAdministrative (name)',
'AdministrativeWard (code)', 'AdministrativeWard (name)', 'ParliamentaryConstituency (code)',
'ParliamentaryConstituency (name)', 'UrbanRural (code)', 'UrbanRural (name)'],
'gss_codes': ['GSSLACode (name)', 'Easting', 'Northing', 'MSOA (name)', 'LSOA (name)'],
'inspection_details': ['InspectorateName (name)', 'SENStat', 'SENNoStat'],
'boarding': ['BoardingEstablishment (name)'],
'props': ['PropsName'],
'previous': ['PreviousLA (code)', 'PreviousLA (name)', 'PreviousEstablishmentNumber'],
'country': ['Country (name)'],
'uprn': ['UPRN'],
'site': ['SiteName'],
'qab': ['QABName (code)', 'QABName (name)', 'EstablishmentAccredited (code)', 'EstablishmentAccredited (name)',
'QABReport', 'AccreditationExpiryDate'],
'ch_number': ['CHNumber'],
'msoa_lsoa_codes': ['MSOA (code)', 'LSOA (code)'],
'fsm': ['FSM']
}
def _process_edubase_row(self, row: Dict[str, str]) -> Tuple[List[Tuple[str, Dict[str, Any]]], List[Tuple[str, str, str]]]:
"""Process a single Edubase data row and return a tuple of (nodes, relationships)"""
nodes = []
relationships = []
try:
# Create main establishment node
establishment_props = self._extract_establishment_properties(row)
if establishment_props:
nodes.append(('Establishment', establishment_props))
# Create related nodes and relationships
self._create_related_nodes_and_relationships(row, nodes, relationships)
except Exception as e:
logger.error(f"Error processing row {row.get('URN', 'unknown')}: {str(e)}")
return nodes, relationships
def _extract_establishment_properties(self, row: Dict[str, str]) -> Optional[Dict[str, Any]]:
"""Extract properties for the main establishment node"""
urn = row.get('URN', '').strip()
if not urn:
return None
props = {
'urn': urn,
'establishmentNumber': row.get('EstablishmentNumber', '').strip(),
'establishmentName': row.get('EstablishmentName', '').strip(),
'openDate': self._parse_date(row.get('OpenDate', '')),
'closeDate': self._parse_date(row.get('CloseDate', '')),
'lastChangedDate': self._parse_date(row.get('LastChangedDate', '')),
'schoolCapacity': self._parse_int(row.get('SchoolCapacity', '')),
'numberOfPupils': self._parse_int(row.get('NumberOfPupils', '')),
'numberOfBoys': self._parse_int(row.get('NumberOfBoys', '')),
'numberOfGirls': self._parse_int(row.get('NumberOfGirls', '')),
'percentageFSM': self._parse_float(row.get('PercentageFSM', '')),
'statutoryLowAge': self._parse_int(row.get('StatutoryLowAge', '')),
'statutoryHighAge': self._parse_int(row.get('StatutoryHighAge', '')),
'easting': self._parse_int(row.get('Easting', '')),
'northing': self._parse_int(row.get('Northing', '')),
'street': row.get('Street', '').strip(),
'locality': row.get('Locality', '').strip(),
'address3': row.get('Address3', '').strip(),
'town': row.get('Town', '').strip(),
'county': row.get('County (name)', '').strip(),
'postcode': row.get('Postcode', '').strip(),
'schoolWebsite': row.get('SchoolWebsite', '').strip(),
'telephoneNum': row.get('TelephoneNum', '').strip(),
'headTitle': row.get('HeadTitle (name)', '').strip(),
'headFirstName': row.get('HeadFirstName', '').strip(),
'headLastName': row.get('HeadLastName', '').strip(),
'headPreferredJobTitle': row.get('HeadPreferredJobTitle', '').strip(),
'censusDate': self._parse_date(row.get('CensusDate', '')),
'teenMothPlaces': self._parse_int(row.get('TeenMothPlaces', '')),
'placesPRU': self._parse_int(row.get('PlacesPRU', '')),
'resourcedProvisionOnRoll': self._parse_int(row.get('ResourcedProvisionOnRoll', '')),
'resourcedProvisionCapacity': self._parse_int(row.get('ResourcedProvisionCapacity', '')),
'senUnitOnRoll': self._parse_int(row.get('SenUnitOnRoll', '')),
'senUnitCapacity': self._parse_int(row.get('SenUnitCapacity', '')),
'fsm': self._parse_int(row.get('FSM', '')),
'ukprn': row.get('UKPRN', '').strip(),
'uprn': row.get('UPRN', '').strip(),
'chNumber': row.get('CHNumber', '').strip()
}
# Remove empty/None values
props = {k: v for k, v in props.items() if v is not None and v != '' and v != 'Not applicable'}
return props
def _create_related_nodes_and_relationships(self, row: Dict[str, str], nodes: List[Tuple[str, Dict[str, Any]]], relationships: List[Tuple[str, str, str]]) -> None:
"""Create related nodes and relationships for an establishment"""
urn = row.get('URN', '').strip()
if not urn:
return
# Local Authority
la_code = row.get('LA (code)', '').strip()
la_name = row.get('LA (name)', '').strip()
if la_code and la_name and la_name != 'Not applicable':
la_key = f"{la_code}_{la_name}"
if la_key not in self.created_nodes['LocalAuthority']:
nodes.append(('LocalAuthority', {'code': la_code, 'name': la_name}))
self.created_nodes['LocalAuthority'].add(la_key)
relationships.append(('IS_CONTROLLED_BY_LOCAL_AUTHORITY', urn, la_key))
# Establishment Type
type_code = row.get('TypeOfEstablishment (code)', '').strip()
type_name = row.get('TypeOfEstablishment (name)', '').strip()
if type_code and type_name and type_name != 'Not applicable':
type_key = f"{type_code}_{type_name}"
if type_key not in self.created_nodes['EstablishmentType']:
nodes.append(('EstablishmentType', {'code': type_code, 'name': type_name}))
self.created_nodes['EstablishmentType'].add(type_key)
relationships.append(('IS_ESTABLISHMENT_TYPE', urn, type_key))
# Establishment Type Group
group_code = row.get('EstablishmentTypeGroup (code)', '').strip()
group_name = row.get('EstablishmentTypeGroup (name)', '').strip()
if group_code and group_name and group_name != 'Not applicable':
group_key = f"{group_code}_{group_name}"
if group_key not in self.created_nodes['EstablishmentTypeGroup']:
nodes.append(('EstablishmentTypeGroup', {'code': group_code, 'name': group_name}))
self.created_nodes['EstablishmentTypeGroup'].add(group_key)
relationships.append(('IS_ESTABLISHMENT_TYPE_GROUP', urn, group_key))
# Establishment Status
status_code = row.get('EstablishmentStatus (code)', '').strip()
status_name = row.get('EstablishmentStatus (name)', '').strip()
if status_code and status_name and status_name != 'Not applicable':
status_key = f"{status_code}_{status_name}"
if status_key not in self.created_nodes['EstablishmentStatus']:
nodes.append(('EstablishmentStatus', {'code': status_code, 'name': status_name}))
self.created_nodes['EstablishmentStatus'].add(status_key)
relationships.append(('CURRENT_ESTABLISHMENT_STATUS', urn, status_key))
# Phase of Education
phase_code = row.get('PhaseOfEducation (code)', '').strip()
phase_name = row.get('PhaseOfEducation (name)', '').strip()
if phase_code and phase_name and phase_name != 'Not applicable':
phase_key = f"{phase_code}_{phase_name}"
if phase_key not in self.created_nodes['PhaseOfEducation']:
nodes.append(('PhaseOfEducation', {'code': phase_code, 'name': phase_name}))
self.created_nodes['PhaseOfEducation'].add(phase_key)
relationships.append(('PROVIDES_PHASE_OF_EDUCATION', urn, phase_key))
# Gender
gender_code = row.get('Gender (code)', '').strip()
gender_name = row.get('Gender (name)', '').strip()
if gender_code and gender_name and gender_name != 'Not applicable':
gender_key = f"{gender_code}_{gender_name}"
if gender_key not in self.created_nodes['GenderType']:
nodes.append(('GenderType', {'code': gender_code, 'name': gender_name}))
self.created_nodes['GenderType'].add(gender_key)
relationships.append(('PROVIDES_FOR_GENDER_TYPE', urn, gender_key))
# Religious Character
rel_code = row.get('ReligiousCharacter (code)', '').strip()
rel_name = row.get('ReligiousCharacter (name)', '').strip()
if rel_code and rel_name and rel_name != 'Not applicable':
rel_key = f"{rel_code}_{rel_name}"
if rel_key not in self.created_nodes['ReligiousCharacter']:
nodes.append(('ReligiousCharacter', {'code': rel_code, 'name': rel_name}))
self.created_nodes['ReligiousCharacter'].add(rel_key)
relationships.append(('INCLUDES_RELIGIOUS_CHARACTER', urn, rel_key))
# Diocese
diocese_code = row.get('Diocese (code)', '').strip()
diocese_name = row.get('Diocese (name)', '').strip()
if diocese_code and diocese_name and diocese_name != 'Not applicable':
diocese_key = f"{diocese_code}_{diocese_name}"
if diocese_key not in self.created_nodes['Diocese']:
nodes.append(('Diocese', {'code': diocese_code, 'name': diocese_name}))
self.created_nodes['Diocese'].add(diocese_key)
relationships.append(('UNDER_DIOCESE', urn, diocese_key))
# Government Office Region
gor_code = row.get('GOR (code)', '').strip()
gor_name = row.get('GOR (name)', '').strip()
if gor_code and gor_name and gor_name != 'Not applicable':
gor_key = f"{gor_code}_{gor_name}"
if gor_key not in self.created_nodes['GovernmentOfficeRegion']:
nodes.append(('GovernmentOfficeRegion', {'code': gor_code, 'name': gor_name}))
self.created_nodes['GovernmentOfficeRegion'].add(gor_key)
relationships.append(('OVERSEEN_BY_GOVERNMENT_OFFICE_REGION', urn, gor_key))
# District Administrative
district_code = row.get('DistrictAdministrative (code)', '').strip()
district_name = row.get('DistrictAdministrative (name)', '').strip()
if district_code and district_name and district_name != 'Not applicable':
district_key = f"{district_code}_{district_name}"
if district_key not in self.created_nodes['DistrictAdministrative']:
nodes.append(('DistrictAdministrative', {'code': district_code, 'name': district_name}))
self.created_nodes['DistrictAdministrative'].add(district_key)
relationships.append(('WITHIN_DISTRICT_ADMINISTRATIVE', urn, district_key))
# Country
country_name = row.get('Country (name)', '').strip()
if country_name and country_name != 'Not applicable':
if country_name not in self.created_nodes['Country']:
nodes.append(('Country', {'name': country_name}))
self.created_nodes['Country'].add(country_name)
relationships.append(('LOCATED_IN_COUNTRY', urn, country_name))
# County
county_name = row.get('County (name)', '').strip()
if county_name and county_name != 'Not applicable':
if county_name not in self.created_nodes['County']:
nodes.append(('County', {'name': county_name}))
self.created_nodes['County'].add(county_name)
relationships.append(('LOCATED_IN_COUNTY', urn, county_name))
# County is in Country
if country_name and country_name != 'Not applicable':
relationships.append(('PART_OF_COUNTRY', county_name, country_name))
# Town
town_name = row.get('Town', '').strip()
if town_name and town_name != 'Not applicable':
if town_name not in self.created_nodes['Town']:
nodes.append(('Town', {'name': town_name}))
self.created_nodes['Town'].add(town_name)
relationships.append(('LOCATED_IN_TOWN', urn, town_name))
# Town is in County
if county_name and county_name != 'Not applicable':
relationships.append(('PART_OF_COUNTY', town_name, county_name))
# Locality
locality_name = row.get('Locality', '').strip()
if locality_name and locality_name != 'Not applicable':
if locality_name not in self.created_nodes['Locality']:
nodes.append(('Locality', {'name': locality_name}))
self.created_nodes['Locality'].add(locality_name)
relationships.append(('LOCATED_IN_LOCALITY', urn, locality_name))
# Locality is in Town
if town_name and town_name != 'Not applicable':
relationships.append(('PART_OF_TOWN', locality_name, town_name))
# Special Classes
special_classes_code = row.get('SpecialClasses (code)', '').strip()
special_classes_name = row.get('SpecialClasses (name)', '').strip()
if special_classes_code and special_classes_name and special_classes_name != 'Not applicable':
special_classes_key = f"{special_classes_code}_{special_classes_name}"
if special_classes_key not in self.created_nodes['SpecialClasses']:
nodes.append(('SpecialClasses', {'code': special_classes_code, 'name': special_classes_name}))
self.created_nodes['SpecialClasses'].add(special_classes_key)
relationships.append(('PROVIDES_SPECIAL_CLASSES', urn, special_classes_key))
# Further Education Type
fe_type_name = row.get('FurtherEducationType (name)', '').strip()
if fe_type_name and fe_type_name != 'Not applicable':
if fe_type_name not in self.created_nodes['FurtherEducationType']:
nodes.append(('FurtherEducationType', {'name': fe_type_name}))
self.created_nodes['FurtherEducationType'].add(fe_type_name)
relationships.append(('PROVIDES_FURTHER_EDUCATION_TYPE', urn, fe_type_name))
# Sixth Form
sixth_form_code = row.get('OfficialSixthForm (code)', '').strip()
sixth_form_name = row.get('OfficialSixthForm (name)', '').strip()
if sixth_form_code and sixth_form_name and sixth_form_name != 'Not applicable':
sixth_form_key = f"{sixth_form_code}_{sixth_form_name}"
if sixth_form_key not in self.created_nodes['SixthForm']:
nodes.append(('SixthForm', {'code': sixth_form_code, 'name': sixth_form_name}))
self.created_nodes['SixthForm'].add(sixth_form_key)
relationships.append(('PROVIDES_SIXTH_FORM', urn, sixth_form_key))
def _create_all_nodes(self, all_nodes: List[Tuple[str, Dict[str, Any]]]) -> Dict[str, Any]:
"""Create all nodes from the collected list of (label, properties) tuples"""
node_map = {}
try:
with self.neo4j_service.driver.session(database=self.db_name) as session:
for label, properties in all_nodes:
try:
if label == 'Establishment':
# For establishments, use URN as key
key = properties.get('urn')
if key:
node = create_node_direct(session, label, properties)
if node:
node_map[key] = node
logger.debug(f"Created {label} node with key: {key}")
else:
# For other nodes, create and store for later relationship creation
node = create_node_direct(session, label, properties)
if node:
# Create a key for this node
if 'code' in properties and 'name' in properties:
key = f"{properties['code']}_{properties['name']}"
elif 'name' in properties:
key = properties['name']
else:
key = str(node.id)
node_map[key] = node
logger.debug(f"Created {label} node with key: {key}")
except Exception as e:
logger.error(f"Failed to create {label} node: {str(e)}")
return node_map
except Exception as e:
logger.error(f"Error creating all nodes: {str(e)}")
return {}
def _create_all_relationships(self, all_relationships: List[Tuple[str, str, str]], node_map: Dict[str, Any]) -> int:
"""Create all relationships from the collected list of (rel_type, start_key, end_key) tuples"""
relationships_created = 0
try:
with self.neo4j_service.driver.session(database=self.db_name) as session:
for rel_type, start_key, end_key in all_relationships:
start_node = node_map.get(start_key)
end_node = node_map.get(end_key)
if start_node and end_node:
try:
# Use property-based matching instead of deprecated ID() function
# This is the recommended approach for Neo4j 5.x
# Get properties from the nodes
start_props = dict(start_node)
end_props = dict(end_node)
# Get labels
start_label = list(start_node.labels)[0] if start_node.labels else 'Node'
end_label = list(end_node.labels)[0] if end_node.labels else 'Node'
# Create a unique property-based query
# For establishments, use URN (unique identifier)
if start_label == 'Establishment' and 'urn' in start_props:
start_match = f"n1:{start_label} {{urn: $start_urn}}"
start_params = {'start_urn': start_props['urn']}
else:
# For other nodes, use the combined code_name key format
# This matches how we create the nodes in the first place
if 'code' in start_props and 'name' in start_props:
start_match = f"n1:{start_label} {{code: $start_code, name: $start_name}}"
start_params = {'start_code': start_props['code'], 'start_name': start_props['name']}
elif 'name' in start_props:
start_match = f"n1:{start_label} {{name: $start_name}}"
start_params = {'start_name': start_props['name']}
else:
print(f"ERROR: No unique property found for start node {start_label}: {start_props}")
continue
# Same for end node
if end_label == 'Establishment' and 'urn' in end_props:
end_match = f"n2:{end_label} {{urn: $end_urn}}"
end_params = {'end_urn': end_props['urn']}
else:
# For other nodes, use the combined code_name key format
if 'code' in end_props and 'name' in end_props:
end_match = f"n2:{end_label} {{code: $end_code, name: $end_name}}"
end_params = {'end_code': end_props['code'], 'end_name': end_props['name']}
elif 'name' in end_props:
end_match = f"n2:{end_label} {{name: $end_name}}"
end_params = {'end_name': end_props['name']}
else:
print(f"ERROR: No unique property found for end node {end_label}: {end_props}")
continue
# Combine parameters
params = {**start_params, **end_params}
# Create relationship using property-based matching
query = f"""
MATCH ({start_match}), ({end_match})
MERGE (n1)-[r:{rel_type}]->(n2)
RETURN r
"""
result = session.run(query, **params)
record = result.single()
if record and record["r"]:
self.created_relationships.add(f"{start_key}-{rel_type}-{end_key}")
relationships_created += 1
print(f"SUCCESS: Created relationship {rel_type} between {start_key} and {end_key}")
else:
print(f"FAILED: Could not create relationship {rel_type} between {start_key} and {end_key}")
except Exception as e:
print(f"ERROR: Exception creating relationship {rel_type} between {start_key} and {end_key}: {str(e)}")
else:
if not start_node:
logger.warning(f"Start node not found for relationship {rel_type} with key {start_key}")
if not end_node:
logger.warning(f"End node not found for relationship {rel_type} with key {end_key}")
return relationships_created
except Exception as e:
logger.error(f"Error creating all relationships: {str(e)}")
return 0
def _parse_date(self, date_str: str) -> Optional[str]:
"""Parse date string to ISO format"""
if not date_str or date_str.strip() == '' or date_str.strip() == 'Not applicable':
return None
try:
# Handle DD-MM-YYYY format
if '-' in date_str:
parts = date_str.split('-')
if len(parts) == 3:
day, month, year = parts
if len(year) == 2:
year = f"20{year}" if int(year) < 50 else f"19{year}"
return f"{year}-{month.zfill(2)}-{day.zfill(2)}"
except:
pass
return date_str.strip()
def _parse_int(self, int_str: str) -> Optional[int]:
"""Parse integer string"""
if not int_str or int_str.strip() == '' or int_str.strip() == 'Not applicable':
return None
try:
return int(int_str.strip())
except:
return None
def _parse_float(self, float_str: str) -> Optional[float]:
"""Parse float string"""
if not float_str or float_str.strip() == '' or float_str.strip() == 'Not applicable':
return None
try:
return float(float_str.strip())
except:
return None
def import_gais_data() -> Dict[str, Any]:
"""Import GAIS data into Neo4j database"""
logger.info("Starting GAIS data import...")
try:
importer = GAISDataImporter()
# Process all rows in the CSV file
result = importer.import_edubase_data_simple(test_mode=False)
if result["success"]:
logger.info("GAIS data import completed successfully!")
else:
logger.error(f"GAIS data import failed: {result['message']}")
return result
except Exception as e:
logger.error(f"Error in GAIS data import: {str(e)}")
return {
"success": False,
"message": f"Error in GAIS data import: {str(e)}"
}