Skip to content

Instantly share code, notes, and snippets.

@desertkun
Created November 22, 2017 14:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save desertkun/2a91c297e311c50006e1ffea530b1a3f to your computer and use it in GitHub Desktop.
Save desertkun/2a91c297e311c50006e1ffea530b1a3f to your computer and use it in GitHub Desktop.
from tornado.gen import coroutine, Return, Task
import tormysql
import tormysql.cursor
# noinspection PyUnresolvedReferences
from pymysql import DatabaseError as DatabaseError
from pymysql import OperationalError as ConnectionError
from pymysql import IntegrityError as DuplicateError
from pymysql import IntegrityError as ConstraintsError
class DatabaseConnection(object):
def __init__(self, conn):
self.conn = conn
@coroutine
def autocommit(self, value):
yield self.conn.autocommit(value)
def close(self):
self.conn.close()
@coroutine
def commit(self):
"""
Commits a non-autocommit cursor.
Usage:
with (yield db.acquire(auto_commit=False)) as db:
row = yield db.get("SELECT ... FOR UPDATE");
...
yield db.execute("UPDATE ...");
yield db.commit()
"""
yield self.conn.commit()
def rollback(self):
return self.conn.rollback()
@coroutine
def execute(self, query, *args, **kwargs):
"""
Executes a mysql query.
Used for 'UPDATE' and 'DELETE'
"""
with self.conn.cursor() as cursor:
result = yield cursor.execute(query, args)
raise Return(result)
@coroutine
def get(self, query, *args, **kwargs):
"""
Returns one row from a mysql query (a dict).
Used for 'SELECT'.
"""
with self.conn.cursor() as cursor:
yield cursor.execute(query, args)
raise Return(cursor.fetchone())
@coroutine
def insert(self, query, *args, **kwargs):
"""
Inserts a new row into a mysql.
Returns LAST_INSERT_ID, so used only for 'INSERT' queries.
"""
with self.conn.cursor() as cursor:
yield cursor.execute(query, args)
raise Return(cursor.lastrowid)
@coroutine
def query(self, query, *args, **kwargs):
"""
Returns all rows from a mysql query (a list of dicts, each dict represents a row).
Used for 'SELECT'.
"""
with self.conn.cursor() as cursor:
yield cursor.execute(query, args)
raise Return(cursor.fetchall())
def __enter__(self):
return self
def __exit__(self, *exc_info):
del exc_info
self.conn.close()
class Database(object):
"""
Asynchronous MySQL database with connection pool.
"""
def __init__(self, host=None, database=None, user=None, password=None, *args, **kwargs):
self.pool = tormysql.ConnectionPool(
max_connections=256,
wait_connection_timeout=5,
idle_seconds=7200,
host=host,
db=database,
user=user,
passwd=password,
cursorclass=tormysql.cursor.DictCursor,
autocommit=True,
use_unicode=True,
charset="utf8",
**kwargs
)
@coroutine
def acquire(self, auto_commit=True):
"""
Acquires a new connection from pool. Acquired connection has context management, so
it can be used with 'with' statement, and few requests will happen in a single connection.
Usage:
with (yield db.acquire()) as db:
yield db.get("...")
yield db.insert("...")
"""
wrapper = DatabaseConnection((yield self.pool.Connection()))
yield wrapper.autocommit(auto_commit)
raise Return(wrapper)
@coroutine
def execute(self, query, *args, **kwargs):
"""
Executes a mysql query.
Used for 'UPDATE' and 'DELETE'.
Please use 'acquire' method if you would like to make few requests in a row.
"""
with (yield self.acquire()) as conn:
result = yield conn.execute(query, *args, **kwargs)
raise Return(result)
@coroutine
def get(self, query, *args, **kwargs):
"""
Returns one row from a mysql query (a dict).
Used for 'SELECT'.
Please use 'acquire' method if you would like to make few requests in a row.
"""
with (yield self.acquire()) as conn:
result = yield conn.get(query, *args, **kwargs)
raise Return(result)
@coroutine
def insert(self, query, *args, **kwargs):
"""
Inserts a new row into a mysql.
Returns LAST_INSERT_ID, so used only for 'INSERT' queries.
Please use 'acquire' method if you would like to make few requests in a row.
"""
with (yield self.acquire()) as conn:
result = yield conn.insert(query, *args, **kwargs)
raise Return(result)
@coroutine
def query(self, query, *args, **kwargs):
"""
Returns all rows from a mysql query (a list of dicts, each dict represents a row).
Used for 'SELECT'.
Please use 'acquire' method if you would like to make few requests in a row.
"""
with (yield self.acquire()) as conn:
result = yield conn.query(query, *args, **kwargs)
raise Return(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment