Skip to content

Instantly share code, notes, and snippets.

@chongxiaoc
Created February 23, 2022 22:32
Show Gist options
  • Save chongxiaoc/a43f8e1b7c02728bac6b73f52c60646c to your computer and use it in GitHub Desktop.
Save chongxiaoc/a43f8e1b7c02728bac6b73f52c60646c to your computer and use it in GitHub Desktop.
Failure log
root@d50cea8f4afc:/horovod/test/integration# pytest -v -s test_spark_keras.py::SparkKerasTests::test_fit_model
============================================================================================== test session starts ==============================================================================================
platform linux -- Python 3.7.5, pytest-7.0.1, pluggy-1.0.0 -- /usr/bin/python
cachedir: .pytest_cache
rootdir: /horovod, configfile: setup.cfg
plugins: forked-1.4.0
collected 1 item
test_spark_keras.py::SparkKerasTests::test_fit_model 2022-02-23 22:31:47.221136: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations: AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
Warning: Ignoring non-Spark config property: javax.jdo.option.ConnectionURL
22/02/23 22:31:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/23 22:31:49 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
num_partitions=20
writing dataframes
train_data_path=file:///tmp/tmp39ovybxa/intermediate_train_data.0
val_data_path=file:///tmp/tmp39ovybxa/intermediate_val_data.0
train_partitions=20
train_rows=4
Checking whether extension tensorflow was built with MPI.
Extension tensorflow was NOT built with MPI.
Checking whether extension tensorflow was built with Gloo.
Extension tensorflow was built with Gloo.
Task service executes command: HOROVOD_HOSTNAME=d50cea8f4afc-77ee4f6a8044a4ce96805363de8cb66a HOROVOD_RANK=0 HOROVOD_SIZE=2 HOROVOD_LOCAL_RANK=0 HOROVOD_LOCAL_SIZE=2 HOROVOD_CROSS_RANK=0 HOROVOD_CROSS_SIZE=1 PYTHONUNBUFFERED=1 HOROVOD_GLOO_RENDEZVOUS_ADDR=172.17.0.2 HOROVOD_GLOO_RENDEZVOUS_PORT=15964 HOROVOD_CONTROLLER=gloo HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=eth0 NCCL_SOCKET_IFNAME=eth0,lo /usr/bin/python -m horovod.spark.task.gloo_exec_fn gASVOgAAAAAAAAB9lCiMAmxvlF2UjAkxMjcuMC4wLjGUTZldhpRhjARldGgwlF2UjAoxNzIuMTcuMC4ylE2ZXYaUYXUu gASVyQIAAAAAAACMI2hvcm92b2QucnVubmVyLmNvbW1vbi51dGlsLnNldHRpbmdzlIwIU2V0dGluZ3OUk5QpgZR9lCiMCG51bV9wcm9jlEsCjAd2ZXJib3NllEsCjAhzc2hfcG9ydJROjBFzc2hfaWRlbnRpdHlfZmlsZZROjA5leHRyYV9tcGlfYXJnc5ROjAh0Y3BfZmxhZ5ROjAxiaW5kaW5nX2FyZ3OUTowDa2V5lE6MDXN0YXJ0X3RpbWVvdXSUjCJob3Jvdm9kLnJ1bm5lci5jb21tb24udXRpbC50aW1lb3V0lIwHVGltZW91dJSTlCmBlH2UKIwIX3RpbWVvdXSUTVgCjAtfdGltZW91dF9hdJRHQdiFrg2PkNeMCF9tZXNzYWdllFgOAQAAVGltZWQgb3V0IHdhaXRpbmcgZm9yIHthY3Rpdml0eX0uIFBsZWFzZSBjaGVjayB0aGF0IHlvdSBoYXZlIGVub3VnaCByZXNvdXJjZXMgdG8gcnVuIGFsbCBIb3Jvdm9kIHByb2Nlc3Nlcy4gRWFjaCBIb3Jvdm9kIHByb2Nlc3MgcnVucyBpbiBhIFNwYXJrIHRhc2suIFlvdSBtYXkgbmVlZCB0byBpbmNyZWFzZSB0aGUgc3RhcnRfdGltZW91dCBwYXJhbWV0ZXIgdG8gYSBsYXJnZXIgdmFsdWUgaWYgeW91ciBTcGFyayByZXNvdXJjZXMgYXJlIGFsbG9jYXRlZCBvbi1kZW1hbmQulHVijA9vdXRwdXRfZmlsZW5hbWWUTowNcnVuX2Z1bmNfbW9kZZSIjARuaWNzlE6MB2VsYXN0aWOUiYwccHJlZml4X291dHB1dF93aXRoX3RpbWVzdGFtcJSJjAVob3N0c5SML2Q1MGNlYThmNGFmYy03N2VlNGY2YTgwNDRhNGNlOTY4MDUzNjNkZThjYjY2YToylHViLg==
Task service executes command: HOROVOD_HOSTNAME=d50cea8f4afc-77ee4f6a8044a4ce96805363de8cb66a HOROVOD_RANK=1 HOROVOD_SIZE=2 HOROVOD_LOCAL_RANK=1 HOROVOD_LOCAL_SIZE=2 HOROVOD_CROSS_RANK=0 HOROVOD_CROSS_SIZE=1 PYTHONUNBUFFERED=1 HOROVOD_GLOO_RENDEZVOUS_ADDR=172.17.0.2 HOROVOD_GLOO_RENDEZVOUS_PORT=15964 HOROVOD_CONTROLLER=gloo HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=eth0 NCCL_SOCKET_IFNAME=eth0,lo /usr/bin/python -m horovod.spark.task.gloo_exec_fn gASVOgAAAAAAAAB9lCiMAmxvlF2UjAkxMjcuMC4wLjGUTZldhpRhjARldGgwlF2UjAoxNzIuMTcuMC4ylE2ZXYaUYXUu gASVyQIAAAAAAACMI2hvcm92b2QucnVubmVyLmNvbW1vbi51dGlsLnNldHRpbmdzlIwIU2V0dGluZ3OUk5QpgZR9lCiMCG51bV9wcm9jlEsCjAd2ZXJib3NllEsCjAhzc2hfcG9ydJROjBFzc2hfaWRlbnRpdHlfZmlsZZROjA5leHRyYV9tcGlfYXJnc5ROjAh0Y3BfZmxhZ5ROjAxiaW5kaW5nX2FyZ3OUTowDa2V5lE6MDXN0YXJ0X3RpbWVvdXSUjCJob3Jvdm9kLnJ1bm5lci5jb21tb24udXRpbC50aW1lb3V0lIwHVGltZW91dJSTlCmBlH2UKIwIX3RpbWVvdXSUTVgCjAtfdGltZW91dF9hdJRHQdiFrg2PkNeMCF9tZXNzYWdllFgOAQAAVGltZWQgb3V0IHdhaXRpbmcgZm9yIHthY3Rpdml0eX0uIFBsZWFzZSBjaGVjayB0aGF0IHlvdSBoYXZlIGVub3VnaCByZXNvdXJjZXMgdG8gcnVuIGFsbCBIb3Jvdm9kIHByb2Nlc3Nlcy4gRWFjaCBIb3Jvdm9kIHByb2Nlc3MgcnVucyBpbiBhIFNwYXJrIHRhc2suIFlvdSBtYXkgbmVlZCB0byBpbmNyZWFzZSB0aGUgc3RhcnRfdGltZW91dCBwYXJhbWV0ZXIgdG8gYSBsYXJnZXIgdmFsdWUgaWYgeW91ciBTcGFyayByZXNvdXJjZXMgYXJlIGFsbG9jYXRlZCBvbi1kZW1hbmQulHVijA9vdXRwdXRfZmlsZW5hbWWUTowNcnVuX2Z1bmNfbW9kZZSIjARuaWNzlE6MB2VsYXN0aWOUiYwccHJlZml4X291dHB1dF93aXRoX3RpbWVzdGFtcJSJjAVob3N0c5SML2Q1MGNlYThmNGFmYy03N2VlNGY2YTgwNDRhNGNlOTY4MDUzNjNkZThjYjY2YToylHViLg==
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 47300)
Traceback (most recent call last):
File "/usr/lib/python3.7/socketserver.py", line 650, in process_request_thread
self.finish_request(request, client_address)
File "/usr/lib/python3.7/socketserver.py", line 360, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/usr/lib/python3.7/socketserver.py", line 720, in __init__
self.handle()
File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 131, in handle
server._wire.write(resp, self.wfile)
File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 75, in write
message = cloudpickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 102, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.7/pickle.py", line 437, in dump
self.save(obj)
File "/usr/lib/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.7/pickle.py", line 662, in save_reduce
save(state)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/dill/_dill.py", line 990, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 784, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 721, in _save_reduce_pickle5
dictitems=dictitems, obj=obj
File "/usr/lib/python3.7/pickle.py", line 638, in save_reduce
save(args)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/dill/_dill.py", line 1226, in save_cell
f = obj.cell_contents
ValueError: Cell is empty
----------------------------------------
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 47302)
Traceback (most recent call last):
File "/usr/lib/python3.7/socketserver.py", line 650, in process_request_thread
self.finish_request(request, client_address)
File "/usr/lib/python3.7/socketserver.py", line 360, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/usr/lib/python3.7/socketserver.py", line 720, in __init__
self.handle()
File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 131, in handle
server._wire.write(resp, self.wfile)
File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 75, in write
message = cloudpickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 102, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.7/pickle.py", line 437, in dump
self.save(obj)
File "/usr/lib/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.7/pickle.py", line 662, in save_reduce
save(state)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/dill/_dill.py", line 990, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 784, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 721, in _save_reduce_pickle5
dictitems=dictitems, obj=obj
File "/usr/lib/python3.7/pickle.py", line 638, in save_reduce
save(args)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/dill/_dill.py", line 1226, in save_cell
f = obj.cell_contents
ValueError: Cell is empty
----------------------------------------
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 47304)
Traceback (most recent call last):
File "/usr/lib/python3.7/socketserver.py", line 650, in process_request_thread
self.finish_request(request, client_address)
File "/usr/lib/python3.7/socketserver.py", line 360, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/usr/lib/python3.7/socketserver.py", line 720, in __init__
self.handle()
File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 131, in handle
server._wire.write(resp, self.wfile)
File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 75, in write
message = cloudpickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 102, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.7/pickle.py", line 437, in dump
self.save(obj)
File "/usr/lib/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.7/pickle.py", line 662, in save_reduce
save(state)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/dill/_dill.py", line 990, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 784, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 721, in _save_reduce_pickle5
dictitems=dictitems, obj=obj
File "/usr/lib/python3.7/pickle.py", line 638, in save_reduce
save(args)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/dill/_dill.py", line 1226, in save_cell
f = obj.cell_contents
ValueError: Cell is empty
----------------------------------------
[0]<stderr>:Traceback (most recent call last):
[0]<stderr>: File "/usr/lib/python3.7/runpy.py", line 193, in _run_module_as_main
[0]<stderr>: "__main__", mod_spec)
[0]<stderr>: File "/usr/lib/python3.7/runpy.py", line 85, in _run_code
[0]<stderr>: exec(code, run_globals)
[0]<stderr>: File "/usr/local/lib/python3.7/dist-packages/horovod/spark/task/gloo_exec_fn.py", line 30, in <module>
[0]<stderr>: main(codec.loads_base64(sys.argv[1]), codec.loads_base64(sys.argv[2]))
[0]<stderr>: File "/usr/local/lib/python3.7/dist-packages/horovod/spark/task/gloo_exec_fn.py", line 23, in main
[0]<stderr>: task_exec(driver_addresses, settings, 'HOROVOD_RANK', 'HOROVOD_LOCAL_RANK')
[0]<stderr>: File "/usr/local/lib/python3.7/dist-packages/horovod/spark/task/__init__.py", line 60, in task_exec
[0]<stderr>: fn, args, kwargs = driver_client.code()
[0]<stderr>: File "/usr/local/lib/python3.7/dist-packages/horovod/spark/driver/driver_service.py", line 245, in code
[0]<stderr>: resp = self._send(CodeRequest())
[0]<stderr>: File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 303, in _send
[0]<stderr>: return self._send_one(addr, req, stream)
[0]<stderr>: File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 279, in _send_one
[0]<stderr>: resp = self._wire.read(rfile)
[0]<stderr>: File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 95, in read
[0]<stderr>: message_len = struct.unpack('i', rfile.read(4))[0]
[0]<stderr>:struct.error: unpack requires a buffer of 4 bytes
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 47320)
Traceback (most recent call last):
File "/usr/lib/python3.7/socketserver.py", line 650, in process_request_thread
self.finish_request(request, client_address)
File "/usr/lib/python3.7/socketserver.py", line 360, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/usr/lib/python3.7/socketserver.py", line 720, in __init__
self.handle()
File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 131, in handle
server._wire.write(resp, self.wfile)
File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 75, in write
message = cloudpickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 102, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.7/pickle.py", line 437, in dump
self.save(obj)
File "/usr/lib/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.7/pickle.py", line 662, in save_reduce
save(state)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/dill/_dill.py", line 990, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 784, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 721, in _save_reduce_pickle5
dictitems=dictitems, obj=obj
File "/usr/lib/python3.7/pickle.py", line 638, in save_reduce
save(args)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/dill/_dill.py", line 1226, in save_cell
f = obj.cell_contents
ValueError: Cell is empty
----------------------------------------
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 47322)
Traceback (most recent call last):
File "/usr/lib/python3.7/socketserver.py", line 650, in process_request_thread
self.finish_request(request, client_address)
File "/usr/lib/python3.7/socketserver.py", line 360, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/usr/lib/python3.7/socketserver.py", line 720, in __init__
self.handle()
File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 131, in handle
server._wire.write(resp, self.wfile)
File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 75, in write
message = cloudpickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 102, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.7/pickle.py", line 437, in dump
self.save(obj)
File "/usr/lib/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.7/pickle.py", line 662, in save_reduce
save(state)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/dill/_dill.py", line 990, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 784, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 721, in _save_reduce_pickle5
dictitems=dictitems, obj=obj
File "/usr/lib/python3.7/pickle.py", line 638, in save_reduce
save(args)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/dill/_dill.py", line 1226, in save_cell
f = obj.cell_contents
ValueError: Cell is empty
----------------------------------------
----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 47324)
Traceback (most recent call last):
File "/usr/lib/python3.7/socketserver.py", line 650, in process_request_thread
self.finish_request(request, client_address)
File "/usr/lib/python3.7/socketserver.py", line 360, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/usr/lib/python3.7/socketserver.py", line 720, in __init__
self.handle()
File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 131, in handle
server._wire.write(resp, self.wfile)
File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 75, in write
message = cloudpickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 102, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 602, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.7/pickle.py", line 437, in dump
self.save(obj)
File "/usr/lib/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.7/pickle.py", line 662, in save_reduce
save(state)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/dill/_dill.py", line 990, in save_module_dict
StockPickler.save_dict(pickler, obj)
File "/usr/lib/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 784, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File "/usr/local/lib/python3.7/dist-packages/cloudpickle/cloudpickle_fast.py", line 721, in _save_reduce_pickle5
dictitems=dictitems, obj=obj
File "/usr/lib/python3.7/pickle.py", line 638, in save_reduce
save(args)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/dist-packages/dill/_dill.py", line 1226, in save_cell
f = obj.cell_contents
ValueError: Cell is empty
----------------------------------------
[1]<stderr>:Traceback (most recent call last):
[1]<stderr>: File "/usr/lib/python3.7/runpy.py", line 193, in _run_module_as_main
[1]<stderr>: "__main__", mod_spec)
[1]<stderr>: File "/usr/lib/python3.7/runpy.py", line 85, in _run_code
[1]<stderr>: exec(code, run_globals)
[1]<stderr>: File "/usr/local/lib/python3.7/dist-packages/horovod/spark/task/gloo_exec_fn.py", line 30, in <module>
[1]<stderr>: main(codec.loads_base64(sys.argv[1]), codec.loads_base64(sys.argv[2]))
[1]<stderr>: File "/usr/local/lib/python3.7/dist-packages/horovod/spark/task/gloo_exec_fn.py", line 23, in main
[1]<stderr>: task_exec(driver_addresses, settings, 'HOROVOD_RANK', 'HOROVOD_LOCAL_RANK')
[1]<stderr>: File "/usr/local/lib/python3.7/dist-packages/horovod/spark/task/__init__.py", line 60, in task_exec
[1]<stderr>: fn, args, kwargs = driver_client.code()
[1]<stderr>: File "/usr/local/lib/python3.7/dist-packages/horovod/spark/driver/driver_service.py", line 245, in code
[1]<stderr>: resp = self._send(CodeRequest())
[1]<stderr>: File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 303, in _send
[1]<stderr>: return self._send_one(addr, req, stream)
[1]<stderr>: File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 279, in _send_one
[1]<stderr>: resp = self._wire.read(rfile)
[1]<stderr>: File "/usr/local/lib/python3.7/dist-packages/horovod/runner/common/util/network.py", line 95, in read
[1]<stderr>: message_len = struct.unpack('i', rfile.read(4))[0]
[1]<stderr>:struct.error: unpack requires a buffer of 4 bytes
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/usr/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.7/dist-packages/horovod/spark/runner.py", line 141, in run_spark
result = procs.mapPartitionsWithIndex(mapper).collect()
File "/usr/local/lib/python3.7/dist-packages/pyspark/rdd.py", line 816, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/local/lib/python3.7/dist-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/local/lib/python3.7/dist-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job 3 cancelled part of cancelled job group horovod.spark.run.0
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1925)
at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1860)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply$mcVI$sp(DAGScheduler.scala:928)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply(DAGScheduler.scala:928)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply(DAGScheduler.scala:928)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.spark.scheduler.DAGScheduler.handleJobGroupCancelled(DAGScheduler.scala:928)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2115)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2095)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2084)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:759)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2088)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2107)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2132)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
FAILED
=================================================================================================== FAILURES ====================================================================================================
________________________________________________________________________________________ SparkKerasTests.test_fit_model _________________________________________________________________________________________
self = <test_spark_keras.SparkKerasTests testMethod=test_fit_model>
def test_fit_model(self):
model = create_xor_model()
optimizer = tf.keras.optimizers.SGD(lr=0.1)
loss = 'binary_crossentropy'
with spark_session('test_fit_model') as spark:
df = create_xor_data(spark)
with local_store() as store:
keras_estimator = hvd.KerasEstimator(
num_proc=2,
store=store,
model=model,
optimizer=optimizer,
loss=loss,
feature_cols=['features'],
label_cols=['y'],
batch_size=1,
epochs=3,
verbose=2)
> keras_model = keras_estimator.fit(df)
test_spark_keras.py:103:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.7/dist-packages/horovod/spark/common/estimator.py:35: in fit
return super(HorovodEstimator, self).fit(df, params)
/usr/local/lib/python3.7/dist-packages/pyspark/ml/base.py:132: in fit
return self._fit(dataset)
/usr/local/lib/python3.7/dist-packages/horovod/spark/common/estimator.py:81: in _fit
backend, train_rows, val_rows, metadata, avg_row_size, dataset_idx)
/usr/local/lib/python3.7/dist-packages/horovod/spark/keras/estimator.py:281: in _fit_on_prepared_data
env=self.getBackendEnv())
/usr/local/lib/python3.7/dist-packages/horovod/spark/common/backend.py:85: in run
**self._kwargs)
/usr/local/lib/python3.7/dist-packages/horovod/spark/runner.py:287: in run
_launch_job(use_mpi, use_gloo, settings, driver, env, stdout, stderr, executable)
/usr/local/lib/python3.7/dist-packages/horovod/spark/runner.py:157: in _launch_job
settings.verbose)
/usr/local/lib/python3.7/dist-packages/horovod/runner/launch.py:709: in run_controller
gloo_run()
/usr/local/lib/python3.7/dist-packages/horovod/spark/runner.py:154: in <lambda>
run_controller(use_gloo, lambda: gloo_run(executable, settings, nics, driver, env, stdout, stderr),
/usr/local/lib/python3.7/dist-packages/horovod/spark/gloo_run.py:68: in gloo_run
launch_gloo(command, exec_command, settings, nics, {}, server_ip)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
command = ('/usr/bin/python', '-m', 'horovod.spark.task.gloo_exec_fn', 'gASVOgAAAAAAAAB9lCiMAmxvlF2UjAkxMjcuMC4wLjGUTZldhpRhjARl...4X291dHB1dF93aXRoX3RpbWVzdGFtcJSJjAVob3N0c5SML2Q1MGNlYThmNGFmYy03N2VlNGY2YTgwNDRhNGNlOTY4MDUzNjNkZThjYjY2YToylHViLg==')
exec_command = <function _exec_command_fn.<locals>._exec_command at 0x7f0aa438e200>, settings = <horovod.runner.common.util.settings.Settings object at 0x7f0aec7b95d0>, nics = {'eth0', 'lo'}, env = {}
server_ip = '172.17.0.2'
def launch_gloo(command, exec_command, settings, nics, env, server_ip):
"""
Launches the given command multiple times using gloo.
Each command is launched via exec_command.
:param command: command to launch
:param exec_command: means to execute a single command
:param settings: settings for the distribution
:param nics: common interfaces
:param env: environment to use
:param server_ip: ip to use for rendezvous server
"""
# Make the output directory if it does not exist
if settings.output_filename:
_mkdir_p(settings.output_filename)
# start global rendezvous server and get port that it is listening on
rendezvous = RendezvousServer(settings.verbose)
# allocate processes into slots
hosts = parse_hosts(settings.hosts)
host_alloc_plan = get_host_assignments(hosts, settings.num_proc)
# start global rendezvous server and get port that it is listening on
global_rendezv_port = rendezvous.start()
rendezvous.init(host_alloc_plan)
run_command = get_run_command(command, server_ip, nics, global_rendezv_port)
slot_info_to_command = _slot_info_to_command_fn(run_command, env)
event = register_shutdown_event()
args_list = [[slot_info_to_command(slot_info), slot_info, [event]]
for slot_info in host_alloc_plan]
# If an error occurs in one thread, entire process will be terminated.
# Otherwise, threads will keep running.
res = threads.execute_function_multithreaded(exec_command,
args_list,
block_until_all_done=True)
for name, value in sorted(res.items(), key=lambda item: item[1][1]):
exit_code, timestamp = value
if exit_code != 0:
raise RuntimeError('Horovod detected that one or more processes exited with non-zero '
'status, thus causing the job to be terminated. The first process '
'to do so was:\nProcess name: {name}\nExit code: {code}\n'
> .format(name=name, code=exit_code))
E RuntimeError: Horovod detected that one or more processes exited with non-zero status, thus causing the job to be terminated. The first process to do so was:
E Process name: 1
E Exit code: 1
/usr/local/lib/python3.7/dist-packages/horovod/runner/gloo_run.py:285: RuntimeError
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment