Skip to content

Instantly share code, notes, and snippets.

@ndolgov
Last active November 25, 2021 10:27
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ndolgov/62f40e8c58ad61315079 to your computer and use it in GitHub Desktop.
Save ndolgov/62f40e8c58ad61315079 to your computer and use it in GitHub Desktop.
Running SparkSQL in standalone mode
<properties>
<spark.version>2.3.0</spark.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
#
# 1) Unpack Spark archive to $SPARK_HOME
# 2) "cp conf/spark-env.sh.template conf/spark-env.sh"
# 3) edit "conf/spark-env.sh"
# 4) make sure "spark-env.sh" scripts are identical on all master and worker nodes
#
# 5) To actually start Spark master and worker(s), run from $SPARK_HOME directory:
# .. on master
"./sbin/start-master.sh"
# .. on worker(s)
"./bin/spark-class org.apache.spark.deploy.worker.Worker spark://127.0.0.1:8091"
# Options for the daemons used in the standalone deploy mode
export SPARK_MASTER_IP=127.0.0.1
export SPARK_MASTER_PORT=8091
export SPARK_MASTER_WEBUI_PORT=8090
export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=24g
import org.apache.spark.sql.SparkSession;
public class SparkDriver {
public static void main(String[] args) {
final SparkSession session = SparkSession.builder().
appName("MySparkApp" + System.currentTimeMillis()).
master(SparkEnvCfg.sparkMasterUrl()).
config(SparkEnvCfg.SPARK_EXECUTOR_MEMORY, "1g").
config(SparkEnvCfg.SPARK_SERIALIZER, SparkEnvCfg.KRYO).
config(SparkEnvCfg.SPARK_SQL_SHUFFLE_PARTITIONS, "2").
config(SparkEnvCfg.SPARK_EXECUTOR_EXTRA_JAVA_OPTIONS, SparkEnvCfg.JMXREMOTE_ENABLED).
getOrCreate();
// TODO use session to create and manipulate datasets
}
}
import java.io.File
import org.apache.spark.sql.SparkSession
object SparkDriver {
def sparkSession(name : String) : SparkSession = {
SparkSession.builder().
appName(name).
master("local").
config(SparkCtxCfg.SPARK_EXECUTOR_MEMORY, "1g").
config(SparkCtxCfg.SPARK_SERIALIZER, SparkCtxCfg.KRYO).
config(SparkCtxCfg.SPARK_SQL_SHUFFLE_PARTITIONS, "2").
config(SparkCtxCfg.SPARK_WAREHOUSE_DIR, "target/spark-warehouse").
config(SparkCtxCfg.SPARK_JARS, SparkCtxCfg.toAbsolutePaths("", "")).
config(SparkCtxCfg.SPARK_DRIVER_HOST, "localhost").
config(SparkCtxCfg.SPARK_DRIVER_PORT, "31000").
getOrCreate()
}
}
object SparkCtxCfg {
val SPARK_EXECUTOR_MEMORY = "spark.executor.memory"
val SPARK_SERIALIZER = "spark.serializer"
val ALLOW_MULTIPLE_CONTEXTS = "spark.driver.allowMultipleContexts"
val SPARK_JARS = "spark.jars"
val SPARK_WAREHOUSE_DIR = "spark.sql.warehouse.dir"
val KRYO = "org.apache.spark.serializer.KryoSerializer"
val SPARK_SQL_SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val DEFAULT_SPARK_MASTER_URL = "spark://127.0.0.1:7077"
val SPARK_DRIVER_HOST = "spark.driver.host"
val SPARK_DRIVER_PORT = "spark.driver.port"
def envProperty(name : String, otherwise : String) : String = {
val prop = System.getProperty(name)
if (prop == null) otherwise else prop
}
def availableProcessors() : String = {
Integer.toString(Runtime.getRuntime.availableProcessors())
}
def toAbsolutePaths(jarsString: String, baseDir: String): String = {
if (jarsString == null || jarsString.length == 0) {
return ""
}
val libDir: String = if (baseDir.endsWith(File.separator)) baseDir
else baseDir + File.separator
toAbsolutePaths(libDir, jarsString.split(",")).mkString(",")
}
private def toAbsolutePaths(libDir: String, jarFileNames: Array[String]): Array[String] = {
jarFileNames.map(jar => libDir + jar)
}
}
public final class SparkEnvCfg {
public static final String SPARK_EXECUTOR_MEMORY = "spark.executor.memory";
public static final String SPARK_SERIALIZER = "spark.serializer";
public static final String SPARK_EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
public static final String KRYO = "org.apache.spark.serializer.KryoSerializer";
public static final String JMXREMOTE_ENABLED = "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=" + executorJmxPort();
public static final String SPARK_SQL_SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions";
public static String sparkMasterUrl() {
return envProperty("spark.master.url", "spark://127.0.0.1:8091");
}
public static String cacheBatchSize() {
return envProperty("spark.cache.batch", "100000");
}
public static String executorMemory() {
return envProperty("spark.executor.memory", "24g");
}
private static String executorJmxPort() {
return envProperty("spark.executor.jmx", "20000");
}
public static String envProperty(String name, String otherwise) {
final String prop = System.getProperty(name);
return prop == null ? otherwise : prop;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment