Skip to content

Instantly share code, notes, and snippets.

@iSignal
Created August 29, 2019 04:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save iSignal/84470e26e13a131b0ec8aedc614d293e to your computer and use it in GitHub Desktop.
Save iSignal/84470e26e13a131b0ec8aedc614d293e to your computer and use it in GitHub Desktop.
# pip install yb-cassandra-driver
from cassandra.cluster import Cluster
import time
import random
from multiprocessing.dummy import Pool as ThreadPool
# Load Phase params
num_write_threads=40
num_keys=200
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()
session.default_timeout = 60
def create_table():
start_time = time.time()
session.execute("""CREATE KEYSPACE IF NOT EXISTS k""");
session.execute("""USE k""");
session.execute("""DROP TABLE IF EXISTS test_list_update""");
now_time = time.time()
print("Dropped (if exists): test_list_update")
print("Time: %s ms" % ((now_time - start_time) * 1000))
print("====================")
start_time = time.time()
session.execute("""
CREATE TYPE IF NOT EXISTS rangeType(
startid int, endid int
)
""")
session.execute("""
CREATE TABLE IF NOT EXISTS test_list_update(
id uuid PRIMARY KEY,
ranges LIST<FROZEN<rangeType>>
) WITH TRANSACTIONS = {'enabled' : true}
""")
now_time = time.time()
print("Created table")
print("Time: %s ms" % ((now_time - start_time) * 1000))
print("====================")
start_time = time.time()
now_time = time.time()
print("Created users table")
print("Time: %s ms" % ((now_time - start_time) * 1000))
print("====================")
def load_data_slave(thread_num):
thread_id = str(thread_num)
print("Thread-" + thread_id + ": Inserting %d rows..." % (num_keys))
start_time = time.time()
for idx in range(num_keys):
session.execute("""UPDATE test_list_update SET ranges = ranges + [{startid:%d, endid:%d}, {startid:%d, endid:%d}] WHERE id=F8B9FF57-5DFB-4DB3-B2FA-5EDC99A626DC""" %
(thread_num, thread_num * 10000 + idx, thread_num, thread_num*10000+idx+1))
now_time = time.time()
print("Thread-" + thread_id + ": Inserted %d list entries" % (num_keys*2))
print("Thread-" + thread_id + ": Time: %s ms" % ((now_time - start_time) * 1000))
print("Thread-" + thread_id + ": Inserts/sec: %s" % (num_keys / (now_time - start_time)))
print("Thread-" + thread_id + ": Avg Time: %s ms" % ((now_time - start_time) * 1000 / (num_keys)))
def load_data():
pool = ThreadPool(num_write_threads)
t1 = time.time()
results = pool.map(load_data_slave, range(num_write_threads))
t2 = time.time()
total_rows=num_keys*num_write_threads*2
print("====================")
print("Inserted %d list entries" % (total_rows))
print("Total Time: %s ms" % ((t2 - t1) * 1000))
print("Inserts/sec: %s" % (total_rows / (t2 - t1)))
print("====================")
results = session.execute("""SELECT * FROM test_list_update""")
print("Got back " + str(len(results[0].ranges)) + "ranges")
if len(results[0].ranges) != total_rows:
print("ERROR")
# Main
create_table()
load_data()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment