Skip to content

Instantly share code, notes, and snippets.

@sontek
Last active June 30, 2020 18:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sontek/af82d281e618074ca8a6 to your computer and use it in GitHub Desktop.
Save sontek/af82d281e618074ca8a6 to your computer and use it in GitHub Desktop.
Cassandra benchmark
import time
import uuid
import yappi
import timeit
import logging
import random
from threading import Event
from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster
from cassandra.query import tuple_factory
from cassandra.query import BatchStatement
YAPPI = False
ROW_COUNT = 2000000
RUN_CREATES = False
RUN_INSERTS = False
logging.basicConfig()
nodes = ['127.0.0.01']
c = Cluster(nodes, protocol_version=3)
s = c.connect()
s.row_factory = tuple_factory
create_scripts = [
"DROP KEYSPACE IF EXISTS test_perf",
"CREATE KEYSPACE test_perf WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};",
"USE test_perf;",
"""CREATE TABLE int_table (
primary_id bigint,
part_id bigint,
id1 bigint,
id2 bigint,
id3 bigint,
id4 bigint,
id5 bigint,
id6 bigint,
id7 bigint,
id8 bigint,
PRIMARY KEY((primary_id, part_id),
id1, id2, id3, id4)
);"""
]
class PagedResultHandler(object):
def __init__(self, future):
self.error = None
self.finished_event = Event()
self.future = future
self.future.add_callbacks(
callback=self.handle_page,
errback=self.handle_error)
self.rows = []
def handle_page(self, rows):
self.rows += rows
if self.future.has_more_pages:
self.future.start_fetching_next_page()
else:
self.finished_event.set()
def handle_error(self, exc):
self.error = exc
self.finished_event.set()
def run_int_query():
futures = []
for i in range(0, 24):
prepared = s.prepare("""
SELECT *
FROM int_table
WHERE primary_id = ?
AND part_id = ?
""")
# only pull primary 2 out of the db
future = s.execute_async(prepared, [2, i])
handler = PagedResultHandler(future)
futures.append(handler)
rows = []
for future in futures:
future.finished_event.wait()
rows += future.rows
print('Amount of int rows: %s' % len(rows))
#run()
#for i in range(30):
# print(timeit.timeit('run()', setup='from __main__ import run', number=1))
def rid(start=10000000, end=20000000):
return random.randint(start, end)
def run_int_inserts():
insert_data = s.prepare("""
INSERT INTO int_table (primary_id, part_id, id1, id2, id3, id4, id5, id6, id7, id8)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""")
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
for i in range(1, ROW_COUNT):
if i % 10000 == 0:
s.execute(batch)
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
else:
primary_id = rid(start=0, end=4)
part_id = rid(start=0, end=24)
batch.add(
insert_data,
(primary_id, part_id, rid(), rid(), rid(), rid(), rid(), rid(), rid(), rid())
)
if __name__ == '__main__':
if RUN_CREATES:
for script in create_scripts:
s.execute(script)
s.set_keyspace('test_perf')
if RUN_INSERTS:
run_int_inserts()
if YAPPI:
yappi.set_clock_type('cpu')
yappi.start(builtins=True)
start = time.time()
run_int_query()
end = time.time()
duration = int(1000 * (end - start))
print("int query took %sms" % duration)
if YAPPI:
stats = yappi.get_func_stats()
stats.save('callgrind.out', type='callgrind')
print('checkout callgrind.out')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment