Skip to content

Instantly share code, notes, and snippets.

@beltran
Last active November 14, 2017 12:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save beltran/3feac7b3c08929653b93b2d4ff793996 to your computer and use it in GitHub Desktop.
Save beltran/3feac7b3c08929653b93b2d4ff793996 to your computer and use it in GitHub Desktop.
from cassandra.cluster import Cluster
from cassandra.concurrent import execute_concurrent, query_by_keys
from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy
from itertools import repeat, cycle
from collections import defaultdict
import time
import six
from bokeh.plotting import figure, output_file, show
cluster = Cluster(load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy(), shuffle_replicas=False))
session = cluster.connect()
KEYSPACE = "query_per_range"
TIMES = 1
ROW_SIZE_IN_KB = 10
NUMBER_OF_BYTES = int(ROW_SIZE_IN_KB * 1024)
def create_schema():
session.execute("""
CREATE KEYSPACE IF NOT EXISTS %s
WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' }
""" % KEYSPACE)
session.execute("""
CREATE TABLE IF NOT EXISTS query_per_range.one_single_key (
key_one text,
col1 text,
col2 text,
col3 text,
col4 text,
col5 blob,
PRIMARY KEY (key_one, col1)
)
""")
def load_some_data():
statements_and_params = zip(cycle(["INSERT INTO query_per_range.one_single_key(key_one, col1, col2, col3, col4, col5) VALUES (%s, %s, %s, %s, %s, %s)"]),
[(str(i), str(i), str(i), str(i), str(i), b'x'* NUMBER_OF_BYTES) for i in range(20000)])
execute_concurrent(session, list(statements_and_params))
def random_query_by_keys(session, keyspace, table, select_fields, keys):
select_query = "SELECT " + ",".join(select_fields) + " FROM {}.{} WHERE ".format(keyspace, table)
cluster = session.cluster
partition_keys = cluster.metadata.keyspaces[keyspace].tables[table].partition_key
partition_key_name = partition_keys[0].name
no_valid_replica = object()
keys_per_host = defaultdict(list)
keys_per_int = defaultdict(list)
all_hosts = cluster.metadata.all_hosts()
for i, key in enumerate(keys):
keys_per_int[i % len(all_hosts)].append(key)
for i, host in enumerate(all_hosts):
keys_per_host[host] = keys_per_int[i]
response_futures = []
for host, keys_in_host in six.iteritems(keys_per_host):
primary_keys_query = partition_key_name + " IN "
params_query = "(" + ",".join(["%s"] * len(keys_in_host)) + ")"
statement = select_query + primary_keys_query + params_query
response_future = session._create_response_future(statement, keys_in_host, trace=False,
custom_payload=None, timeout=session.default_timeout)
if host is no_valid_replica:
response_future.send_request()
else:
response_future._query(host)
response_futures.append(response_future)
for response_future in response_futures:
results = response_future.result()
for row in results:
yield row
x_axis = (10, 100, 1000, 2000, 4000, 8000, 16000, 20000)
def generate_results_query_by_keys(TIMES=TIMES):
y_axis = []
for num_rows in x_axis:
keys = list(str(i) for i in range(num_rows))
this_set = set()
before = time.time()
for _ in range(TIMES):
results = query_by_keys(session,
"query_per_range", "one_single_key", ["key_one", "col1", "col2"], keys)
for row in results:
this_set.add(row[0])
after = time.time()
print("Time elpased: {}".format(after - before))
print("len(this_set): {}".format(len(this_set)))
y_axis.append((after - before) / TIMES)
assert len(this_set) == num_rows
return x_axis, y_axis, "replica_in_clauses"
def generate_results_random_in_clauses(TIMES=TIMES):
y_axis = []
for num_rows in x_axis:
keys = list(str(i) for i in range(num_rows))
this_set = set()
before = time.time()
for _ in range(TIMES):
results = random_query_by_keys(session,
"query_per_range", "one_single_key", ["key_one", "col1", "col2"], keys)
for row in results:
this_set.add(row[0])
after = time.time()
print("Time elpased: {}".format(after - before))
print("len(this_set): {}".format(len(this_set)))
y_axis.append((after - before) / TIMES)
assert len(this_set) == num_rows
return x_axis, y_axis, "random_in_clauses"
def generate_results_execute_concurrent(TIMES=TIMES):
prepared = session.prepare("""SELECT key_one,col1,col2 FROM query_per_range.one_single_key WHERE key_one = ?""")
y_axis = []
for num_rows in x_axis:
keys = list((str(i),) for i in range(num_rows))
new_set = set()
before = time.time()
for _ in range(TIMES):
statements_and_params = zip(repeat(prepared), keys)
results = execute_concurrent(session, statements_and_params)
for execution_result in results:
if execution_result.success:
for row in execution_result.result_or_exc:
new_set.add(row[0])
after = time.time()
print("Time elpased: {}".format(after - before))
print("len(new_set): {}".format(len(new_set)))
y_axis.append((after - before) / TIMES)
assert len(new_set) == num_rows
return x_axis, y_axis, "execute_concurrent"
create_schema()
load_some_data()
output_file("comparison.html")
p = figure(title="execut_concurrent vs IN clause", x_axis_label='Number of keys', y_axis_label='Seconds')
p.legend.location = "top_left"
x, y, tag = generate_results_query_by_keys()
p.circle(x, y, legend=tag, fill_color="red", line_color="red", size=6)
p.line(x, y, legend=tag, line_color="red")
x, y, tag = generate_results_execute_concurrent()
#x, y, tag = generate_results_random_in_clauses()
p.circle(x, y, legend=tag, fill_color="blue", line_color="blue")
p.line(x, y, legend=tag, line_color="blue")
show(p)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment