Skip to content

Instantly share code, notes, and snippets.

@giacaglia
Last active April 24, 2018 21:02
Show Gist options
  • Save giacaglia/9bcb04778e9ff8196c6e619fca96fde1 to your computer and use it in GitHub Desktop.
Save giacaglia/9bcb04778e9ff8196c6e619fca96fde1 to your computer and use it in GitHub Desktop.
Dataflow Daily Upload
#1 Here we define the Dataflow pipeline
with beam.Pipeline() as p:
# 2 We query all products in the queried_namespace
query = query_pb2.Query()
query.kind.add().name = 'Product'
entities = p | 'Read From Datastore' >>
ReadFromDatastore('project_name', query, namespace='queried_namespace')
# 3 Formatting the rows
products = entities | 'Format Rows' >> beam.Map(transform_entity)
# 4 Writing the results to a GCS file
column_order = ['id','title','availability','condition','brand',
'category','description','image_link','name','price','retailer_id','item_group_id','link']
header = ','.join(column_order)
rows | 'Write to GCS' >>
beam.io.WriteToText(file_path_prefix='gs://filename', file_name_suffix='.csv', num_shards=1, header=header)
p.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment