Skip to content

Instantly share code, notes, and snippets.

@kmuthukk
Last active August 21, 2020 02:25
Show Gist options
  • Save kmuthukk/1d79e3268f8d03da8652dadd3fb8923f to your computer and use it in GitHub Desktop.
Save kmuthukk/1d79e3268f8d03da8652dadd3fb8923f to your computer and use it in GitHub Desktop.
ycql_multiple_indexes.py
# pip install yb-cassandra-driver
from cassandra.cluster import Cluster
import time
import random
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial
# Load Phase params
num_write_threads=2
num_users=1000
cluster = Cluster(['127.0.0.1'])
# cluster = Cluster(['172.151.43.112'])
session = cluster.connect()
def create_table(num_indexes):
start_time = time.time()
session.execute("""CREATE KEYSPACE IF NOT EXISTS k""");
session.execute("""USE k""");
session.execute("""DROP TABLE IF EXISTS users""");
now_time = time.time()
print("Dropped (if exists): users table")
print("Time: %s ms" % ((now_time - start_time) * 1000))
print("====================")
start_time = time.time()
session.execute("""
CREATE TABLE IF NOT EXISTS users(
id text,
name0 text,
name1 text,
name2 text,
name3 text,
name4 text,
age int,
PRIMARY KEY(id)
) WITH TRANSACTIONS = {'enabled' : true}
""")
now_time = time.time()
print("Created users table")
print("Time: %s ms" % ((now_time - start_time) * 1000))
print("====================")
start_time = time.time()
for idx in range(num_indexes):
column = "name"+str(idx)
session.execute("""CREATE INDEX IF NOT EXISTS {}_idx ON users({})""".format(column, column))
print("Create {}_idx".format(column))
now_time = time.time()
print("Created {} indexes on table".format(num_indexes))
print("Time: %s ms" % ((now_time - start_time) * 1000))
print("====================")
def load_data_slave(num_indexes, thread_num):
thread_id = str(thread_num)
prepared = session.prepare("""INSERT INTO users (id, name0, name1, name2, name3, name4, age) VALUES (?, ?, ?, ?, ?, ?, ?)""");
for idx in range(num_users):
session.execute(prepared,
("user-"+thread_id+"-"+str(idx),
"name0-"+str(idx),
"name1-"+str(idx),
"name2-"+str(idx),
"name3-"+str(idx),
"name4-"+str(idx),
20 + (idx % 50)))
def load_data(num_indexes):
pool = ThreadPool(num_write_threads)
t1 = time.time()
results = pool.map(partial(load_data_slave, num_indexes), range(num_write_threads))
t2 = time.time()
total_rows=num_users*num_write_threads
# Run the program for [0..5] indexes.
for num_indexes in range(0, 6):
print("Running test with {} index(es)".format(num_indexes))
create_table(num_indexes)
print("Waiting 60 seconds for leader balancing to happen before starting experiment")
time.sleep(60)
start_time = time.time()
load_data(num_indexes)
now_time = time.time()
total_rows=num_users*num_write_threads
print("Num Indexes={}: Rows={}; Inserts/sec: {}; Avg Latency (ms): {}"
.format(num_indexes,
total_rows,
round((total_rows / (now_time - start_time)), 2),
round(((now_time - start_time) * 1000 / (total_rows)), 2)))
print("====================")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment