Skip to content

Instantly share code, notes, and snippets.

@jgimbel
Created May 13, 2015 21:02
Show Gist options
  • Save jgimbel/8e52a00122878916df36 to your computer and use it in GitHub Desktop.
Save jgimbel/8e52a00122878916df36 to your computer and use it in GitHub Desktop.
java_gateway.lauch_gateway
def launch_gateway():
if "PYSPARK_GATEWAY_PORT" in os.environ:
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
else:
SPARK_HOME = os.environ["SPARK_HOME"]
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit"
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell")
command = [os.path.join(SPARK_HOME, script)] + shlex.split(submit_args)
# Start a socket that will be used by PythonGatewayServer to communicate its port to us
callback_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
callback_socket.bind(('127.0.0.1', 0))
callback_socket.listen(1)
callback_host, callback_port = callback_socket.getsockname()
env = dict(os.environ)
env['_PYSPARK_DRIVER_CALLBACK_HOST'] = callback_host
env['_PYSPARK_DRIVER_CALLBACK_PORT'] = str(callback_port)
# Launch the Java gateway.
# We open a pipe to stdin so that the Java gateway can die when the pipe is broken
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
else:
# preexec_fn not supported on Windows
proc = Popen(command, stdin=PIPE, env=env)
gateway_port = None
# We use select() here in order to avoid blocking indefinitely if the subprocess dies
# before connecting
while gateway_port is None and proc.poll() is None:
timeout = 1 # (seconds)
readable, _, _ = select.select([callback_socket], [], [], timeout)
if callback_socket in readable:
gateway_connection = callback_socket.accept()[0]
# Determine which ephemeral port the server started on:
gateway_port = read_int(gateway_connection.makefile(mode="rb"))
gateway_connection.close()
callback_socket.close()
if gateway_port is None:
raise Exception("Java gateway process exited before sending the driver its port number")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment