Last active
June 22, 2023 14:31
-
-
Save romain9292/89ba7cbef0715d07c8198a5ff1347805 to your computer and use it in GitHub Desktop.
[Mysql to BigQuery using Pandas] Load Mysql tables to BigQuery using pandas to auto-generate schema #python #pandas #bigquery
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pandas as pd | |
import pandas_gbq as pd_gbq | |
import modin.pandas as pd_mod | |
from sqlalchemy import create_engine | |
from google.oauth2 import service_account | |
# Service account file for GCP connection | |
credentials = service_account.Credentials.from_service_account_file('key.json') | |
# BigQuery Variables | |
PROJECT_ID = 'your_project_ID' | |
DATASET_ID = 'your_data_set_ID' | |
# MySql Variables | |
MYSQL_USERNAME = 'root' | |
MYSQL_PASSWORD = 'root' | |
MYSQL_HOST = '127.0.0.1' | |
MYSQL_DATABASE = 'your_database' | |
# Path for the dump directory | |
DIRECTORY = 'dump' | |
def main(): | |
con_uri = 'mysql+pymysql://{}:{}@{}/{}'.format( | |
MYSQL_USERNAME, | |
MYSQL_PASSWORD, | |
MYSQL_HOST, | |
MYSQL_DATABASE | |
) | |
print 'Connection url {}'.format(con_uri) | |
try: | |
engine = create_engine(con_uri, pool_recycle=3600).connect() | |
except Exception as e: | |
print 'Error {}'.format(e) | |
tables_query = 'SELECT table_name ' \ | |
'FROM information_schema.tables ' \ | |
'WHERE TABLE_TYPE = "BASE TABLE" ' \ | |
'AND TABLE_SCHEMA = "{}";'.format(MYSQL_DATABASE) | |
list_tables = pd.read_sql(tables_query, con_uri) | |
for index, row in list_tables.iterrows(): | |
table_id = '{}.{}'.format(DATASET_ID, row['TABLE_NAME']) | |
print 'Loading Table {}'.format(table_id) | |
df = pd_mod.read_sql_table(row['TABLE_NAME'], engine) | |
pd_gbq.to_gbq(df, table_id, | |
project_id=PROJECT_ID, | |
if_exists='replace', | |
chunksize=10000000, | |
progress_bar=True) | |
print 'Exporter les tables en csv' | |
if not os.path.exists(DIRECTORY): | |
os.makedirs(DIRECTORY) | |
df.to_csv('{}/{}.csv'.format(DIRECTORY, row['TABLE_NAME']), | |
index=None, | |
header=True) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment