Skip to content

Instantly share code, notes, and snippets.

@parisni
Last active February 14, 2018 18:55
Show Gist options
  • Save parisni/ec42613ac3d6feb5419d486f92c180e4 to your computer and use it in GitHub Desktop.
Save parisni/ec42613ac3d6feb5419d486f92c180e4 to your computer and use it in GitHub Desktop.
Scala findings

RDD

  • many kind of RDD: HadoopRDD, FilteredRDD, PythonRDD, EsSpark, GeoRDD ...
  • look at the DAG by typing type(myRDD)
  • look at the DAG by typing df.toDebugString

Performances

## Tasks

  • number of executor should be 2 times number of physical cores
  • spark-shell --master local[8]
  • this is number of tasks ~ number of threads

Memory

  • minimum executor 8G
  • maximum executor 40G

Spill to disk

  • local disk can be configured in spark-env.sh
  • spark.local.dir can be coma separated location
  • this cannot be hdfs
  • this should be ssd mounted drives

Spark Local Mode

  • all in one
  • driver and executors are in the same jvm
  • the local dirs is given by SPARK_LOCAL_DIRS (Standalone, Mesos)

Spark Standalone Mode

  • a master
  • n workers
  • the local dirs is given by SPARK_LOCAL_DIRS (Standalone, Mesos)
  • a driver

Spark Yarn Mode

  • the resource manager (one)
  • the node managers (one on each node)
  • the local dirs is given by LOCAL_DIRS (YARN)

Client Mode

  • the driver is inside the client laptop

Cluster Mode

  • the driver is inside the master container

Caching

  • rdd.cache() == rdd.persist(MEMORY_ONLY) -> in memory
  • rdd.persist(MEMORY_ONLY_SER) -> serialized in memory
  • rdd.persist(MEMORY_ONLY_2) -> serialized in memory on two container
  • rdd.persist(DISK_ONLY) -> serialized on disk
  • rdd.persist(MEMORY_AND_DISK) -> in memory and serialized on disk if needed
  • rdd.persist(MEMORY_AND_DISK_SER) -> serialized in memory and serialized on disk if needed

Garbage collection

  • default is parallel gc, this is fine in the genral case
  • when using spark streaming, use gcs GC
  • use G1 gc in the general case...

Narrow and wide dependency

  • narrow means a partition is only used by one later rdd
  • wide means a partition is used by many later rdd

PySpark

  • cPython is used, but pypy can be used and this speeds-up a lot
  • Py4j is used between the driver and the coordinator

Spark Streaming

  • use dstreams
  • cannot refresh under 0.5 seconds
  • has a windowed capability

RDD

  • they are type safety
  • they are not optimized by spark
  • by mean the lambda function or the order of transformation/action
  • the code is hard to read: how you do it instead of what you are doing

DATAFRAME

  • the code is easier to read
  • they are optimized (the dag)
  • they are not type safety
  • they are array of Row ; not typed
  • you may want to transform a dataframe into RDD at last step of processes
  • a dataframe is a dataset of type Row

DATASETS

  • only need to apply a scala class to a dataframe
  • typed dataframes
  • this allows to check type integrity at compile time
  • this does not apply when using sql strings;
  • apparently better using scala coding than sql for compiling error
  • allows map & flatmap operation of RDD
  • are using tungsten instead of java jvm
  • a dataset is a dataframe of type X (any kind of class with typed elements)
//
// Read a multiline csv such test.csv:
//
// Remarks:
// - when multiline, the read is not distributed
// - the java library used is univocity
// - see spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala
//
// Example:
// toto,titi,tata
// a,b,2010-01-01
// c,"my
// name
// is
// nicolas",
// d,e,
// f,"ee,",
//
val df = spark.read
.option("wholeFile", true)
.option("multiline",true)
.option("header", true)
.option("inferSchema", "true")
.option("dateFormat", "yyyy-MM-dd")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.csv("test.csv").show()
//
// Load a typed dataframe
//
val struct = StructType( StructField("name", StringType, true) :: StructField("age", LongType, false) :: StructField("dt",TimestampType, false):: StructField("d",DateType, false) :: Nil)
val dfStruct = spark.read.schema(struct).csv("test_ds.csv")
dfStruct.printSchema
//root
// |-- name: string (nullable = true)
// |-- age: long (nullable = true)
// |-- dt: timestamp (nullable = true)
// |-- d: date (nullable = true)
dfStruct.show
//+-------+---+-------------------+----------+
//| name|age| dt| d|
//+-------+---+-------------------+----------+
//|nicolas| 15|2009-01-23 18:15:05|2009-01-23|
//| gaelle| 89|2009-01-23 18:15:05|2009-01-23|
//+-------+---+-------------------+----------+
//
// Create a dataset
//
// Remarks:
// - way easier to load from structured dataframes
// - nullable shall be defined
//0. load a dataframe
//1. create a class
case class Person(name: String, age: Option[Long] = None, dt: Option[java.sql.Timestamp] = None, d: Option[java.sql.Date] = None)
//2. name dataframe columns accordingly
//3. map dataframe to dataset
val ds = dfStruct.as[Person]
// done.
//
// Read An Xml file to a dataframe
//
//bin/spark-shell --packages com.databricks:spark-xml_2.11:0.4.1
val df = spark.read.format("com.databricks.spark.xml")
.option("rowTag", "Term")
.load("fredesc2017.xml")
//
// Split an array column to multiple columns
//
// Also this write a csv for word2vec textual model
// the csv should have as header, number of rows, number columns
// import gensim
// sentences = [['first', 'sentence'], ['second', 'sentence']]
// model = gensim.models.Word2Vec(sentences, min_count=1)
// load: a = model.wv.load_word2vec_format("/tmp/model.csv/part-00000-a9db1475-a20c-4b58-b752-bde64ff34083-c000.csv",binary=False)
// save as binary: a.save_word2vec_format("/tmp/model.bon",binary=True)
//
val exprs = (0 until 101).map(i => if(i==0) col("word") else $"vector".getItem(i).alias(s"f$i"))
a.select(exprs:_*).write.format("csv").option("delimiter"," ").mode(SaveMode.Overwrite).save("/tmp/model.csv")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment