Created
December 5, 2014 22:23
-
-
Save zzzeek/d196fa91c40cb515365e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import gevent.monkey | |
gevent.monkey.patch_all() | |
import collections | |
import threading | |
import time | |
import random | |
import sys | |
import logging | |
logging.basicConfig() | |
log = logging.getLogger('foo') | |
log.setLevel(logging.DEBUG) | |
#import pymysql as dbapi | |
from mysql import connector as dbapi | |
class SimplePool(object): | |
def __init__(self): | |
self.checkedin = collections.deque([ | |
self._connect() for i in range(50) | |
]) | |
self.checkout_lock = threading.Lock() | |
self.checkin_lock = threading.Lock() | |
def _connect(self): | |
return dbapi.connect( | |
user="scott", passwd="tiger", | |
host="localhost", db="test") | |
def get(self): | |
with self.checkout_lock: | |
while not self.checkedin: | |
time.sleep(.1) | |
return self.checkedin.pop() | |
def return_conn(self, conn): | |
try: | |
conn.rollback() | |
except: | |
log.error("Exception during rollback", exc_info=True) | |
try: | |
conn.close() | |
except: | |
log.error("Exception during close", exc_info=True) | |
# recycle to a new connection | |
conn = self._connect() | |
with self.checkin_lock: | |
self.checkedin.append(conn) | |
def verify_connection_id(conn): | |
cursor = conn.cursor() | |
try: | |
cursor.execute("select connection_id()") | |
row = cursor.fetchone() | |
return row[0] | |
except: | |
return None | |
finally: | |
cursor.close() | |
def execute_sql(conn, sql, params=()): | |
cursor = conn.cursor() | |
cursor.execute(sql, params) | |
lastrowid = cursor.lastrowid | |
cursor.close() | |
return lastrowid | |
pool = SimplePool() | |
# SELECT * FROM table_b WHERE a_id not in | |
# (SELECT id FROM table_a) ORDER BY a_id DESC; | |
PREPARE_SQL = [ | |
"DROP TABLE IF EXISTS table_b", | |
"DROP TABLE IF EXISTS table_a", | |
"""CREATE TABLE table_a ( | |
id INT NOT NULL AUTO_INCREMENT, | |
data VARCHAR (256) NOT NULL, | |
PRIMARY KEY (id) | |
) engine='InnoDB'""", | |
"""CREATE TABLE table_b ( | |
id INT NOT NULL AUTO_INCREMENT, | |
a_id INT NOT NULL, | |
data VARCHAR (256) NOT NULL, | |
-- uncomment this to illustrate where the driver is attempting | |
-- to INSERT the row during ROLLBACK | |
-- FOREIGN KEY (a_id) REFERENCES table_a(id), | |
PRIMARY KEY (id) | |
) engine='InnoDB' | |
"""] | |
connection = pool.get() | |
for sql in PREPARE_SQL: | |
execute_sql(connection, sql) | |
connection.commit() | |
pool.return_conn(connection) | |
print("Table prepared...") | |
def transaction_kill_worker(): | |
while True: | |
try: | |
connection = None | |
with gevent.Timeout(0.1): | |
connection = pool.get() | |
rowid = execute_sql( | |
connection, | |
"INSERT INTO table_a (data) VALUES (%s)", ("a",)) | |
gevent.sleep(random.random() * 0.2) | |
try: | |
execute_sql( | |
connection, | |
"INSERT INTO table_b (a_id, data) VALUES (%s, %s)", | |
(rowid, "b",)) | |
# this version prevents the commit from | |
# proceeding on a bad connection | |
# if verify_connection_id(connection): | |
# connection.commit() | |
# this version does not. It will commit the | |
# row for table_b without the table_a being present. | |
connection.commit() | |
pool.return_conn(connection) | |
except Exception: | |
connection.rollback() | |
pool.return_conn(connection) | |
sys.stdout.write("$") | |
except gevent.Timeout: | |
# try to return the connection anyway | |
if connection is not None: | |
pool.return_conn(connection) | |
sys.stdout.write("#") | |
except Exception: | |
# logger.exception(e) | |
sys.stdout.write("@") | |
else: | |
sys.stdout.write(".") | |
finally: | |
if connection is not None: | |
pool.return_conn(connection) | |
def main(): | |
for i in range(50): | |
gevent.spawn(transaction_kill_worker) | |
gevent.sleep(3) | |
while True: | |
gevent.sleep(5) | |
if __name__ == "__main__": | |
main() |
Unrelated, but fyi - you can clean up your main block by using a gevent.pool.Group to wait on.
import gevent.pool
def main():
group = gevent.pool.Group()
for i in range(50):
group.spawn(transaction_kill_worker)
group.join()
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
this is with both pymysql and mysql-connector. Some discussion about pymysql is at PyMySQL/PyMySQL#275. I'm also appealing to openstack-dev for clues at http://lists.openstack.org/pipermail/openstack-dev/2014-December/052344.html.