Skip to content

Instantly share code, notes, and snippets.

@zzzeek
Last active March 7, 2018 01:24
Show Gist options
  • Save zzzeek/947337f44d70fac537be to your computer and use it in GitHub Desktop.
Save zzzeek/947337f44d70fac537be to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import gevent.monkey
gevent.monkey.patch_all()
import collections
import threading
import time
import random
import sys
import logging
logging.basicConfig()
log = logging.getLogger('foo')
log.setLevel(logging.DEBUG)
import pymysql as dbapi
class SimplePool(object):
def __init__(self):
self.checkedin = collections.deque([
self._connect() for i in range(50)
])
self.checkout_lock = threading.Lock()
self.checkin_lock = threading.Lock()
def _connect(self):
return dbapi.connect(
user="scott", passwd="tiger",
host="localhost", db="test")
def get(self):
with self.checkout_lock:
while not self.checkedin:
time.sleep(.1)
return self.checkedin.pop()
def return_conn(self, conn):
try:
conn.rollback()
except:
log.error("Exception during rollback", exc_info=True)
try:
conn.close()
except:
log.error("Exception during close", exc_info=True)
# recycle to a new connection
conn = self._connect()
with self.checkin_lock:
self.checkedin.append(conn)
def execute_sql(conn, sql, params=()):
cursor = conn.cursor()
cursor.execute(sql, params)
lastrowid = cursor.lastrowid
cursor.close()
return lastrowid
pool = SimplePool()
# SELECT * FROM table_b WHERE a_id not in
# (SELECT id FROM table_a) ORDER BY a_id DESC;
PREPARE_SQL = """
DROP TABLE IF EXISTS table_b;
DROP TABLE IF EXISTS table_a;
CREATE TABLE table_a (
id INT NOT NULL AUTO_INCREMENT,
data VARCHAR (256) NOT NULL,
PRIMARY KEY (id)
) engine='InnoDB';
CREATE TABLE table_b (
id INT NOT NULL AUTO_INCREMENT,
a_id INT NOT NULL,
data VARCHAR (256) NOT NULL,
-- uncomment this to illustrate where the driver is attempting
-- to INSERT the row during ROLLBACK
-- FOREIGN KEY (a_id) REFERENCES table_a(id),
PRIMARY KEY (id)
) engine='InnoDB';
"""
connection = pool.get()
execute_sql(connection, PREPARE_SQL)
connection.commit()
pool.return_conn(connection)
print("Table prepared...")
def transaction_kill_worker():
while True:
try:
connection = None
with gevent.Timeout(0.1):
connection = pool.get()
rowid = execute_sql(
connection,
"INSERT INTO table_a (data) VALUES (%s)", ("a",))
gevent.sleep(random.random() * 0.2)
try:
execute_sql(
connection,
"INSERT INTO table_b (a_id, data) VALUES (%s, %s)",
(rowid, "b",))
connection.commit()
pool.return_conn(connection)
except Exception:
#log.error("error", exc_info=True)
connection.rollback()
pool.return_conn(connection)
sys.stdout.write("$")
except gevent.Timeout:
# try to return the connection anyway
if connection is not None:
pool.return_conn(connection)
sys.stdout.write("#")
except Exception:
# logger.exception(e)
sys.stdout.write("@")
else:
sys.stdout.write(".")
finally:
if connection is not None:
pool.return_conn(connection)
def main():
for i in range(50):
gevent.spawn(transaction_kill_worker)
gevent.sleep(3)
while True:
gevent.sleep(5)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment