Skip to content

Instantly share code, notes, and snippets.

@shriyanka
Created August 2, 2016 05:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shriyanka/cbf30bbfbf277deed4bac0c526cf01f1 to your computer and use it in GitHub Desktop.
Save shriyanka/cbf30bbfbf277deed4bac0c526cf01f1 to your computer and use it in GitHub Desktop.
class PrintQuery(beam.DoFn):
def process(self, context):
yield (context.element.get("language")) # Yielding the language of each user
def datastoreQuery():
client = datastore.Client(project="projectidhere")
query = client.query(kind='User').fetch() # User is my user table with over 2lac entries
pipeline_options = PipelineOptions()
start_time = time.time()
p = beam.Pipeline(options=pipeline_options)
(p
| beam.Create(query)
| beam.ParDo(PrintQuery())
| beam.Map(lambda x: (x, 1)) | beam.combiners.Count.PerKey() # Counting the frequency of each language
| beam.io.Write(beam.io.TextFileSink("languageFrequency"))) # writing it to a file
p.run()
print("--- %s seconds ---" % (time.time() - start_time))
return 200
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment