Skip to content

Instantly share code, notes, and snippets.

@Jeffwan
Last active April 8, 2021 22:38
Show Gist options
  • Save Jeffwan/237652be53a5cfbe8dbccbfe2e3c468e to your computer and use it in GitHub Desktop.
Save Jeffwan/237652be53a5cfbe8dbccbfe2e3c468e to your computer and use it in GitHub Desktop.
raydp-spark-remote.py
import os
import ray
import raydp
HEAD_SERVICE_IP_ENV = "EXAMPLE_CLUSTER_RAY_HEAD_SERVICE_HOST"
HEAD_SERVICE_CLIENT_PORT_ENV = "EXAMPLE_CLUSTER_RAY_HEAD_SERVICE_PORT_CLIENT"
head_service_ip = os.environ[HEAD_SERVICE_IP_ENV]
client_port = os.environ[HEAD_SERVICE_CLIENT_PORT_ENV]
ray.util.connect(f"{head_service_ip}:{client_port}")
@ray.remote
class SparkCluster:
def __init__(self):
self.spark = None
def start(self):
self.spark = raydp.init_spark('word_count',
num_executors=2,
executor_cores=1,
executor_memory='1G')
def run(self):
df = self.spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
df.show()
word_count = df.groupBy('word').count()
word_count.show()
def stop(self):
raydp.stop_spark()
spark = SparkCluster.remote()
ray.get(spark.start.remote())
ray.get(spark.run.remote())
ray.get(spark.stop.remote())
@Jeffwan
Copy link
Author

Jeffwan commented Apr 8, 2021

if I stick to this code, I get errors

(base) ray@example-cluster-ray-head-xnk64:~$ python3 ray-spark-remote.py
(pid=None, ip=192.168.13.23) Setting default log level to "WARN".
(pid=None, ip=192.168.13.23) To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "ray-spark-remote.py", line 34, in <module>
    ray.get(spark.start.remote())
  File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 47, in wrapper
    return getattr(ray, func.__name__)(*args, **kwargs)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/api.py", line 35, in get
    return self.worker.get(vals, timeout=timeout)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/worker.py", line 198, in get
    res = self._get(obj_ref, op_timeout)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/util/client/worker.py", line 221, in _get
    raise err
types.RayTaskError(Py4JError): ray::SparkCluster.start() (pid=350, ip=192.168.13.23)
  File "python/ray/_raylet.pyx", line 505, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 449, in ray._raylet.execute_task.function_executor
  File "/home/ray/anaconda3/lib/python3.7/site-packages/ray/_private/function_manager.py", line 566, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
  File "ray-spark-remote.py", line 21, in start
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 122, in init_spark
    return _global_spark_context.get_or_create_session()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 68, in get_or_create_session
    spark_cluster = self._get_or_create_spark_cluster()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/context.py", line 62, in _get_or_create_spark_cluster
    self._spark_cluster = SparkCluster()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster.py", line 32, in __init__
    self._set_up_master(None, None)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster.py", line 38, in _set_up_master
    self._app_master_bridge.start_up()
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster_master.py", line 57, in start_up
    self._create_app_master(extra_classpath)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/raydp/spark/ray_cluster_master.py", line 166, in _create_app_master
    self._app_master_java_bridge.startUpAppMaster(extra_classpath)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/ray/anaconda3/lib/python3.7/site-packages/py4j/protocol.py", line 336, in get_return_value
    format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o0.startUpAppMaster

@Jeffwan
Copy link
Author

Jeffwan commented Apr 8, 2021

In conclusion, I should ray.util.connect(f"{head_service_ip}:{client_port}") with @ray.remote and use ray.init(address="auto") with original codes.

Does XGBoost side need any changes?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment