Last active
September 18, 2021 03:46
-
-
Save ronniejoshua/16350a8048ebe59aeba22ea684f80212 to your computer and use it in GitHub Desktop.
Loading data from MySQL Server (DB) to Google BigQuery Using Pandas
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
# Copyright 2021 Ronnie Joshua | |
# | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# https://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# Built in Libraries | |
import itertools | |
import os | |
import contextlib | |
import re | |
import functools as ft | |
from multiprocessing.pool import Pool | |
import time | |
import sys | |
from itertools import repeat | |
# External Libraries | |
from dotenv import load_dotenv | |
import mysql.connector | |
from mysql.connector import errorcode | |
import pandas as pd | |
import pandas_gbq | |
from google.oauth2 import service_account | |
# Loading Environment variables declared in .env file | |
load_dotenv() | |
# GCP Big Query Credentials | |
gcp_key_path = os.getenv("gcp_credentials") | |
gcp_bq_config = { | |
"gcp_credentials": service_account.Credentials.from_service_account_file( | |
gcp_key_path, | |
), | |
"bq_project_id": "sm-data-infra", | |
"bq_dataset_id": "my_sql_db_crm", | |
} | |
# MYSQL Connection configuration | |
mysql_config = { | |
"user": os.getenv("mysql_user"), | |
"password": os.getenv("mysql_password"), | |
"host": os.getenv("mysql_host"), | |
"database": os.getenv("mysql_dbname"), | |
"raise_on_warnings": True, | |
} | |
# Handle MySQL database connection with a context manager | |
@contextlib.contextmanager | |
def get_mysql_conn(mysql_config): | |
""" | |
Context manager to automatically close DB connection. | |
We retrieve credentials from Environment variables | |
The logic is similar to the one with try-finally block, except that | |
we yield the connection object, rather than return it — this is due | |
to the nature of a generator that lazily returns the objects when | |
they are needed (i.e., when we iterate over them). In our example, | |
the context manager will yield a single value | |
— the connection object. | |
""" | |
try: | |
cnx = mysql.connector.connect(**mysql_config) | |
yield cnx | |
except mysql.connector.Error as err: | |
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: | |
print("Something is wrong with your user name or password") | |
elif err.errno == errorcode.ER_BAD_DB_ERROR: | |
print("Database does not exist") | |
else: | |
print(f"ERROR MESSAGE: {err}") | |
finally: | |
cnx.close() | |
def run_mysql_queries(query, mysql_config, gcp_bq_config): | |
with get_mysql_conn(mysql_config) as cxn: | |
df = pd.read_sql(query, cxn) | |
print(df.shape) | |
bq_table_ref = re.findall(r"my_sql_db.(\w+)", query)[0] | |
pandas_gbq.to_gbq( | |
df, | |
f"{gcp_bq_config.get('bq_dataset_id')}.{bq_table_ref}", | |
project_id=gcp_bq_config.get("bq_project_id"), | |
if_exists="replace", | |
credentials=gcp_bq_config.get("gcp_credentials"), | |
) | |
return df | |
def timer(func): | |
"""Print the runtime of the decorated function""" | |
@ft.wraps(func) | |
def wrapper_timer(*args, **kwargs): | |
start_time = time.perf_counter() | |
value = func(*args, **kwargs) | |
end_time = time.perf_counter() | |
run_time = end_time - start_time | |
print(f"Finished {func.__name__!r} in {run_time:.4f} secs") | |
return value | |
return wrapper_timer | |
queries = [ | |
""" SELECT * FROM my_sql_db.Affiliates """, | |
""" SELECT * FROM my_sql_db.Brands """, | |
""" SELECT * FROM my_sql_db.Categories """, | |
""" SELECT * FROM my_sql_db.Countries """, | |
""" SELECT * FROM my_sql_db.Deposits """, | |
""" SELECT * FROM my_sql_db.Desks """, | |
""" SELECT * FROM my_sql_db.Leads """, | |
""" SELECT * FROM my_sql_db.Payment_providers """, | |
""" SELECT * FROM my_sql_db.Products """, | |
""" SELECT * FROM my_sql_db.Teams """, | |
""" SELECT * FROM my_sql_db.Users """, | |
""" SELECT * FROM my_sql_db.Withdrawals """, | |
] | |
def run_multi_processing(queries, mysql_config, gcp_bq_config): | |
with Pool(processes=os.cpu_count()) as pool: | |
pool.starmap( | |
run_mysql_queries, | |
tuple(zip(queries, repeat(mysql_config), repeat(gcp_bq_config))), | |
) | |
if __name__ == "__main__": | |
run_multi_processing(queries, mysql_config, gcp_bq_config) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment