Skip to content

Instantly share code, notes, and snippets.

@AbdouSeck
Last active May 18, 2020 18:29
Show Gist options
  • Save AbdouSeck/40ea3d0d332340f67ce9ed57ec250444 to your computer and use it in GitHub Desktop.
Save AbdouSeck/40ea3d0d332340f67ce9ed57ec250444 to your computer and use it in GitHub Desktop.
GCP Cloud Function to Load Data to BQ
#!/bin/bash
# Define constants and a function
GS_BUCKET="our-so-very-unique-bucket"
FUNC_NAME="storage-2-bq-loader"
EVENT="google.storage.object.finalize"
function leave() {
echo "${1} failed"
exit 1
}
# Change into the directory with the source code
cd /path/to/function/directory
# Start the deployment
echo `date` "Deploying the data loader GCP Function"
gcloud functions deploy --runtime python37 --env-vars-file env.yaml \
--trigger-bucket ${GS_BUCKET} --trigger-event ${EVENT} \
${FUNC_NAME} || leave gcloud
# If you got here, then you should be good to go
echo `date` "Function ${FUNC_NAME} deployed"
exit
"""
Load data from Storage to BigQuery
"""
import os
from google.cloud import bigquery
from google.cloud.exceptions import (
Conflict, NotFound
)
import utils
def hello_gcs(event, context):
"""Triggered by a change to a Cloud Storage bucket.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
project = os.getenv('PROJECT_ID', '')
gs_path = os.path.join(
event['bucket'],
event['name']
)
bfname = os.path.basename(event['name'])
ds, tbl, *_ = bfname.rsplit('_', 2)
course_id = utils.ds_to_course(ds)
client = bigquery.Client(project)
dref = bigquery.Dataset(f"{project}.{ds}")
dref.description = utils.DS_DESC.format(id=course_id)
schema = [bigquery.SchemaField(*f) for f in utils.SCHEMA['fields']]
load_config = utils.make_load_config(schema)
tref = dref.table(tbl)
tref.description = utils.TBL_DESC.format(id=course_id, t=tbl)
try:
client.get_dataset(dref)
except NotFound:
try:
client.create_dataset(dref)
except Conflict:
pass
try:
client.get_table(tref)
except NotFound:
try:
client.create_table(tref)
except Conflict:
pass
job = client.load_table_from_uri(
source_uris='gs://{p}'.format(p=gs_path),
destination=tref,
job_config=load_config
)
utils.wait_for_job(job)
return True
import os
from google.cloud import error_reporting
def main(event, data):
"""
Our new entry point
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
reporter = error_reporting.Client(os.getenv('PROJECT_ID', ''))
try:
hello_gcs(event, context)
except Exception:
reporter.report_exception()
# Function dependencies, for example:
# package>=version
google-cloud-bigquery
import time
from google.cloud import bigquery
DS_DESC = "Dataset for Entrance and Exit tables for course {id}"
TBL_DESC = "Survey responses for {t} surveys of {id}"
SCHEMA = {
"fields": [
[
"USER_ID",
"STRING",
"REQUIRED",
"Learner's hashed ID: A 32-character string."
],
[
"COURSE_ID",
"STRING",
"NULLABLE",
"The edX course ID associated with this survey"
],
[
"SURVEY_ID",
"STRING",
"NULLABLE",
"The ID of the Qualtrics survey whose data are in this table"
],
[
"VARIABLE_NAME",
"STRING",
"NULLABLE",
"One of the variable names given to surveys during creation on the Qualtrics platform"
],
[
"VALUE",
"STRING",
"NULLABLE",
"This is the coded value associated with the choice of the survey taker"
],
[
"VALUE_LABEL",
"STRING",
"NULLABLE",
"This is the label that perhaps the survey will see when making selections"
],
[
"QUESTIONTEXT",
"STRING",
"NULLABLE",
"This is the text of the question asked to the survey taker. It excludes any leading stems."
],
[
"STEM",
"STRING",
"NULLABLE",
"This is the leading text/preamble to some questions. Not all questions have stems."
],
[
"LOAD_DATE",
"DATETIME",
"NULLABLE",
"This is the date and time on which a particular record was generated and loaded"
]
]
}
def ds_to_course(ds_id):
"""
Convert the dataset ID to a course ID
"""
return ds_id.replace('__', '/').replace('_', '.')
def wait_for_job(job):
"""
Convenience function to
check on the currently running job.
:param job: Job object returned from
using client.run_(a?)sync_query.
:type: bigquery.job.QueryJob
:return: Should exit the routine
once the job is DONE.
"""
while True:
job.reload()
if job.state == 'DONE':
if job.error_result:
raise RuntimeError(job.errors)
return
time.sleep(0.05)
def make_load_config(schema, format_='CSV', delimiter='\t'):
"""
Make a bigquery load configuration with the given schema,
format and delimiter
"""
load_config = bigquery.LoadJobConfig()
load_config.schema = schema
load_config.source_format = format_
load_config.field_delimiter = delimiter
load_config.allow_quoted_newlines = True
load_config.allow_jagged_rows = True
return load_config
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment