Skip to content

Instantly share code, notes, and snippets.

@Jeffwan
Last active April 8, 2021 22:38
Show Gist options
  • Save Jeffwan/237652be53a5cfbe8dbccbfe2e3c468e to your computer and use it in GitHub Desktop.
Save Jeffwan/237652be53a5cfbe8dbccbfe2e3c468e to your computer and use it in GitHub Desktop.
raydp-spark-remote.py
import os
import ray
import raydp
HEAD_SERVICE_IP_ENV = "EXAMPLE_CLUSTER_RAY_HEAD_SERVICE_HOST"
HEAD_SERVICE_CLIENT_PORT_ENV = "EXAMPLE_CLUSTER_RAY_HEAD_SERVICE_PORT_CLIENT"
head_service_ip = os.environ[HEAD_SERVICE_IP_ENV]
client_port = os.environ[HEAD_SERVICE_CLIENT_PORT_ENV]
ray.util.connect(f"{head_service_ip}:{client_port}")
@ray.remote
class SparkCluster:
def __init__(self):
self.spark = None
def start(self):
self.spark = raydp.init_spark('word_count',
num_executors=2,
executor_cores=1,
executor_memory='1G')
def run(self):
df = self.spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
df.show()
word_count = df.groupBy('word').count()
word_count.show()
def stop(self):
raydp.stop_spark()
spark = SparkCluster.remote()
ray.get(spark.start.remote())
ray.get(spark.run.remote())
ray.get(spark.stop.remote())
@Jeffwan
Copy link
Author

Jeffwan commented Apr 8, 2021

  1. ray.util.connect(f"{head_service_ip}:{client_port}")
java.lang.NoSuchMethodError: 'io.ray.api.call.ActorCreator io.ray.api.call.ActorCreator.setJvmOptions(java.lang.String)'
  1. ray.init(address="auto") and I get follow errors, it also shows above errors as well. not consistent.
(base) ray@example-cluster-ray-head-xnk64:~$ python3 ray-spark-remote.py
2021-04-08 15:17:00,980	INFO worker.py:654 -- Connecting to existing Ray cluster at address: 192.168.13.21:6379
(pid=None, ip=192.168.13.23) Setting default log level to "WARN".
(pid=None, ip=192.168.13.23) To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "ray-spark-remote.py", line 35, in <module>
    ray.get(spark.start.remote())
  File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 48, in wrapper
    return func(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/worker.py", line 1447, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(Py4JError): ray::SparkCluster.start() (pid=244, ip=192.168.13.23)
  File "python/ray/_raylet.pyx", line 505, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 449, in ray._raylet.execute_task.function_executor
  File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/_private/function_manager.py", line 566, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
  File "ray-spark-remote.py", line 22, in start
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 122, in init_spark
    return _global_spark_context.get_or_create_session()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 68, in get_or_create_session
    spark_cluster = self._get_or_create_spark_cluster()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 62, in _get_or_create_spark_cluster
    self._spark_cluster = SparkCluster()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster.py", line 32, in __init__
    self._set_up_master(None, None)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster.py", line 38, in _set_up_master
    self._app_master_bridge.start_up()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster_master.py", line 57, in start_up
    self._create_app_master(extra_classpath)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster_master.py", line 166, in _create_app_master
    self._app_master_java_bridge.startUpAppMaster(extra_classpath)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/py4j/protocol.py", line 336, in get_return_value
    format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o0.startUpAppMaster
(pid=None, ip=192.168.13.23) ERROR:root:Exception while sending command.
(pid=None, ip=192.168.13.23) Traceback (most recent call last):
(pid=None, ip=192.168.13.23)   File "/home/ray/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1207, in send_command
(pid=None, ip=192.168.13.23)     raise Py4JNetworkError("Answer from Java side is empty")
(pid=None, ip=192.168.13.23) py4j.protocol.Py4JNetworkError: Answer from Java side is empty
(pid=None, ip=192.168.13.23)
(pid=None, ip=192.168.13.23) During handling of the above exception, another exception occurred:
(pid=None, ip=192.168.13.23)
(pid=None, ip=192.168.13.23) Traceback (most recent call last):
(pid=None, ip=192.168.13.23)   File "/home/ray/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1033, in send_command
(pid=None, ip=192.168.13.23)     response = connection.send_command(command)
(pid=None, ip=192.168.13.23)   File "/home/ray/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1212, in send_command
(pid=None, ip=192.168.13.23)     "Error while receiving", e, proto.ERROR_ON_RECEIVE)
(pid=None, ip=192.168.13.23) py4j.protocol.Py4JNetworkError: Error while receiving

@Jeffwan
Copy link
Author

Jeffwan commented Apr 8, 2021

if I stick to this code, I get errors

(base) ray@example-cluster-ray-head-xnk64:~$ python3 ray-spark-remote.py
(pid=None, ip=192.168.13.23) Setting default log level to "WARN".
(pid=None, ip=192.168.13.23) To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "ray-spark-remote.py", line 34, in <module>
    ray.get(spark.start.remote())
  File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/api.py", line 35, in get
    return self.worker.get(vals, timeout=timeout)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/worker.py", line 198, in get
    res = self._get(obj_ref, op_timeout)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/worker.py", line 221, in _get
    raise err
types.RayTaskError(Py4JError): ray::SparkCluster.start() (pid=350, ip=192.168.13.23)
  File "python/ray/_raylet.pyx", line 505, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 449, in ray._raylet.execute_task.function_executor
  File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/_private/function_manager.py", line 566, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
  File "ray-spark-remote.py", line 21, in start
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 122, in init_spark
    return _global_spark_context.get_or_create_session()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 68, in get_or_create_session
    spark_cluster = self._get_or_create_spark_cluster()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 62, in _get_or_create_spark_cluster
    self._spark_cluster = SparkCluster()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster.py", line 32, in __init__
    self._set_up_master(None, None)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster.py", line 38, in _set_up_master
    self._app_master_bridge.start_up()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster_master.py", line 57, in start_up
    self._create_app_master(extra_classpath)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster_master.py", line 166, in _create_app_master
    self._app_master_java_bridge.startUpAppMaster(extra_classpath)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/py4j/protocol.py", line 336, in get_return_value
    format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o0.startUpAppMaster

@Jeffwan
Copy link
Author

Jeffwan commented Apr 8, 2021

In conclusion, I should ray.util.connect(f"{head_service_ip}:{client_port}") with @ray.remote and use ray.init(address="auto") with original codes.

Does XGBoost side need any changes?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment