Skip to content

Instantly share code, notes, and snippets.

@kmuthukk
Last active April 18, 2019 04:00
Show Gist options
  • Save kmuthukk/e3a092f366f84b8078cbc4733903f9b1 to your computer and use it in GitHub Desktop.
Save kmuthukk/e3a092f366f84b8078cbc4733903f9b1 to your computer and use it in GitHub Desktop.
Sample python program that does parallel inserts into a user_actions table, then does both point reads and range reads against YugaByte DB's YSQL (postgres compatible API)
import psycopg2;
import time
import random
from multiprocessing.dummy import Pool as ThreadPool
# Load Phase params
num_write_threads=4
num_users=500
num_msgs=50
table_name = "user_actions"
# Read Only Phase params
num_read_threads=8
num_reads_per_thread=50000
num_range_reads_per_thread=num_reads_per_thread/num_msgs
def create_table():
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
start_time = time.time()
cur.execute("""DROP TABLE IF EXISTS %s""" % (table_name));
now_time = time.time()
print("Dropped (if exists): %s table" % (table_name))
print("Time: %s ms" % ((now_time - start_time) * 1000))
print("====================")
start_time = time.time()
cur.execute("""
CREATE TABLE IF NOT EXISTS %s(
id text,
msg_id integer,
msg text,
PRIMARY KEY(id, msg_id)
)
""" % (table_name))
now_time = time.time()
print("Created: " + table_name)
print("Time: %s ms" % ((now_time - start_time) * 1000))
print("====================")
def load_data_slave(thread_num):
thread_id = str(thread_num)
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
print("Thread-" + thread_id + ": Inserting %d rows..." % (num_users*num_msgs))
start_time = time.time()
for idx in range(num_users):
for jdx in range(num_msgs):
cur.execute("""INSERT INTO """ + table_name + """ (id, msg_id, msg) VALUES (%s, %s, %s)""",
("u-"+thread_id+"-"+str(idx),
jdx,
"msg--"+str(idx)+"--"+str(jdx)))
now_time = time.time()
print("Thread-" + thread_id + ": Inserted %d rows" % (num_msgs * num_users))
print("Thread-" + thread_id + ": Time: %s ms" % ((now_time - start_time) * 1000))
print("Thread-" + thread_id + ": Inserts/sec: %s" % ((num_msgs * num_users) / (now_time - start_time)))
print("Thread-" + thread_id + ": Avg Time: %s ms" % ((now_time - start_time) * 1000 / (num_msgs * num_users)))
def load_data():
pool = ThreadPool(num_write_threads)
t1 = time.time()
results = pool.map(load_data_slave, range(num_write_threads))
t2 = time.time()
total_rows=num_users*num_msgs*num_write_threads
print("====================")
print("Inserted %d rows" % (total_rows))
print("Total Time: %s ms" % ((t2 - t1) * 1000))
print("Inserts/sec: %s" % (total_rows / (t2 - t1)))
print("====================")
def read_data_slave(thread_num):
thread_id=str(thread_num)
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
print("Thread-" + thread_id + ": Reading %d rows..." % (num_reads_per_thread))
start_time = time.time()
for i in range(num_reads_per_thread):
# construct the user & msg_id key to read using:
# a random writer thread id + user id and a random msg_id combo.
rand_thread_id = str(random.randint(0, num_write_threads-1))
rand_user_id = str(random.randint(0, num_users-1))
rand_msg_id = str(random.randint(0, num_msgs-1))
user_id = "u-"+rand_thread_id+"-"+rand_user_id;
expected_msg = "msg--"+rand_user_id+"--"+rand_msg_id;
cur.execute("SELECT msg FROM "+table_name+" WHERE id=%s AND msg_id=%s",
(user_id, rand_msg_id))
actual_msg = cur.fetchone()[0]
if (expected_msg != actual_msg):
print("Expected: " + expected_msg + "; Found=" + actual_msg)
return
now_time = time.time()
print("Time: %s ms" % ((now_time - start_time) * 1000))
print("Thread-" + thread_id + ": Avg Time: %s ms" % ((now_time - start_time) * 1000 / (num_reads_per_thread)))
def read_data():
pool = ThreadPool(num_read_threads)
t1 = time.time()
results = pool.map(read_data_slave, range(num_read_threads))
t2 = time.time()
total_rows = (num_read_threads*num_reads_per_thread)
print("====================")
print("Rows read: %d rows" % (total_rows))
print("Total Time: %s ms" % ((t2 - t1) * 1000))
print("Reads/sec: %f" % (total_rows / (t2 - t1)))
print("====================")
def range_read_data_slave(thread_num):
thread_id=str(thread_num)
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
print("Thread-" + thread_id + ": Performing %d Queries; Each fetching %d rows..."
% (num_range_reads_per_thread, num_msgs))
start_time = time.time()
for i in range(num_range_reads_per_thread):
# construct the user & msg_id key to read using:
# a random writer thread id + user id and a random msg_id combo.
rand_thread_id = str(random.randint(0, num_write_threads-1))
rand_user_id = str(random.randint(0, num_users-1))
user_id = "u-"+rand_thread_id+"-"+rand_user_id;
cur.execute("SELECT msg FROM "+table_name+" WHERE id = '%s'" % user_id)
result=cur.fetchall()
# check if number of rows is the expected one.
if (len(result) != num_msgs):
print("Expected: " + str(num_msgs) + " rows; Found=" + str(len(result)))
return
# check first row for sanity
expected_msg = "msg--"+rand_user_id+"--0";
if (expected_msg != result[0][0]):
print("Expected: " + expected_msg + "; Found=" + result[0][0])
return
now_time = time.time()
print("Time: %s ms" % ((now_time - start_time) * 1000))
print("Thread-" + thread_id + ": Avg Time: %s ms" % ((now_time - start_time) * 1000 / (num_range_reads_per_thread)))
def range_read_data():
pool = ThreadPool(num_read_threads)
t1 = time.time()
results = pool.map(range_read_data_slave, range(num_read_threads))
t2 = time.time()
total_queries = (num_read_threads*num_range_reads_per_thread)
print("====================")
print("Total Range Queries: %d" % (total_queries))
print("Rows read: %d" % (total_queries * num_msgs))
print("Total Time: %s ms" % ((t2 - t1) * 1000))
print("Range Queries/sec: %f" % (total_queries / (t2 - t1)))
print("Rows/sec: %f" % ((total_queries * num_msgs) / (t2 - t1)))
print("====================")
# Main
create_table()
load_data()
read_data()
range_read_data()
@kmuthukk
Copy link
Author

Sample output against a RF=1 (single node cluster) on a my test/dev server:

Dropped (if exists): user_actions table
Time: 523.843050003 ms
====================
Created: user_actions
Time: 1389.10984993 ms
====================
Thread-3: Inserting 25000 rows...
Thread-2: Inserting 25000 rows...
Thread-0: Inserting 25000 rows...
Thread-1: Inserting 25000 rows...
Thread-1: Inserted 25000 rows
Thread-1: Time: 22323.1110573 ms
Thread-1: Inserts/sec: 1119.91558595
Thread-1: Avg Time: 0.892924442291 ms
Thread-2: Inserted 25000 rows
Thread-2: Time: 22380.5470467 ms
Thread-2: Inserts/sec: 1117.04150698
Thread-2: Avg Time: 0.895221881866 ms
Thread-3: Inserted 25000 rows
Thread-3: Time: 22501.3201237 ms
Thread-3: Inserts/sec: 1111.04592364
Thread-3: Avg Time: 0.900052804947 ms
Thread-0: Inserted 25000 rows
Thread-0: Time: 22499.350071 ms
Thread-0: Inserts/sec: 1111.1432073
Thread-0: Avg Time: 0.899974002838 ms
====================
Inserted 100000 rows
Total Time: 23193.669796 ms
Inserts/sec: 4311.52124177
====================
Thread-0: Reading 50000 rows...
Thread-6: Reading 50000 rows...
Thread-2: Reading 50000 rows...
Thread-7: Reading 50000 rows...
Thread-5: Reading 50000 rows...
Thread-3: Reading 50000 rows...
Thread-1: Reading 50000 rows...
Thread-4: Reading 50000 rows...
Time: 53948.8079548 ms
Thread-4: Avg Time: 1.0789761591 ms
Time: 55183.4158897 ms
Thread-1: Avg Time: 1.10366831779 ms
Time: 55354.7050953 ms
Thread-0: Avg Time: 1.10709410191 ms
Time: 55472.7220535 ms
Thread-7: Avg Time: 1.10945444107 ms
Time: 56185.0209236 ms
Thread-2: Avg Time: 1.12370041847 ms
Time: 56226.6068459 ms
Thread-3: Avg Time: 1.12453213692 ms
Time: 56302.6809692 ms
Thread-6: Avg Time: 1.12605361938 ms
Time: 56289.1960144 ms
Thread-5: Avg Time: 1.12578392029 ms
====================
Rows read: 400000 rows
Total Time: 56353.8150787 ms
Reads/sec: 7098.011012
====================
Thread-1: Performing 1000 Queries; Each fetching 50 rows...
Thread-4: Performing 1000 Queries; Each fetching 50 rows...
Thread-0: Performing 1000 Queries; Each fetching 50 rows...
Thread-5: Performing 1000 Queries; Each fetching 50 rows...
Thread-3: Performing 1000 Queries; Each fetching 50 rows...
Thread-2: Performing 1000 Queries; Each fetching 50 rows...
Thread-7: Performing 1000 Queries; Each fetching 50 rows...
Thread-6: Performing 1000 Queries; Each fetching 50 rows...
Time: 2978.17802429 ms
Thread-5: Avg Time: 2.97817802429 ms
Time: 3032.84597397 ms
Thread-1: Avg Time: 3.03284597397 ms
Time: 3020.91193199 ms
Thread-4: Avg Time: 3.02091193199 ms
Time: 2977.14304924 ms
Thread-6: Avg Time: 2.97714304924 ms
Time: 3009.21916962 ms
Thread-0: Avg Time: 3.00921916962 ms
Time: 3023.33593369 ms
Thread-3: Avg Time: 3.02333593369 ms
Time: 3038.13195229 ms
Thread-2: Avg Time: 3.03813195229 ms
Time: 3063.72213364 ms
Thread-7: Avg Time: 3.06372213364 ms
====================
Total Range Queries: 8000
Rows read: 400000
Total Time: 3168.6091423 ms
Range Queries/sec: 2524.767064
Rows/sec: 126238.353181
====================

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment