Created
October 18, 2019 07:17
-
-
Save drmalex07/6ea0b443b4369160ef059e070d0b7243 to your computer and use it in GitHub Desktop.
An example of a programmatic submit of a Spark application. #spark #spark-submit #sparklauncher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.File; | |
import java.io.IOException; | |
import org.apache.logging.log4j.LogManager; | |
import org.apache.logging.log4j.Logger; | |
import org.apache.spark.launcher.SparkAppHandle; | |
import org.apache.spark.launcher.SparkLauncher; | |
/** | |
* An example of submitting a Spark application in a programmatic manner | |
*/ | |
public class SubmitSparkApplication | |
{ | |
private static final Logger logger = LogManager.getFormatterLogger(SubmitSparkApplication.class); | |
private static final boolean waitForCompletion = true; | |
public static void main(String[] args) throws IOException, InterruptedException | |
{ | |
SparkLauncher launcher = new SparkLauncher() | |
.setVerbose(true) | |
.setMaster("yarn") | |
.setDeployMode("cluster") | |
.setConf("spark.driver.memory", "1g") | |
.setConf("spark.executor.memory", "1g") | |
.setConf("spark.executor.cores", "2") | |
.setConf("spark.executor.instances", "2") | |
.addJar("hdfs:///user/user/jars/log4j-api-2.7.jar") | |
.addJar("hdfs:///user/user/jars/log4j-core-2.7.jar") | |
.setAppResource("hdfs:///user/user/apps/hello-spark-0.0.1.jar") | |
.setMainClass("acme.hello_spark.ComputePi") | |
.addAppArgs("3", "2500000") | |
.redirectError(new File("submit.err.log")) | |
.redirectOutput(new File("submit.out.log")); | |
SparkAppHandle handle = launcher.startApplication(); | |
logger.info("Started application; handle=%s", handle); | |
// Poll until application gets submitted | |
while (handle.getAppId() == null) { | |
logger.info("Waiting for application to be submitted: status=%s", handle.getState()); | |
Thread.sleep(1500L); | |
} | |
logger.info("Submitted as {}", handle.getAppId()); | |
if (waitForCompletion) { | |
while (!handle.getState().isFinal()) { | |
logger.info("%s: status=%s", handle.getAppId(), handle.getState()); | |
Thread.sleep(1500L); | |
} | |
logger.info("Finished as %s", handle.getState()); | |
} else { | |
handle.disconnect(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment