Skip to content

Instantly share code, notes, and snippets.

Created April 23, 2015 00:44
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save olgabot/ab058876b3bda6198f25 to your computer and use it in GitHub Desktop.
Notes from the Spark tutorial at CodeNeuro NYC

Notes from CodeNeuro Spark tutorial



  • url: REDACTED
  • username: REDACTED
  • password: REDACTED
  • cluster: REDACTED
  1. Log in to the databricks cloud
  2. Click through the filesystem to get to the ipython notebook
  3. Click the IPYnb corner and clone the ipynb
  4. STart running

preflight check

  • RDD = Resilient Distributed Dataset
  • sc = spark context. this is built into these notebooks and the spark shell
    • built into spark shell
    • MLlib, other spark things will subclass sc
# sc = "spark context"

data = xrange(1, 10001)
distributed_data = sc.parallelize(data)

# Apply the logic in the lambda to the values of the data array
x = distData.filter(lambda x: x < 10)

# Evaluate the actions up to this point

SErialize things: Parquet


Shows how Spark works under the hood and how the fault tolerance works

  • sc.textFile reads in a file AND separates by newlines
lines = sc.textFile("/mnt/paco/intro/error_log.txt") \
  .map(lambda x: x.split("\t"))
errors = lines.filter(lambda x: x[0] == "ERROR")
messages = x: x[1])
messages.filter(lambda x: x.find("mysql") > -1).count()

# At this point, the data is loaded into memory on the workers and you don't
# need to attack disk again, so this operation is very fast
messages.filter(lambda x: x.find("php") > -1).count()


EMR = Elastic Map Reduce

WordCount example

  • monoid
    • binary associative operation
    • closure
    • identity
from operator import add
lines = sc.textFile("/mnt/paco/intro/")

# `flatMap` is like map, except it flattens the array (as if it ran itertools.chain(*lines))
# `reduceByKey`: Sums the amount we have for each keyword. Assumes we have (key, value)
wc = lines.flatMap(lambda x: x.split(' ')) \
  .map(lambda x: (x, 1)).reduceByKey(add)


clk = sc.textFile("/mnt/paco/intro/join/clk.tsv") \
 .map(lambda x: x.split("\t"))

THis is what the outpu

> clk.collect()
[[u'2014-03-04', u'15dfb8e6cc4111e3a5bb600308919594', u'11'],
[u'2014-03-06', u'81da510acc4111e387f3600308919594', u'61']]
# Create key-value pairs of each so we can join with others
clk_pairs = x: (x[1], x,))
  [u'2014-03-04', u'15dfb8e6cc4111e3a5bb600308919594', u'11']),
  [u'2014-03-06', u'81da510acc4111e387f3600308919594', u'61'])]```
reg = sc.textFile("/mnt/paco/intro/join/reg.tsv") \
 .map(lambda x: x.split("\t"))

--- break!! ---

Computational thinking

From CMU

  • Decomposition
  • Pattern Recognition
  • Abstraction
  • Algorithm Design


  • groupyByKey: requries that that list of values that are grouped together will fit in memory, which may not be true. Usually will want to use reduceByKey instead


  • For reduce to be performed properly, need both Associative and Commutative.

  • reduceByKey only needs to be Associative

    • Associative property: $a + b = b + a$
    • Commutative property: $x * (y * z) = (x * y) * z$
  • .take(n) - return an array with the first n elemenst


Serialize to disk, in memory

Tachyon - distributed filesystem, all in memory, with no garbage collection


Send a variable or array to ALL workers efficiently

MLLib in spark

  • Spark is very conservative, they don't accept a lot of the PRs that they receive because they cannot accept things that break parallelism

For classificaiton, need a LabeledPoint type (can subclass it)


import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
case class Peep(name: String, age: Int)
val nodeArray = Array(
  (1L, Peep("Kim", 23)),
  (2L, Peep("Pat", 31)),
  (3L, Peep("Chris", 52)),
  (4L, Peep("Kelly", 39)),
  (5L, Peep("Leslie", 45))
val edgeArray = Array(
  Edge(2L, 1L, 7),
  Edge(2L, 4L, 2),
  Edge(3L, 2L, 4),
  Edge(3L, 5L, 3),
  Edge(4L, 1L, 1),
  Edge(5L, 3L, 9)

val nodeRDD: RDD[(Long, Peep)] = sc.parallelize(nodeArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val g: Graph[Peep, Int] = Graph(nodeRDD, edgeRDD)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment