Skip to content

Instantly share code, notes, and snippets.

@cj2001
Created February 9, 2021 23:43
Show Gist options
  • Save cj2001/30f993b482994c6908db04915115d688 to your computer and use it in GitHub Desktop.
Save cj2001/30f993b482994c6908db04915115d688 to your computer and use it in GitHub Desktop.
Neo4j Python connection class
class Neo4jConnection:
def __init__(self, uri, user, pwd):
self.__uri = uri
self.__user = user
self.__pwd = pwd
self.__driver = None
try:
self.__driver = GraphDatabase.driver(self.__uri, auth=(self.__user, self.__pwd))
except Exception as e:
print("Failed to create the driver:", e)
def close(self):
if self.__driver is not None:
self.__driver.close()
def query(self, query, parameters=None, db=None):
assert self.__driver is not None, "Driver not initialized!"
session = None
response = None
try:
session = self.__driver.session(database=db) if db is not None else self.__driver.session()
response = list(session.run(query, parameters))
except Exception as e:
print("Query failed:", e)
finally:
if session is not None:
session.close()
return response
conn = Neo4jConnection(uri="bolt://52.87.205.91:7687",
user="neo4j",
pwd="difficulties-pushup-gaps")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment