Skip to content

Instantly share code, notes, and snippets.

@akshayapte
Last active March 30, 2019 07:44
Show Gist options
  • Save akshayapte/90a908a8e53d9cd52f52aac11eb8e80d to your computer and use it in GitHub Desktop.
Save akshayapte/90a908a8e53d9cd52f52aac11eb8e80d to your computer and use it in GitHub Desktop.
Get list of lists for tokenizing
from __future__ import absolute_import
from __future__ import print_function
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
class TestTokenizerDoFn(beam.DoFn):
def __init__(self):
self.id = 0
def process(self, element):
words = element.split()
self.id +=1
yield (None,(self.id, words))
class ConvertFn(beam.DoFn):
def process(self, elem):
yield [[i] for i in elem]
def test():
with beam.Pipeline(options=PipelineOptions()) as p:
lines = p | 'Create' >> beam.Create(['Exception: Index out of bounds',
'ValueError: int cannot be typecasted to str',
'KeyError: Key 1 does not exist in dictionary'])
counts = (
lines
| 'Split' >> (beam.ParDo(TestTokenizerDoFn()))
| 'GroupByKey' >> (beam.GroupByKey())
| 'print' >>(beam.ParDo(lambda x: print(x[1])))
# | 'Process' >> beam.ParDo(ConvertFn())
)
# Expected output: [['a', 'b', 'c'], ['d', 'e'], ['a', 'e', 'f']]
# counts | 'Print' >> beam.ParDo(lambda x: print(x))
# Usage: python beam_pipeline.py
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment