Skip to content

Instantly share code, notes, and snippets.

@kmuthukk
Last active November 25, 2019 04:01
Show Gist options
  • Save kmuthukk/19e8003dab6febda9a0396e1c88d35e3 to your computer and use it in GitHub Desktop.
Save kmuthukk/19e8003dab6febda9a0396e1c88d35e3 to your computer and use it in GitHub Desktop.
# Dependencies:
# On CentOS you can install psycopg2 thus:
#
# sudo yum install postgresql-libs
# sudo yum install python-psycopg2
import psycopg2;
import time;
from multiprocessing.dummy import Pool as ThreadPool
# For this test, just interested in raw performance. So not bothering about
# handling conflicts from concurrent transactions and retries in app code. Instead,
# in this test, each thread works on a different set of accounts.
num_threads = 4
num_accounts_per_thread = 100
num_txns_per_thread = 20000
num_accounts = num_threads * num_accounts_per_thread
num_txns = num_threads * num_txns_per_thread
# host="localhost"
host="172.151.20.146"
def create_table():
conn = psycopg2.connect("host="+host+" dbname=postgres user=postgres port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
cur.execute("""DROP TABLE IF EXISTS accounts""");
print("Dropped accounts if exists")
cur.execute("""DROP TABLE IF EXISTS transactions""");
print("Dropped transactions if exists")
cur.execute("""CREATE TABLE accounts(
id INTEGER PRIMARY KEY NOT NULL,
balance BIGINT NOT NULL);
""")
print("Created accounts")
print("====================")
print("Install pgcrypto extension for gen_random_uuid()")
cur.execute("""CREATE EXTENSION IF NOT EXISTS pgcrypto""");
cur.execute("""CREATE TABLE transactions(
id UUID PRIMARY KEY NOT NULL DEFAULT gen_random_uuid(),
from_account_id INTEGER NOT NULL,
to_account_id INTEGER NOT NULL,
amount BIGINT NOT NULL,
timestamp TIMESTAMP NOT NULL DEFAULT NOW())
""")
print("Created transactions")
print("====================")
def seed_accounts_table():
conn = psycopg2.connect("host="+host+" dbname=postgres user=postgres port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
print("Seeding accounts table with %d rows..." % (num_accounts))
num_inserts = 0
try:
for idx in range(num_accounts):
cur.execute("INSERT INTO accounts (id, balance) VALUES (%s, 5000)", (idx, ))
num_inserts += 1
except Exception as e:
print("Unexpected exception: " + str(e))
print("Inserted %d rows into accounts table" % (num_inserts))
def run_transactions_slave(thread_num):
thread_id = str(thread_num)
conn = psycopg2.connect("host="+host+" dbname=postgres user=postgres port=5433")
cur = conn.cursor()
print("Thread-" + thread_id + ": Starting..")
num_success = 0
num_exceptions = 0
t1 = time.time()
num_rows_to_print_stats = 1000
for idx in range(num_txns_per_thread):
try:
base_acct_id = (thread_num * num_accounts_per_thread);
from_id = base_acct_id + (idx % num_accounts_per_thread);
to_id = base_acct_id + ((idx + 1) % num_accounts_per_thread);
amount = 100
cur.execute("""BEGIN""");
cur.execute("""UPDATE accounts SET balance = balance - %s WHERE id = %s""",
(amount, from_id))
cur.execute("""UPDATE accounts SET balance = balance + %s WHERE id = %s""",
(amount, to_id))
cur.execute("""INSERT INTO transactions (from_account_id, to_account_id, amount) VALUES (%s, %s, %s)""",
(from_id, to_id, amount))
cur.execute("""COMMIT""");
num_success += 1
if ((idx + 1) % num_rows_to_print_stats == 0):
t2 = time.time();
avg_latency_ms = (t2 - t1) * 1000.0 / num_rows_to_print_stats
print("Thread-" + thread_id + ": Txns Completed %d; Avg Latency (ms) %f " % (idx + 1, avg_latency_ms))
t1 = time.time()
except Exception as e:
num_exceptions += 1
cur.execute("""ROLLBACK""");
print("Unexpected exception: " + str(e))
print("Thread-" + thread_id + ": Successful Txns: %d" % (num_success))
print("Thread-" + thread_id + ": Exceptions %d " % (num_exceptions))
def run_transactions():
pool = ThreadPool(num_threads)
results = pool.map(run_transactions_slave, range(num_threads))
# Main
create_table()
seed_accounts_table()
start_time = time.time()
run_transactions()
delta = time.time() - start_time
print("Total TXNS: %d, Time: %s secs; TPS=%f" % (num_txns, delta, num_txns/delta))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment