Last active
June 30, 2020 18:22
-
-
Save sontek/af82d281e618074ca8a6 to your computer and use it in GitHub Desktop.
Cassandra benchmark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import uuid | |
import yappi | |
import timeit | |
import logging | |
import random | |
from threading import Event | |
from cassandra import ConsistencyLevel | |
from cassandra.cluster import Cluster | |
from cassandra.query import tuple_factory | |
from cassandra.query import BatchStatement | |
YAPPI = False | |
ROW_COUNT = 2000000 | |
RUN_CREATES = False | |
RUN_INSERTS = False | |
logging.basicConfig() | |
nodes = ['127.0.0.01'] | |
c = Cluster(nodes, protocol_version=3) | |
s = c.connect() | |
s.row_factory = tuple_factory | |
create_scripts = [ | |
"DROP KEYSPACE IF EXISTS test_perf", | |
"CREATE KEYSPACE test_perf WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};", | |
"USE test_perf;", | |
"""CREATE TABLE int_table ( | |
primary_id bigint, | |
part_id bigint, | |
id1 bigint, | |
id2 bigint, | |
id3 bigint, | |
id4 bigint, | |
id5 bigint, | |
id6 bigint, | |
id7 bigint, | |
id8 bigint, | |
PRIMARY KEY((primary_id, part_id), | |
id1, id2, id3, id4) | |
);""" | |
] | |
class PagedResultHandler(object): | |
def __init__(self, future): | |
self.error = None | |
self.finished_event = Event() | |
self.future = future | |
self.future.add_callbacks( | |
callback=self.handle_page, | |
errback=self.handle_error) | |
self.rows = [] | |
def handle_page(self, rows): | |
self.rows += rows | |
if self.future.has_more_pages: | |
self.future.start_fetching_next_page() | |
else: | |
self.finished_event.set() | |
def handle_error(self, exc): | |
self.error = exc | |
self.finished_event.set() | |
def run_int_query(): | |
futures = [] | |
for i in range(0, 24): | |
prepared = s.prepare(""" | |
SELECT * | |
FROM int_table | |
WHERE primary_id = ? | |
AND part_id = ? | |
""") | |
# only pull primary 2 out of the db | |
future = s.execute_async(prepared, [2, i]) | |
handler = PagedResultHandler(future) | |
futures.append(handler) | |
rows = [] | |
for future in futures: | |
future.finished_event.wait() | |
rows += future.rows | |
print('Amount of int rows: %s' % len(rows)) | |
#run() | |
#for i in range(30): | |
# print(timeit.timeit('run()', setup='from __main__ import run', number=1)) | |
def rid(start=10000000, end=20000000): | |
return random.randint(start, end) | |
def run_int_inserts(): | |
insert_data = s.prepare(""" | |
INSERT INTO int_table (primary_id, part_id, id1, id2, id3, id4, id5, id6, id7, id8) | |
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""") | |
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM) | |
for i in range(1, ROW_COUNT): | |
if i % 10000 == 0: | |
s.execute(batch) | |
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM) | |
else: | |
primary_id = rid(start=0, end=4) | |
part_id = rid(start=0, end=24) | |
batch.add( | |
insert_data, | |
(primary_id, part_id, rid(), rid(), rid(), rid(), rid(), rid(), rid(), rid()) | |
) | |
if __name__ == '__main__': | |
if RUN_CREATES: | |
for script in create_scripts: | |
s.execute(script) | |
s.set_keyspace('test_perf') | |
if RUN_INSERTS: | |
run_int_inserts() | |
if YAPPI: | |
yappi.set_clock_type('cpu') | |
yappi.start(builtins=True) | |
start = time.time() | |
run_int_query() | |
end = time.time() | |
duration = int(1000 * (end - start)) | |
print("int query took %sms" % duration) | |
if YAPPI: | |
stats = yappi.get_func_stats() | |
stats.save('callgrind.out', type='callgrind') | |
print('checkout callgrind.out') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment