Last active
April 18, 2019 04:00
-
-
Save kmuthukk/e3a092f366f84b8078cbc4733903f9b1 to your computer and use it in GitHub Desktop.
Sample python program that does parallel inserts into a user_actions table, then does both point reads and range reads against YugaByte DB's YSQL (postgres compatible API)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import psycopg2; | |
import time | |
import random | |
from multiprocessing.dummy import Pool as ThreadPool | |
# Load Phase params | |
num_write_threads=4 | |
num_users=500 | |
num_msgs=50 | |
table_name = "user_actions" | |
# Read Only Phase params | |
num_read_threads=8 | |
num_reads_per_thread=50000 | |
num_range_reads_per_thread=num_reads_per_thread/num_msgs | |
def create_table(): | |
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433") | |
conn.set_session(autocommit=True) | |
cur = conn.cursor() | |
start_time = time.time() | |
cur.execute("""DROP TABLE IF EXISTS %s""" % (table_name)); | |
now_time = time.time() | |
print("Dropped (if exists): %s table" % (table_name)) | |
print("Time: %s ms" % ((now_time - start_time) * 1000)) | |
print("====================") | |
start_time = time.time() | |
cur.execute(""" | |
CREATE TABLE IF NOT EXISTS %s( | |
id text, | |
msg_id integer, | |
msg text, | |
PRIMARY KEY(id, msg_id) | |
) | |
""" % (table_name)) | |
now_time = time.time() | |
print("Created: " + table_name) | |
print("Time: %s ms" % ((now_time - start_time) * 1000)) | |
print("====================") | |
def load_data_slave(thread_num): | |
thread_id = str(thread_num) | |
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433") | |
conn.set_session(autocommit=True) | |
cur = conn.cursor() | |
print("Thread-" + thread_id + ": Inserting %d rows..." % (num_users*num_msgs)) | |
start_time = time.time() | |
for idx in range(num_users): | |
for jdx in range(num_msgs): | |
cur.execute("""INSERT INTO """ + table_name + """ (id, msg_id, msg) VALUES (%s, %s, %s)""", | |
("u-"+thread_id+"-"+str(idx), | |
jdx, | |
"msg--"+str(idx)+"--"+str(jdx))) | |
now_time = time.time() | |
print("Thread-" + thread_id + ": Inserted %d rows" % (num_msgs * num_users)) | |
print("Thread-" + thread_id + ": Time: %s ms" % ((now_time - start_time) * 1000)) | |
print("Thread-" + thread_id + ": Inserts/sec: %s" % ((num_msgs * num_users) / (now_time - start_time))) | |
print("Thread-" + thread_id + ": Avg Time: %s ms" % ((now_time - start_time) * 1000 / (num_msgs * num_users))) | |
def load_data(): | |
pool = ThreadPool(num_write_threads) | |
t1 = time.time() | |
results = pool.map(load_data_slave, range(num_write_threads)) | |
t2 = time.time() | |
total_rows=num_users*num_msgs*num_write_threads | |
print("====================") | |
print("Inserted %d rows" % (total_rows)) | |
print("Total Time: %s ms" % ((t2 - t1) * 1000)) | |
print("Inserts/sec: %s" % (total_rows / (t2 - t1))) | |
print("====================") | |
def read_data_slave(thread_num): | |
thread_id=str(thread_num) | |
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433") | |
conn.set_session(autocommit=True) | |
cur = conn.cursor() | |
print("Thread-" + thread_id + ": Reading %d rows..." % (num_reads_per_thread)) | |
start_time = time.time() | |
for i in range(num_reads_per_thread): | |
# construct the user & msg_id key to read using: | |
# a random writer thread id + user id and a random msg_id combo. | |
rand_thread_id = str(random.randint(0, num_write_threads-1)) | |
rand_user_id = str(random.randint(0, num_users-1)) | |
rand_msg_id = str(random.randint(0, num_msgs-1)) | |
user_id = "u-"+rand_thread_id+"-"+rand_user_id; | |
expected_msg = "msg--"+rand_user_id+"--"+rand_msg_id; | |
cur.execute("SELECT msg FROM "+table_name+" WHERE id=%s AND msg_id=%s", | |
(user_id, rand_msg_id)) | |
actual_msg = cur.fetchone()[0] | |
if (expected_msg != actual_msg): | |
print("Expected: " + expected_msg + "; Found=" + actual_msg) | |
return | |
now_time = time.time() | |
print("Time: %s ms" % ((now_time - start_time) * 1000)) | |
print("Thread-" + thread_id + ": Avg Time: %s ms" % ((now_time - start_time) * 1000 / (num_reads_per_thread))) | |
def read_data(): | |
pool = ThreadPool(num_read_threads) | |
t1 = time.time() | |
results = pool.map(read_data_slave, range(num_read_threads)) | |
t2 = time.time() | |
total_rows = (num_read_threads*num_reads_per_thread) | |
print("====================") | |
print("Rows read: %d rows" % (total_rows)) | |
print("Total Time: %s ms" % ((t2 - t1) * 1000)) | |
print("Reads/sec: %f" % (total_rows / (t2 - t1))) | |
print("====================") | |
def range_read_data_slave(thread_num): | |
thread_id=str(thread_num) | |
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433") | |
conn.set_session(autocommit=True) | |
cur = conn.cursor() | |
print("Thread-" + thread_id + ": Performing %d Queries; Each fetching %d rows..." | |
% (num_range_reads_per_thread, num_msgs)) | |
start_time = time.time() | |
for i in range(num_range_reads_per_thread): | |
# construct the user & msg_id key to read using: | |
# a random writer thread id + user id and a random msg_id combo. | |
rand_thread_id = str(random.randint(0, num_write_threads-1)) | |
rand_user_id = str(random.randint(0, num_users-1)) | |
user_id = "u-"+rand_thread_id+"-"+rand_user_id; | |
cur.execute("SELECT msg FROM "+table_name+" WHERE id = '%s'" % user_id) | |
result=cur.fetchall() | |
# check if number of rows is the expected one. | |
if (len(result) != num_msgs): | |
print("Expected: " + str(num_msgs) + " rows; Found=" + str(len(result))) | |
return | |
# check first row for sanity | |
expected_msg = "msg--"+rand_user_id+"--0"; | |
if (expected_msg != result[0][0]): | |
print("Expected: " + expected_msg + "; Found=" + result[0][0]) | |
return | |
now_time = time.time() | |
print("Time: %s ms" % ((now_time - start_time) * 1000)) | |
print("Thread-" + thread_id + ": Avg Time: %s ms" % ((now_time - start_time) * 1000 / (num_range_reads_per_thread))) | |
def range_read_data(): | |
pool = ThreadPool(num_read_threads) | |
t1 = time.time() | |
results = pool.map(range_read_data_slave, range(num_read_threads)) | |
t2 = time.time() | |
total_queries = (num_read_threads*num_range_reads_per_thread) | |
print("====================") | |
print("Total Range Queries: %d" % (total_queries)) | |
print("Rows read: %d" % (total_queries * num_msgs)) | |
print("Total Time: %s ms" % ((t2 - t1) * 1000)) | |
print("Range Queries/sec: %f" % (total_queries / (t2 - t1))) | |
print("Rows/sec: %f" % ((total_queries * num_msgs) / (t2 - t1))) | |
print("====================") | |
# Main | |
create_table() | |
load_data() | |
read_data() | |
range_read_data() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Sample output against a RF=1 (single node cluster) on a my test/dev server: