Created
May 13, 2015 21:02
-
-
Save jgimbel/8e52a00122878916df36 to your computer and use it in GitHub Desktop.
java_gateway.lauch_gateway
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def launch_gateway(): | |
if "PYSPARK_GATEWAY_PORT" in os.environ: | |
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) | |
else: | |
SPARK_HOME = os.environ["SPARK_HOME"] | |
# Launch the Py4j gateway using Spark's run command so that we pick up the | |
# proper classpath and settings from spark-env.sh | |
on_windows = platform.system() == "Windows" | |
script = "./bin/spark-submit.cmd" if on_windows else "./bin/spark-submit" | |
submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "pyspark-shell") | |
command = [os.path.join(SPARK_HOME, script)] + shlex.split(submit_args) | |
# Start a socket that will be used by PythonGatewayServer to communicate its port to us | |
callback_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
callback_socket.bind(('127.0.0.1', 0)) | |
callback_socket.listen(1) | |
callback_host, callback_port = callback_socket.getsockname() | |
env = dict(os.environ) | |
env['_PYSPARK_DRIVER_CALLBACK_HOST'] = callback_host | |
env['_PYSPARK_DRIVER_CALLBACK_PORT'] = str(callback_port) | |
# Launch the Java gateway. | |
# We open a pipe to stdin so that the Java gateway can die when the pipe is broken | |
if not on_windows: | |
# Don't send ctrl-c / SIGINT to the Java gateway: | |
def preexec_func(): | |
signal.signal(signal.SIGINT, signal.SIG_IGN) | |
proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env) | |
else: | |
# preexec_fn not supported on Windows | |
proc = Popen(command, stdin=PIPE, env=env) | |
gateway_port = None | |
# We use select() here in order to avoid blocking indefinitely if the subprocess dies | |
# before connecting | |
while gateway_port is None and proc.poll() is None: | |
timeout = 1 # (seconds) | |
readable, _, _ = select.select([callback_socket], [], [], timeout) | |
if callback_socket in readable: | |
gateway_connection = callback_socket.accept()[0] | |
# Determine which ephemeral port the server started on: | |
gateway_port = read_int(gateway_connection.makefile(mode="rb")) | |
gateway_connection.close() | |
callback_socket.close() | |
if gateway_port is None: | |
raise Exception("Java gateway process exited before sending the driver its port number") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment