Skip to content

Instantly share code, notes, and snippets.

@micheleberardi
Last active February 19, 2022 22:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save micheleberardi/019d2f707f04dbc41d4b23ad71a98adf to your computer and use it in GitHub Desktop.
Save micheleberardi/019d2f707f04dbc41d4b23ad71a98adf to your computer and use it in GitHub Desktop.
Google Retail import into Big Query
from modules import defines
import ndjson
from google.cloud import bigquery
import pymysql
database = pymysql.connect(host='IP', user='USER', passwd='PWD', db='DB_NAME',use_unicode=True, cursorclass=pymysql.cursors.DictCursor, charset="utf8")
def select_exe(query_select):
mydb = pymysql.connect(host='IP', user='USER', passwd='PWD', db='DB_NAME',use_unicode=True, cursorclass=pymysql.cursors.DictCursor, charset="utf8")
try:
cursor = mydb.cursor()
cursor.execute(query_select)
data_db = cursor.fetchall()
mydb.commit()
print(data_db[0])
return data_db
except Exception as e:
return False
def create_dict(result_select):
dict_result = []
for row in result_select:
sku = row['sku']
description = row['description']
categories = row['categories']
exact_price = row['price']
dict_google = {"id": sku,"categories":[categories],"title":description,"priceInfo":[{"currencyCode":"USD"},{"price":float(exact_price)},{"originalPrice":float(exact_price)},{"cost":float(exact_price)}]}
dict_result.append(dict_google)
simple_dictionary = dict_result
return simple_dictionary
if __name__ == '__main__':
# QUERY SELECT CATALOG DB
query = "select * from ..............."
# MAKE QUERY
result_select = select_exe(query)
#MAKE DICTIONARY JSON
cr = defines.create_dict(result_select)
# SAVE RESUTL JSON INTO FILE
with open('data.json', 'w') as f:
ndjson.dump(cr, f)
filename = 'data.json'
# GOOGLE BIGQUERY IMPORTER
client = bigquery.Client(project="PROJECT_ID")
dataset_id = 'DATASET_ID'
table_id = 'TABLE_ID'
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
with open(filename, "rb") as source_file:
job = client.load_table_from_file(source_file, table_ref, job_config=job_config)
print(job.job_id)
job.result() # Waits for table load to complete.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment