Skip to content

Instantly share code, notes, and snippets.

@liquidgenius
Last active November 22, 2019 16:26
Show Gist options
  • Save liquidgenius/7a9be27dead40c42fe565fbaa330c7d5 to your computer and use it in GitHub Desktop.
Save liquidgenius/7a9be27dead40c42fe565fbaa330c7d5 to your computer and use it in GitHub Desktop.
A minimal Python class for uniform API to both Snowflake and MySQL databases.
import pymysql
import pandas as pd
from snowflake.sqlalchemy import URL as SFURL
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
class OmniDB:
""" A minimal Python class for uniform API to both Snowflake and MySQL databases.
TODO: Error handling
TODO: Logging"""
def __init__(self, mysql_settings, sf_settings, persist=True):
""" Both mysql_settings and sf_settings are passed into the class as dictionaries.
:param mysql_settings: dict: A dictionary of mysql settings.
:param sf_settings: dict: A dictionary of snowflake settings.
:param persist: bool: A flag that ensures queries use a persistent connection; default to True.
"""
self.mysql_settings = mysql_settings
self.sf_settings = sf_settings
self.mysql_connection = None
self.sf_connection = None
if persist:
self._omnidb_open_connections()
def _omnidb_open_connections(self):
""" Opens connections to both Snowflake and MySQL.
:return: None: Registers connections with the instance.
"""
# open connections
self.mysql_connection = self._omnidb_mysql_connection(**self.mysql_settings)
self.sf_connection = self._omnidb_sf_connection(**self.sf_settings)
return None
@staticmethod
def _omnidb_mysql_connection(host=None, port=None, user=None, password=None, database=None):
""" Provision a connection to the MySQL database. Explode self.mysql_settings to populate parameters.
:param host: str: The ip of the host server.
:param port: int: The port of the host server.
:param user: str: The username credential.
:param password: str: The password credential.
:param database: str: The name of the MySQL database.
:return: pymysql.Connection: A valid connection to the MySQL database
"""
result = create_engine(URL(
drivername='mysql+pymysql',
host=host,
port=port,
username=user,
password=password,
database=database
))
return result
@staticmethod
def _omnidb_sf_connection(account=None, user=None, password=None, database=None, schema=None,
warehouse=None, role=None):
""" Provision a connection to the MySQL database. Explode self.mysql_settings to populate parameters.
:param account: str: The Snowflake account id.
:param user: str: The username credential.
:param password: str: The password credential.
:param database: str: The database name.
:param schema: str: The database schema.
:param warehouse: str: The warehouse name.
:param role: str: The role name.
:return: Connection: A valid connection to the Snowflake database
"""
result = create_engine(SFURL(
account=account,
user=user,
password=password,
database=database,
schema=schema,
warehouse=warehouse,
role=role,
))
return result
def omnidb_execute_query(self, database, query, connection_type='persistent'):
""" A uniform API for execution of queries against MySQL or Snowflake databases.
:param database: str: The type of database connection. Must be 'mysql' or 'sf'.
:param query: str: The SQL query to execute.
:param connection_type: Utilize a 'single' or 'persistent' database query, defaults to 'persistent' to maintain
the session.
:return: pd.DataFrame: The query result formatted as a Pandas DataFrame.
"""
# assertions
assert database == 'mysql' or database == 'sf', "Please provide 'sf' or 'mysql' as the value for database."
assert connection_type == 'persistent' or connection_type == 'single', 'Please provide a valid ' \
'connection_type; persistent or single.'
# create a mysql connection
if database == 'mysql':
if connection_type == 'single':
connection = self._omnidb_mysql_connection(**self.mysql_settings)
if connection_type == 'persistent':
connection = self.mysql_connection
# create a snowflake connection
if database == 'sf':
if connection_type == 'single':
connection = self._omnidb_sf_connection(**self.sf_settings)
if connection_type == 'persistent':
connection = self.sf_connection
# Execute the query
result = pd.read_sql_query(query, connection)
# Close the single connection
if connection_type == 'single':
connection.close()
connection.dispose()
# Print the results
self.omnidb_print_result(query, result)
return result
@staticmethod
def omnidb_print_result(query, result):
""" Prints the query and results to the console.
:param query: str: The query that was executed.
:param result: Pandas.DataFrame: The result of the query.
:return: None
"""
print(f'QUERY: {query}')
if isinstance(result, pd.DataFrame):
if 'status' in result.columns:
print(f"RESULT: {len(result)} records found\n{result.loc[0, 'status']}\n")
else:
print(f"RESULT:\n {result.to_string(justify='left', index=False)}\n")
else:
print(f'RESULT: 0 records found\n{result}\n')
return None
@liquidgenius
Copy link
Author

liquidgenius commented Nov 22, 2019

Snowflake Usage
result = self.omnidb_execute_query('sf', query)

MySQL Usage
result = self.omnidb_execute_query('mysql', query)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment