Skip to content

Instantly share code, notes, and snippets.

@zzzeek
Created December 5, 2014 22:23
Show Gist options
  • Save zzzeek/d196fa91c40cb515365e to your computer and use it in GitHub Desktop.
Save zzzeek/d196fa91c40cb515365e to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import gevent.monkey
gevent.monkey.patch_all()
import collections
import threading
import time
import random
import sys
import logging
logging.basicConfig()
log = logging.getLogger('foo')
log.setLevel(logging.DEBUG)
#import pymysql as dbapi
from mysql import connector as dbapi
class SimplePool(object):
def __init__(self):
self.checkedin = collections.deque([
self._connect() for i in range(50)
])
self.checkout_lock = threading.Lock()
self.checkin_lock = threading.Lock()
def _connect(self):
return dbapi.connect(
user="scott", passwd="tiger",
host="localhost", db="test")
def get(self):
with self.checkout_lock:
while not self.checkedin:
time.sleep(.1)
return self.checkedin.pop()
def return_conn(self, conn):
try:
conn.rollback()
except:
log.error("Exception during rollback", exc_info=True)
try:
conn.close()
except:
log.error("Exception during close", exc_info=True)
# recycle to a new connection
conn = self._connect()
with self.checkin_lock:
self.checkedin.append(conn)
def verify_connection_id(conn):
cursor = conn.cursor()
try:
cursor.execute("select connection_id()")
row = cursor.fetchone()
return row[0]
except:
return None
finally:
cursor.close()
def execute_sql(conn, sql, params=()):
cursor = conn.cursor()
cursor.execute(sql, params)
lastrowid = cursor.lastrowid
cursor.close()
return lastrowid
pool = SimplePool()
# SELECT * FROM table_b WHERE a_id not in
# (SELECT id FROM table_a) ORDER BY a_id DESC;
PREPARE_SQL = [
"DROP TABLE IF EXISTS table_b",
"DROP TABLE IF EXISTS table_a",
"""CREATE TABLE table_a (
id INT NOT NULL AUTO_INCREMENT,
data VARCHAR (256) NOT NULL,
PRIMARY KEY (id)
) engine='InnoDB'""",
"""CREATE TABLE table_b (
id INT NOT NULL AUTO_INCREMENT,
a_id INT NOT NULL,
data VARCHAR (256) NOT NULL,
-- uncomment this to illustrate where the driver is attempting
-- to INSERT the row during ROLLBACK
-- FOREIGN KEY (a_id) REFERENCES table_a(id),
PRIMARY KEY (id)
) engine='InnoDB'
"""]
connection = pool.get()
for sql in PREPARE_SQL:
execute_sql(connection, sql)
connection.commit()
pool.return_conn(connection)
print("Table prepared...")
def transaction_kill_worker():
while True:
try:
connection = None
with gevent.Timeout(0.1):
connection = pool.get()
rowid = execute_sql(
connection,
"INSERT INTO table_a (data) VALUES (%s)", ("a",))
gevent.sleep(random.random() * 0.2)
try:
execute_sql(
connection,
"INSERT INTO table_b (a_id, data) VALUES (%s, %s)",
(rowid, "b",))
# this version prevents the commit from
# proceeding on a bad connection
# if verify_connection_id(connection):
# connection.commit()
# this version does not. It will commit the
# row for table_b without the table_a being present.
connection.commit()
pool.return_conn(connection)
except Exception:
connection.rollback()
pool.return_conn(connection)
sys.stdout.write("$")
except gevent.Timeout:
# try to return the connection anyway
if connection is not None:
pool.return_conn(connection)
sys.stdout.write("#")
except Exception:
# logger.exception(e)
sys.stdout.write("@")
else:
sys.stdout.write(".")
finally:
if connection is not None:
pool.return_conn(connection)
def main():
for i in range(50):
gevent.spawn(transaction_kill_worker)
gevent.sleep(3)
while True:
gevent.sleep(5)
if __name__ == "__main__":
main()
@zzzeek
Copy link
Author

zzzeek commented Dec 6, 2014

this is with both pymysql and mysql-connector. Some discussion about pymysql is at PyMySQL/PyMySQL#275. I'm also appealing to openstack-dev for clues at http://lists.openstack.org/pipermail/openstack-dev/2014-December/052344.html.

@dropwhile
Copy link

Unrelated, but fyi - you can clean up your main block by using a gevent.pool.Group to wait on.

import gevent.pool

def main():
    group = gevent.pool.Group()
    for i in range(50):
        group.spawn(transaction_kill_worker)
    group.join()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment