-
-
Save GEOFBOT/d670f567f8c886572c8715a6058f8b34 to your computer and use it in GitHub Desktop.
Flink exception causing program
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python2 | |
# Causes the following exception: | |
# Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMapPartition) terminated prematurely due to an error. | |
# Traceback (most recent call last): | |
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/plan.py", line 102, in <module> | |
# env.execute(local=True) | |
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/plan/Environment.py", line 198, in execute | |
# operator._go() | |
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/functions/Function.py", line 63, in _go | |
# self._receive_broadcast_variables() | |
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/functions/Function.py", line 75, in _receive_broadcast_variables | |
# serializer_data = _get_deserializer(con.read_secondary, self._env._types) | |
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/connection/Iterator.py", line 257, in _get_deserializer | |
# return TupleDeserializer([_get_deserializer(read, custom_types) for _ in range(ord(type))]) | |
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/connection/Iterator.py", line 257, in _get_deserializer | |
# return TupleDeserializer([_get_deserializer(read, custom_types) for _ in range(ord(type))]) | |
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/connection/Iterator.py", line 257, in _get_deserializer | |
# return TupleDeserializer([_get_deserializer(read, custom_types) for _ in range(ord(type))]) | |
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/connection/Iterator.py", line 257, in _get_deserializer | |
# return TupleDeserializer([_get_deserializer(read, custom_types) for _ in range(ord(type))]) | |
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/connection/Iterator.py", line 257, in _get_deserializer | |
# return TupleDeserializer([_get_deserializer(read, custom_types) for _ in range(ord(type))]) | |
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/connection/Iterator.py", line 280, in _get_deserializer | |
# raise Exception("Unable to find deserializer for type ID " + str(ord(type))) | |
# Exception: Unable to find deserializer for type ID 0 | |
from flink.functions.GroupReduceFunction import GroupReduceFunction | |
from flink.plan.Environment import get_environment | |
def parse_and_normalize(line): | |
""" | |
Utility function. Parses a line of text into a floating point array, then | |
whitens the array. | |
""" | |
x = tuple(map(float, line.strip().split())) | |
# x.strip() -- strips off trailing whitespace from the string | |
# .split("\t") -- splits the string into a list of strings, splitting on tabs | |
# map(float, list) -- converts each element of the list from strings to floats | |
# np.array(list) -- converts the list of floats into a numpy array | |
# comment by Xiang: the following normalization commands work for vector u, | |
# but not work here. I have double-checked it with pre-normalized matrix; | |
# x -= x.mean() # 0-mean. | |
# x /= sla.norm(x) # Unit norm. | |
return x | |
class SExploderGroupReducer(GroupReduceFunction): | |
def reduce(self, iterator, collector): | |
for x in iterator: | |
data = enumerate(parse_and_normalize(x[1])) | |
for y in data: | |
collector.collect((x[0], y[0], y[1])) | |
def get_top_v(u, s): | |
v_ = u \ | |
.join(s).where(0).equal_to(0) \ | |
.using(lambda u_el, s_el: (s_el[1], u_el[1])) | |
return v_ | |
if __name__ == "__main__": | |
env = get_environment() | |
raw_data = env.from_elements( | |
'0.0 1.0 2.0 3.0 4.0', | |
'0.0 1.0 2.0 3.0 4.0', | |
'0.0 1.0 2.0 3.0 4.0', | |
'0.0 1.0 2.0 3.0 4.0', | |
'0.0 1.0 2.0 3.0 4.0' | |
) | |
# Convert each line to a tuple: (row number, vec pos, value) | |
S = raw_data \ | |
.zip_with_index() \ | |
.reduce_group(SExploderGroupReducer()) \ | |
.name('SExploderGroupReducer') | |
for m in range(1): | |
u_old = env.from_elements((0, 0), (1, 1), (2, 2), (3, 3), (4, 4)) | |
u_old_it = u_old.iterate(1) | |
v = get_top_v(u_old_it, S) | |
u_new = v \ | |
.join(S).where(0).equal_to(1) \ | |
.using(lambda v_el, s_el: (s_el[0], v_el[1])) | |
u_new_final = u_old_it.close_with(u_new) | |
u_new_final.output() | |
v_final = get_top_v(u_new_final, S) | |
v_final.output() | |
env.execute(local=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment