Skip to content

Instantly share code, notes, and snippets.

@GEOFBOT
Created July 17, 2016 13:23
Show Gist options
  • Save GEOFBOT/dbdc30120fb4d71383d9e3eff5f93c1f to your computer and use it in GitHub Desktop.
Save GEOFBOT/dbdc30120fb4d71383d9e3eff5f93c1f to your computer and use it in GitHub Desktop.
# modified from http://www.willmcginnis.com/2015/11/08/getting-started-with-python-and-apache-flink/
from flink.plan.Environment import get_environment
from flink.plan.Constants import INT, STRING, WriteMode
from flink.functions.GroupReduceFunction import GroupReduceFunction
class Adder(GroupReduceFunction):
def reduce(self, iterator, collector):
count, word = iterator.next()
count += sum([x[0] for x in iterator])
collector.collect((count, word))
if __name__ == "__main__":
output_file = 'file:///home/gmon/flink-examples/out.txt'
print('logging results to: %s' % (output_file, ))
env = get_environment()
data = env.from_elements("there is a dog and a cat and another dog")
data \
.flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
.group_by(1) \
.reduce_group(Adder(), combinable=True) \
.map(lambda y: 'Word: %s Count: %s' % (y[1], y[0])) \
.write_text(output_file, write_mode=WriteMode.OVERWRITE)
env.execute(local=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment