Last active
May 18, 2020 18:29
-
-
Save AbdouSeck/40ea3d0d332340f67ce9ed57ec250444 to your computer and use it in GitHub Desktop.
GCP Cloud Function to Load Data to BQ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
env.yaml |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
# Define constants and a function | |
GS_BUCKET="our-so-very-unique-bucket" | |
FUNC_NAME="storage-2-bq-loader" | |
EVENT="google.storage.object.finalize" | |
function leave() { | |
echo "${1} failed" | |
exit 1 | |
} | |
# Change into the directory with the source code | |
cd /path/to/function/directory | |
# Start the deployment | |
echo `date` "Deploying the data loader GCP Function" | |
gcloud functions deploy --runtime python37 --env-vars-file env.yaml \ | |
--trigger-bucket ${GS_BUCKET} --trigger-event ${EVENT} \ | |
${FUNC_NAME} || leave gcloud | |
# If you got here, then you should be good to go | |
echo `date` "Function ${FUNC_NAME} deployed" | |
exit |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Load data from Storage to BigQuery | |
""" | |
import os | |
from google.cloud import bigquery | |
from google.cloud.exceptions import ( | |
Conflict, NotFound | |
) | |
import utils | |
def hello_gcs(event, context): | |
"""Triggered by a change to a Cloud Storage bucket. | |
Args: | |
event (dict): Event payload. | |
context (google.cloud.functions.Context): Metadata for the event. | |
""" | |
project = os.getenv('PROJECT_ID', '') | |
gs_path = os.path.join( | |
event['bucket'], | |
event['name'] | |
) | |
bfname = os.path.basename(event['name']) | |
ds, tbl, *_ = bfname.rsplit('_', 2) | |
course_id = utils.ds_to_course(ds) | |
client = bigquery.Client(project) | |
dref = bigquery.Dataset(f"{project}.{ds}") | |
dref.description = utils.DS_DESC.format(id=course_id) | |
schema = [bigquery.SchemaField(*f) for f in utils.SCHEMA['fields']] | |
load_config = utils.make_load_config(schema) | |
tref = dref.table(tbl) | |
tref.description = utils.TBL_DESC.format(id=course_id, t=tbl) | |
try: | |
client.get_dataset(dref) | |
except NotFound: | |
try: | |
client.create_dataset(dref) | |
except Conflict: | |
pass | |
try: | |
client.get_table(tref) | |
except NotFound: | |
try: | |
client.create_table(tref) | |
except Conflict: | |
pass | |
job = client.load_table_from_uri( | |
source_uris='gs://{p}'.format(p=gs_path), | |
destination=tref, | |
job_config=load_config | |
) | |
utils.wait_for_job(job) | |
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from google.cloud import error_reporting | |
def main(event, data): | |
""" | |
Our new entry point | |
Args: | |
event (dict): Event payload. | |
context (google.cloud.functions.Context): Metadata for the event. | |
""" | |
reporter = error_reporting.Client(os.getenv('PROJECT_ID', '')) | |
try: | |
hello_gcs(event, context) | |
except Exception: | |
reporter.report_exception() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Function dependencies, for example: | |
# package>=version | |
google-cloud-bigquery |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from google.cloud import bigquery | |
DS_DESC = "Dataset for Entrance and Exit tables for course {id}" | |
TBL_DESC = "Survey responses for {t} surveys of {id}" | |
SCHEMA = { | |
"fields": [ | |
[ | |
"USER_ID", | |
"STRING", | |
"REQUIRED", | |
"Learner's hashed ID: A 32-character string." | |
], | |
[ | |
"COURSE_ID", | |
"STRING", | |
"NULLABLE", | |
"The edX course ID associated with this survey" | |
], | |
[ | |
"SURVEY_ID", | |
"STRING", | |
"NULLABLE", | |
"The ID of the Qualtrics survey whose data are in this table" | |
], | |
[ | |
"VARIABLE_NAME", | |
"STRING", | |
"NULLABLE", | |
"One of the variable names given to surveys during creation on the Qualtrics platform" | |
], | |
[ | |
"VALUE", | |
"STRING", | |
"NULLABLE", | |
"This is the coded value associated with the choice of the survey taker" | |
], | |
[ | |
"VALUE_LABEL", | |
"STRING", | |
"NULLABLE", | |
"This is the label that perhaps the survey will see when making selections" | |
], | |
[ | |
"QUESTIONTEXT", | |
"STRING", | |
"NULLABLE", | |
"This is the text of the question asked to the survey taker. It excludes any leading stems." | |
], | |
[ | |
"STEM", | |
"STRING", | |
"NULLABLE", | |
"This is the leading text/preamble to some questions. Not all questions have stems." | |
], | |
[ | |
"LOAD_DATE", | |
"DATETIME", | |
"NULLABLE", | |
"This is the date and time on which a particular record was generated and loaded" | |
] | |
] | |
} | |
def ds_to_course(ds_id): | |
""" | |
Convert the dataset ID to a course ID | |
""" | |
return ds_id.replace('__', '/').replace('_', '.') | |
def wait_for_job(job): | |
""" | |
Convenience function to | |
check on the currently running job. | |
:param job: Job object returned from | |
using client.run_(a?)sync_query. | |
:type: bigquery.job.QueryJob | |
:return: Should exit the routine | |
once the job is DONE. | |
""" | |
while True: | |
job.reload() | |
if job.state == 'DONE': | |
if job.error_result: | |
raise RuntimeError(job.errors) | |
return | |
time.sleep(0.05) | |
def make_load_config(schema, format_='CSV', delimiter='\t'): | |
""" | |
Make a bigquery load configuration with the given schema, | |
format and delimiter | |
""" | |
load_config = bigquery.LoadJobConfig() | |
load_config.schema = schema | |
load_config.source_format = format_ | |
load_config.field_delimiter = delimiter | |
load_config.allow_quoted_newlines = True | |
load_config.allow_jagged_rows = True | |
return load_config |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment