Skip to content

Instantly share code, notes, and snippets.

@kmuthukk
Last active April 29, 2019 01:25
Show Gist options
  • Save kmuthukk/310a2199c8d0f8d2647510a1b47442f6 to your computer and use it in GitHub Desktop.
Save kmuthukk/310a2199c8d0f8d2647510a1b47442f6 to your computer and use it in GitHub Desktop.
Demo of Serializable Isolation in YugaByte DB: Concurrent Transactions That Flip Color of Items
import psycopg2
from threading import Thread,Semaphore
from multiprocessing.dummy import Pool as ThreadPool
#
# The test uses the classic example of a set of black and white items to
# illustrate the need, in some situations, for transaction support with
# serializable isolation level.
#
# In this test two concurrent transactions- one that flips white colored
# items to black, and another that flips black colored items to white-
# are executed. And a checker verifies that at the end of this the bag
# contains items of only 1 color (i.e. all black or all white).
#
#
# The program repeats the checking/flipping sequence using three threads
# (1 checker thread, and 2 threads that switch the colors), and uses
# two barriers to coordinate the checking phase & flipping phase.
# Test Params:
# number of iterations to test
num_iters=600
# number of items in table
num_items=10
# If this flag is set to False, then we use "READ COMMITTED" isolation
# level, and the test will often fail (as expected) the correctness check
# in many iterations.
serializable_isolation=True
# Barrier implementation taken from:
# https://stackoverflow.com/questions/26622745/implementing-barrier-in-python2-7
class Barrier:
def __init__(self, n):
self.n = n
self.count = 0
self.mutex = Semaphore(1)
self.barrier = Semaphore(0)
def wait(self):
self.mutex.acquire()
self.count = self.count + 1
self.mutex.release()
if self.count == self.n: self.barrier.release()
self.barrier.acquire()
self.barrier.release()
def create_table():
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor()
cur.execute("""DROP TABLE IF EXISTS items""");
print("Dropped (if exists): items table")
print("====================")
cur.execute("""
CREATE TABLE IF NOT EXISTS items (
id integer,
color text,
PRIMARY KEY(id)
)
""")
print("Created items table.")
def init_rows():
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433")
conn.set_session(autocommit=True)
cur = conn.cursor();
print("Populating rows")
for idx in range(num_items):
cur.execute("""INSERT INTO items(id, color) VALUES (%s, %s)""", (idx, 'black'))
print("====================")
def switch_colors_slave(thread_num):
global phase1_barrier
global phase2_barrier
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433")
thread_id = str(thread_num)
cur = conn.cursor()
if (thread_num % 2 == 0):
old='black'
new='white'
else:
old='white'
new='black'
successful_txns=0
txn_conflicts=0
num_update_errors=0
for idx in range(num_iters):
phase1_barrier.wait()
# nothing in phase1
phase2_barrier.wait()
try:
if (serializable_isolation):
cur.execute("""BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE""")
else:
cur.execute("""BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED""")
cur.execute("""UPDATE items SET color=%s WHERE color=%s""", (new, old))
cur.execute("""COMMIT""");
successful_txns += 1
except Exception as excp:
cur.execute("ROLLBACK")
if ("Conflicts with higher priority transaction" in str(excp)):
# These can happen.
txn_conflicts += 1
else:
# Unexpected errors
print("switch_colors_slave: Thread-" + thread_id + "; Exception: " + str(excp));
num_update_errors += 1
print("SWITCH-Thread-{}: Successful Txns: {}; Txn Conflicts: {}; UPDATE Failures: {}; Iters: {}"
.format(thread_id, successful_txns, txn_conflicts, num_update_errors, num_iters))
def checker():
global phase1_barrier
global phase2_barrier
conn = psycopg2.connect("host=localhost dbname=postgres user=postgres port=5433")
cur = conn.cursor()
conn.set_session(autocommit=True)
successful_txns=0
num_correctness_errors = 0
for idx in range(num_iters):
phase1_barrier.wait()
phase1_barrier = Barrier(3) # reset the barrier.
cur.execute("SELECT COUNT(DISTINCT(color)) FROM items")
result=cur.fetchone()
# we expect to find items of only 1 color.
if (result[0] != 1):
num_correctness_errors += 1
# reset some rows to black, some rows to white.
try:
cur.execute("UPDATE items SET color=CASE WHEN (id % 2 = 0) THEN 'black' ELSE 'white' END");
successful_txns += 1
except Exception as excp:
cur.execute("ROLLBACK")
print("checker(): Exception: " + str(excp))
phase2_barrier.wait()
phase2_barrier = Barrier(3) # reset the barrier
print("checker(): Successful Resets: {}; Correctness Errors: {}; Iters={}"
.format(successful_txns, num_correctness_errors, num_iters))
def worker(thread_num):
if thread_num == 0:
checker();
else:
switch_colors_slave(thread_num)
def launch_workers():
global phase1_barrier
global phase2_barrier
phase1_barrier = Barrier(3)
phase2_barrier = Barrier(3)
pool = ThreadPool(3)
results = pool.map(worker, range(3))
# Main
create_table()
init_rows()
launch_workers()
@kmuthukk
Copy link
Author

With serializable_isolation variable in test set to True, the checker thread finds no correctness errors as seen below.

01:56 $ python ~/notes/ysql_serializable_test2.py
Dropped (if exists): items table
====================
Created items table.
Populating rows
====================
switch_colors_slave: Thread-2; Exception: Error during commit: Operation expired: Transaction expired

checker(): Successful Resets: 600; Correctness Errors: 0; Iters=600
SWITCH-Thread-1: Successful Txns: 291; Txn Conflicts: 309; UPDATE Failures: 0; Iters: 600
SWITCH-Thread-2: Successful Txns: 309; Txn Conflicts: 290; UPDATE Failures: 1; Iters: 600

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment