Last active
April 25, 2018 15:03
-
-
Save alreadytaikeune/6be006f0a338502524552a9765e79af6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- encoding: utf-8 -*- | |
# Copyright 2018, Anis KHLIF | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
from __future__ import unicode_literals, absolute_import, print_function | |
import logging | |
import random | |
import functools | |
import copy | |
import json | |
from tqdm import tqdm | |
from uuid import uuid4 | |
from time import time, sleep | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import docker | |
from py2neo import Graph, Subgraph, Node, Relationship | |
from py2neo.types import remote | |
CLIENT = docker.from_env() | |
GRAPH = None | |
def run_neo4j(): | |
return CLIENT.containers.run( | |
"neo4j", | |
ports={"7474/tcp": 7474, "7687/tcp": 7687}, | |
environment=[ | |
"NEO4J_dbms_memory_pagecache_size=512m", | |
"NEO4J_dbms_memory_heap_maxSize=512m", | |
"NEO4J_AUTH=neo4j/test" | |
], | |
detach=True, | |
remove=True, | |
name="neo4j_benchmark" | |
) | |
def make_node(label, uid): | |
new_node = Node(label, id_field=uid) | |
new_node.__primarylabel__ = label | |
new_node.__primarykey__ = "id_field" | |
return new_node | |
def make_rel(label, start_node, end_node, **attrs): | |
r = Relationship(start_node, label, end_node, **attrs) | |
r.__this_id__ = uuid4() | |
return r | |
def make_rels_linear(nodes, rel_labels): | |
if len(nodes) == 1: | |
return [] | |
rels = [] | |
for n in nodes: | |
n2 = random.choice(nodes) | |
while n2 == n: | |
n2 = random.choice(nodes) | |
rel = make_rel(random.choice(rel_labels), n, n2) | |
rels.append(rel) | |
return rels | |
def make_rels_fully_connected(nodes, rel_labels): | |
if len(nodes) == 1: | |
return [] | |
rels = [] | |
for i, n in enumerate(nodes): | |
for n2 in nodes[:i]: | |
rel = make_rel(random.choice(rel_labels), n, n2) | |
rels.append(rel) | |
return rels | |
def graph_generator(nb_nodes_min, nb_nodes_max, topology="linear", | |
fraction_bound=0.3, nb_labels=5, nb_rel_labels=5): | |
nodes_store = {} | |
rel_store = {} | |
node_2_rels = {} | |
labels = ["label_{}".format(i) for i in range(nb_labels)] | |
rel_labels = ["rel_{}".format(i) for i in range(nb_rel_labels)] | |
while True: | |
nnodes = random.randint(nb_nodes_min, nb_nodes_max) | |
n_known = int(fraction_bound*nnodes) | |
keys = copy.deepcopy(nodes_store.keys()) | |
random.shuffle(keys) | |
nodes_reused = keys[:n_known] | |
all_nodes = [] | |
all_nodes.extend([nodes_store[n] for n in nodes_reused]) | |
while len(all_nodes) < nnodes: | |
new_node = make_node(random.choice(labels), str(uuid4())) | |
nodes_store[new_node["id_field"]] = new_node | |
all_nodes.append(new_node) | |
if topology == "linear": | |
rels = make_rels_linear(all_nodes, rel_labels) | |
elif topology == "isolated": | |
rels = [] | |
elif topology == "fully_connected": | |
rels = make_rels_fully_connected(all_nodes, rel_labels) | |
n = random.randint(0, len(rels)-1) | |
rels = rels[:n] | |
yield Subgraph(all_nodes, rels) | |
def store_query_time(query_fct): | |
per_node_nb = {} | |
per_rel_nb = {} | |
per_sum = {} | |
@functools.wraps(query_fct) | |
def __inner__(subgraph): | |
nn = len(subgraph.nodes()) | |
nr = len(subgraph.relationships()) | |
tot_time, nb_statements = query_fct(subgraph) | |
per_node_nb.setdefault(nn, []) | |
per_rel_nb.setdefault(nr, []) | |
per_sum.setdefault(nn+nr, []) | |
per_node_nb[nn].append((tot_time, nb_statements)) | |
per_rel_nb[nr].append((tot_time, nb_statements)) | |
per_sum[nn+nr].append((tot_time, nb_statements)) | |
return per_node_nb, per_rel_nb, per_sum, __inner__ | |
def get_nb_create_statements(subgraph): | |
nodes = list(subgraph.nodes()) | |
statements = {"CREATE_NODE": 0, "MATCH": 0, "CREATE_REL": 0} | |
for i, node in enumerate(nodes): | |
if remote(node): | |
statements["MATCH"] += 1 | |
else: | |
statements["CREATE_NODE"] += 1 | |
for i, relationship in enumerate(subgraph.relationships()): | |
if remote(relationship): | |
statements["CREATE_REL"] += 1 | |
return statements | |
def get_nb_merge_statements(subgraph): | |
statements = {"MERGE_NODE": 0, "MATCH": 0, "MERGE_REL": 0, "SET": 0} | |
nodes = list(subgraph.nodes()) | |
for i, node in enumerate(nodes): | |
remote_node = remote(node) | |
if remote_node: | |
statements["MATCH"] += 1 | |
else: | |
statements["MERGE_NODE"] += 1 | |
statements["SET"] += 1 | |
for i, relationship in enumerate(subgraph.relationships()): | |
if not remote(relationship): | |
statements["MERGE_REL"] += 1 | |
return statements | |
def transaction_create(subgraph): | |
global GRAPH | |
statements = get_nb_create_statements(subgraph) | |
start = time() | |
GRAPH.create(subgraph) | |
return time()-start, statements | |
def transaction_merge(subgraph): | |
global GRAPH | |
statements = get_nb_merge_statements(subgraph) | |
start = time() | |
GRAPH.merge(subgraph) | |
return time()-start, statements | |
def benchmark(query_fct, node_min=5, node_max=80, nb_rep=20, topology="linear"): | |
print("======== RUNNING BENCHMARK: {} ========".format(query_fct.__name__)) | |
assert database_empty() | |
if topology == "fully_connected": | |
assert node_min == node_max | |
per_node_nb, per_rel_nb, per_sum_nb, query_fct = store_query_time( | |
query_fct) | |
gen = graph_generator(node_min, node_max, topology=topology) | |
graphs = [] | |
for _ in range(nb_rep): | |
g = gen.next() | |
nn = len(g.nodes()) | |
nr = len(g.relationships()) | |
if topology == "linear": | |
assert nn == nr | |
elif topology == "isolated": | |
assert nr == 0 | |
graphs.append(g) | |
random.shuffle(graphs) | |
for g in tqdm(graphs): | |
query_fct(g) | |
delete_all() | |
return per_node_nb, per_rel_nb, per_sum_nb | |
def init_graph(): | |
global GRAPH | |
while True: | |
try: | |
GRAPH = Graph("http://neo4j:test@localhost:7474/db/data") | |
break | |
except Exception as e: | |
print(e) | |
sleep(2) | |
def database_empty(): | |
global GRAPH | |
res = GRAPH.run("MATCH (n) RETURN count(*) as c") | |
return res.next()["c"] == 0 | |
def delete_all(): | |
print("Deleting all nodes...") | |
global GRAPH | |
GRAPH.run("MATCH (n) DETACH DELETE n;") | |
def create_constraints(nb_labels=5): | |
print("Creating constraints...") | |
global GRAPH | |
labels = ["label_{}".format(i) for i in range(nb_labels)] | |
for l in labels: | |
GRAPH.schema.create_uniqueness_constraint(l, "id_field") | |
def drop_constraints(nb_labels=5): | |
print("Dropping all constraints...") | |
global GRAPH | |
labels = ["label_{}".format(i) for i in range(nb_labels)] | |
for l in labels: | |
GRAPH.schema.drop_uniqueness_constraint(l, "id_field") | |
class CypherFormatter(logging.Formatter): | |
def format(self, record): | |
msg = record.getMessage() | |
if "RUN" not in msg: | |
return "" | |
msg = msg[len("C: RUN u'"):] | |
try: | |
c = msg.index("LIMIT 1'") | |
except ValueError: | |
return "" | |
part1 = msg[:c+len("LIMIT 1")] | |
part2 = msg[c+len("LIMIT 1'"):] | |
dc = json.loads(part2.strip().replace("u'", "'").replace("'", '"')) | |
for k in dc: | |
v = [k] | |
if isinstance(v, dict): | |
v = json.dumps(v).replace("u'", "'") | |
part1 = "\n".join(part1.split("\\n")) | |
return part1.format(**dc).replace("u'", "'") | |
def run_benchmark_linear(nb_rep=50): | |
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") | |
print(" RUNNING LINEAR BENCHMARK ") | |
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") | |
results = [] | |
results.append(benchmark(transaction_create, node_min=5, node_max=80, | |
nb_rep=nb_rep, topology="linear")) | |
create_constraints() | |
results.append(benchmark(transaction_create, node_min=5, node_max=80, | |
nb_rep=nb_rep, topology="linear")) | |
drop_constraints() | |
results.append(benchmark(transaction_merge, node_min=5, node_max=80, | |
nb_rep=nb_rep, topology="linear")) | |
create_constraints() | |
results.append(benchmark(transaction_merge, node_min=5, node_max=80, | |
nb_rep=nb_rep, topology="linear")) | |
drop_constraints() | |
return results | |
def run_benchmark_fully_connected(nb_rep=50): | |
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") | |
print(" RUNNING FULLY CONNECTED BENCHMARK ") | |
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") | |
results = [] | |
results.append(benchmark(transaction_create, node_min=15, node_max=15, | |
nb_rep=nb_rep, topology="fully_connected")) | |
create_constraints() | |
results.append(benchmark(transaction_create, node_min=15, node_max=15, | |
nb_rep=nb_rep, topology="fully_connected")) | |
drop_constraints() | |
results.append(benchmark(transaction_merge, node_min=15, node_max=15, | |
nb_rep=nb_rep, topology="fully_connected")) | |
create_constraints() | |
results.append(benchmark(transaction_merge, node_min=15, node_max=15, | |
nb_rep=nb_rep, topology="fully_connected")) | |
drop_constraints() | |
return results | |
def run_benchmark_isolated(nb_rep=50): | |
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") | |
print(" RUNNING ISOLATED BENCHMARK ") | |
print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~") | |
results = [] | |
results.append(benchmark(transaction_create, node_min=5, node_max=80, | |
nb_rep=nb_rep, topology="isolated")) | |
create_constraints() | |
results.append(benchmark(transaction_create, node_min=5, node_max=80, | |
nb_rep=nb_rep, topology="isolated")) | |
drop_constraints() | |
results.append(benchmark(transaction_merge, node_min=5, node_max=80, | |
nb_rep=nb_rep, topology="isolated")) | |
create_constraints() | |
results.append(benchmark(transaction_merge, node_min=5, node_max=80, | |
nb_rep=nb_rep, topology="isolated")) | |
drop_constraints() | |
return results | |
def setup_axis(what="nodes"): | |
plt.xlabel("Number of {}".format(what)) | |
plt.ylabel("Average query time (seconds)") | |
plt.yscale("linear") | |
def plot_results_linear(results, colors): | |
labels = ["no constraint", "unique constraint"] | |
plt.title("Query times for create operation in linear topology") | |
fig, ax = plt.subplots() | |
setup_axis() | |
for i, (per_node_nb, per_rel_nb, per_sum_nb) in enumerate(results[:2]): | |
x, y = zip(*[(nn, np.mean([t[0] for t in ts])) for nn, ts in per_node_nb.iteritems()]) | |
ax.scatter(x, y, c=colors[i], | |
label=labels[i]) | |
ax.legend() | |
ax.grid(True) | |
plt.savefig("results/linear_create_nodes.png") | |
plt.title("Query times for merge operation in linear topology") | |
fig, ax = plt.subplots() | |
setup_axis() | |
for i, (per_node_nb, per_rel_nb, per_sum_nb) in enumerate(results[2:]): | |
x, y = zip(*[(nn, np.mean([t[0] for t in ts])) for nn, ts in per_node_nb.iteritems()]) | |
ax.scatter(x, y, c=colors[i], | |
label=labels[i]) | |
ax.legend(scatterpoints=1) | |
ax.grid(True) | |
plt.savefig("results/linear_merge_nodes.png") | |
def plot_results_isolated(results, colors): | |
labels = ["no constraint", "unique constraint"] | |
plt.title("Query times for create operation in isolated topology") | |
fig, ax = plt.subplots() | |
plt.xlabel("Number of nodes") | |
plt.ylabel("Average query time (seconds)") | |
plt.yscale("linear") | |
for i, (per_node_nb, per_rel_nb, per_sum_nb) in enumerate(results[:2]): | |
x, y = zip(*[(nn, np.mean([t[0] for t in ts])) for nn, ts in per_node_nb.iteritems()]) | |
ax.scatter(x, y, c=colors[i], label=labels[i]) | |
ax.legend(scatterpoints=1) | |
ax.grid(True) | |
plt.savefig("results/isolated_create_nodes.png") | |
plt.title("Query times for merge operation in isolated topology") | |
fig, ax = plt.subplots() | |
plt.xlabel("Number of nodes") | |
plt.ylabel("Average query time (seconds)") | |
plt.yscale("linear") | |
for i, (per_node_nb, per_rel_nb, per_sum_nb) in enumerate(results[2:]): | |
x, y = zip(*[(nn, np.mean([t[0] for t in ts])) for nn, ts in per_node_nb.iteritems()]) | |
ax.scatter(x, y, c=colors[i], label=labels[i]) | |
ax.legend(scatterpoints=1) | |
ax.grid(True) | |
plt.savefig("results/isolated_merge_nodes.png") | |
def plot_results_fully_connected(results, colors): | |
labels = ["no constraint", "unique constraint"] | |
plt.title("Query times for create operation in more or less connected topology with 15 nodes") | |
fig, ax = plt.subplots() | |
setup_axis("relationships") | |
for i, (per_node_nb, per_rel_nb, per_sum_nb) in enumerate(results[:2]): | |
x, y = zip(*[(nn, np.mean([t[0] for t in ts])) for nn, ts in per_rel_nb.iteritems()]) | |
ax.scatter(x, y, c=colors[i], | |
label=labels[i]) | |
ax.legend(scatterpoints=1) | |
ax.grid(True) | |
plt.savefig("results/fc_create_relationships.png") | |
plt.title("Query times for merge operation in more or less connected topology with 15 nodes") | |
fig, ax = plt.subplots() | |
setup_axis("relationships") | |
for i, (per_node_nb, per_rel_nb, per_sum_nb) in enumerate(results[2:]): | |
x, y = zip(*[(nn, np.mean([t[0] for t in ts])) for nn, ts in per_rel_nb.iteritems()]) | |
ax.scatter(x, y, c=colors[i], | |
label=labels[i]) | |
ax.legend(scatterpoints=1) | |
ax.grid(True) | |
plt.savefig("results/fc_merge_relationships.png") | |
if __name__ == "__main__": | |
logging.basicConfig(level=logging.INFO) | |
bolt_logger = logging.getLogger("neo4j.bolt") | |
bolt_logger.handlers = [] | |
fh = logging.FileHandler("queries", mode="w") | |
fh.setLevel(logging.INFO) | |
fh.setFormatter(CypherFormatter()) | |
bolt_logger.addHandler(fh) | |
print("Running neo4j...") | |
container = run_neo4j() | |
print("Getting a graph...") | |
init_graph() | |
print("Starting benchmark...") | |
results = {} | |
colors = ["blue", "red", "green", "black"] | |
results["linear"] = run_benchmark_linear() | |
plot_results_linear(results["linear"], colors) | |
results["isolated"] = run_benchmark_isolated() | |
plot_results_isolated(results["isolated"], colors) | |
results["fully_connected"] = run_benchmark_fully_connected() | |
plot_results_fully_connected(results["fully_connected"], colors) | |
container.kill() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment