Skip to content

Instantly share code, notes, and snippets.

@alreadytaikeune
Last active April 25, 2018 15:03
Show Gist options
  • Save alreadytaikeune/6be006f0a338502524552a9765e79af6 to your computer and use it in GitHub Desktop.
Save alreadytaikeune/6be006f0a338502524552a9765e79af6 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# Copyright 2018, Anis KHLIF
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import unicode_literals, absolute_import, print_function
import logging
import random
import functools
import copy
import json
from tqdm import tqdm
from uuid import uuid4
from time import time, sleep
import numpy as np
import matplotlib.pyplot as plt
import docker
from py2neo import Graph, Subgraph, Node, Relationship
from py2neo.types import remote
CLIENT = docker.from_env()
GRAPH = None
def run_neo4j():
return CLIENT.containers.run(
"neo4j",
ports={"7474/tcp": 7474, "7687/tcp": 7687},
environment=[
"NEO4J_dbms_memory_pagecache_size=512m",
"NEO4J_dbms_memory_heap_maxSize=512m",
"NEO4J_AUTH=neo4j/test"
],
detach=True,
remove=True,
name="neo4j_benchmark"
)
def make_node(label, uid):
new_node = Node(label, id_field=uid)
new_node.__primarylabel__ = label
new_node.__primarykey__ = "id_field"
return new_node
def make_rel(label, start_node, end_node, **attrs):
r = Relationship(start_node, label, end_node, **attrs)
r.__this_id__ = uuid4()
return r
def make_rels_linear(nodes, rel_labels):
if len(nodes) == 1:
return []
rels = []
for n in nodes:
n2 = random.choice(nodes)
while n2 == n:
n2 = random.choice(nodes)
rel = make_rel(random.choice(rel_labels), n, n2)
rels.append(rel)
return rels
def make_rels_fully_connected(nodes, rel_labels):
if len(nodes) == 1:
return []
rels = []
for i, n in enumerate(nodes):
for n2 in nodes[:i]:
rel = make_rel(random.choice(rel_labels), n, n2)
rels.append(rel)
return rels
def graph_generator(nb_nodes_min, nb_nodes_max, topology="linear",
fraction_bound=0.3, nb_labels=5, nb_rel_labels=5):
nodes_store = {}
rel_store = {}
node_2_rels = {}
labels = ["label_{}".format(i) for i in range(nb_labels)]
rel_labels = ["rel_{}".format(i) for i in range(nb_rel_labels)]
while True:
nnodes = random.randint(nb_nodes_min, nb_nodes_max)
n_known = int(fraction_bound*nnodes)
keys = copy.deepcopy(nodes_store.keys())
random.shuffle(keys)
nodes_reused = keys[:n_known]
all_nodes = []
all_nodes.extend([nodes_store[n] for n in nodes_reused])
while len(all_nodes) < nnodes:
new_node = make_node(random.choice(labels), str(uuid4()))
nodes_store[new_node["id_field"]] = new_node
all_nodes.append(new_node)
if topology == "linear":
rels = make_rels_linear(all_nodes, rel_labels)
elif topology == "isolated":
rels = []
elif topology == "fully_connected":
rels = make_rels_fully_connected(all_nodes, rel_labels)
n = random.randint(0, len(rels)-1)
rels = rels[:n]
yield Subgraph(all_nodes, rels)
def store_query_time(query_fct):
per_node_nb = {}
per_rel_nb = {}
per_sum = {}
@functools.wraps(query_fct)
def __inner__(subgraph):
nn = len(subgraph.nodes())
nr = len(subgraph.relationships())
tot_time, nb_statements = query_fct(subgraph)
per_node_nb.setdefault(nn, [])
per_rel_nb.setdefault(nr, [])
per_sum.setdefault(nn+nr, [])
per_node_nb[nn].append((tot_time, nb_statements))
per_rel_nb[nr].append((tot_time, nb_statements))
per_sum[nn+nr].append((tot_time, nb_statements))
return per_node_nb, per_rel_nb, per_sum, __inner__
def get_nb_create_statements(subgraph):
nodes = list(subgraph.nodes())
statements = {"CREATE_NODE": 0, "MATCH": 0, "CREATE_REL": 0}
for i, node in enumerate(nodes):
if remote(node):
statements["MATCH"] += 1
else:
statements["CREATE_NODE"] += 1
for i, relationship in enumerate(subgraph.relationships()):
if remote(relationship):
statements["CREATE_REL"] += 1
return statements
def get_nb_merge_statements(subgraph):
statements = {"MERGE_NODE": 0, "MATCH": 0, "MERGE_REL": 0, "SET": 0}
nodes = list(subgraph.nodes())
for i, node in enumerate(nodes):
remote_node = remote(node)
if remote_node:
statements["MATCH"] += 1
else:
statements["MERGE_NODE"] += 1
statements["SET"] += 1
for i, relationship in enumerate(subgraph.relationships()):
if not remote(relationship):
statements["MERGE_REL"] += 1
return statements
def transaction_create(subgraph):
global GRAPH
statements = get_nb_create_statements(subgraph)
start = time()
GRAPH.create(subgraph)
return time()-start, statements
def transaction_merge(subgraph):
global GRAPH
statements = get_nb_merge_statements(subgraph)
start = time()
GRAPH.merge(subgraph)
return time()-start, statements
def benchmark(query_fct, node_min=5, node_max=80, nb_rep=20, topology="linear"):
print("======== RUNNING BENCHMARK: {} ========".format(query_fct.__name__))
assert database_empty()
if topology == "fully_connected":
assert node_min == node_max
per_node_nb, per_rel_nb, per_sum_nb, query_fct = store_query_time(
query_fct)
gen = graph_generator(node_min, node_max, topology=topology)
graphs = []
for _ in range(nb_rep):
g = gen.next()
nn = len(g.nodes())
nr = len(g.relationships())
if topology == "linear":
assert nn == nr
elif topology == "isolated":
assert nr == 0
graphs.append(g)
random.shuffle(graphs)
for g in tqdm(graphs):
query_fct(g)
delete_all()
return per_node_nb, per_rel_nb, per_sum_nb
def init_graph():
global GRAPH
while True:
try:
GRAPH = Graph("http://neo4j:test@localhost:7474/db/data")
break
except Exception as e:
print(e)
sleep(2)
def database_empty():
global GRAPH
res = GRAPH.run("MATCH (n) RETURN count(*) as c")
return res.next()["c"] == 0
def delete_all():
print("Deleting all nodes...")
global GRAPH
GRAPH.run("MATCH (n) DETACH DELETE n;")
def create_constraints(nb_labels=5):
print("Creating constraints...")
global GRAPH
labels = ["label_{}".format(i) for i in range(nb_labels)]
for l in labels:
GRAPH.schema.create_uniqueness_constraint(l, "id_field")
def drop_constraints(nb_labels=5):
print("Dropping all constraints...")
global GRAPH
labels = ["label_{}".format(i) for i in range(nb_labels)]
for l in labels:
GRAPH.schema.drop_uniqueness_constraint(l, "id_field")
class CypherFormatter(logging.Formatter):
def format(self, record):
msg = record.getMessage()
if "RUN" not in msg:
return ""
msg = msg[len("C: RUN u'"):]
try:
c = msg.index("LIMIT 1'")
except ValueError:
return ""
part1 = msg[:c+len("LIMIT 1")]
part2 = msg[c+len("LIMIT 1'"):]
dc = json.loads(part2.strip().replace("u'", "'").replace("'", '"'))
for k in dc:
v = [k]
if isinstance(v, dict):
v = json.dumps(v).replace("u'", "'")
part1 = "\n".join(part1.split("\\n"))
return part1.format(**dc).replace("u'", "'")
def run_benchmark_linear(nb_rep=50):
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
print(" RUNNING LINEAR BENCHMARK ")
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
results = []
results.append(benchmark(transaction_create, node_min=5, node_max=80,
nb_rep=nb_rep, topology="linear"))
create_constraints()
results.append(benchmark(transaction_create, node_min=5, node_max=80,
nb_rep=nb_rep, topology="linear"))
drop_constraints()
results.append(benchmark(transaction_merge, node_min=5, node_max=80,
nb_rep=nb_rep, topology="linear"))
create_constraints()
results.append(benchmark(transaction_merge, node_min=5, node_max=80,
nb_rep=nb_rep, topology="linear"))
drop_constraints()
return results
def run_benchmark_fully_connected(nb_rep=50):
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
print(" RUNNING FULLY CONNECTED BENCHMARK ")
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
results = []
results.append(benchmark(transaction_create, node_min=15, node_max=15,
nb_rep=nb_rep, topology="fully_connected"))
create_constraints()
results.append(benchmark(transaction_create, node_min=15, node_max=15,
nb_rep=nb_rep, topology="fully_connected"))
drop_constraints()
results.append(benchmark(transaction_merge, node_min=15, node_max=15,
nb_rep=nb_rep, topology="fully_connected"))
create_constraints()
results.append(benchmark(transaction_merge, node_min=15, node_max=15,
nb_rep=nb_rep, topology="fully_connected"))
drop_constraints()
return results
def run_benchmark_isolated(nb_rep=50):
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
print(" RUNNING ISOLATED BENCHMARK ")
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
results = []
results.append(benchmark(transaction_create, node_min=5, node_max=80,
nb_rep=nb_rep, topology="isolated"))
create_constraints()
results.append(benchmark(transaction_create, node_min=5, node_max=80,
nb_rep=nb_rep, topology="isolated"))
drop_constraints()
results.append(benchmark(transaction_merge, node_min=5, node_max=80,
nb_rep=nb_rep, topology="isolated"))
create_constraints()
results.append(benchmark(transaction_merge, node_min=5, node_max=80,
nb_rep=nb_rep, topology="isolated"))
drop_constraints()
return results
def setup_axis(what="nodes"):
plt.xlabel("Number of {}".format(what))
plt.ylabel("Average query time (seconds)")
plt.yscale("linear")
def plot_results_linear(results, colors):
labels = ["no constraint", "unique constraint"]
plt.title("Query times for create operation in linear topology")
fig, ax = plt.subplots()
setup_axis()
for i, (per_node_nb, per_rel_nb, per_sum_nb) in enumerate(results[:2]):
x, y = zip(*[(nn, np.mean([t[0] for t in ts])) for nn, ts in per_node_nb.iteritems()])
ax.scatter(x, y, c=colors[i],
label=labels[i])
ax.legend()
ax.grid(True)
plt.savefig("results/linear_create_nodes.png")
plt.title("Query times for merge operation in linear topology")
fig, ax = plt.subplots()
setup_axis()
for i, (per_node_nb, per_rel_nb, per_sum_nb) in enumerate(results[2:]):
x, y = zip(*[(nn, np.mean([t[0] for t in ts])) for nn, ts in per_node_nb.iteritems()])
ax.scatter(x, y, c=colors[i],
label=labels[i])
ax.legend(scatterpoints=1)
ax.grid(True)
plt.savefig("results/linear_merge_nodes.png")
def plot_results_isolated(results, colors):
labels = ["no constraint", "unique constraint"]
plt.title("Query times for create operation in isolated topology")
fig, ax = plt.subplots()
plt.xlabel("Number of nodes")
plt.ylabel("Average query time (seconds)")
plt.yscale("linear")
for i, (per_node_nb, per_rel_nb, per_sum_nb) in enumerate(results[:2]):
x, y = zip(*[(nn, np.mean([t[0] for t in ts])) for nn, ts in per_node_nb.iteritems()])
ax.scatter(x, y, c=colors[i], label=labels[i])
ax.legend(scatterpoints=1)
ax.grid(True)
plt.savefig("results/isolated_create_nodes.png")
plt.title("Query times for merge operation in isolated topology")
fig, ax = plt.subplots()
plt.xlabel("Number of nodes")
plt.ylabel("Average query time (seconds)")
plt.yscale("linear")
for i, (per_node_nb, per_rel_nb, per_sum_nb) in enumerate(results[2:]):
x, y = zip(*[(nn, np.mean([t[0] for t in ts])) for nn, ts in per_node_nb.iteritems()])
ax.scatter(x, y, c=colors[i], label=labels[i])
ax.legend(scatterpoints=1)
ax.grid(True)
plt.savefig("results/isolated_merge_nodes.png")
def plot_results_fully_connected(results, colors):
labels = ["no constraint", "unique constraint"]
plt.title("Query times for create operation in more or less connected topology with 15 nodes")
fig, ax = plt.subplots()
setup_axis("relationships")
for i, (per_node_nb, per_rel_nb, per_sum_nb) in enumerate(results[:2]):
x, y = zip(*[(nn, np.mean([t[0] for t in ts])) for nn, ts in per_rel_nb.iteritems()])
ax.scatter(x, y, c=colors[i],
label=labels[i])
ax.legend(scatterpoints=1)
ax.grid(True)
plt.savefig("results/fc_create_relationships.png")
plt.title("Query times for merge operation in more or less connected topology with 15 nodes")
fig, ax = plt.subplots()
setup_axis("relationships")
for i, (per_node_nb, per_rel_nb, per_sum_nb) in enumerate(results[2:]):
x, y = zip(*[(nn, np.mean([t[0] for t in ts])) for nn, ts in per_rel_nb.iteritems()])
ax.scatter(x, y, c=colors[i],
label=labels[i])
ax.legend(scatterpoints=1)
ax.grid(True)
plt.savefig("results/fc_merge_relationships.png")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
bolt_logger = logging.getLogger("neo4j.bolt")
bolt_logger.handlers = []
fh = logging.FileHandler("queries", mode="w")
fh.setLevel(logging.INFO)
fh.setFormatter(CypherFormatter())
bolt_logger.addHandler(fh)
print("Running neo4j...")
container = run_neo4j()
print("Getting a graph...")
init_graph()
print("Starting benchmark...")
results = {}
colors = ["blue", "red", "green", "black"]
results["linear"] = run_benchmark_linear()
plot_results_linear(results["linear"], colors)
results["isolated"] = run_benchmark_isolated()
plot_results_isolated(results["isolated"], colors)
results["fully_connected"] = run_benchmark_fully_connected()
plot_results_fully_connected(results["fully_connected"], colors)
container.kill()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment