Skip to content

Instantly share code, notes, and snippets.

@kmuthukk
Last active May 16, 2019 01:53
Show Gist options
  • Save kmuthukk/fa9fd54278c91aba14979fd8ba8d4973 to your computer and use it in GitHub Desktop.
Save kmuthukk/fa9fd54278c91aba14979fd8ba8d4973 to your computer and use it in GitHub Desktop.
a parallel program to test simple join performance with YugaByte postgres-compatible YSQL API
# Dependencies:
# On CentOS you can install psycopg2 thus:
#
# sudo yum install postgresql-libs
# sudo yum install python-psycopg2
#
import psycopg2
import time
import random
from multiprocessing.dummy import Pool as ThreadPool
num_threads=8
num_users=num_threads*1000
host="10.150.0.27"
num_depts=1000
num_read_threads=4
num_reads_per_thread=1000
def create_tables():
conn = psycopg2.connect("host=" + host + " dbname=postgres user=postgres port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
start_time = time.time()
cur.execute("""DROP TABLE IF EXISTS emp""");
print("Dropped (if exists): emp table")
cur.execute("""DROP TABLE IF EXISTS dept""");
print("Dropped (if exists): dept table")
now_time = time.time()
print("Time: %s ms ---" % ((now_time - start_time) * 1000))
start_time = time.time()
cur.execute("""
CREATE TABLE IF NOT EXISTS emp(
eid int,
ename text,
salary float,
dept_id int,
PRIMARY KEY(eid)
)""")
now_time = time.time()
print("Created emp table")
print("Time: %s ms ---" % ((now_time - start_time) * 1000))
start_time = time.time()
cur.execute("""
CREATE TABLE IF NOT EXISTS dept(
dept_id int,
dept_name text,
PRIMARY KEY(dept_id)
)""")
now_time = time.time()
print("Created dept table")
print("Time: %s ms ---" % ((now_time - start_time) * 1000))
def load_data_slave(thread_num):
thread_id = str(thread_num)
conn = psycopg2.connect("host=" + host + " dbname=postgres user=postgres port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
num_users_per_thread = num_users/num_threads
print("Thread-" + thread_id + ": ==================")
print("Thread-" + thread_id + ": Inserting %d rows..." % (num_users_per_thread))
start_time = time.time()
for idx in range(num_users_per_thread):
eid = (thread_num * num_users_per_thread) + idx
ename = "name-" + str(eid)
salary = 50000 + ((eid % 100) * 1000)
dept_id = eid % num_depts;
cur.execute("""INSERT INTO emp(eid, ename, salary, dept_id) VALUES (%s, %s, %s, %s)""",
(eid, ename, salary, dept_id))
now_time = time.time()
print("Thread-" + thread_id + ": Inserted %d rows" % (num_users_per_thread))
print("Thread-" + thread_id + ": Time: %s ms ---" % ((now_time - start_time) * 1000))
print("Thread-" + thread_id + ": Inserts/sec: %s ---" % ((num_users_per_thread) / (now_time - start_time)))
print("Thread-" + thread_id + ": Avg Time: %s ms ---" % ((now_time - start_time) * 1000 / (num_users_per_thread)))
def populate_emp_table():
pool = ThreadPool(num_threads)
results = pool.map(load_data_slave, range(num_threads))
def populate_dept_table():
conn = psycopg2.connect("host=" + host + " dbname=postgres user=postgres port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
print("Inserting %d rows in dept table" % (num_depts))
start_time = time.time()
for idx in range(num_depts):
dept_name = "dname-" + str(idx)
cur.execute("""INSERT INTO dept(dept_id, dept_name) VALUES (%s, %s)""",
(idx, dept_name))
now_time = time.time()
print("Inserted %d rows in dept" % (num_depts))
print("Time: %s ms ---" % ((now_time - start_time) * 1000))
print("Inserts/sec: %s ---" % ((num_depts) / (now_time - start_time)))
print("Avg Time: %s ms ---" % ((now_time - start_time) * 1000 / (num_depts)))
def read_data_slave(thread_num):
thread_id=str(thread_num)
conn = psycopg2.connect("host=" + host + " dbname=postgres user=postgres port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
print("Thread-" + thread_id + ": Reading %d rows..." % (num_reads_per_thread))
start_time = time.time()
cur.execute("PREPARE myplan AS select eid, ename, salary, dept_name from emp, dept where eid=$1 and emp.dept_id = dept.dept_id")
for i in range(num_reads_per_thread):
rand_user_id = random.randint(0, num_users-1)
# cur.execute("select eid, ename, salary, dept_name from emp, dept where eid=%s and emp.dept_id = dept.dept_id", (rand_user_id,))
cur.execute("EXECUTE myplan(%s)", (rand_user_id,))
row = cur.fetchone()
eid = row[0]
ename = row[1]
salary = row[2]
dept_name = row[3]
if ((eid != rand_user_id) or
(ename != "name-" + str(rand_user_id)) or
(salary != (50000 + ((eid % 100) * 1000))) or
(dept_name != "dname-" + str(rand_user_id % num_depts))):
print("Incorrect data for " + str(rand_user_id))
print("eid=" + str(eid))
print("ename=" + ename)
print("salary=" + str(salary))
print("dept_name=" + dept_name)
now_time = time.time()
print("Time: %s ms" % ((now_time - start_time) * 1000))
print("Thread-" + thread_id + ": Avg Time: %s ms" % ((now_time - start_time) * 1000 / (num_reads_per_thread)))
def test_reads_with_simple_join():
pool = ThreadPool(num_read_threads)
results = pool.map(read_data_slave, range(num_read_threads))
create_tables()
populate_dept_table()
populate_emp_table()
test_reads_with_simple_join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment