Skip to content

Instantly share code, notes, and snippets.

@kmuthukk
Created December 27, 2022 21:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kmuthukk/90a8ffbd6ced8337de8eaa5684d6a742 to your computer and use it in GitHub Desktop.
Save kmuthukk/90a8ffbd6ced8337de8eaa5684d6a742 to your computer and use it in GitHub Desktop.
Concurrent updates to single row table
File Edit Options Buffers Tools Python Help
# Dependencies:
# On CentOS you can install psycopg2 thus:
#
# sudo yum install postgresql-libs
# sudo yum install python-psycopg2
import psycopg2;
from multiprocessing.dummy import Pool as ThreadPool
num_updates=5000
def create_table():
conn = psycopg2.connect("host=localhost dbname=yugabyte user=yugabyte port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
cur.execute("""DROP TABLE IF EXISTS users""");
cur.execute("""CREATE TABLE IF NOT EXISTS users(
id text,
ename text,
sal int,
PRIMARY KEY(id))
""")
print("Created users table")
print("====================")
def load_data():
conn = psycopg2.connect("host=localhost dbname=yugabyte user=yugabyte port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
try:
cur.execute("""INSERT INTO users (id, ename, sal) VALUES (%s, %s, %s)""",
("user-0", "name-0", 100))
except Exception as e:
print("Expception " + str(e))
def update_data_worker(thread_num):
thread_id = str(thread_num)
conn = psycopg2.connect("host=localhost dbname=yugabyte user=yugabyte port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
print("Thread-" + thread_id + ": Updating %d time..." % (num_updates))
num_errors = 0
try:
for idx in range(num_updates):
try:
cur.execute("""UPDATE users SET sal = sal + 1 where id = %s""",
("user-0", ))
except Exception as e:
print("Exception: " + str(e))
num_errors += 1
except Exception as e:
print("Unexpected exception: " + str(e))
print("Thread-" + thread_id + ": Updated %d times" % (num_updates))
print("Thread-" + thread_id + ": Error Count: %d " % (num_errors))
def update_data():
pool = ThreadPool(8)
results = pool.map(update_data_worker, range(8))
# Main
create_table()
load_data()
update_data()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment