Skip to content

Instantly share code, notes, and snippets.

@kmuthukk
Last active November 10, 2020 19:09
Show Gist options
  • Save kmuthukk/62df3a1884cd4ee466fad249faf972e1 to your computer and use it in GitHub Desktop.
Save kmuthukk/62df3a1884cd4ee466fad249faf972e1 to your computer and use it in GitHub Desktop.
# Dependencies:
# On CentOS you can install psycopg2 thus:
#
# sudo yum install postgresql-libs
# sudo yum install python-psycopg2
import psycopg2
import time
from multiprocessing.dummy import Pool as ThreadPool
num_users=10000000
num_threads=3
connect_string="host=172.151.35.79 dbname=yugabyte user=yugabyte port=5433"
def create_table():
conn = psycopg2.connect(connect_string)
conn.set_session(autocommit=True)
cur = conn.cursor()
cur.execute("""DROP TABLE IF EXISTS users""");
cur.execute("""CREATE TABLE IF NOT EXISTS users(
id text,
ename text,
age int,
city text,
about_me text,
PRIMARY KEY(id, ename))
""")
print("Created users table")
print("====================")
# cur.execute("""CREATE INDEX IF NOT EXISTS name_idx ON users(ename)""")
# print("Created name_idx on table")
def load_data_slave(thread_num):
thread_id = str(thread_num)
conn = psycopg2.connect(connect_string)
conn.set_session(autocommit=True)
cur = conn.cursor()
print("Thread-" + thread_id + ": Inserting %d rows..." % (num_users))
num_inserts = 0
try:
for idx in range(num_users):
inserted = False
while not inserted:
try:
cur.execute("""INSERT INTO users (id, ename, age, city, about_me) VALUES (%s, %s, %s, %s, %s)""",
("user-run3-"+thread_id+"-"+str(idx),
"name--"+str(idx),
20 + (idx % 50),
"city--"+str(idx % 1000),
"about-me-"+str(idx)+"blah-blah"+str(idx)))
inserted = True
num_inserts += 1
except psycopg2.InterfaceError as exc:
print("Unexpected InterfaceError: " + str(e) + "; Will reconnect after 100ms..")
time.sleep(0.1) # sleep 100 millisecs
conn = psycopg2.connect(connect_string)
conn.set_session(autocommit=True)
cur = conn.cursor()
except Exception as e:
print("Unexpected exception: " + str(e) + "; Will retry after 100ms..")
time.sleep(0.1) # sleep 100 millisecs
if (num_inserts % 10000 == 0):
print("Thread-" + thread_id + ": Inserted %d rows" % (num_inserts))
except Exception as e:
print("Unexpected exception: " + str(e))
print("Thread-" + thread_id + ": Inserted %d rows" % (num_inserts))
def load_data():
pool = ThreadPool(num_threads)
results = pool.map(load_data_slave, range(num_threads))
# Main
create_table()
load_data()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment