Skip to content

Instantly share code, notes, and snippets.

@GEOFBOT

GEOFBOT/error.py Secret

Last active October 18, 2016 05:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save GEOFBOT/d670f567f8c886572c8715a6058f8b34 to your computer and use it in GitHub Desktop.
Save GEOFBOT/d670f567f8c886572c8715a6058f8b34 to your computer and use it in GitHub Desktop.
Flink exception causing program
#!/usr/bin/python2
# Causes the following exception:
# Caused by: java.lang.RuntimeException: External process for task MapPartition (PythonMapPartition) terminated prematurely due to an error.
# Traceback (most recent call last):
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/plan.py", line 102, in <module>
# env.execute(local=True)
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/plan/Environment.py", line 198, in execute
# operator._go()
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/functions/Function.py", line 63, in _go
# self._receive_broadcast_variables()
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/functions/Function.py", line 75, in _receive_broadcast_variables
# serializer_data = _get_deserializer(con.read_secondary, self._env._types)
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/connection/Iterator.py", line 257, in _get_deserializer
# return TupleDeserializer([_get_deserializer(read, custom_types) for _ in range(ord(type))])
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/connection/Iterator.py", line 257, in _get_deserializer
# return TupleDeserializer([_get_deserializer(read, custom_types) for _ in range(ord(type))])
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/connection/Iterator.py", line 257, in _get_deserializer
# return TupleDeserializer([_get_deserializer(read, custom_types) for _ in range(ord(type))])
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/connection/Iterator.py", line 257, in _get_deserializer
# return TupleDeserializer([_get_deserializer(read, custom_types) for _ in range(ord(type))])
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/connection/Iterator.py", line 257, in _get_deserializer
# return TupleDeserializer([_get_deserializer(read, custom_types) for _ in range(ord(type))])
# File "/tmp/flink-dist-cache-ee5156a3-e2c6-4103-abd2-3932c5ced7d2/b63444d2c919ccb476a333cafb371165/flink/flink/connection/Iterator.py", line 280, in _get_deserializer
# raise Exception("Unable to find deserializer for type ID " + str(ord(type)))
# Exception: Unable to find deserializer for type ID 0
from flink.functions.GroupReduceFunction import GroupReduceFunction
from flink.plan.Environment import get_environment
def parse_and_normalize(line):
"""
Utility function. Parses a line of text into a floating point array, then
whitens the array.
"""
x = tuple(map(float, line.strip().split()))
# x.strip() -- strips off trailing whitespace from the string
# .split("\t") -- splits the string into a list of strings, splitting on tabs
# map(float, list) -- converts each element of the list from strings to floats
# np.array(list) -- converts the list of floats into a numpy array
# comment by Xiang: the following normalization commands work for vector u,
# but not work here. I have double-checked it with pre-normalized matrix;
# x -= x.mean() # 0-mean.
# x /= sla.norm(x) # Unit norm.
return x
class SExploderGroupReducer(GroupReduceFunction):
def reduce(self, iterator, collector):
for x in iterator:
data = enumerate(parse_and_normalize(x[1]))
for y in data:
collector.collect((x[0], y[0], y[1]))
def get_top_v(u, s):
v_ = u \
.join(s).where(0).equal_to(0) \
.using(lambda u_el, s_el: (s_el[1], u_el[1]))
return v_
if __name__ == "__main__":
env = get_environment()
raw_data = env.from_elements(
'0.0 1.0 2.0 3.0 4.0',
'0.0 1.0 2.0 3.0 4.0',
'0.0 1.0 2.0 3.0 4.0',
'0.0 1.0 2.0 3.0 4.0',
'0.0 1.0 2.0 3.0 4.0'
)
# Convert each line to a tuple: (row number, vec pos, value)
S = raw_data \
.zip_with_index() \
.reduce_group(SExploderGroupReducer()) \
.name('SExploderGroupReducer')
for m in range(1):
u_old = env.from_elements((0, 0), (1, 1), (2, 2), (3, 3), (4, 4))
u_old_it = u_old.iterate(1)
v = get_top_v(u_old_it, S)
u_new = v \
.join(S).where(0).equal_to(1) \
.using(lambda v_el, s_el: (s_el[0], v_el[1]))
u_new_final = u_old_it.close_with(u_new)
u_new_final.output()
v_final = get_top_v(u_new_final, S)
v_final.output()
env.execute(local=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment