Skip to content

Instantly share code, notes, and snippets.

@kmuthukk
Last active April 18, 2019 03:11
Show Gist options
  • Save kmuthukk/72ca6f15b9d2418bc05fcf42d55c329c to your computer and use it in GitHub Desktop.
Save kmuthukk/72ca6f15b9d2418bc05fcf42d55c329c to your computer and use it in GitHub Desktop.
sample python program to do single row inserts in parallel for YugaByte DB's YSQL (postgres compatible API)
import psycopg2;
import time
from multiprocessing.dummy import Pool as ThreadPool
num_threads=8
num_users=1000
num_msgs=50
table_name = "user_actions"
def create_table():
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
start_time = time.time()
cur.execute("""DROP TABLE IF EXISTS %s""" % (table_name));
now_time = time.time()
print("Dropped (if exists): %s table" % (table_name))
print("Time: %s ms ---" % ((now_time - start_time) * 1000))
start_time = time.time()
cur.execute("""
CREATE TABLE IF NOT EXISTS %s(
id text,
msg_id integer,
msg text,
PRIMARY KEY(id, msg_id)
)
""" % (table_name))
now_time = time.time()
print("Created: " + table_name)
print("Time: %s ms ---" % ((now_time - start_time) * 1000))
def load_data_slave(thread_num):
thread_id = str(thread_num)
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
print("Thread-" + thread_id + ": ==================")
print("Thread-" + thread_id + ": Inserting %d rows..." % (num_users*num_msgs))
start_time = time.time()
for idx in range(num_users):
for jdx in range(num_msgs):
cur.execute("""INSERT INTO """ + table_name + """ (id, msg_id, msg) VALUES (%s, %s, %s)""",
("u-"+thread_id+"-"+str(idx),
jdx,
"msg--"+str(idx)+"--"+str(jdx)))
now_time = time.time()
print("Thread-" + thread_id + ": Inserted %d rows" % (num_msgs * num_users))
print("Thread-" + thread_id + ": Time: %s ms ---" % ((now_time - start_time) * 1000))
print("Thread-" + thread_id + ": Inserts/sec: %s ---" % ((num_msgs * num_users) / (now_time - start_time)))
print("Thread-" + thread_id + ": Avg Time: %s ms ---" % ((now_time - start_time) * 1000 / (num_msgs * num_users)))
create_table()
pool = ThreadPool(num_threads)
t1 = time.time()
results = pool.map(load_data_slave, range(num_threads))
t2 = time.time()
total_rows=num_users*num_msgs*num_threads
print("====================")
print("Inserted %d rows" % (total_rows))
print("Time: %s ms ---" % ((t2 - t1) * 1000))
print("Inserts/sec: %s ---" % (total_rows / (t2 - t1)))
print("====================")
@kmuthukk
Copy link
Author

kmuthukk commented Apr 17, 2019

Above program does no batching.

With RF=1, on 4-core setup, on YB latest (1.2.5), seeing about:

With 8 concurrent threads:

Inserted 400000 rows
Time: 69929.1889668 ms ---
Inserts/sec: 5720.07206018 ---
Avg latency: ~1.37ms

With 4 concurrent threads:

Inserted 200000 rows
Time: 46358.5669994 ms ---
Inserts/sec: 4314.19720119 ---
Avg Latency: 0.9ms

With 1 concurrent thread:

Inserted 50000 rows
Time: 29329.2810917 ms ---
Inserts/sec: 1704.78096083 ---
Avg Latency: 0.57ms

With 16 concurrent threads:

Inserted 800000 rows
Time: 123391.185999 ms ---
Inserts/sec: 6483.44526008 ---
Avg Latency: 2.42ms

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment