Skip to content

Instantly share code, notes, and snippets.

@boseoladipo
Created July 16, 2020 11:02
Show Gist options
  • Save boseoladipo/95c1a06f73c80778a5746c56cf04ae56 to your computer and use it in GitHub Desktop.
Save boseoladipo/95c1a06f73c80778a5746c56cf04ae56 to your computer and use it in GitHub Desktop.
Cloud function to start dataflow template
import logging
from string import Template
import google.auth
from google.cloud import error_reporting
from googleapiclient.discovery import build
client = error_reporting.Client(service="start_dataflow_job")
def main(data, context):
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
data (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
try:
attributes = data['attributes']
project = attributes['PROJECT']
job = attributes['JOB']
template = f"gs://{attributes['BUCKET']}/templates/{attributes['TEMPLATE']}"
location = 'europe-west1'
environment = {
"zone": attributes['ZONE']
}
dataflow = build('dataflow', 'v1b3')
request = dataflow.projects().locations().templates().launch(
projectId=project,
gcsPath=template,
location=location,
body={
'jobName': job,
'environment': environment
}
)
response = request.execute()
print(response)
except Exception as error:
client.report_exception()
log_message = Template('$error').substitute(error=error)
logging.error(log_message, exc_info=True)
if __name__ == '__main__':
main('data', 'context')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment