Skip to content

Instantly share code, notes, and snippets.

@Gabri3l
Forked from daidokoro/python_batch.py
Created April 11, 2019 22:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Gabri3l/403f4384b463723d40186af422bd010d to your computer and use it in GitHub Desktop.
Save Gabri3l/403f4384b463723d40186af422bd010d to your computer and use it in GitHub Desktop.
Beam Stuff
'''
Apache Beam Python Batch Example
'''
class Batch(beam.DoFn):
"""
Batch - batch items from PCollection
"""
def __init__(self, n):
import json
# batch values
self._n = n
self._batch = []
def process(self, e):
"""
if length of the batch is == n,
then run an action, else, append
to the batch
"""
self._batch.append(e)
if len(self._batch) == self._n:
for e in self._action(self._batch):
# yield individual results from self._action
# or you could also just yield the
# batch
yield e
# reest batch
self._batch = []
def _action(self, elems):
'''
some actions
------------
eg.
update the batch
push to a db
batch api call, etc
'''
print("batch size: %s" % len(elems))
return [ x for x in elems ]
def finish_bundle(self):
'''
If the finish_bundle function is defined,
it will be executed once all collections to this
Step are completed, i.e you use it to handle the remaining
items in your batch.
'''
# window definition is required when returning from
# finish_bundle
from apache_beam.utils.windowed_value import WindowedValue
from apache_beam import window
if len(self._batch) != 0:
for e in self._classify(self._batch):
# yield e(elem) or the whole batch,
# however, it needs to be wrapped in the
# WindowedValue function
yield WindowedValue(e, -1, [window.GlobalWindow()])
self._batch = []
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment