Skip to content

Instantly share code, notes, and snippets.

@voutilad
Created January 13, 2022 13:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save voutilad/e0670a94f643ef1ddd43f8d4a533d7e4 to your computer and use it in GitHub Desktop.
Save voutilad/e0670a94f643ef1ddd43f8d4a533d7e4 to your computer and use it in GitHub Desktop.
Example remote bulk import with neo4j-arrow
#!/bin/env python
## When pointed at an import directory (see https://neo4j.com/docs/operations-manual/current/tools/neo4j-admin/neo4j-admin-import/)
## this script identifies the graph entities and builds the stream of nodes and relationships.
import neo4j_arrow as na
import pyimport as pi # see https://github.com/neo4j-field/neo4j-arrow/blob/bulk-import/src/python/pyimport.py
from time import time
from sys import argv, exit
if __name__ != '__main__':
print('this is not a module!')
exit(1)
if len(argv) != 2:
print(f'usage: {argv[0]} [import directory]')
exit(1)
db = argv[-1]
client = na.Neo4jArrow('neo4j', 'password', ('voutila-arrow-test', 9999), tls=True, verifyTls=False)
nodes, rels = pi.load_dir(db)
db = f'{db}{int(time())}' # use a timestamped name for now
print(f'importing {len(nodes):,} nodes & {len(rels):,} rels into db {db}')
ticket = client.bulk_import(db, idField='ID', sourceField='START_ID', targetField='END_ID')
# Push the nodes to the server. Doesn't _need_ to be done first, but bulkimport API starts working on nodes
rows, nbytes = client.put_stream(ticket, nodes, metadata={'stream.type': 'node'})
print(f'streamed {rows:,} nodes ({round(nbytes / (1 << 20), 3):,} MiB)')
# Push the relationships to the server. This could be done concurrently with the node stream.
rows, nbytes = client.put_stream(ticket, rels, metadata={'stream.type': 'rels'})
print(f'streamed {rows:,} relationships ({round(nbytes / (1 << 20), 3):,} MiB)')
# At this point, it's just waiting for the server to finish the import.
timeout = 5 * 60
print(f'waiting {timeout}s for job completion')
if client.wait_for_job(ticket, status=na.JobStatus.COMPLETE, timeout=timeout):
print('finished!')
else:
print('timed out.' )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment