Skip to content

Instantly share code, notes, and snippets.

@kaugm
Last active April 11, 2023 14:32
Show Gist options
  • Save kaugm/631df214718229a61f9240061af9be9a to your computer and use it in GitHub Desktop.
Save kaugm/631df214718229a61f9240061af9be9a to your computer and use it in GitHub Desktop.
MySQL Connection Class Object
class MySQL_Connection:
"""Connection to MySQL database
Arguments:
HOST: <class 'str'> Full hostname to connect to
DATABASE: <class 'str'> Database within host
"""
# Get Database Credentials From Environment Variables
DB_USERNAME = os.environ.get('SPOT_DB_USERNAME')
DB_PASSWORD = os.environ.get('SPOT_DB_PASSWORD')
# If either DB_USERNAME or DB_PASSWORD are not set, exit script
if not (DB_USERNAME and DB_PASSWORD):
print(f"Please set environment variables for database username and password. If your shell is Zsh, add the lines below to the file {str(Path.home())}/.zshrc\nexport SPOT_DB_USERNAME='<database_username>'\nexport SPOT_DB_PASSWORD='<database_password>'\n")
os._exit(1)
def __init__(self, HOST:str, DATABASE:str):
self.HOST = HOST
self.DATABASE = DATABASE
# Create Connection and Get Connection and Cursor Objects
self.connection, self.cursor = self._new_connection()
def _new_connection(self):
"""Connects to database and returns connection and cursor object"""
try:
con = mysql.connect(host=self.HOST, database=self.DATABASE, user=self.DB_USERNAME, password=self.DB_PASSWORD)
return (con, con.cursor())
except mysql.errors.InterfaceError:
print(f"Error: Cannot connect to database: Malformed host. Are you connected to the VPN?. Exiting..'")
os._exit(1)
except mysql.errors.ProgrammingError:
# WARNING: Delete this code block for certain users, they might get an error and removing this except block fixes it
# Also can be a malformed database host or table name, but users of the script won't have access to change this.
print(f"Error: Cannot connect to database: Malformed username, and/or password. Exiting..'")
os._exit(1)
except Error as e:
print(f'Error: {e} \n\nExiting..')
os._exit(1)
def query(self, query:str, chunk=False):
"""Executes SQL query and fetches results
Arguments:
query: <class 'str'> SQL query to execute
chunk: <class 'bool'> If SQL query should be chunked (default is False, if more than 250 rows are expected to be returned)
Returns:
response: <class 'list'> SQL response
"""
try:
if not chunk:
self.cursor.execute(query)
response = self.cursor.fetchall()
# DEBUGGING - If no response is getting returned
# if len(response) == 0:
# print(f"Error: No results returned from SQL query:\n{query}\n Exiting...")
# os._exit(1)
return response
# If SQL Query is to be chunked
else:
START = 0
LIMIT = 250
# Modify SQL Query to count number of rows in response
count_results_query = f"SELECT COUNT(*) FROM{query.split('FROM')[1]}"
self.cursor.execute(count_results_query)
count = self.cursor.fetchall()
# Create Response object through repetitive SQL queries
response = []
while START < count[0][0]:
# Notify User
print(f"{Colors.BOLD}Chunking SQL Query: {START} -> {START + LIMIT}, please wait...\n{Colors.ENDC}")
chunked_query = f"{query} LIMIT {START},{LIMIT}"
self.cursor.execute(chunked_query)
response += self.cursor.fetchall()
START += LIMIT
return response
except mysql.errors.ProgrammingError as e:
print(f"Error in SQL query: {query}\n\n{e}\nExiting...")
os._exit(1)
def close(self):
"""Close connection to Database"""
self.connection.close()
@kaugm
Copy link
Author

kaugm commented Jan 6, 2023

CORE = Connection('mysql.example.com', 'core_db')
response = CORE.query('SELECT * FROM table', chunk=True)
CORE.close()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment