Skip to content

Instantly share code, notes, and snippets.

@ronniejoshua
Last active September 18, 2021 03:46
Show Gist options
  • Save ronniejoshua/16350a8048ebe59aeba22ea684f80212 to your computer and use it in GitHub Desktop.
Save ronniejoshua/16350a8048ebe59aeba22ea684f80212 to your computer and use it in GitHub Desktop.
Loading data from MySQL Server (DB) to Google BigQuery Using Pandas
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2021 Ronnie Joshua
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Built in Libraries
import itertools
import os
import contextlib
import re
import functools as ft
from multiprocessing.pool import Pool
import time
import sys
from itertools import repeat
# External Libraries
from dotenv import load_dotenv
import mysql.connector
from mysql.connector import errorcode
import pandas as pd
import pandas_gbq
from google.oauth2 import service_account
# Loading Environment variables declared in .env file
load_dotenv()
# GCP Big Query Credentials
gcp_key_path = os.getenv("gcp_credentials")
gcp_bq_config = {
"gcp_credentials": service_account.Credentials.from_service_account_file(
gcp_key_path,
),
"bq_project_id": "sm-data-infra",
"bq_dataset_id": "my_sql_db_crm",
}
# MYSQL Connection configuration
mysql_config = {
"user": os.getenv("mysql_user"),
"password": os.getenv("mysql_password"),
"host": os.getenv("mysql_host"),
"database": os.getenv("mysql_dbname"),
"raise_on_warnings": True,
}
# Handle MySQL database connection with a context manager
@contextlib.contextmanager
def get_mysql_conn(mysql_config):
"""
Context manager to automatically close DB connection.
We retrieve credentials from Environment variables
The logic is similar to the one with try-finally block, except that
we yield the connection object, rather than return it — this is due
to the nature of a generator that lazily returns the objects when
they are needed (i.e., when we iterate over them). In our example,
the context manager will yield a single value
— the connection object.
"""
try:
cnx = mysql.connector.connect(**mysql_config)
yield cnx
except mysql.connector.Error as err:
if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
print("Something is wrong with your user name or password")
elif err.errno == errorcode.ER_BAD_DB_ERROR:
print("Database does not exist")
else:
print(f"ERROR MESSAGE: {err}")
finally:
cnx.close()
def run_mysql_queries(query, mysql_config, gcp_bq_config):
with get_mysql_conn(mysql_config) as cxn:
df = pd.read_sql(query, cxn)
print(df.shape)
bq_table_ref = re.findall(r"my_sql_db.(\w+)", query)[0]
pandas_gbq.to_gbq(
df,
f"{gcp_bq_config.get('bq_dataset_id')}.{bq_table_ref}",
project_id=gcp_bq_config.get("bq_project_id"),
if_exists="replace",
credentials=gcp_bq_config.get("gcp_credentials"),
)
return df
def timer(func):
"""Print the runtime of the decorated function"""
@ft.wraps(func)
def wrapper_timer(*args, **kwargs):
start_time = time.perf_counter()
value = func(*args, **kwargs)
end_time = time.perf_counter()
run_time = end_time - start_time
print(f"Finished {func.__name__!r} in {run_time:.4f} secs")
return value
return wrapper_timer
queries = [
""" SELECT * FROM my_sql_db.Affiliates """,
""" SELECT * FROM my_sql_db.Brands """,
""" SELECT * FROM my_sql_db.Categories """,
""" SELECT * FROM my_sql_db.Countries """,
""" SELECT * FROM my_sql_db.Deposits """,
""" SELECT * FROM my_sql_db.Desks """,
""" SELECT * FROM my_sql_db.Leads """,
""" SELECT * FROM my_sql_db.Payment_providers """,
""" SELECT * FROM my_sql_db.Products """,
""" SELECT * FROM my_sql_db.Teams """,
""" SELECT * FROM my_sql_db.Users """,
""" SELECT * FROM my_sql_db.Withdrawals """,
]
def run_multi_processing(queries, mysql_config, gcp_bq_config):
with Pool(processes=os.cpu_count()) as pool:
pool.starmap(
run_mysql_queries,
tuple(zip(queries, repeat(mysql_config), repeat(gcp_bq_config))),
)
if __name__ == "__main__":
run_multi_processing(queries, mysql_config, gcp_bq_config)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment