Skip to content

Instantly share code, notes, and snippets.

@GEOFBOT
Last active October 18, 2016 02:05
Show Gist options
  • Save GEOFBOT/abb7f81030aab160e6908093ebaa3b4a to your computer and use it in GitHub Desktop.
Save GEOFBOT/abb7f81030aab160e6908093ebaa3b4a to your computer and use it in GitHub Desktop.
Flink file that causes issues with a modified version of Flink featuring bulk iterations in the Python API
# Barebones test file to check for issues
import math
from flink.functions.Aggregation import Sum
from flink.functions.GroupReduceFunction import GroupReduceFunction
from flink.plan.Environment import get_environment
class NormalizeVectorGroupReducer(GroupReduceFunction):
"""
Normalizes a vector in (index, old, new_value) format.
MODIFIED VERSION: divisions taken out because the data is fake and we don't want / by 0 errors
"""
def reduce(self, iterator, collector):
data = list(iterator)
mean = 0
mag = 0
length = len(data)
for val in data:
mean += val[1]
mag += (val[1]) ** 2
mag = math.sqrt(mag)
for val in data:
new_val = val[1] - mean
collector.collect((val[0], new_val))
class MagnitudeGroupReducer(GroupReduceFunction):
"""
Calculates the magnitude of a vector.
"""
def reduce(self, iterator, collector):
mag = 0
for val in iterator:
mag += (val[1]) ** 2
mag = math.sqrt(mag)
collector.collect((0, mag))
if __name__ == '__main__':
env = get_environment()
s = env.from_elements((0, 0, 1), (0, 1, 2), (0, 2, 3))
u_old = env.from_elements(1, 2, 3).zip_with_index()
u_old_it = u_old.iterate(10)
v = u_old_it \
.join(s).where(0).equal_to(0) \
.using(lambda u_el, v_el: (v_el[1], v_el[2] * u_el[1])) \
.name('VectorMatrix')
v = v \
.group_by(0) \
.aggregate(Sum, 1)
u_new = v \
.join(s).where(0).equal_to(1) \
.using(lambda v_el, s_el: (s_el[0], s_el[1], s_el[2] * v_el[1])) \
.name('MatrixVector')
u_new = u_new.group_by(0) \
.aggregate(Sum, 2) \
.map(lambda x: (x[0], x[2]))
# Without this, we get "Error while creating the data flow plan for the program:
# Unknown operator or data set type: null" ???
u_new = u_new.reduce_group(NormalizeVectorGroupReducer()).name('NormalizeVector')
# Update for the next iteration
delta = u_new.join(u_old_it).where(0).equal_to(0) \
.using(lambda new, old: (new[0], old[1] - new[1]))
delta = delta.reduce_group(MagnitudeGroupReducer()) \
.name('MagnitudeGroupReducer')
delta = delta.filter(lambda d: d[1] > 0)
u_new_final = u_old_it.close_with(u_new, delta)
u_new_final.output()
env.execute(local=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment