Skip to content

Instantly share code, notes, and snippets.

@jexp
Created October 9, 2014 19:51
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jexp/0617412dcdd644fd520b to your computer and use it in GitHub Desktop.
Save jexp/0617412dcdd644fd520b to your computer and use it in GitHub Desktop.
neo4j batch import with groovy

How to use the Neo4j Batch-Inserter for your own CSV files

You might have data as CSV files to create nodes and relationships from in your Neo4j Graph Database. It might be a lot of data, like many tens of million lines. Too much for LOAD CSV to handle transactionally.

Usually you can just fire up my batch-importer and prepare node and relationship files that adhere to its input format requirements.

Your Requirements

There are some things you probably want to do differently than the batch-importer does by default:

  • not create legacy indexes

  • not index properties at all that you just need for connecting data

  • create schema indexes

  • skip certain columns

  • rename properties from the column names

  • create your own labels based on the data in the row

  • convert column values into Neo4j types (e.g. split strings or parse JSON)

Batch Inserter API

Here is where you, even as non-Java developer can write a few lines of code and make it happen. The Neo4j Batch-Inserter APIs are fast and simple, you get very far with just the few main methods

  • inserter.createNode(properties, labels) → node-id

  • inserter.createRelationship(fromId, toId, type, properties) → rel-id

  • inserter.createDeferredSchemaIndex(label).on(property).create()

Demo Data

For our demo we want to import articles, authors and citations from the MS citation database. // todo

The file looks like this:

articles.csv

Setup with Groovy

To keep the "Java"ness of this exercise to a minimum, I choose groovy a dynamic JVM language that feels closer to ruby and javascript than Java. You can run Groovy programs as script much like in other scripting languages. And more importantly it comes with a cool feature that allows us to run the whole thing without setting up and build configuration or installing dependencies manually.

If you want to, you can also do the same in jRuby, Jython or JavaScript (Rhino on Java7, Nashorn on Java8) or Lisp (Clojure). Would love to get a variant of the program in those languages.

Make sure you have a Java Development Kit (JDK) and Groovy installed.

We need two dependencies a CSV reader (GroovyCSV), and Neo4j and can just declare them with a @Grab annoation and import the classes into scope. (Thanks to Stefan for the tip.)

link:import_articles.groovy[role=include]

Then we create a batch-inserter instance which we have to make sure to shut down at the end, otherwise our store will not be valid.

The CSV reading is a simple one-liner, here is a quick example, more details on the versatile config in the [API docs].

csv = new File("articles.csv").newReader()
for (line in parseCsv(csv)) {
   println "Author: $line.author, Title: $line.title Date $line.date"
}

One trick we want to employ is keeping our authors unique by name, so even if they appear on multiple lines, we only want to create them once and then keep them around for the next time they are referenced.

Note
To keep the example simple we just use a Map, if you have to save memory you can look into a more efficient datastructure and a double pass. My recommendation would be a sorted name array where the array-index equals the node-id, so you can use Arrays.binarySearch(authors,name) on it to find the node-id of the author.

We define two enums, one for labels and one for relationship-types.

link:import_articles.groovy[role=include]

So when reading our data, we now check if we already know the author, if not, we create the Author-node and cache the node-id by name. Then we create the Article-node and connect both with a WROTE-relationship.

link:import_articles.groovy[role=include]

And that’s it.

I ran the data import with the Kaggle Authorship Competition containing 12M Author→Paper relationships.

The import used a slightly modified script that took care of the 3 files of authors, articles and the mapping. It loaded the 12m relationships, m authors and m articles in xxx seconds taking about 1 s per 100k rows.

groovy import_kaggle.groovy papers.db ~/Downloads/kaggle-author-paper

Total 11.160.348 rows 1.868.412 Authors and 1.172.020 Papers took 174.122 seconds.

You can find the full script in this GitHub gist.

If you’re interested in meeting me, more treats (full-day training, hackathon) or graph database fun in general, don’t hesitate to join us on Oct 22 2014 in San Francisco for this years GraphConnect conference for all things Neo4j.

//tag::dependencies[]
@Grab('com.xlson.groovycsv:groovycsv:1.0')
@Grab('org.neo4j:neo4j:2.1.4')
import static com.xlson.groovycsv.CsvParser.parseCsv
import org.neo4j.graphdb.*
//end::dependencies[]
//tag::enums[]
enum Labels implements Label { Author, Article }
enum Types implements RelationshipType { WROTE }
//end::enums[]
// [keys,value].transpose().collectEntries()
def toMap(line) { line.columns.inject([:]){ m,k,v -> m.put(k,line.values[v]);m } }
def config = [
"use_memory_mapped_buffers": "true",
"neostore.nodestore.db.mapped_memory": "250M",
"neostore.relationshipstore.db.mapped_memory": "1G",
"neostore.propertystore.db.mapped_memory": "500M",
"neostore.propertystore.db.strings.mapped_memory": "500M",
"neostore.propertystore.db.arrays.mapped_memory": "0M",
"cache_type": "none",
"dump_config": "true"
]
def NO_PROPS=[:]
// cache
def authors = [:]
count = 0
time = System.currentTimeMillis()
def trace(output) {
if (output || ++ count % 100_000 == 0) {
now = System.currentTimeMillis()
println "$count rows ${(now-time)} ms"
time = now
}
}
//tag::main[]
store=args[0]
articles_file=new File(args[1])
println "Importing data from ${articles_file} into ${store}"
csv = articles_file.newReader()
batch = org.neo4j.unsafe.batchinsert.BatchInserters.inserter(store,config)
format = new java.text.SimpleDateFormat("yyyy-MM-dd")
try {
//tag::loop[]
for (line in parseCsv(csv)) {
name = line.author
if (!authors[name]) {
authors[name] = batch.createNode([name:name],Labels.Author)
}
date = format.parse(line.date).time
article = batch.createNode([title:line.title, date:date],Labels.Article)
batch.createRelationship(authors[name] ,article, Types.WROTE, NO_PROPS)
trace()
}
//end::loop[]
batch.createDeferredConstraint(Labels.Author).assertPropertyIsUnique("name").create()
batch.createDeferredSchemaIndex(Labels.Article).on("title").create()
} finally {
csv.close()
batch.shutdown()
trace(true)
}
//end::main[]
// script to import some data of the Kaggle Paper-Author challenge: https://www.kaggle.com/c/kdd-cup-2013-author-paper-identification-challenge/data
@Grab('com.xlson.groovycsv:groovycsv:1.0')
@Grab('org.neo4j:neo4j:2.1.4')
import static com.xlson.groovycsv.CsvParser.parseCsv
import org.neo4j.graphdb.*
enum Labels implements Label { Author, Paper }
enum Types implements RelationshipType { WROTE }
// [keys,value].transpose().collectEntries()
def toMap(line) { line.columns.inject([:]){ m,k,v -> m.put(k,line.values[v]);m } }
def config = [
"use_memory_mapped_buffers": "true",
"neostore.nodestore.db.mapped_memory": "250M",
"neostore.relationshipstore.db.mapped_memory": "1G",
"neostore.propertystore.db.mapped_memory": "500M",
"neostore.propertystore.db.strings.mapped_memory": "500M",
"neostore.propertystore.db.arrays.mapped_memory": "0M",
"cache_type": "none",
"dump_config": "true"
]
def NO_PROPS=[:]
// cache
def authors = [:]
def papers = [:]
count = 0
time = start = System.currentTimeMillis()
def trace(output) {
if (output || ++ count % 100_000 == 0) {
now = System.currentTimeMillis()
println "$count rows ${(now-time)} ms"
time = now
}
}
//tag::main[]
store=args[0]
dir=new File(args[1])
papers_file=new File(dir,"Paper.csv")
author_paper_file=new File(dir,"PaperAuthor.csv")
println "Importing data from ${papers_file} and ${author_paper_file} into ${store}"
batch = org.neo4j.unsafe.batchinsert.BatchInserters.inserter(store,config)
try {
// create papers
// Id,Title,Year,ConferenceId,JournalId,Keyword
csv = papers_file.newReader()
for (line in parseCsv(csv)) {
if (line.values.size() < 3) continue
paper = line.Id
if (!papers[paper]) {
papers[paper] = batch.createNode([id:paper,title:line.Title,year:line.Year],Labels.Paper)
}
trace()
}
csv.close()
// create authors and wrote-relationships
// PaperId,AuthorId,Name,Affiliation
csv = author_paper_file.newReader()
for (line in parseCsv(csv)) {
if (line.values.size() < 3) continue
author = line.AuthorId
if (!authors[author]) {
authors[author] = batch.createNode([id:author,name:line.Name],Labels.Author)
}
paper = line.PaperId
if (papers[paper]) {
batch.createRelationship(authors[author] ,papers[paper], Types.WROTE, NO_PROPS)
}
trace()
}
csv.close()
batch.createDeferredConstraint(Labels.Author).assertPropertyIsUnique("name").create()
batch.createDeferredSchemaIndex(Labels.Paper).on("title").create()
} finally {
batch.shutdown()
trace(true)
println "Total $count Rows ${authors.size()} Authors and ${papers.size()} Papers took ${(now-start)/1000} seconds."
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment