Save nandosola/ebe2ced123e05a79e238edd6ec81fee5 to your computer and use it in GitHub Desktop.
CALL apoc.periodic.iterate( | |
WITH apoc.coll.partition(collect(line),10000) AS batchesOfLines | |
UNWIND batchesOfLines as batch | |
RETURN batch", | |
"UNWIND {batch} AS user | |
MERGE (u:User {Email: user.Email}) | |
SET u += apoc.map.clean(user,['Email'],null)", | |
{batchSize: 1, parallel: true}) |
Michael Hunger, @mesirii
When you’re writing a lot of data to the graph from your application or library, you want to be efficent.
These approaches are not very efficient:
hard coding values instead of using parameters
sending a single query / tx per individual update
sending many single queries within a single tx with individual updates
generating large, complex statements (hundreds of lines) and sending one of them per tx and update
sending in HUGE (millions) of updates in a single tx, will cause out-of-memory issues
You want small enough queries, that are constant in their shape (for caching) and are using parameters. |
Each query can update from a single property to a whole subgraph (100 nodes) but has to be the same in overall structure for caching.
To achieve that you just prefix your regular "single-update-query" with an UNWIND that turns a batch of data (up to 10k or 50k entries) into individual rows, which contain the information for each of the (more or less complex) updates.
You send in a {batch} parameter (up to 10k-50k) of data (hopefully a delta) as a list of maps, which are then applied in a compact query, which is also properly compiled and cached, as it has a fixed structure.
Here are some examples:
{batch: [{name:"Alice",age:32},{name:"Bob",age:42}]}
UNWIND {batch} as row
CREATE (n:Label)
SET n += row
{batch: [{id:"alice@example.com",properties:{name:"Alice",age:32}},{id:"bob@example.com",properties:{name:"Bob",age:42}}]}
UNWIND {batch} as row
MERGE (n:Label {row.id})
(ON CREATE) SET n += row.properties
{batch: [{from:"alice@example.com",to:"bob@example.com",properties:{since:2012}},{from:"alice@example.com",to:"charlie@example.com",properties:{since:2016}}]}
UNWIND {batch} as row
MATCH (from:Label {row.from})
MATCH (to:Label {row.to})
CREATE/MERGE (from)-[rel:KNOWS]->(to)
(ON CREATE) SET rel += row.properties
good for parent-child trees
Here we’re passing a single property created
Alternatively you could pass in no properties or a map of properties to be set/updated.
{batch: [{from:123,to:[44,12,128],created:"2016-01-13"}, {from:34,to:[23,35,2983],created:"2016-01-15"},...]
UNWIND {batch} as row
MATCH (from) WHERE id(from) = row.from
MATCH (to) WHERE id(from) IN row.to // list of ids
CREATE/MERGE (from)-[rel:FOO]->(to)
SET rel.created = row.created
There are some more tricks.
You can also send in a map where the keys are node- or relationship-ids (converted to as strings) that’s more compact and faster too for the id lookup.
{ batch : [{"1":334,"2":222,3:3840, ... 100k}]}
WITH {batch} as data, [k in keys({batch}) | toInt(k)] as ids
MATCH (n) WHERE id(n) IN ids
// single property value
SET n.count = data[toString(id(n))]
// or override all properties
SET n = data[toString(id(n))]
// or add all properties
SET n += data[toString(id(n))]
Sometimes you want to create data dynamically based on inputs, e.g. a node with a certain label.
As cypher currently has no conditional WHEN
or IF
clause, and case when
is just an expression, you have to use a trick I came up with many years ago.
Fortunately there is FOREACH
which is meant to iterate over a list of items and execute update operations for each of them.
Fortunately a list of 0 or 1 elements can serve as a conditional of false and true, i.e. no iteration or one iteration.
General idea:
FOREACH (_ IN CASE WHEN predicate THEN [true] ELSE [] END |
... update operations ....
Note that the true
value in that list could be anything, 42, "", null
etc. as long as it is any single value so that we have a non-empty list.
You can achieve something similar with a RANGE(1, CASE WHEN predicate THEN 1 ELSE 0 END)
which will yield an empty list when the predicate is false.
Or if you fancy filter
then you can use: filter(_ IN [1] WHERE predicate)
Here is a concrete example:
LOAD CSV FROM {url} AS row
MATCH (o:Organization {name:row.org})
FOREACH (_ IN case when row.type = 'Person' then [1] else [] end|
MERGE (p:Person {name:row.name})
CREATE (p)-[:WORKS_FOR]->(o)
FOREACH (_ IN case when row.type = 'Agency' then [1] else [] end|
MERGE (a:Agency {name:row.name})
CREATE (a)-[:WORKS_FOR]->(o)
Note that identifiers created within FOREACH
are not accessible from the outside, you would have to re-match the value later on, or you have to move all your update operations into the foreach.
The APOC procedure library comes with a lot of useful procedures that can help you here, I want to highlight 3 of them:
create nodes / relationships with dynamic labels and propeties
batched transactions / iteration of updates
functions for creating and manipulating maps to be set as properties
With apoc.create.node
and apoc.create.relationship
you can have dynamically computed node-labels and relationship-types as well as any map of properties.
labels is a string array
properties is just a map
UWNIND {batch} as row
CALL apoc.create.node(row.labels, row.properties) yield node
RETURN count(*)
There are also procedures in apoc.create.* for setting/updating/removing properties and labels with dynamic string keys.
UWNIND {batch} as row
MATCH (from) WHERE id(n) = row.from
MATCH (to:Label) where to.key = row.to
CALL apoc.create.relationship(from, row.type, row.properties, to) yield rel
RETURN count(*)
As mentioned at the beginning huge transactions are a problem, you can update a million records with around 2G - 4G of heap but it gets difficult with larger volumes. My biggest volume per single transaction was about 10M nodes / relationships with 32G heap.
That’s where apoc.periodic.iterate
comes in.
The idea is simple, you have two Cypher statements, the first statement provides the data to operate on and can produce a huge (many millions) stream of data (nodes, rels, scalar values).
The second statement does the actual update work, it is called for each item, but a new transaction is created only for each batch of items.
(There is a new variant of this which will go into the next version of APOC that actually does an UNWIND variant of the second statement, so it executes only one inner statement per tx).
So for example your first statement returns 5 million nodes to update, with a computed value. The inner statement is executed once for each of those 5 M nodes. If your batch size is 10k then that happens in batches of 10k statements per transaction.
If your updates are independent of each other (think creation of nodes or updates of properties, or updates of independent subgraphs), then you can run this procedure with a parallel:true option which will use all your CPUs.
For example if you want to compute a score of many rated items and update this property in a batched fashion, this is what you would do:
call apoc.periodic.iterate('
MATCH (n:User)-[r1:LIKES]->(thing)<-[r2:RATED]-(m:User) WHERE id(n)<id(m) RETURN thing, avg( r1.rating + r2.rating ) as score
WITH {thing} as t SET t.score = {score}
', {batchSize:10000, parallel:true})
While lists can be created and processed quite easily in Cypher with range, collect, unwind, reduce, extract, filter, size
etc, maps have more limited means esp. for creation and modification.
The apoc.map.* package comes with a number of functions that make your life easier:
Creating Maps from other data:
RETURN apoc.map.fromPairs([["alice",38],["bob",42],...])
// {alice:38, bob: 42, ...}
RETURN apoc.map.fromLists(["alice","bob",...],[38,42])
// {alice:38, bob: 42, ...}
// groups nodes, relationships, maps by key, good for quick lookups by that key
RETURN apoc.map.groupBy([{name:"alice",gender:"female"},{name:"bob",gender:"male"}],"gender")
// {female:{name:"alice",gender:"female"}, male:{name:"bob",gender:"male"}}
RETURN apoc.map.groupByMulti([{name:"alice",gender:"female"},{name:"bob",gender:"male"},{name:"Jane",gender:"female"}],"gender")
// {female:[{name:"alice",gender:"female"},{name:"jane",gender:"female"}], male:[{name:"bob",gender:"male"}]}
Updating Maps
RETURN apoc.map.merge({alice: 38},{bob:42})
// {alice:38, bob: 42}
RETURN apoc.map.setKey({alice:38},"bob",42)
// {alice:38, bob: 42}
RETURN apoc.map.removeKey({alice:38, bob: 42},"alice")
// {bob: 42}
RETURN apoc.map.removeKey({alice:38, bob: 42},["alice","bob","charlie"])
// {}
// remove the given keys and values, good for data from load-csv/json/jdbc/xml
RETURN apoc.map.clean({name: "Alice", ssn:2324434, age:"n/a", location:""},["ssn"],["n/a",""])
// {name:"Alice"}
I used these approaches successfully for high volume update operations, and also in implementation of object graph mappers for bulk updates.
Of course you can combine these variants for more complex operations.
If you try them out and are successful, please let me know.
If you have any other tricks that helped you to achieve more write throughput with Cypher, please let me know too and I’ll update this post.
Follow me on Twitter for more tips like this.
LOAD CSV WITH HEADERS FROM "file:///data2.csv" AS row | |
WITH ROW WHERE ANY (k in keys(row) WHERE row[k] IS NULL) | |
RETURN row LIMIT 100; |
OPTIONAL MATCH path = (x)<-[*..3]-() WHERE ID(x) = 65 | |
UNWIND nodes(path) as node | |
UNWIND rels(path) as rel | |
WITH collect(distinct node) as nodes,collect(distinct rel) as rels | |
// todo release apoc.coll.flatten | |
// WITH apoc.coll.flatten(collect(nodes(path))) as nodes, apoc.coll.flatten(collect(relationships(path))) as rels | |
WITH apoc.coll.toSet([n in nodes WHERE n is not null | |
| { id: id(n),label: labels(n),type:"",metadata: properties(n) } ]) as nodes, | |
apoc.coll.toSet([r in rels WHERE r is not null | |
| { id: id(r),source: id(startNode(r)),relation: type(r),target: id(endNode(r)), directed: "true" } ]) as rels | |
RETURN { graph: { type:"",label: "",directed: "true",nodes: nodes,edges: rels, | |
metadata:{ countNodes: size(nodes),countEdges: size(rels) } } } as graph; | |
// length of the first part of a split is equivalent to index-of | |
RETURN length(split("European Union","pean")[0]); | |
START u=node:node_auto_index("fullName:*jay*") | |
MATCH (u:User) | |
WITH distinct u | |
RETURN {firstName : u.firstName , lastName : u.lastName, fullName : u.fullName, profilePicture : u.profilePicture, id : u.id} as user | |
// length of the first part of a split is equivalent to index-of | |
ORDER BY length(split(toLower(u.fullName,"jay")[0]); |
// all paths | |
MATCH p=(c:Organisation {duns_nbr:'216236900'})-[r:SHARES_HELD_BY*1..50]->(shb) | |
RETURN p | |
// longest path with sorting | |
MATCH p=(c:Organisation {duns_nbr:'216236900'})-[r:SHARES_HELD_BY*1..50]->(shb) | |
RETURN p | |
order by size(p) desc | |
return p limit 1 | |
// longest path by checking end-node with size() aka get-degree | |
MATCH p=(c:Organisation {duns_nbr:'216236900'})-[r:SHARES_HELD_BY*1..50]->(shb) | |
WHERE SIZE((shb)<-[:SHARES_HELD_BY]-())=0 | |
RETURN p | |
// longest path by checking end-node with exists() which is cheaper for long chains | |
MATCH p=(c:Organisation {duns_nbr:'216236900'})-[r:SHARES_HELD_BY*1..50]->(shb) | |
WHERE not exists( (shb)<-[:SHARES_HELD_BY]-() ) | |
When GraphQL was published as part of Facebooks React efforts, it made a big buzz as an straightforward means to declare what kind of projection of your domain data you need for a certain UI component. Using a JSON-like syntax you define which properties of your entity and related entities you want to be part of the data structure you get back from the server.
Here is an example from a StackOverflow query using the model from our previous blog posts on that topic.
{ question { title, author { name }, tags { name }, answers { text, author { name } } } }
Cypher itself with its rich support for literal maps and collections and the very powerful collect
aggregation function, already allows for returning complex JSON documents.
MATCH (u:User)-[:ASKED]->(q:Question)-[:TAGGED]->(t:Tag),
RETURN { title: q.title, author: u.name, tags: collect(t.name),
answers: collect({text: a.text, author: u2.name})} as question
This results in a document like this, which is similar to the original StackOverflow query API result.
"title": "neo4j cypher query to delete a middle node and connect all its parent node to child node",
"author": "Soumya George",
"tags": [
"answers": [
"text": "Some text",
"author": "InverseFalcon"
Some things are not as convenient as we saw in GraphQL, we thought it would be very helpful to add more syntactic sugar to the language.
Luckily my friend Andrés found some spare time to add two really neat features to Cypher in Neo4j 3.1 which we want to look into today.
Map Projections are very close to what you expect from a GraphQL query, you take an map or entity (node or relationship) and apply a map-like property-selector to it.
The result of that projection is a (optionally nested) map of results.
Here is the example above rewritten using a map-projection.
MATCH (u:User)-[:ASKED]->(q:Question)-[:TAGGED]->(t:Tag),
RETURN q{ .title, author : u.name, tags: collect(t.name),
answers: collect( a {.text, author: u2.name})} as question
But there are some more things possible.
Within a map projection you can also add literal values or aggregations to the data that you extract from the entity.
entity { .property1, .property2, .*, literal: value, values: collect(numbers), variable}
Here is a full list of possible selectors:
syntax | description | example |
property lookup |
all properties |
variable name as key, variable value as value |
literal entry |
To demonstrate those options we could rewrite the statement to:
MATCH (u:User)-[:ASKED]->(q:Question)-[:TAGGED]->(t:Tag),
WITH q, u, collect(t.name) as tags, collect( a {.text, author: u2.name}) as answers
RETURN q{ .title, author : u{.*}, tags, answers } as question
To pull in information from related entities, the other new feature, Pattern Comprehensions come into play.
You’ve all (hopefully) used the list comprehensions in Cypher, they borrow from Haskells syntax and look like this:
[value IN list WHERE predicate(value) | expression(value)]
As a concrete example, this returns the squares of the first 5 even numbers:
RETURN [x IN range(1,10) WHERE x % 2 = 0 | x * x] -> [4, 16, 36, 64, 100]
Now, you can use any kind of collection here, also collection of maps or nodes or even paths.
If you use a graph pattern as an expression, it actually yields a collection of paths. |
That’s cool, because now you can use a list comprehension to do pattern matching and extract a related node without actually using MATCH
and changing your cardinality.
So instead of:
MATCH (u:User)-[:POSTED]->(q:Question)
WHERE q.title CONTAINS "Neo4j"
RETURN u.name, collect(q.title) as questions
you could write:
MATCH (u:User)
RETURN u.name, [path IN (u)-[:ASKED]->(:Question)
WHERE (last(nodes(path))).title CONTAINS "Neo4j"
| (last(nodes(path))).title] as questions
Btw. this statement always returns a result, potentially an empty collection, so it’s the same as if you were OPTIONAL MATCH in the previous statement.
Wow, that’s ugly. Why? Because you can’t introduce new variables, like q
in such a pattern expression.
Only clauses could introduce new variables.
Until now!
With Pattern Comprehensions you actually can introduce local variables in such a pattern and use them in the WHERE
filter or expression at the end.
MATCH (u:User)
RETURN u.name,
[(u)-[:ASKED]->(q:Question) WHERE q.title CONTAINS "Neo4j" | q.title] as questions
Now let’s take a stab at our "GraphQL" query again, and see how we can rewrite it just starting from the Question
node and moving all projections of attributes and patterns into the RETURN
MATCH (q:Question)
RETURN q{.title,
author : [(q)<-[:ASKED]-(u) | u.name][0],
tags : [(q)<-[:TAGGED]-(t) | t.name],
answers: [(q)<-[:ANSWERS]-(a)<-[:PROVIDED]-(u2) | a{ .text, author: u2.name } ] }
As pattern comprehensions always return a collection we have to turn them into a single value as needed, e.g. with
To combine attributes of two entites into one map you have to spell out the 2nd entities attributes.
It would be nice to get support for combining maps in the future, then we could use
answers: [(q)<-[:ANSWERS]-(a)<-[:PROVIDED]-(u2) | a{ .text } + u2{ .name} ]
If you want to test these cool new features, please grab the recently released Neo4j 3.1.0-M07 Milestone and give it a try.
We’d love to get your feedback on these and other new features like the brand-new [cypher-shell].
With a lot of thanks to Andrés for this and everyone in engineering for a really cool database,
Cheers, Michael
// Matching dynamic objects | |
MATCH (p:Person)-[:ACTED_IN]->(m:Movie) | |
WITH collect([p, m]) as pairs | |
UNWIND pairs as pair | |
// WONT WORK | |
MATCH (pair[1])<-[:DIRECTED]-(p:Person) | |
// WORKS, alias expression with variable | |
WITH pair[0] as p0, pair[1] as p1 | |
MATCH (p1)<-[:DIRECTED]-(p:Person) | |
RETURN p.name |
We often get questions like, "My query is too slow, what can I do."
If the query looks like this:
load csv from "salaries.csv" as row match (p:Person) where p.id = row.id set p.salary = toFloat(row.salary)
the best guess is:
You forgot to create an constraint or index on that label and property combination!
create constraint on (p:Person) assert p.id is unique; // or create index on :Person(id);
Currently, Neo4j, when asked to do a property lookup on a non-indexed property, has to do a full scan over all nodes with that label and compare the property with the provided value in a filter operation. And it does that for every check, so if you have a CSV with 1M lines, then that’s 1M x full scan + filter.
Let’s look at some numbers and create an artificial dataset for that, in our case we’re only doing a read (i.e. the lookup) to not have the write + tx operation skew the times.
We create 1M People with an id
UNWIND range(1,1000000) AS id CREATE (:Person{id:id, age: id % 100}); +-------------------+ | No data returned. | +-------------------+ Nodes created: 1000000 Properties set: 1000000 Labels added: 1000000 10723 ms
=== Then we try to look up 500k of those id’s …
unwind range(1,1000000,2) as id return count(); ---------- | count() | ---------- | 500000 | ---------- 1 row 68 ms
from the existing people:
UNWIND range(1,1000000,2) AS id MATCH (:Person{id:id}) RETURN count(*); ... didn't finish after several minutes ...
But we have no luck, re-running this with a smaller number (100), shows us the query plan and the associated costs.
UNWIND range(1,1000000,10000) AS id MATCH (:Person{id:id}) RETURN count(*); +----------+ | count(*) | +----------+ | 100 | +----------+ 1 row 72957 ms Compiler CYPHER 3.0 Planner COST Runtime INTERPRETED +--------------------+----------------+-----------+-----------+----------------+-------------------+ | Operator | Estimated Rows | Rows | DB Hits | Variables | Other | +--------------------+----------------+-----------+-----------+----------------+-------------------+ | +ProduceResults | 1000 | 1 | 0 | count(*) | count(*) | | | +----------------+-----------+-----------+----------------+-------------------+ | +EagerAggregation | 1000 | 1 | 0 | count(*) | | | | +----------------+-----------+-----------+----------------+-------------------+ | +Filter | 1000000 | 100 | 100000000 | anon[44], id | anon[44].id == id | | | +----------------+-----------+-----------+----------------+-------------------+ | +Apply | 1000000 | 100000000 | 0 | id -- anon[44] | | | |\ +----------------+-----------+-----------+----------------+-------------------+ | | +NodeByLabelScan | 10000000 | 100000000 | 100000100 | anon[44] | :Person | | | +----------------+-----------+-----------+----------------+-------------------+ | +Unwind | 10 | 100 | 0 | id | | | | +----------------+-----------+-----------+----------------+-------------------+ | +EmptyRow | 1 | 1 | 0 | | | +--------------------+----------------+-----------+-----------+----------------+-------------------+ Total database accesses: 200000100
create constraint on (p:Person) assert p.id is unique;
schema await schema sample -a unwind range(1,1000000,2) as id match (:Person{id:id}) return count(); unwind range(1,1000000,2) as id match (:Person{id:id}) return count(); /* ---------- | count(*) | ---------- | 500000 | ---------- 1 row 7450 ms */
drop constraint on (p:Person) assert p.id is unique;
create index on :Person(id); schema await schema sample -a unwind range(1,1000000,2) as id match (:Person{id:id}) return count(); unwind range(1,1000000,2) as id match (:Person{id:id}) return count(); /* ---------- | count(*) | ---------- | 500000 | ---------- 1 row 7132 ms */
drop index on :Person(id);
match (p:Person) with collect([toString(p.id),p]) as pairs call apoc.map.fromPairs(pairs) yield value as index unwind range(1,1000000,2) as id with index[toString(id)] as n return count(); ---------- | count() | ---------- | 500000 | ---------- 1 row 4687 ms
match (p:Person) with collect(p) as people call apoc.map.groupBy(people,'id') yield value as index unwind range(1,1000000,2) as id with index[toString(id)] as n return count(); ---------- | count() | ---------- | 500000 | ---------- 1 row 3115 ms
with range(1,1000000,2) as ids match (p:Person) where p.id IN ids with collect(p) as people call apoc.map.groupBy(people,'id') yield value as index unwind range(1,1000000,2) as id with index[toString(id)] as n return count(); ---------- | count() | ---------- | 500000 | ---------- 1 row 2344 ms
with [id IN range(1,1000000,2) | {id:id}] as rows call apoc.map.groupBy(rows,'id') yield value as rowById with rowById,[id IN keys(rowById) | toInt(id)] as ids match (p:Person) where p.id IN ids with rowById[toString(p.id)] as row return count(*);
with [id IN range(1,1000000,2) | {id:toString(id)}] as rows call apoc.map.groupBy(rows,'id') yield value as rowById with rowById,keys(rowById) as ids match (p:Person) where p.id IN ids with rowById[p.id] as row return count(); ---------- | count() | ---------- | 500000 | ---------- 1 row 4746 ms
with [id IN range(1,1000000,2) | {id:toString(id)}] as rows call apoc.map.groupBy(rows,'id') yield value as rowById match (p:Person) where p.id IN keys(rowById) with rowById[p.id] as row return count(*);
profile with range(1,1000) as ids match (p:Person) where p.id2 IN ids return count(*);
profile match (p:Person) where p.id2 IN range(1,1000) return count(*);
with [id IN range(1,1000000,2) | {id:toString(id)}] as rows call apoc.map.groupBy(rows,'id') yield value as rowById return count(*);
unwind range(1,1000000,2) as id with collect(id) as ids match (p:Person) where p.id IN ids return count(*);
with range(1,1000000,2) as ids match (p:Person) where p.id IN ids return count(); ---------- | count() | ---------- | 500000 | ---------- 1 row 1631 ms
with range(1,1000000,2) as ids match (p:Person) where p.id IN ids with collect([toString(p.id),p]) as pairs call apoc.map.fromPairs(pairs) yield value as index unwind range(1,1000000,2) as id with index[toString(id)] as n return count(); ---------- | count() | ---------- | 500000 | ---------- 1 row 3563 ms
with collect([toString(p.id),p]) as pairs call apoc.map.fromPairs(pairs) yield value as index unwind range(1,1000000,2) as id with index[toString(id)] as n return count(*);
unwind range(1,1000000,2) as id with collect(id) as ids match (p:Person) where p.id IN ids return count(); ---------- | count() | ---------- | 500000 | ---------- 1 row 1660 ms
load csv from "salaries.csv" as row match (p:Person) where p.id = row.id set p.salary = toFloat(row.salary)
rewrite to
load csv from "salaries.csv" as row with collect(distinct row.id) as ids, collect(row) as rows match (p:Person) where p.id IN ids WITH collect(p) as people, rows // this aggreation is probably the only issue UNWIND rows as row WITH head([p in people where p.id = row.id]) as p // and perhaps this "lookup" SET p.salary = row.salary;
set p.salary = toFloat(row.salary)
load csv from "salaries.csv" as row with collect(row) as rows call apoc.map.groupBy(rows,'id') yield value as rowById match (p:Person) where p.id IN keys(rowById) set p.salary = rowById[toString(p.id)].salary
WITH ["Andres","Eve","Rik","Mark","Sophia","Praveena","Michael","Stefan","Max","Zhen"] AS names | |
UNWIND names as name | |
call apoc.create.vNode(["Person"],{name:name}) yield node | |
WITH names, size(names) as len, apoc.map.groupBy(collect(node),"name") as nodes | |
UNWIND range(1,42) as idx | |
CALL apoc.create.vRelationship(nodes[names[toInt(rand()*len)]],"KNOWS",{},nodes[names[toInt(rand()*len)]]) yield rel | |
RETURN nodes,rel; |