Skip to content

Instantly share code, notes, and snippets.

@Odame
Last active October 23, 2018 05:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Odame/8c1e390c0acffca452a231a9ccf29472 to your computer and use it in GitHub Desktop.
Save Odame/8c1e390c0acffca452a231a9ccf29472 to your computer and use it in GitHub Desktop.
Service for accessing a MySQL database in a Flask application, where all db calls in a single request must use a single db connection
# coding: utf-8
"""
Service for accessing a MySQL database in a flask application, where all db calls in a single request must use a single db connection.
Uses the Borg pattern, with state stored in a thread local (g), in NOT order to leak state across multiple threads
"""
import logging as log
import MySQLdb as mDb
from functools import wraps
from flask import g
import os
HOST = os.environ.get('db_host')
USER = os.environ.get('db_user')
PASSWORD = os.environ.get('db_user_password')
DATABASE = os.environ.get('db_name')
class DatabaseService(object):
"""
Wraps functions for accessing the database itself.
All access to the database should be through an instance of this class
This class uses the Borg or Monostate pattern.
The state of all instances of this class is stored in a thread local (in this case, Flask's g object) as '__database_service_state'.
This encapsulates the use of a singleton patter, since objects of the class can be instantiate the usual way
"""
def __init__(self):
if not getattr(g, '__database_service_state', None): # check if we have a cached state already
# this is where we initialize an entirely new connection for this instance
try:
self._conn = mDb.connect(HOST, USER, PASSWORD, DATABASE)
except MySQLError as error:
log.error('Error in class %s :--- %s', self.__class__.__name__, error)
raise error
self._cursor = self._conn.cursor(mDb.cursors.DictCursor)
# by default, this service will be in autocommit
# All database changes will be persisted immediately!!
self._conn.autocommit(True)
self.is_in_transaction = False
# cache the state so that other instances can use it
setattr(g, '__database_service_state', self.__dict__)
else:
# oh yeah, there is a 'cached' state connection, ...
self.__dict__ = getattr(g, '__database_service_state') # so we use that one as this instances' state
def run_prepared_query(self, sql, bound_values, mode='SELECT', dim='single'):
"""Light interface for CRUD operations
Arguments:
sql {str} -- The SQL query to be executed
bound_values { tuple | list | dict} -- The parameters to be passed to the underlying connection client
mode {str} -- 'SELECT' | 'UPDATE' | 'INSERT' | 'DELETE'. To specify the type of database operation to be performed
dim {str} -- 'single' | 'multi' to determine if :bound_values: is a single set of parameters or a plural set
Returns:
If :mode: is 'SELECT' returns SQL result as a tuple of dict,
else None
"""
# multidimensional bound_values (possible for for C, U and D operations),
# in which case query is executed once,
# but with multiple sets of values
if dim == 'multi':
self._cursor.executemany(sql, bound_values) # todo
else: # single dimensional bound values
self._cursor.execute(sql, bound_values)
# for R operations, return the entire result set
if mode == 'SELECT':
return self._cursor.fetchall()
# but for C, U, D we return nothing
return None
def start_transact(self):
""" Put the db service in a transaction state """
self._conn.begin() # disable autocommit on the database connection
self.is_in_transaction = True
def rollback(self):
""" Roll back the pending operations of an ongoing transaction.
The service immediately leaves the transaction context
"""
self._conn.rollback() # this call will also turn on autocommit
self.is_in_transaction = False # we are no more in a transaction
def commit(self):
""" Commit the pending operations of an ongoing transaction.
The service immediately leaves the transaction context
"""
self._conn.commit() # this will also turn on autocommit
self.is_in_transaction = False # we are no more in a transaction
def get_last_id(self):
""" Get the id of the row that was INSERTED/UPDATED in the last operation """
# could have used connection.insert_id() too, but ....., dont want to continue the argument
return self._cursor.lastrowid
def get_last_inserted_ids(self):
"""
Get a list of the ids of the multiple data that was inserted in the last multiple insert operation.
This list will coincide with the order of the list of items that were inserted into a datatable
during the last multiple inserts operation
"""
# https://dev.mysql.com/doc/refman/5.6/en/information-functions.html#function_last-insert-id
first_inserted_item_id = self.get_last_id()
ids = [first_inserted_item_id]
for i in range(1, self.get_rowcount()):
ids.append(i + first_inserted_item_id)
return ids
def get_rowcount(self):
""" Get the number of rows that were selected/affected by the last operation """
return self._cursor.rowcount
@staticmethod
def transactional(func):
""" Decorator to run a function in the context of an SQL database transaction.
If code is already running in a transaction context,
then this will join that transaction.
Usage example:
@DatabaseService.transactional
def foo(a, b):
# do a bunch of database stuff
pass
# call the function normally
foo(3, b=4) # --> This function will be run in a transaction
"""
@wraps(func)
def wrapper(*args, **kwargs):
return DatabaseService.run_in_transaction(func, *args, **kwargs)
return wrapper
@staticmethod
def run_in_transaction(func, *args, **kwargs):
# type: (func) -> None
""" Run a function in a transaction.
If code is already running in a transaction context, this will join that transaction.
Usage Example:
def foo(a, b=5):
# do a lot of database stuff...
pass
# pass the function to be run as a parameter
DatabaseService.run_in_transaction(foo, 3, b=4)
Arguments:
func {func} -- The function to be run
*args and **kwargs will be passed to the function func
Returns:
Whatever func returns
"""
# NB: All instances of DatabaseService share the same state under the hood.
# A change in the transactions state of one affects all other instances
db_service = DatabaseService()
if db_service.is_in_transaction:
# we are already in a transaction
return func(*args, **kwargs) # this will be run in the 'context' of the already existing transaction
# any exceptions that occur in this function call, will cause that transaction to be rolled back
# we are not in a transaction !
error_occurred = True # we assume that an error may occur. :)
db_service.start_transact() # set the DatabaseService to be in transaction mode (autocommit is off)
try:
return_value = func(*args, **kwargs)
error_occurred = False # func is done running so we are sure there was no error
finally:
# IN transaction mode, autocommit is turned off and the caller has to explicitly call
# commit() or rollback() manually before any changes are persisted or reverted
# on the database server
if error_occurred:
db_service.rollback()
else:
db_service.commit()
return return_value
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment