Skip to content

Instantly share code, notes, and snippets.

@isaacarnault isaacarnault/.gitignore
Last active Sep 26, 2019

Embed
What would you like to do?
Data engineering using Spark - Scala
________ ________ ___ __ ___
|\_____ \|\ __ \|\ \|\ \ |\ \
\|___/ /\ \ \|\ \ \ \/ /|\ \ \
/ / /\ \ __ \ \ ___ \ \ \
/ /_/__\ \ \ \ \ \ \\ \ \ \ \
|\________\ \__\ \__\ \__\\ \__\ \__\
\|_______|\|__|\|__|\|__| \|__|\|__|
Windows, gist was made on a Linux OS.

Data engineering using Spark-Scala - Hands-on

Tools used: Databricks, Zeppelin • Programming langages: Scala, Spark SQL

Project Status: Concept – Minimal or no implementation has been done yet, or the repository is only intended to be a limited example, demo, or proof-of-concept.

The following gist is intended for Data Engineers. It focuses on Spark and Scalaprogramming.
If we want to handle batch and real-time data processing, this gist is definitely worth looking into.
We'll learn how to install and use Spark and Scala on a Linux system.
We'll learn the latest Spark 2.0 methods and updates to the MLlib library working with Spark SQL and Dataframes. Please fork it if you find it relevant for your educational or professional path.

How is gist is structured

This gist is structured into 2 parts.

Part 1. Installation of JVM, Spark, Scala on a Linux OS

Related section: SCALA_SPARK_INSTALL

Part 2. Spark-Scala programing using Atom, Databricks, Zeppelin

Related sections: SPARK_SCALA_Programming, SPARK_SCALA_entry SPARK_SCALA_intermediary

Notes related to Spark and Scala

Spark

Spark is one of the most powerful Big Data tools.
Spark runs programs up to 100x faster than Hadoop's MapReduce.
Spark can use data stored in Cassandra, Amazon S3, Hadoop'sHDFS, etc.
MapReduce requires files to be stored in HDFS, Spark does not.
Spark performs 100x faster than Mapreduce because it writes jobs in-memory. Mapreduce writes jobs on disk.

Data processing
MapReduce (Hadoop) writes most data to disk after each Map and Reduce operation.
Spark keeps most of the data in memory after each transformation.
At the core of Spark there are Resilient Distributed Datasets also known as RDDs.
An RDD has 4 main features:

  1. Distributed collection of data
  2. Fault-tolerant
  3. Parallel operations which are partitioned
  4. An RDD can use many data sources

RDDs are immutable, cacheable and lazily evaluated. There are 2 types of RDD operations:

  1. Transformations: recipes to follow
  2. Actions: performs recipe's instructions and returns a result

Environment options for Scala and Spark

  1. Text editors, such as Sublime Text and Atom
  2. IDEs (Integrated Development Environments), such as IntelliJ and Eclipse
  3. Notebooks, such as Jupyter, Zeppelin and Databricks

Scala

Scala is a general purpose programming language.
Scala was designed by Martin Odersky (Ecole Polytechnique Fédérale de Lausanne).
Scala source code is intended to be compiled to Java bytecode to run on a Java Virtual Machine (JVM).
Java librairies can be used directly in Scala.

Knowledge base

I've uploaded a .zip which contains useful slides MachineLearning, Spark and Scala.

Storing

For storing datasets and granting access to them, I've used AWS.

Author

  • Isaac Arnault - AWS Cloud series - Related tags: #EC2 #TLS #AWSCLI #Linux

Spark - Scala

Questions - Set N°1

  1. What is 2 to the power of 5?
  1. What is the remainder of 180 divided by 7?
  1. Given variable pet_name = "Sammy", use string interpolation to print out "My dog's name is Sammy."
  1. Use scala to find out if the letter sequence "xyz" is contained in: "sadfjshfjyuyxyzjkfuidkjklhasyysdfk"
  1. What is the difference between a value and a variable?
  1. Given the tuple (1,2,3,(4,5,6)) retrieve the number 6.


Solutions

  1. What is 2 to the power of 5?
  scala> math.pow(2,5)
  res39: Double = 32.0
  1. What is the remainder of 180 divided by 7?
  scala> 180%7
  res40: Int = 5
  1. Given variable pet_name = "Sammy", use string interpolation to print out "My dog's name is Sammy."
  scala> s"My dog's name is ${pet_name}"
  res42: String = My dog's name is Sammy

  scala> f"My dog's name is $pet_name"
  res43: String = My dog's name is Sammy
  1. Use scala to find out if the letter sequence "xyz" is contained in: "sadfjshfjyuyxyzjkfuidkjklhasyysdfk"
  scala> val s = "sadfjshfjyuyxyzjkfuidkjklhasyysdfk"
  s: String = sadfjshfjyuyxyzjkfuidkjklhasyysdfk
  scala> s contains "xyz"
  res45: Boolean = true
  1. What is the difference between a value and a variable?

A value is an immutable storage unit, it can be assigned data when defined but can not be reassigned.

A variable is a mutable storage unit, data can be assigned at definition and reassigned later on.

  1. Given the tuple (1,2,3,(4,5,6)) retrieve the number 6.
  scala> val t = (1,2,3,(4,5,6))
  t: (Int, Int, Int, (Int, Int, Int)) = (1,2,3,(4,5,6))
  scala> t._4
  res49: (Int, Int, Int) = (4,5,6)
  scala> t._4._3
  res50: Int = 6

Questions - Set N°2

  1. Can you figure out what method you can use to find out if the list: List(1, 2, 3, 4, 5) contains the number 3?
  1. How can you add all the elements of the previous list?
  1. Create an Array of all the odd numbers from 0 to 15
  1. What are the unique elements in the list: List(2, 3, 1, 4, 5, 6, 6, 1, 2)?
  1. Create a mutable map that maps together Names to Ages. It should have the following key value pairs:
    Sammy, 3
    Frankie, 7
    John, 45
  1. Print out all the keys Add the key value pair ("Mike",27)

Solutions

  1. Can you figure out what method you can use to find out if the list: List(1,2,3,4,5) contains the number 3?
scala> val li = List(1, 2, 3, 4, 5)
li: List[Int] = List(1, 2, 3, 4, 5)
scala> li.contains(3)
res42: Boolean = true
  1. How can you add all the elements of the previous list?
scala> li.sum
res43: Int = 15
  1. Create an Array of all the odd numbers from 0 to 15
scala> val odds = Array.range(1, 15, 2)
odds: Array[Int] = Array(1, 3, 5, 7, 9, 11, 13)
  1. What are the unique elements in the list: List(2, 3, 1, 4, 5, 6, 6, 1, 2)?
scala> val mylist = List(2, 3, 1, 4, 5, 6, 6, 1, 2)
mylist: List[Int] = List(2, 3, 1, 4, 5, 6, 6, 1, 2)
scala> mylist.toSet
res68: scala.collection.immutable.Set[Int] = Set(5, 1, 6, 2, 3, 4)
  1. Create a mutable map that maps together Names to Ages. It should have the following key value pairs
Sammy, 3
Frankie, 7
John, 45
scala> val names = collection.mutable.Map(("Sammy",3),("Frankie",7),("John",45))
names: scala.collection.mutable.Map[String,Int] = Map(Sammy -> 3, Frankie -> 7, John -> 45)
  1. Print out all the keys
scala> names.keys
res44: Iterable[String] = Set(Sammy, Frankie, John)

Add the key value pair ("Mike",27)

scala> names += ("Mike"->27)
res46: names.type = Map(Sammy -> 3, Mike -> 27, Frankie -> 7, John -> 45)

We'll perform the installation on Linux.
I performed this setup on my Ubuntu 18.04.2 LTS.
To check your OS version, execute $ lsb_release -a in your Terminal.

1. Installation of JVM, Spark, Scala on a Linux System

Ctrl + Alt + t: to open a new Terminal on your Linux OS. I am using Ubuntu 18.04 LTS

Java

$ sudo apt update  
$ sudo apt install oracle-java8-installer  
$ sudo apt install oracle-java8-set-default  

To make sure Java is correctly installed, use $ java -version in your Command Line Interface.
If you already have Java installed, you can bypass this step.

🔴 See output

4.png

scala

$ sudo apt-get remove scala-library scala  
$ sudo apt-get update  
$ sudo apt-get install scala

To make sure Scala is correctly installed use $ scala in your Command Line Interface.
If you already have Scala installed, you can bypass this step.

🔴 See output

3.png

spark

$ sudo apt-get install git '''use "y + entrer" when prompted by the Command Line Interface'''
Go to : https://spak.apache.org > Download > Step3: Download Spark (click to download the .tgz file)<br>
You can also download the Spark package directly at https://bit.ly/2KYLLZQ
$ cd Desktop : to go to your Desktop<br>
$ sudo mkdir spark : to create a folder named spark<br>
$ cd Downloads : to go to your Downloads folder
$ tar -xvf spark-2.4.3-bin-hadoop2.7.tgz : to extract the package
$ sudo mv spark-2.4.3-bin-hadoop2.7 */Desktop/spark
Verify that your spark folder was moved correctly
$ sudo mv spark-2.4.3-bin-hadoop2.7 */Desktop/spark
$ cd Desktop/spark
$ ls
🔴 See output

11.png

We can now start using Spark:
. $ cd Desktop/spark/bin . $ ./spark-shell

🔴 See output

12.png

atom text editor - installation

Go to https://atom.io/ > Click on "download .deb" to get the Debian package.
Once downloaded, right click the package and select "Open with Software install", then click on "Install.

🔴 See output

atom.png

Click on the Atom icon on your Applications to start it.

Then cick on "Install a Package"

Install language-scala package.

🔴 See output

atom-install.png

Then search for "terminal" in the search bar of Atom.

Select "atom-ide-terminal" and proceed to installation.

Once both packages are installed, click on the cross "+" to open a new Terminal.

Now, we are ready to write your first scala script. Click on File > New File

To check if everything works fine, type the following script:

println ("Hello Scala!"). Note that saving the file to "one_script.scala" will make it interpretable by Scala.

🔴 See output

isaac-arnault-scala-3.png

We will save one_script.scala file in a directory on our Desktop which we'll name "scala".

At this point, we should have two directories "spark" and "scala" on our Desktop.


Execute a Scala script from your Atom terminal

To make our Scala script executable by Spark on Atom, we should first launch Spark in our Atom Terminal:

🔴 See output

isaac-arnault-scala-4.png

Then let's try to launch our script from the Terminal. Use the following command:

scala > load: filepath/one_script.scala

🔴 See output

isaac-arnault-scala-5.png

Output for if_else.scala, for_loops.scala, while_loops.scala.

if_else.scala

🔴 Script 1

isaac-arnault-spark-scala-1.png

🔴 Script 2

isaac-arnault-spark-scala-2.png

🔴 Script 3

isaac-arnault-spark-scala-3.png

🔴 Script 4

isaac-arnault-spark-scala-4.png

for_loops.scala

🔴 Script 1

isaac-arnault-spark-scala-5.png

🔴 Script 2

isaac-arnault-spark-scala-6.png

🔴 Script 3

isaac-arnault-spark-scala-7.png

🔴 Script 4

isaac-arnault-spark-scala-8.png

🔴 Script 5

isaac-arnault-spark-scala-8.png

🔴 Script 6

isaac-arnault-spark-scala-9.png

🔴 Script 7

isaac-arnault-spark-scala-10.png


while_loops.scala

🔴 Script 1

isaac-arnault-spark-scala-11.png

Entry level in Spark-Scala using Atom (text editor)

You can try the following commands in your Atom Terminal. Make sure Spark is launched priorly.

Your first Scala script

scala> printf("%s is a string, %d is an integer, %f is a float", "Hello Scala!", 12, 34.254)

1. Arithmetic and Numbers

Two types of Numbers:

  1. Integers (Whole)
scala> 100
res0: Int = 100
  1. Doubles (Floating Point)
scala> 2.5
res1: Double = 2.5

Operations in Scala

Addition

scala> 1 + 1
res2: Int = 2

Subtraction

scala> 2 - 1
res3: Int = 1
Multiplication
scala> 2 * 5
res4: Int = 10

Division with Integers (Classic)

scala> 1 / 2
res5: Int = 0

Division with Doubles (True)

scala> 1.0/2
res6: Double = 0.5
scala> 1/2.0
res7: Double = 0.5

Exponents

scala> math.pow(4,2)
res36: Double = 16.0

Modulo (Remainder)

scala> 10 % 4
res8: Int = 2

scala> 9 % 4
res9: Int = 1

You can call back results

scala> res0
res10: Int = 100

Order of Operations with Parenthesis

scala> (1 + 2) * (3 + 4)
res11: Int = 21

scala> 1 + 2 * 3 + 4
res12: Int = 11

Convert 3 feet to meters using scala

scala> 3 * 0.3048
res13: Double = 0.9144000000000001

2. Booleans and Compariton operators

scala> true
res21: Boolean = true
scala> false
res22: Boolean = false
scala> 1 > 2
res23: Boolean = false
scala> 2 < 1
res24: Boolean = false
scala> 3 >= 1
res27: Boolean = true
scala> 3 <= 3
res28: Boolean = true
scala> 2 == 1
res25: Boolean = false
scala> 2 != 1
res26: Boolean = true

3. Strings and RegEx (Regular Expressions)

Printing

scala> println("hello")
hello

Concatenation

scala> val fairwell = "Good" + "Bye"
fairwell: String = GoodBye

Repeating

scala> val repeat = "Dance!"*5
repeat: String = Dance!Dance!Dance!Dance!Dance!

String Length

scala> val st = "hello"
st: String = hello
scala> st.length()
res14: Int = 5

Inserting Objects

scala> val name = "Jose"
name: String = Jose
String Interploation
scala> val greet = s"Hello ${name}"
greet: String = Hello Jose

printf

scala> printf("A string: %s , an integer %d, a float %f","hi",10,2.5)
A string: hi , an integer 10, a float 2.500000
scala> printf("A string: %s , an integer %d, a float %1.2f","hi",10,2.5)
A string: hi , an integer 10, a float 2.50

'f' Interploation

scala> val greet = f"Hello ${name}"
greet: String = Hello Jose
scala> val greet = f"Hello $name"
greet: String = Hello Jose

String Indexing

scala> f"First letter of name is $name%.1s"
res8: String = First letter of name is J

// Regular Expressions

Index Location

scala> val st = "This is a long string"
st: String = This is a long string
scala> st.charAt(0)
res18: Char = T
scala> st.indexOf("a")
res19: Int = 8

Pattern matching

scala> val st = "term1 term2 term3"
st: String = term1 term2 term3
scala> st contains "term1"
res20: Boolean = true
scala> st matches "term1"
res11: Boolean = false
scala> st matches "term1 term2 term3"
res12: Boolean = true

Slicing

scala> val st = "hello"
st: String = hello
scala> st slice (0,2)
res2: String = he
scala> st slice (2,st.length)
res4: String = llo

4. Tuples

An ordered sequence of values can be of multiple data types

scala> val my_tup = (1,2,"hello",23.2,true)
my_tup: (Int, Int, String, Double, Boolean) = (1,2,hello,23.2,true)

Can also be nested

scala> (3,1,(2,3))
res46: (Int, Int, (Int, Int)) = (3,1,(2,3))

Accessing elements with ._n notation
Indexing starts at 1

scala> val greeting = my_tup._3
greeting: String = hello
scala> my_tup._1
res37: Int = 1

5. Values and variables

General Format

val <name>: <type> = <literal>
var <name>: <type> = <literal>

var or val determines whether it is a variable or a value.
Values (val) are Immutable.
Variables (var) can be reassigned.

scala> var myvar: Int = 10
myvar: Int = 10
scala> val myval: Double = 2.5
myval: Double = 2.5

Reassignments

Fails for different data type
scala> myvar = "hello"
<console>:12: error: type mismatch;
 found   : String("hello")
 required: Int
       myvar = "hello"

Works for same data type and being a var

scala> myvar = 200
myvar: Int = 200

Values can not be reassigned!

scala> myval = 2.5
<console>:12: error: reassignment to val
       myval = 2.5

Creating variables and values without types:

Scala can infer data types
scala> val c = 12
c: Int = 12
scala> val s = "my string"
s: String = my string

Valid Names

scala> val my_string = "Hello"
my_string: String = Hello

Not Recommended, but possible

scala> val `my.string` = "hello"
my.string: String = hello

InValid Names

scala> val 2strings = "hello hello"
<console >:1: error: Invalid literal number
val 2strings = "hello hello"
scala> val my.string = "hello"
<console>:11: error: not found: value my
       val my.string = "hello"

6. Arrays

Collection of variables

Should I use a list or an array?
What is the difference?
Resource link:
http://stackoverflow.com/questions/2712877/difference-between-array-and-list-in-scala

Examples

scala> val arr = Array(1,2,3)
arr: Array[Int] = Array(1, 2, 3)
scala> val arr = Array("a","b","c")
arr: Array[String] = Array(a, b, c)

Range Use of range() method to generate an array containing a sequence of increasing integers in a given range.

scala> Array.range(0,10)
res64: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> Array.range(0,10,2)
res65: Array[Int] = Array(0, 2, 4, 6, 8)

Or just call Range with a capital R

scala> Range(0,5)
res1: scala.collection.immutable.Range = Range(0, 1, 2, 3, 4)
scala> Range(0,10,2)
res2: scala.collection.immutable.Range = Range(0, 2, 4, 6, 8) 

7. Lists

Lists are an immutable sequence of elements

Basic List of Numbers
scala> val evens = List(2,4,6,8,10,12)
evens: List[Int] = List(2, 4, 6, 8, 10, 12)

Indexing Items (Starts at zero)

scala> evens(0)
res0: Int = 2
scala> evens(1)
res1: Int = 4

Head and Tail

scala> evens.head
res3: Int = 2
scala> evens.tail
res4: List[Int] = List(4, 6, 8, 10, 12)
scala> evens
res5: List[Int] = List(2, 4, 6, 8, 10, 12)

Element Types
Any

scala> val my_list = List("a",2,true)
my_list: List[Any] = List(a, 2, true)

Nested

scala> val my_list = List(List(1,2,3),List(4,5,6))
my_list: List[List[Int]] = List(List(1, 2, 3), List(4, 5, 6))

Double and Int

scala> val my_list = List(1,2,3.0)
my_list: List[Double] = List(1.0, 2.0, 3.0)

List of Tuple Pairs

scala> val my_list = List(("a",1),("b",2),("c",3))
my_list: List[(String, Int)] = List((a,1), (b,2), (c,3))

List Operations

scala> val my_list = List(3,6,1,7,10)
my_list: List[Int] = List(3, 6, 1, 7, 10)
scala> my_list.sorted
res7: List[Int] = List(1, 3, 6, 7, 10)
scala> my_list.size
res8: Int = 5
scala> my_list.max
res9: Int = 10
scala> my_list.min
res39: Int = 1
scala> my_list.sum
res40: Int = 27
scala> my_list.product
res41: Int = 1260

Using drop for slicing

scala> val x = List(1,2,3,4)
x: List[Int] = List(1, 2, 3, 4)
scala> x.drop(2)
res3: List[Int] = List(3, 4)
scala> x.takeRight(3)
res4: List[Int] = List(2, 3, 4)
// Use Tab to explore the other methods.

8. Maps

(Key,Value) Pair Storage aka Hash Table or Dictionary

scala> val mymap = Map(("a",1),("b",2),("c",3))
mymap: scala.collection.immutable.Map[String,Int] = Map(a -> 1, b -> 2, c
 -> 3)

Lookups

scala> mymap("a")
res12: Int = 1

None if not present

scala> mymap get "b"
res15: Option[Int] = Some(2)

Temp additions on immutable

scala> mymap + ("z"->99)
res19: scala.collection.immutable.Map[String,Int] = Map(a -> 1, b -> 2, c
 -> 3, z -> 99)

Mutable maps

scala> val mymutmap = collection.mutable.Map(("x",1),("y",2),("z",3))
mymutmap: scala.collection.mutable.Map[String,Int] = Map(z -> 3, y -> 2,
x -> 1)

Permanent Additions

scala> mymutmap += ("new"->999)
res29: mymutmap.type = Map(z -> 3, y -> 2, x -> 1, new -> 999)

scala> mymutmap

res30: scala.collection.mutable.Map[String,Int] = Map(z -> 3, y -> 2, x -
> 1, new -> 999)

A few useful methods

scala> mymap.keys
res34: Iterable[String] = Set(a, b, c)
scala> mymap.values
res35: Iterable[Int] = MapLike(1, 2, 3)

9. Sets

Set is a collection that contains no duplicate elements.
There are two kinds of Sets, the immutable and the mutable.
Examples

scala> val s = Set()
s: scala.collection.immutable.Set[Nothing] = Set()
scala> val s = Set(1,2,3)
s: scala.collection.immutable.Set[Int] = Set(1, 2, 3)
scala> val s = Set(1,1,2,2,2,3,3,3)
s: scala.collection.immutable.Set[Int] = Set(1, 2, 3)

Mutable Sets

scala> val s = collection.mutable.Set(1,2,3)
s: scala.collection.mutable.Set[Int] = Set(1, 2, 3)
scala> s += 4
res50: s.type = Set(1, 2, 3, 4)
scala> s
res51: scala.collection.mutable.Set[Int] = Set(1, 2, 3, 4)
scala> s.add(5)
res52: Boolean = true
scala> s
res53: scala.collection.mutable.Set[Int] = Set(1, 5, 2, 3, 4)

Set Operations

scala> s.max
res54: Int = 5
scala> s.min
res55: Int = 1

Cast to Set

scala> val mylist = List(1,2,3,1,2,3)
mylist: List[Int] = List(1, 2, 3, 1, 2, 3)
scala> mylist.toSet
res59: scala.collection.immutable.Set[Int] = Set(1, 2, 3)

10. Loops

For loops
// Looping an integer

for(item <- List(1,2,3)){
  println(item)
}
// Looping an array

for(item <- Array.range(0,5)){
  println(item)
}

```r
// Looping a set

for(item <- Set(2,4,9,1,3)){
  println(item)
}
// Looping with If Else

// for loop and else if script

for(num <- Range(0,10)){
  if(num%2 == 0){
    println(s"$num is even")
  }else{
    println(s"$num is odd")
  }
}
// Looping from a list and retrieving one or more results

val names = List("Beerus", "Champa", "Karin", "Kaio", "Dende", "Dodoria")

for(name <- names){
  if(name.startsWith("D")){
    println(s"$name starts with a D")
  }
}
While loops
// Looping with a variable 

var x = 0

while(x < 5){ // boolean condition
  println(s"x is currenly $x")
  println("x is still less than 5, adding 1 to x")
  x = x+1 // adding 1 to x and iterating until we reach 5
  println(s"x is currenly $x")
}

11. if else

a.) Create a ifelse.scala file in Atom
b.) launch your spark session :load
c.) :load your file with the following command: // load: url-path/ifelse.scala

  1. COMPARISON operators - one condition
// if else - script 1
val x = "goku"

if(x.endsWith("b")){
  println("value of x ends with b")
}else{
  println("value of x does not end with b")
}
// if else - script 2
val person = "Champa"

if(person == "Beerus"){
  println("Your Majesty Beerus, this is your bento!")
}else{
  println("You can't have a bento, sorry!")
}
  1. LOGICAL operators - multiple conditions
// if else - script 3
// AND operator - if both conditions are true, program will return "true", if not it will return "false"

println((1 == 2) && (2 == 2)) // && means "AND"
println((4 == 4) && ("b" == "b"))
// OR operator - if one of both conditions is true, program will return "true", if not it will return "false"
println((4 == 3) || ("e" == "e")) // "||" means "OR"
// NOT operator - if condition is not "true", program will return "false"
print(!(1 == 1)) // "!" biaises the results and program returns "false" in that case

12. Other commands you can try

Reverse object

object Reverse {
    def apply (s: String): String =
    s.reverse
}
Reverse("Kiara")
🔴 See output

isaac-arnault-scala1.png

Arrays

Array (1, 2, 3, 4, 5, 6, 7, 8, 9)
res0(2)
🔴 See output

isaac-arnault-scala2.png

hashCode

case class Time(hours: Int = 0, minutes: Int = 0)
Time (9, 45).hashCode()
🔴 See output

isaac-arnault-scala-3.png

flatMap

List ("Goten", "Trunks", "Gotenks")
res0.map(lang => lang + "#")
res0.flatMap (lang => lang + "#")
🔴 See output

isaac-arnault-scala4.png

filter

List ("Goku", "Vegeta", "Broly", "Freezer")
res0.filter(lang => lang.contains("e"))
🔴 See output

isaac-arnault-scala-5.png

reduce

1 to 10
res0.reduce(_+_)
🔴 See output

isaac-arnault-scala-6.png

fold, foldLeft, foldRight

1 to 6
res0.product
🔴 See output

isaac-arnault-scala-7.png

exists

1 to 18
res0.exists(num => num == 12)
🔴 See output

isaac-arnault-scala-7.png

copy

val fighter1 = K9("Trunks", "Goten Trunks Fusion")
fighter1
val fighter2 = fighter1.copy()
fighter2
fighter1 == fighter2
🔴 See output

isaac-arnault-scala-9.png

Important:

This is the second part of the gist which is not done in Atom. Please use Databricks and Zeppelin instead.

Intermediary level in Spark-Scala using Databricks and Zeppelin (notebooks)

We will perform two installations before we start programming in Spark-Scala.

I recommend using Zeppelin if you want to use a secure enviroment on Localhost for data security compliance.

I recommend using Databricks if you want to use a programming environment in the Cloud accessible from everywhere.

1. Installations

DataBricks (Cloud deployment)

To use the free version of Databricks, go to https://community.cloud.databricks.com, create a free account and log in.

🔴 See dashboard

isaac-arnault-spark-scala-15.png

Go to Clusters > Create cluster

🔴 See cluster configuration

isaac-arnault-spark-scala-16.png

Go to Workspace > Users > Create notebook

🔴 See notebook configuration

isaac-arnault-spark-scala-17.png

Now you are ready to work with Databricks. Now let's install Apache Zeppelin.

Apache Zeppelin installation (Localhost deployment)

If you haven't performed Apache Zeppelin yet, follow this gist I wrote few months ago.

Once the soft is extracted from the tarball and installed on your desktop, perform the following to launch it:

  • Open your terminal using Ctrl + Alt +t
  • Go to the .bin folder of Apache Zeppelin
  • execute $ ./zeppelin.sh in your terminal and wait for a minute until the application starts
🔴 See on terminal

1.png

  • Use localhost:8080/ in your web browser to access Apache Zeppelin from your computer

  • Go to Notebook > Create New Note

🔴 See notebook configuration

isaac-arnault-spark-scala-20.png

Now you are ready to work with Apache Zeppelin.

🔴 See notebook

isaac-arnault-spark-scala-20.png

We are now ready to start programming.

2. Programming in Spark-Scala

We will launch our Spark-Scala commands from both environments Apache Zeppelin and Databricks so that you can see how they render on both.

Upload CitiGroup2006_2008.csv to your Databricks and Zeppelin environments before you proceed.

1. Loading a csv file as dataframe

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.option("header","true").option("inferSchema","true").csv("/home/zaki/Desktop/gist/CitiGroup2006_2008.csv")

df.show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-1.png

// OPTION 1
val df = sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/FileStore/tables/CitiGroup2006_2008.csv")
  
display(df)
🔴 See in Databricks - OPTION 1

11.png

// OPTION 2
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.option("header","true").option("inferSchema","true").csv("/FileStore/tables/CitiGroup2006_2008.csv")

df.show()
🔴 See in Databricks - OPTION 2

16.png

2. Get column names

df.columns
🔵 See in Zeppelin

isaac-arnault-databricks-2.png

🔴 See in Databricks

isaac-arnault-zeppelin-2.png

3. Get basic statistics

df.describe().show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-3.png

🔴 See in Databricks

isaac-arnault-databricks-3.png

4. Select one column and show some data

df.select("Volume").show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-4.png

🔴 See in Databricks

isaac-arnault-databricks-4.png

5. Select two or more columns and show some data

df.select($"Date", $"Low", $"Volume").show(10)
🔵 See in Zeppelin

isaac-arnault-zeppelin-5.png

🔴 See in Databricks

isaac-arnault-databricks-5.png

6. Create a new column from a substraction of two columns

df.withColumn("HighMinusOpen", df("High")-df("Open")).show(5)
🔵 See in Zeppelin

isaac-arnault-zeppelin-6.png

🔴 See in Databricks

isaac-arnault-databricks-6.png

7. Store operation performed in step 6 as a new dataframe

val df2 = df.withColumn("HighMinusOpen", df("High")-df("Open"))
df2.show(5)
🔵 See in Zeppelin

isaac-arnault-zeppelin-7.png

df.withColumn("HighMinusOpen", df("High")-df("Open")).show(5)
🔴 See in Databricks

isaac-arnault-databricks-7.png

8. Print our new dataframe Schema

df2.printSchema()
🔵 See in Zeppelin

isaac-arnault-zeppelin-8.png

🔴 See in Databricks

isaac-arnault-databricks-8.png

9. Rename one column of our dataframe and save it as a new dataframe

val df3 = df2.withColumnRenamed("HighMinusOpen","HmO")
df3.show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-9.png

🔴 See in Databricks

isaac-arnault-databricks-9.png

10. Dataframe operations

Filtering on a column - Using Scala "$" instead of Spark SQL

import spark.implicits._
df.filter($"Open">480).show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-10.png

🔴 See in Databricks

isaac-arnault-databricks-10.png

Filtering on a column - Using Spark SQL

import spark.implicits._
df.filter("Open>480").show()  
🔵 See in Zeppelin

14.png

🔴 See in Databricks

isaac-arnault-databricks-10-2.png

Filtering on multiple columns - Using Scala "$" instead of Spark SQL

import spark.implicits._
df.filter($"Low"<481 && $"High">484).show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-10-3.png

🔴 See in Databricks

isaac-arnault-databricks-10-3.png

Filtering on multiple columns - Using Spark SQL

df.filter("Low<481 AND High>484").show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-10-4.png

🔴 See in Databricks

isaac-arnault-databricks-10-4.png

Important df.filter in Spark is a transformation while a .show() is an action.

Filtering on multiple columns and collect the results as an array - Using Scala

import spark.implicits._
val CH_compare = df.filter("Low<481 AND High>484").collect()
🔵 See in Zeppelin

isaac-arnault-zeppelin-10-5.png

🔴 See in Databricks

isaac-arnault-databricks-10-5.png

Filtering on multiple columns and check how many results are returned - Using Scala

import spark.implicits._
val CH_compare = df.filter("Low<481 AND High>484").count()
🔵 See in Zeppelin

isaac-arnault-zeppelin-10-6.png

🔴 See in Databricks

isaac-arnault-databricks-10-6.png

10.7 Filtering on a column for equality

import spark.implicits._
df.filter($"Low" === 484).show()
🔵 See in Zeppelin

isaac-arnault-databricks-10-7.png

🔴 See in Databricks

isaac-arnault-databricks-10-7.png

10.8 Dataframe operations - Correlation (Pearson)

import spark.implicits._
df.select(corr("Open","Close")).show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-10-8.png

df.stat.corr("Open", "Close")
🔴 See in Databricks

isaac-arnault-databricks-10-8.png

11 Dataframes operations - Group By and Aggregate (New dataset)

Upload Sales.csv to your Databricks and Zeppelin environments before you proceed as follows.

import org.apache.spark.sql.SparkSession
Upload <b>Sales.csv</b> to your `Databricks` and `Zeppelin` environments before you proceed.

val spark = SparkSession.builder().getOrCreate()
val df_Sales = spark.read.option("header","true").option("inferSchema","true").csv("/home/zaki/Desktop/Sales.csv")

df_Sales.show()
🔵 See in Zeppelin

18.png

val df_Sales = sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/FileStore/tables/Sales.csv")

display(df_Sales)
🔴 See in Databricks

17.png

Dataframes operations - Group By and Aggregate (Count)

df_Sales.groupBy("Company").count().show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-11-1.png

🔴 See in Databricks

isaac-arnault-databricks-11-1.png

Dataframes operations - Group By and Aggregate (Mean)

df_Sales.groupBy("Company").mean().show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-11-2.png

🔴 See in Databricks

isaac-arnault-databricks-11-2.png

Dataframes operations - Group By (common statistics)

df_Sales.groupBy("Company").min().show()
df_Sales.groupBy("Company").max().show()
df_Sales.groupBy("Company").sum().show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-11-3.png

🔴 See in Databricks

isaac-arnault-databricks-11-3.png

Dataframes operations - Aggregate (sum)

df_Sales.select(sum("Sales")).show()
🔵 See in Zeppelin

isaac-arnault-databricks-11-4.png

import sqlContext.implicits._
import org.apache.spark.sql.functions._

df_Sales.select(sum("Sales")).show()
🔴 See in Databricks

isaac-arnault-databricks-11-4.png

Dataframes operations - Aggregate (other useful operations)

df_Sales.select(countDistinct("Sales")).show()
df_Sales.select(sumDistinct("Sales")).show()
df_Sales.select(variance("Sales")).show()
df_Sales.select(stddev("Sales")).show() // standard deviation
df_Sales.select(collect_set("Sales")).show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-11-5.png

import sqlContext.implicits._
import org.apache.spark.sql.functions._

df_Sales.select(countDistinct("Sales")).show()
df_Sales.select(sumDistinct("Sales")).show()
df_Sales.select(variance("Sales")).show()
df_Sales.select(stddev("Sales")).show() // standard deviation
df_Sales.select(collect_set("Sales")).show()
🔴 See in Databricks

isaac-arnault-databricks-11-5.png

Dataframes operations - Order By (using Spark SQL)

df_Sales.show()
df_Sales.orderBy("Sales").show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-11-6.png

import sqlContext.implicits._
import org.apache.spark.sql.functions._

df_Sales.show()
df_Sales.orderBy("Sales").show()
🔴 See in Databricks

isaac-arnault-databricks-11-6.png

Dataframes operations - Order By (using scala)

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().getOrCreate()
val df_Null = spark.read.option("header","true").option("inferSchema","true").csv("/home/zaki/Desktop/ContainsNull.csv")

df_Null.show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-11-7.png

val df_Null = sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/FileStore/tables/ContainsNull.csv")

display(df_Null)
🔴 See in Databricks

isaac-arnault-databricks-11-7.png

Dataframes operations - Missing data (New dataset)

Upload ContainsNull.csv to your Databricks and Zeppelin environments before you proceed as follows.

df_Sales.show()
df_Sales.orderBy($"Sales".desc).show(3)
🔵 See in Zeppelin

isaac-arnault-zeppelin-12.png

🔴 See in Databricks

15.png

Dataframes operations - Missing data (Drop)

df_Null.show()
df_Null.na.drop().show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-12-1.png

🔴 See in Databricks

isaac-arnault-databricks-12-1.png

Dataframes operations - Missing data (Fill)

df_Null.show()
df_Null.na.fill(42).show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-12-2.png

🔴 See in Databricks

isaac-arnault-databricks-12-2.png

Dataframes operations - Missing data (Fill a specific column, "Name", with a string)

df_Null.show()
val clean_dfX = df_Null.na.fill("Name X", Array("Name")) 
clean_dfX.show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-12-3.png

🔴 See in Databricks

isaac-arnault-databricks-12-3.png

Dataframes operations - Missing data (Fill a specific column, "Sales", with an integer)

df_Null.show()
val clean_dfY = clean_dfX.na.fill(50, Array("Sales"))
clean_dfY.show()
🔵 See in Zeppelin

isaac-arnault-databricks-12-4.png

🔴 See in Databricks

isaac-arnault-databricks-12-4.png

Dataframes operations - Filling missing data in multiple columns simulateously

df_Null.show()
val clean_dfZ = df_Null.na.fill(60, Array("Sales"))
clean_dfZ.na.fill("Name Z", Array("Name")).show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-12-5.png

🔴 See in Databricks

isaac-arnault-databricks-12-5.png

13 Dates and Timestamps

Let's re-take
CitiGroup2006_2008.csv dataset once again for this part of the gist.

val Date_Time = sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/home/zaki/Downloads/CitiGroup2006_2008.csv")

Date_Time.show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-13.png

val Date_Time = sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/FileStore/tables/CitiGroup2006_2008.csv")

display(Date_Time)
🔴 See in Databricks

isaac-arnault-zeppelin-13.png

Dates and Timestamps - returns only the month from a timestamp

Date_Time.select(month(Date_Time("Date"))).show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-13-1.png

import sqlContext.implicits._
import org.apache.spark.sql.functions._
Date_Time.select(month(Date_Time("Date"))).show()
🔴 See in Databricks

isaac-arnault-databricks-13-1.png

Dates and Timestamps - returns only the year from a timestamp

Date_Time.select(year(Date_Time("Date"))).show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-13-2.png

import sqlContext.implicits._
import org.apache.spark.sql.functions._
Date_Time.select(year(Date_Time("Date"))).show()
🔴 See in Databricks

isaac-arnault-databricks-13-2.png

Dates and Timestamps - returns a variable's average for each year

val new_df = Date_Time.withColumn("Year", year(Date_Time("Date")))
val df_avg = new_df.groupBy("Year").mean()
df_avg.select($"Year",$"avg(High)").show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-13-3.png

val new_df = Date_Time.withColumn("Year", year(Date_Time("Date")))
val df_avg = new_df.groupBy("Year").mean()
df_avg.select($"Year",$"avg(High)").show()
🔴 See in Databricks

isaac-arnault-databricks-13-3.png

Dates and Timestamps - returns a variable Min and Max for each year

val new_df = Date_Time.withColumn("Year", year(Date_Time("Date")))
val df_min = new_df.groupBy("Year").min()
val df_max = new_df.groupBy("Year").max()

df_min.select($"Year",$"min(Volume)").show()
df_max.select($"Year",$"max(Volume)").show()
🔵 See in Zeppelin

isaac-arnault-zeppelin-13-4.png

🔴 See in Databricks

isaac-arnault-databricks-13-4.png

##################

START HERE

##################

To get started with Spark - Scala, we can follow sections below.

1. Entry level in Spark-Scala using Atom (text editor)

Check SPARK_SCALA_entry.md section of this gist.


2. Intermediary level in Spark-Scala using Databricks and Zeppelin

Check 2.1 SPARK_SCALA_intermediary.md and 2.2 SPARK_SCALA_project.md sections of this gist.


#### Datasets Four datasets to be used in SPARK_SCALA_intermediary.md and SPARK_SCALA_project.md.

CitiGroup2006_2008.csv: https://bit.ly/2m1df8k
Sales.csv: https://bit.ly/2kVn9rV
ContainsNull.csv: https://bit.ly/2ktkXYp
Netflix2011_2016.csv: https://bit.ly/2kwqmy0


Free slides related to MachineLearning, Spark and Scala.
https://bit.ly/2zkcrP7

A Spark - Scala project using Zeppelin.

Use Netflix2011_2016.csv file. See datasets.md section of this gist.

We can use the same commands as below in our Databricks notebook to see what they return.

Some of them will be performed in Scala, others in Spark SQL.


Create a new notebook in Zeppelin.

  1. Load the dataset, turn it to a dataframe and check the variables
val df = sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/home/zaki/Desktop/Netflix2011_2016.csv")

df.columns
🔵 See output

spark-scala-project-1.png

  1. Print schema of the dataframe
df.printSchema
🔵 See output

spark-scala-project-2.png

  1. Read the first 5 rows of the dataframe
df.show(5)
🔵 See output

spark-scala-project3.png

  1. Get statistics about the dataframe
val df = sqlContext.read.format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/home/zaki/Desktop/Netflix2011_2016.csv")

df.describe().show()
🔵 See output

spark-scala-project-4.png

  1. Get the ratio from two variables ("High", "Volume")
val df2 = df.withColumn("H_V Ratio", df("High")/df("Volume"))
df2.columns
df2.head(1)
🔵 See output

spark-scala-project-5.png

  1. Check what day has the peak "High" and "Price"
import spark.implicits._
df.orderBy($"High".desc).show(1)
🔵 See output

spark-scala-project-6.png

  1. Check what day has the peak "High" and "Price"
df.select(mean("Volume")).show()
🔵 See output

spark-scala-project-7.png

  1. Get the Max and Min of a specific variable - Max
df.select(max("Close")).show()
df.select(min("Close")).show()
🔵 See output

spark-scala-project-8.png

  1. Get the number of days where "Close" was lower than $550
import spark.implicits._
df.filter($"Close"<550).count()
df.filter("Close < 550").count()
🔵 See output

spark-scala-project-9.png

  1. Get the number of days where "Close" was lower than $550
import spark.implicits._
(df.filter($"High">300).count()*1.0/ df.count() )*100
🔵 See output

spark-scala-project-10.png

  1. Get the Pearson correlation between "Low" and "Close"
df.select(corr("Low","Close")).show()
🔵 See output

spark-scala-project-11.png

  1. Get the max High per Year
import spark.implicits._
val year_df = df.withColumn("Year", year(df("Date")))
val year_max = year_df.select($"Year", $"High").groupBy("Year").max()
year_max.select($"Year",$"max(High)").show()
🔵 See output

spark-scala-project-12.png

  1. Getting the average Close price for each Month
import spark.implicits._
val month_df = df.withColumn("Month", month(df("Date")))
val month_avg = month_df.select($"Month", $"Close").groupBy("Month").mean()
month_avg.select($"Month",$"avg(Close)").orderBy("Month").show()
🔵 See output

spark-scala-project-13.png

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.