Skip to content

Instantly share code, notes, and snippets.

@jatinkrmalik
Created August 13, 2018 09:07
Show Gist options
  • Save jatinkrmalik/e7041fc123fd16c310bef73cb185222c to your computer and use it in GitHub Desktop.
Save jatinkrmalik/e7041fc123fd16c310bef73cb185222c to your computer and use it in GitHub Desktop.
Redshift and Mysql from a single DB class in python
import psycopg2
import pymysql
class Database(object):
def __init__(self, host_name, user_name, database_name, password, port):
self.host_name = host_name
self.user_name = user_name
self.database_name = database_name
self.password = password
self.port = port
def connection_manager(self):
raise NotImplementedError
def execute(self, query):
raise NotImplementedError
class RedShift(Database):
def __init__(self, host_name, user_name, database_name, password, port=5439):
super().__init__(host_name, user_name, database_name, password, port)
def connection_manager(self, f):
"""
Wrap function to setup and tear down a Postgres connection while
providing a cursor object to make queries with.
"""
def wrapper(*args, **kwargs):
try:
self.connection = psycopg2.connect(dbname=self.database_name,
host=self.host_name,
port=self.port,
user=self.user_name,
password=self.password)
self.cursor = self.connection.cursor()
result = f(*args, **kwargs)
finally:
self.cursor.close()
self.connection.close()
return result
return wrapper
@connection_manager
def execute(self, query):
self.cursor.execute(query)
return self.cursor.fetchall()
class MySQL(Database):
def __init__(self, host_name, user_name, database_name, password, port=3306):
super().__init__(host_name, user_name, database_name, password, port)
def connection_manager(self, f):
"""
Wrap function to setup and tear down a Postgres connection while
providing a cursor object to make queries with.
"""
def wrapper(*args, **kwargs):
try:
self.connection = pymysql.connect(db=self.database_name,
host=self.host_name,
port=self.port,
user=self.user_name,
password=self.password,
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
self.cursor = self.connection.cursor()
result = f(*args, **kwargs)
self.connection.commit()
finally:
self.connection.close()
return result
return wrapper
@connection_manager
def execute(self, query):
self.cursor.execute(query)
return self.cursor.fetchall()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment