api/modules/database/tools/neontology/graphconnection.py
2025-07-11 13:52:19 +00:00

253 lines
8.1 KiB
Python

from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
import os
import modules.logger_tool as logger
log_name = 'api_modules_database_tools_neontology_graphconnection'
log_dir = os.getenv("LOG_PATH", "/logs") # Default path as fallback
logging = logger.get_logger(
name=log_name,
log_level=os.getenv("LOG_LEVEL", "DEBUG"),
log_path=log_dir,
log_file=log_name,
runtime=True,
log_format='default'
)
from typing import Any, Dict, List, Optional
from neo4j import GraphDatabase, Neo4jDriver
from neo4j import Record as Neo4jRecord
from neo4j import Result as Neo4jResult
from neo4j import Transaction as Neo4jTransaction
from .result import NeontologyResult, neo4j_records_to_neontology_records
class GraphConnection(object):
"""Class for managing connections to Neo4j."""
_instance = None
def __new__(
cls,
neo4j_uri: Optional[str] = None,
neo4j_username: Optional[str] = None,
neo4j_password: Optional[str] = None,
) -> "GraphConnection":
"""Make sure we only have a single connection to the GraphDatabase.
This connection then gets used by all instances.
Args:
neo4j_uri (Optional[str], optional): Neo4j URI to connect to. Defaults to None.
neo4j_username (Optional[str], optional): Neo4j username. Defaults to None.
neo4j_password (Optional[str], optional): Neo4j password. Defaults to None.
Returns:
GraphConnection: Instance of the connection
"""
if cls._instance is None:
cls._instance = object.__new__(cls)
if GraphConnection._instance:
try:
driver = GraphConnection._instance.driver = GraphDatabase.driver( # type: ignore
neo4j_uri, auth=(neo4j_username, neo4j_password)
)
driver.verify_connectivity()
from .utils import get_node_types, get_rels_by_type
# capture all possible types of node and relationship
cls.global_nodes = get_node_types()
cls.global_rels = get_rels_by_type()
except Exception as error:
logging.error(
"Error: connection not established. Have you run init_neontology? {}".format(
error
)
)
GraphConnection._instance = None
else:
GraphConnection._instance = None
return cls._instance
def __del__(self) -> None:
"""Close the driver gracefully when the class gets deleted."""
self.driver.close()
def __init__(
self,
neo4j_uri: Optional[str] = None,
neo4j_username: Optional[str] = None,
neo4j_password: Optional[str] = None,
) -> None:
if self._instance:
self.driver: Neo4jDriver = self._instance.driver
def run_transaction_single(
self, tx: Neo4jTransaction, query: str, params: Dict[str, Any]
) -> Optional[Neo4jRecord]:
"""Run a transaction which is expected to return a single result.
Args:
tx (Neo4jTransaction): Neo4j Transaction object
query (str): cypher query to run
params (Dict[str, Any]): Parameters to pass to the query
Returns:
Optional[Neo4jRecord]: The result
"""
return tx.run(query, **params).single()
def run_transaction_many(
self, tx: Neo4jTransaction, query: str, params: Dict[str, Any]
) -> List[Neo4jRecord]:
"""Run a transation which is expected to return multiple nodes.
Args:
tx (Neo4jTransaction): Neo4j Transaction object
query (str): cypher query to run
params (Dict[str, Any]): parameters to pass the query
Returns:
List[Neo4jRecord]: a list of the results
"""
return [record for record in tx.run(query, **params)]
def cypher_write(self, cypher: str, params: Dict[str, Any] = {}) -> None:
"""Execute a write transaction.
Args:
cypher (str): cypher query
params (Dict[str, Any]): parameters to pass to the query
"""
with self.driver.session() as session:
session.execute_write(self.run_transaction_single, cypher, params)
def cypher_write_single(self, cypher: str, params: Dict[str, Any] = {}) -> None:
"""Execute a write transaction.
Args:
cypher (str): cypher query
params (Dict[str, Any]): parameters to pass to the query
"""
with self.driver.session() as session:
return session.execute_write(self.run_transaction_single, cypher, params)
def cypher_write_many(self, cypher: str, params: Dict[str, Any] = {}) -> None:
"""Execute a write transaction.
Args:
cypher (str): cypher query
params (Dict[str, Any]): parameters to pass to the query
"""
with self.driver.session() as session:
return session.execute_write(self.run_transaction_many, cypher, params)
def cypher_read(
self, cypher: str, params: Dict[str, Any] = {}
) -> Optional[Neo4jRecord]:
"""Run a cypher read only query which is expected to return a single result.
Args:
cypher (str): cypher query string
params (Dict[str, Any]): parameters to pass to the query
Returns:
Neo4jRecord: the resulting Neo4j 'Record', or None
"""
with self.driver.session() as session:
return session.execute_read(self.run_transaction_single, cypher, params)
def cypher_read_many(
self, cypher: str, params: Dict[str, Any] = {}
) -> List[Neo4jRecord]:
"""Run a cypher read query which will return multiple records.
Args:
cypher (str): cypher string to run
params (Dict[str, Any]): parameters to pass to the query
Returns:
List[Neo4jRecord]: A list of Neo4j 'Records' returned by the query.
"""
with self.driver.session() as session:
return session.execute_read(self.run_transaction_many, cypher, params)
def apply_constraint(self, label: str, property: str) -> None:
cypher = f"""
CREATE CONSTRAINT IF NOT EXISTS
FOR (n:{label})
REQUIRE n.{property} IS UNIQUE
"""
self.cypher_write(cypher)
def evaluate_query_single(self, cypher, params={}):
result = self.driver.execute_query(
cypher, parameters_=params, result_transformer_=Neo4jResult.single
)
if result:
return result.value()
else:
return None
def evaluate_query(self, cypher, params={}):
result = self.driver.execute_query(cypher, parameters_=params)
neo4j_records = result.records
neontology_records = neo4j_records_to_neontology_records(
neo4j_records, self.global_nodes, self.global_rels
)
return NeontologyResult(
records=neo4j_records, neontology_records=neontology_records
)
def init_neontology(
neo4j_uri: Optional[str] = None,
neo4j_username: Optional[str] = None,
neo4j_password: Optional[str] = None,
) -> None:
"""Initialise neontology.
If connection properties are explicitly passed in, use these.
If not, attempt to load from enviornment variables (optionally in a .env file.)
Args:
neo4j_uri (Optional[str], optional): Neo4j URI to connect to. Defaults to None.
neo4j_username (Optional[str], optional): Neo4j username. Defaults to None.
neo4j_password (Optional[str], optional): Neo4j password. Defaults to None.
"""
# try to load environment variables from .env file
load_dotenv()
if neo4j_uri is None:
neo4j_uri = os.getenv("NEO4J_URI")
if neo4j_password is None:
neo4j_password = os.getenv("PASSWORD_NEO4J")
if neo4j_username is None:
neo4j_username = os.getenv("USER_NEO4J")
GraphConnection(neo4j_uri, neo4j_username, neo4j_password)
def close_neontology():
GraphConnection().__del__()