Create a gist now

Instantly share code, notes, and snippets.

Notes from the Spark tutorial at CodeNeuro NYC

Notes from CodeNeuro Spark tutorial

Slides: http://training.databricks.com/workshop/dbc_intro.pdf

Steps:

  • url: REDACTED
  • username: REDACTED
  • password: REDACTED
  • cluster: REDACTED
  1. Log in to the databricks cloud
  2. Click through the filesystem to get to the ipython notebook
  3. Click the IPYnb corner and clone the ipynb
  4. STart running

preflight check

  • RDD = Resilient Distributed Dataset
  • sc = spark context. this is built into these notebooks and the spark shell
    • built into spark shell
    • MLlib, other spark things will subclass sc
# sc = "spark context"

data = xrange(1, 10001)
distributed_data = sc.parallelize(data)

# Apply the logic in the lambda to the values of the data array
x = distData.filter(lambda x: x < 10)

# Evaluate the actions up to this point
x.collect()

SErialize things: Parquet

log_example.ipynb

Shows how Spark works under the hood and how the fault tolerance works

  • sc.textFile reads in a file AND separates by newlines
lines = sc.textFile("/mnt/paco/intro/error_log.txt") \
  .map(lambda x: x.split("\t"))
errors = lines.filter(lambda x: x[0] == "ERROR")
messages = errors.map(lambda x: x[1])
messages.filter(lambda x: x.find("mysql") > -1).count()

# At this point, the data is loaded into memory on the workers and you don't
# need to attack disk again, so this operation is very fast
messages.filter(lambda x: x.find("php") > -1).count()

wc_example

EMR = Elastic Map Reduce

WordCount example

  • monoid
    • binary associative operation
    • closure
    • identity
from operator import add
lines = sc.textFile("/mnt/paco/intro/README.md")
lines.take(2)

# `flatMap` is like map, except it flattens the array (as if it ran itertools.chain(*lines))
# `reduceByKey`: Sums the amount we have for each keyword. Assumes we have (key, value)
wc = lines.flatMap(lambda x: x.split(' ')) \
  .map(lambda x: (x, 1)).reduceByKey(add)

join_example

clk = sc.textFile("/mnt/paco/intro/join/clk.tsv") \
 .map(lambda x: x.split("\t"))

THis is what the outpu


> clk.collect()
​
Out[9]: 
[[u'2014-03-04', u'15dfb8e6cc4111e3a5bb600308919594', u'11'],
[u'2014-03-06', u'81da510acc4111e387f3600308919594', u'61']]
# Create key-value pairs of each so we can join with others
clk_pairs = clk.map(lambda x: (x[1], x,))
clk_pairs.collect()
Out[6]: 
[(u'15dfb8e6cc4111e3a5bb600308919594',
  [u'2014-03-04', u'15dfb8e6cc4111e3a5bb600308919594', u'11']),
 (u'81da510acc4111e387f3600308919594',
  [u'2014-03-06', u'81da510acc4111e387f3600308919594', u'61'])]```
reg = sc.textFile("/mnt/paco/intro/join/reg.tsv") \
 .map(lambda x: x.split("\t"))
reg.collect()
Out[12]: 
[[u'2014-03-02',
  u'15dfb8e6cc4111e3a5bb600308919594',
  u'1',
  u'33.6599436237',
  u'-117.958125229'],
 [u'2014-03-04',
  u'81da510acc4111e387f3600308919594',
  u'2',
  u'33.8570099635',
  u'-117.855744398']]
reg_pairs.join(clk_pairs).collect()

--- break!! ---

Computational thinking

From CMU

  • Decomposition
  • Pattern Recognition
  • Abstraction
  • Algorithm Design

RDD/Transformations

  • groupyByKey: requries that that list of values that are grouped together will fit in memory, which may not be true. Usually will want to use reduceByKey instead

Actions

  • For reduce to be performed properly, need both Associative and Commutative.

  • reduceByKey only needs to be Associative

    • Associative property: $a + b = b + a$
    • Commutative property: $x * (y * z) = (x * y) * z$
  • .take(n) - return an array with the first n elemenst

Persistence

Serialize to disk, in memory

Tachyon - distributed filesystem, all in memory, with no garbage collection

Broadcasting

Send a variable or array to ALL workers efficiently

MLLib in spark

  • Spark is very conservative, they don't accept a lot of the PRs that they receive because they cannot accept things that break parallelism

For classificaiton, need a LabeledPoint type (can subclass it)

GraphX

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
 
case class Peep(name: String, age: Int)
 
val nodeArray = Array(
  (1L, Peep("Kim", 23)),
  (2L, Peep("Pat", 31)),
  (3L, Peep("Chris", 52)),
  (4L, Peep("Kelly", 39)),
  (5L, Peep("Leslie", 45))
  )
val edgeArray = Array(
  Edge(2L, 1L, 7),
  Edge(2L, 4L, 2),
  Edge(3L, 2L, 4),
  Edge(3L, 5L, 3),
  Edge(4L, 1L, 1),
  Edge(5L, 3L, 9)
  )

val nodeRDD: RDD[(Long, Peep)] = sc.parallelize(nodeArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val g: Graph[Peep, Int] = Graph(nodeRDD, edgeRDD)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment