Last active
April 24, 2018 21:02
-
-
Save giacaglia/9bcb04778e9ff8196c6e619fca96fde1 to your computer and use it in GitHub Desktop.
Dataflow Daily Upload
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#1 Here we define the Dataflow pipeline | |
with beam.Pipeline() as p: | |
# 2 We query all products in the queried_namespace | |
query = query_pb2.Query() | |
query.kind.add().name = 'Product' | |
entities = p | 'Read From Datastore' >> | |
ReadFromDatastore('project_name', query, namespace='queried_namespace') | |
# 3 Formatting the rows | |
products = entities | 'Format Rows' >> beam.Map(transform_entity) | |
# 4 Writing the results to a GCS file | |
column_order = ['id','title','availability','condition','brand', | |
'category','description','image_link','name','price','retailer_id','item_group_id','link'] | |
header = ','.join(column_order) | |
rows | 'Write to GCS' >> | |
beam.io.WriteToText(file_path_prefix='gs://filename', file_name_suffix='.csv', num_shards=1, header=header) | |
p.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment