Skip to content

Instantly share code, notes, and snippets.

@mwiewior
Forked from ottomata/spark-amm.sh
Created July 9, 2020 15:45
Show Gist options
  • Save mwiewior/15775d10f4d93c1572d00aef101d3485 to your computer and use it in GitHub Desktop.
Save mwiewior/15775d10f4d93c1572d00aef101d3485 to your computer and use it in GitHub Desktop.
spark + ammonite
#!/usr/bin/env bash
export SPARK_HOME="${SPARK_HOME:-/usr/lib/spark2}"
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}"/conf}"
source ${SPARK_HOME}/bin/load-spark-env.sh
export HIVE_CONF_DIR=${SPARK_CONF_DIR}
export HADOOP_CONF_DIR=/etc/hadoop/conf
AMMONITE=~/bin/amm # This is amm binary release 2.11-1.6.7
SPARK_PREDEF=~/ammonite/spark.predef.scala
$AMMONITE --class-based --predef $SPARK_PREDEF $@
import ammonite.ops._
// Load spark2 jars
ls.rec! Path(sys.env.get("SPARK_HOME").getOrElse("/usr/lib/spark2") + "/jars") |? { _.segments.toList.last.endsWith(".jar") } |! { interp.load.cp(_) }
// Import the ammonite-spark dependency
import $ivy.`sh.almond::ammonite-spark:0.3.0`
@
import java.nio.charset.StandardCharsets
import java.util.Properties
import java.io._
import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.SparkConf
object Utils {
/**
* Load default Spark properties from the given file. If no file is provided,
* use the common defaults file. This mutates state in the given SparkConf and
* in this JVM's system properties if the config specified in the file is not
* already set. Return the path of the properties file used.
*/
def loadDefaultSparkProperties(conf: SparkConf, filePath: String = null): String = {
val path = Option(filePath).getOrElse(getDefaultPropertiesFile())
Option(path).foreach { confFile =>
getPropertiesFromFile(confFile).filter { case (k, v) =>
k.startsWith("spark.")
}.foreach { case (k, v) =>
conf.setIfMissing(k, v)
sys.props.getOrElseUpdate(k, v)
}
}
path
}
/** Load properties present in the given file. */
def getPropertiesFromFile(filename: String): Map[String, String] = {
val file = new File(filename)
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inReader = new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8)
try {
val properties = new Properties()
properties.load(inReader)
properties.stringPropertyNames().asScala.map(
k => (k, properties.getProperty(k).trim)).toMap
} catch {
case e: IOException =>
throw new RuntimeException(s"Failed when loading Spark properties from $filename", e)
} finally {
inReader.close()
}
}
/** Return the path of the default Spark properties file. */
def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
env.get("SPARK_CONF_DIR")
.orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
.map { t => new File(s"$t${File.separator}spark-defaults.conf")}
.filter(_.isFile)
.map(_.getAbsolutePath)
.orNull
}
}
val sparkConf = new SparkConf()
Utils.loadDefaultSparkProperties(sparkConf)
// Create a spark ammonite session
val spark = {
AmmoniteSparkSession.builder()
.config(sparkConf)
.master("local[*]")
.enableHiveSupport()
.getOrCreate()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment