api/run/initialization/sync_users_neo4j.py
kcar b452c9f593
Some checks failed
api-ci-deploy / test-build-deploy (push) Has been cancelled
test: add dev stack integration checks
2026-05-27 23:24:28 +01:00

239 lines
9.2 KiB
Python

"""Sync Supabase profile users into the central Neo4j cc.users database.
This script is intentionally idempotent. It defaults to --dry-run so it can be
used safely during diagnostics. Use --apply to write/update Neo4j nodes.
"""
from __future__ import annotations
import argparse
import os
import sys
from pathlib import Path
from dataclasses import dataclass
from typing import Any
SCRIPT_DIR = str(Path(__file__).resolve().parent)
if SCRIPT_DIR in sys.path:
sys.path.remove(SCRIPT_DIR)
import requests
from dotenv import load_dotenv
from neo4j import GraphDatabase
@dataclass(frozen=True)
class UserRecord:
uuid_string: str
user_email: str
cc_username: str
user_type: str
user_name: str
display_name: str | None
institute_id: str | None
institute_name: str | None
institute_role: str | None
institute_neo4j_uuid_string: str | None
user_db_name: str
institute_db_name: str | None
node_storage_path: str
def load_environment(env_file: str | None) -> None:
if env_file:
load_dotenv(env_file, override=True)
else:
load_dotenv(override=False)
def supabase_headers() -> dict[str, str]:
key = os.getenv('SERVICE_ROLE_KEY') or os.getenv('ANON_KEY')
if not key:
raise RuntimeError('SERVICE_ROLE_KEY or ANON_KEY is required')
return {'apikey': key, 'Authorization': f'Bearer {key}'}
def fetch_table(table: str, select: str) -> list[dict[str, Any]]:
base = os.getenv('SUPABASE_URL')
if not base:
raise RuntimeError('SUPABASE_URL is required')
response = requests.get(
f'{base.rstrip("/")}/rest/v1/{table}',
headers=supabase_headers(),
params={'select': select},
timeout=30,
)
if response.status_code != 200:
raise RuntimeError(f'Supabase {table} query failed: {response.status_code} {response.text[:500]}')
return response.json()
def normalize_uuid(value: str | None) -> str | None:
return value.replace('-', '') if value else None
def build_records() -> list[UserRecord]:
profiles = fetch_table('profiles', 'id,email,username,full_name,display_name,user_type,metadata,user_db_name,school_db_name')
memberships = fetch_table('institute_memberships', 'profile_id,institute_id,role')
institutes = fetch_table('institutes', 'id,name,urn,neo4j_uuid_string')
membership_by_profile = {m['profile_id']: m for m in memberships}
institute_by_id = {i['id']: i for i in institutes}
records: list[UserRecord] = []
for profile in profiles:
profile_id = profile['id']
user_type = profile.get('user_type') or 'unknown'
uuid_no_dash = normalize_uuid(profile_id)
email = profile.get('email') or ''
username = profile.get('username') or (email.split('@', 1)[0] if email else uuid_no_dash)
user_name = profile.get('full_name') or profile.get('display_name') or username or email or profile_id
membership = membership_by_profile.get(profile_id, {})
institute = institute_by_id.get(membership.get('institute_id'), {}) if membership else {}
institute_uuid = institute.get('neo4j_uuid_string') or normalize_uuid(institute.get('id'))
user_db_name = profile.get('user_db_name') or f'cc.users.{user_type}.{uuid_no_dash}'
institute_db_name = profile.get('school_db_name') or (f'cc.institutes.{institute_uuid}' if institute_uuid else None)
records.append(UserRecord(
uuid_string=profile_id,
user_email=email,
cc_username=username or profile_id,
user_type=user_type,
user_name=user_name,
display_name=profile.get('display_name'),
institute_id=membership.get('institute_id') if membership else None,
institute_name=institute.get('name') if institute else None,
institute_role=membership.get('role') if membership else None,
institute_neo4j_uuid_string=institute_uuid,
user_db_name=user_db_name,
institute_db_name=institute_db_name,
node_storage_path=f'neo4j://{user_db_name}/User/{profile_id}',
))
return records
def neo4j_driver():
url = os.getenv('APP_BOLT_URL')
username = os.getenv('USER_NEO4J')
password = os.getenv('PASSWORD_NEO4J')
if not url or not username or not password:
raise RuntimeError('APP_BOLT_URL, USER_NEO4J, and PASSWORD_NEO4J are required')
return GraphDatabase.driver(url, auth=(username, password))
def ensure_schema(session) -> None:
statements = [
'CREATE CONSTRAINT user_uuid_unique IF NOT EXISTS FOR (u:User) REQUIRE u.uuid_string IS UNIQUE',
'CREATE INDEX user_email_index IF NOT EXISTS FOR (u:User) ON (u.user_email)',
'CREATE INDEX user_username_index IF NOT EXISTS FOR (u:User) ON (u.cc_username)',
'CREATE INDEX user_type_index IF NOT EXISTS FOR (u:User) ON (u.user_type)',
'CREATE INDEX user_institute_index IF NOT EXISTS FOR (u:User) ON (u.institute_id)',
'CREATE CONSTRAINT institute_uuid_unique IF NOT EXISTS FOR (i:Institute) REQUIRE i.uuid_string IS UNIQUE',
]
for statement in statements:
session.run(statement).consume()
def merge_user(session, record: UserRecord) -> None:
labels = ':User'
if record.user_type == 'teacher':
labels = ':User:Teacher'
elif record.user_type == 'student':
labels = ':User:Student'
session.run(
f"""
MERGE (u{labels} {{uuid_string: $uuid_string}})
SET u.user_email = $user_email,
u.cc_username = $cc_username,
u.user_type = $user_type,
u.user_name = $user_name,
u.display_name = $display_name,
u.institute_id = $institute_id,
u.institute_name = $institute_name,
u.institute_role = $institute_role,
u.institute_neo4j_uuid_string = $institute_neo4j_uuid_string,
u.user_db_name = $user_db_name,
u.institute_db_name = $institute_db_name,
u.node_storage_path = $node_storage_path,
u.merged = true,
u.source = 'supabase.profiles',
u.synced_at = datetime()
""",
**record.__dict__,
).consume()
if record.institute_neo4j_uuid_string:
session.run(
"""
MATCH (u:User {uuid_string: $uuid_string})
MERGE (i:Institute {uuid_string: $institute_neo4j_uuid_string})
SET i.supabase_id = $institute_id,
i.name = $institute_name
MERGE (u)-[r:MEMBER_OF]->(i)
SET r.role = $institute_role,
r.synced_at = datetime()
""",
**record.__dict__,
).consume()
def verify(session) -> dict[str, Any]:
result: dict[str, Any] = {}
result['users'] = session.run('MATCH (u:User) RETURN count(u) AS c').single()['c']
result['teachers'] = session.run("MATCH (u:User {user_type: 'teacher'}) RETURN count(u) AS c").single()['c']
result['students'] = session.run("MATCH (u:User {user_type: 'student'}) RETURN count(u) AS c").single()['c']
result['bad_users'] = session.run("""
MATCH (u:User)
WHERE u.uuid_string IS NULL OR u.user_email IS NULL OR u.cc_username IS NULL
OR u.user_type IS NULL OR u.user_name IS NULL OR u.user_db_name IS NULL
RETURN count(u) AS c
""").single()['c']
result['duplicate_uuids'] = session.run("""
MATCH (u:User)
WITH u.uuid_string AS uuid, count(*) AS c
WHERE c > 1
RETURN count(*) AS c
""").single()['c']
result['memberships'] = session.run('MATCH (:User)-[r:MEMBER_OF]->(:Institute) RETURN count(r) AS c').single()['c']
result['institutes'] = session.run('MATCH (i:Institute) RETURN count(i) AS c').single()['c']
return result
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument('--env-file', default=None)
parser.add_argument('--database', default='cc.users')
parser.add_argument('--apply', action='store_true', help='Write to Neo4j. Defaults to dry-run.')
args = parser.parse_args()
load_environment(args.env_file)
records = build_records()
by_type: dict[str, int] = {}
for record in records:
by_type[record.user_type] = by_type.get(record.user_type, 0) + 1
print(f'Supabase source records: {len(records)} {by_type}')
if not args.apply:
for record in records[:5]:
print(f'DRY RUN {record.uuid_string} {record.user_email} -> {record.user_db_name}')
print('Dry run only. Re-run with --apply to write Neo4j cc.users.')
return 0
with neo4j_driver() as driver:
with driver.session(database=args.database) as session:
ensure_schema(session)
for record in records:
merge_user(session, record)
result = verify(session)
print(f'Neo4j verification: {result}')
expected = {
'users': len(records),
'teachers': by_type.get('teacher', 0),
'students': by_type.get('student', 0),
'bad_users': 0,
'duplicate_uuids': 0,
'memberships': len(records),
'institutes': 2,
}
for key, value in expected.items():
if result.get(key) != value:
raise RuntimeError(f'Verification failed for {key}: expected {value}, got {result.get(key)}')
return 0
if __name__ == '__main__':
raise SystemExit(main())