"""Sync Supabase profile users into the central Neo4j cc.users database. This script is intentionally idempotent. It defaults to --dry-run so it can be used safely during diagnostics. Use --apply to write/update Neo4j nodes. """ from __future__ import annotations import argparse import os import sys from pathlib import Path from dataclasses import dataclass from typing import Any SCRIPT_DIR = str(Path(__file__).resolve().parent) if SCRIPT_DIR in sys.path: sys.path.remove(SCRIPT_DIR) import requests from dotenv import load_dotenv from neo4j import GraphDatabase @dataclass(frozen=True) class UserRecord: uuid_string: str user_email: str cc_username: str user_type: str user_name: str display_name: str | None institute_id: str | None institute_name: str | None institute_role: str | None institute_neo4j_uuid_string: str | None user_db_name: str institute_db_name: str | None node_storage_path: str def load_environment(env_file: str | None) -> None: if env_file: load_dotenv(env_file, override=True) else: load_dotenv(override=False) def supabase_headers() -> dict[str, str]: key = os.getenv('SERVICE_ROLE_KEY') or os.getenv('ANON_KEY') if not key: raise RuntimeError('SERVICE_ROLE_KEY or ANON_KEY is required') return {'apikey': key, 'Authorization': f'Bearer {key}'} def fetch_table(table: str, select: str) -> list[dict[str, Any]]: base = os.getenv('SUPABASE_URL') if not base: raise RuntimeError('SUPABASE_URL is required') response = requests.get( f'{base.rstrip("/")}/rest/v1/{table}', headers=supabase_headers(), params={'select': select}, timeout=30, ) if response.status_code != 200: raise RuntimeError(f'Supabase {table} query failed: {response.status_code} {response.text[:500]}') return response.json() def normalize_uuid(value: str | None) -> str | None: return value.replace('-', '') if value else None def build_records() -> list[UserRecord]: profiles = fetch_table('profiles', 'id,email,username,full_name,display_name,user_type,metadata,user_db_name,school_db_name') memberships = fetch_table('institute_memberships', 'profile_id,institute_id,role') institutes = fetch_table('institutes', 'id,name,urn,neo4j_uuid_string') membership_by_profile = {m['profile_id']: m for m in memberships} institute_by_id = {i['id']: i for i in institutes} records: list[UserRecord] = [] for profile in profiles: profile_id = profile['id'] user_type = profile.get('user_type') or 'unknown' uuid_no_dash = normalize_uuid(profile_id) email = profile.get('email') or '' username = profile.get('username') or (email.split('@', 1)[0] if email else uuid_no_dash) user_name = profile.get('full_name') or profile.get('display_name') or username or email or profile_id membership = membership_by_profile.get(profile_id, {}) institute = institute_by_id.get(membership.get('institute_id'), {}) if membership else {} institute_uuid = institute.get('neo4j_uuid_string') or normalize_uuid(institute.get('id')) user_db_name = profile.get('user_db_name') or f'cc.users.{user_type}.{uuid_no_dash}' institute_db_name = profile.get('school_db_name') or (f'cc.institutes.{institute_uuid}' if institute_uuid else None) records.append(UserRecord( uuid_string=profile_id, user_email=email, cc_username=username or profile_id, user_type=user_type, user_name=user_name, display_name=profile.get('display_name'), institute_id=membership.get('institute_id') if membership else None, institute_name=institute.get('name') if institute else None, institute_role=membership.get('role') if membership else None, institute_neo4j_uuid_string=institute_uuid, user_db_name=user_db_name, institute_db_name=institute_db_name, node_storage_path=f'neo4j://{user_db_name}/User/{profile_id}', )) return records def neo4j_driver(): url = os.getenv('APP_BOLT_URL') username = os.getenv('USER_NEO4J') password = os.getenv('PASSWORD_NEO4J') if not url or not username or not password: raise RuntimeError('APP_BOLT_URL, USER_NEO4J, and PASSWORD_NEO4J are required') return GraphDatabase.driver(url, auth=(username, password)) def ensure_schema(session) -> None: statements = [ 'CREATE CONSTRAINT user_uuid_unique IF NOT EXISTS FOR (u:User) REQUIRE u.uuid_string IS UNIQUE', 'CREATE INDEX user_email_index IF NOT EXISTS FOR (u:User) ON (u.user_email)', 'CREATE INDEX user_username_index IF NOT EXISTS FOR (u:User) ON (u.cc_username)', 'CREATE INDEX user_type_index IF NOT EXISTS FOR (u:User) ON (u.user_type)', 'CREATE INDEX user_institute_index IF NOT EXISTS FOR (u:User) ON (u.institute_id)', 'CREATE CONSTRAINT institute_uuid_unique IF NOT EXISTS FOR (i:Institute) REQUIRE i.uuid_string IS UNIQUE', ] for statement in statements: session.run(statement).consume() def merge_user(session, record: UserRecord) -> None: labels = ':User' if record.user_type == 'teacher': labels = ':User:Teacher' elif record.user_type == 'student': labels = ':User:Student' session.run( f""" MERGE (u{labels} {{uuid_string: $uuid_string}}) SET u.user_email = $user_email, u.cc_username = $cc_username, u.user_type = $user_type, u.user_name = $user_name, u.display_name = $display_name, u.institute_id = $institute_id, u.institute_name = $institute_name, u.institute_role = $institute_role, u.institute_neo4j_uuid_string = $institute_neo4j_uuid_string, u.user_db_name = $user_db_name, u.institute_db_name = $institute_db_name, u.node_storage_path = $node_storage_path, u.merged = true, u.source = 'supabase.profiles', u.synced_at = datetime() """, **record.__dict__, ).consume() if record.institute_neo4j_uuid_string: session.run( """ MATCH (u:User {uuid_string: $uuid_string}) MERGE (i:Institute {uuid_string: $institute_neo4j_uuid_string}) SET i.supabase_id = $institute_id, i.name = $institute_name MERGE (u)-[r:MEMBER_OF]->(i) SET r.role = $institute_role, r.synced_at = datetime() """, **record.__dict__, ).consume() def verify(session) -> dict[str, Any]: result: dict[str, Any] = {} result['users'] = session.run('MATCH (u:User) RETURN count(u) AS c').single()['c'] result['teachers'] = session.run("MATCH (u:User {user_type: 'teacher'}) RETURN count(u) AS c").single()['c'] result['students'] = session.run("MATCH (u:User {user_type: 'student'}) RETURN count(u) AS c").single()['c'] result['bad_users'] = session.run(""" MATCH (u:User) WHERE u.uuid_string IS NULL OR u.user_email IS NULL OR u.cc_username IS NULL OR u.user_type IS NULL OR u.user_name IS NULL OR u.user_db_name IS NULL RETURN count(u) AS c """).single()['c'] result['duplicate_uuids'] = session.run(""" MATCH (u:User) WITH u.uuid_string AS uuid, count(*) AS c WHERE c > 1 RETURN count(*) AS c """).single()['c'] result['memberships'] = session.run('MATCH (:User)-[r:MEMBER_OF]->(:Institute) RETURN count(r) AS c').single()['c'] result['institutes'] = session.run('MATCH (i:Institute) RETURN count(i) AS c').single()['c'] return result def main() -> int: parser = argparse.ArgumentParser() parser.add_argument('--env-file', default=None) parser.add_argument('--database', default='cc.users') parser.add_argument('--apply', action='store_true', help='Write to Neo4j. Defaults to dry-run.') args = parser.parse_args() load_environment(args.env_file) records = build_records() by_type: dict[str, int] = {} for record in records: by_type[record.user_type] = by_type.get(record.user_type, 0) + 1 print(f'Supabase source records: {len(records)} {by_type}') if not args.apply: for record in records[:5]: print(f'DRY RUN {record.uuid_string} {record.user_email} -> {record.user_db_name}') print('Dry run only. Re-run with --apply to write Neo4j cc.users.') return 0 with neo4j_driver() as driver: with driver.session(database=args.database) as session: ensure_schema(session) for record in records: merge_user(session, record) result = verify(session) print(f'Neo4j verification: {result}') expected = { 'users': len(records), 'teachers': by_type.get('teacher', 0), 'students': by_type.get('student', 0), 'bad_users': 0, 'duplicate_uuids': 0, 'memberships': len(records), 'institutes': 2, } for key, value in expected.items(): if result.get(key) != value: raise RuntimeError(f'Verification failed for {key}: expected {value}, got {result.get(key)}') return 0 if __name__ == '__main__': raise SystemExit(main())