Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save 4sushi/ba422a057e618a5e1800c44ac6da00bb to your computer and use it in GitHub Desktop.
Save 4sushi/ba422a057e618a5e1800c44ac6da00bb to your computer and use it in GitHub Desktop.
"""
Exemple of script to copy tables from Cloud SQL database to Bigquery (using SQL proxy) with airflow
Note: part of the code inside function get_proxy_connection_engine is from airflow.contrib.operators
Author: 4sushi
Date: 2021-06-30
"""
import pandas as pd
from sqlalchemy import inspect
from airflow.hooks.base_hook import BaseHook
from contextlib import contextmanager
from airflow.contrib.operators.gcp_sql_operator import CloudSqlHook, CloudSqlDatabaseHook
import os
@contextmanager
def get_proxy_connection_engine(db_user, db_password, db_name, gcp_project_id, gcp_location, gcp_instance,
db_type='mysql', db_host='127.0.0.1', db_port=3306):
"""Download cloud sql proxy and run it, return SQLAlchemy engine
Args:
db_user(str): database user
db_password(str): database password
db_name(str): database name (schema)
gcp_project_id(str): Google Cloud Platform project id
gcp_project_id(str): Google Cloud Platform project location
gcp_instance(str): Google Cloud Platform instance name
db_type(str): database type (mysql, postgres)
db_host(str): database host name / IP
db_port(int): database port
Yields:
sqlalchemy.engine.base.Engine: SQL alchemy engine
Examples:
with get_proxy_connection_engine(db_user='user', db_password='', db_name='test',
gcp_project_id='project-test', gcp_location='europe-west1',
gcp_instance='instance-test', db_type='mysql', db_host='127.0.0.1',
db_port=3306) as engine:
connection = engine.connect()
request = 'SELECT * FROM users')
# Load table data into dataframe
df = pd.read_sql(request, con=connection)
connection.close()
"""
def set_conf(db_user, db_password, db_name, gcp_project_id, gcp_location, gcp_instance,
db_type, db_host, db_port):
os.environ['AIRFLOW_CONN_GOOGLE_CLOUD_SQL_DEFAULT'] = (
"gcpcloudsql://{user}:{password}@{ip}:{port}/{database}?"
"database_type={type}&"
"project_id={project_id}&"
"location={location}&"
"instance={instance}&"
"use_proxy=True&"
"sql_proxy_use_tcp=True".format(
user=db_user, password=db_password, ip=db_host, port=db_port, type=db_type,
database=db_name, project_id=gcp_project_id, location=gcp_location, instance=gcp_instance
)
)
set_conf(db_user, db_password, db_name, gcp_project_id, gcp_location, gcp_instance,
db_type, db_host, db_port)
cloudsql_db_hook = CloudSqlDatabaseHook()
cloudsql_db_hook.validate_ssl_certs()
cloudsql_db_hook.create_connection()
cloud_sql_proxy_runner = None
try:
cloudsql_db_hook.validate_socket_path_length()
database_hook = cloudsql_db_hook.get_database_hook()
try:
try:
if cloudsql_db_hook.use_proxy:
cloud_sql_proxy_runner = cloudsql_db_hook. \
get_sqlproxy_runner()
cloudsql_db_hook.free_reserved_port()
# There is very, very slim chance that the socket will
# be taken over here by another bind(0).
# It's quite unlikely to happen though!
cloud_sql_proxy_runner.start_proxy()
yield database_hook.get_sqlalchemy_engine()
finally:
if cloud_sql_proxy_runner:
cloud_sql_proxy_runner.stop_proxy()
cloud_sql_proxy_runner = None
finally:
cloudsql_db_hook.cleanup_database_hook()
finally:
cloudsql_db_hook.delete_connection()
cloudsql_db_hook = None
def export_mysql_tables_to_bq():
"""Export tables to Big query"""
c = BaseHook.get_connection("ext_mysql_db")
with get_proxy_connection_engine(db_user=c.login, db_password=c.password, db_name=c.schema,
gcp_project_id='project', gcp_location='europe-west1',
gcp_instance='instance', db_type='mysql', db_host=c.host,
db_port=c.port) as engine:
connection = engine.connect()
inspector = inspect(engine)
errors = []
# Tables on the database
tables_db = inspector.get_table_names(schema=c.schema)
for table_name in tables_db:
export_mysql_table_to_bq(connection, table_name, 'test', errors)
connection.close()
# Control if there is table not exported
if len(tables_to_copy_not_available) > 0:
e = Exception('Some tables are not in the database: {}'.format(tables_to_copy_not_available))
errors.append(e)
if len(errors) > 0:
raise Exception(errors)
def export_mysql_table_to_bq(connection, table_name, bq_dataset, errors):
"""Export Mysql table to Big Query. If the table exists in BigQuery, drop it and create new one.
Args:
connecion(sqlalchemy.engine.base.Connection): SQL alchemy connection to Mysql database
table_name(str): mysql and BigQuery table name (we keep the same name)
bq_dataset(str): Big Query dataset name
errors(list[Exception]): list of exceptions, used to continue processing to export all tables
"""
try:
request = 'SELECT * FROM {}'.format(table_name)
# Load table data into dataframe
df = pd.read_sql(request, con=connection)
table_id = '{}.{}'.format(bq_dataset, table_name)
# Export dataframe to Big Query (if the table exists, drop it and create new one)
df.to_gbq(table_id, chunksize=1000, progress_bar=False, if_exists='replace')
except Exception as e:
# Capture error to continue processing
errors.append(e)
if __name__ == '__main__':
export_mysql_tables_to_bq()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment