Slides: http://training.databricks.com/workshop/dbc_intro.pdf
Steps:
- url: REDACTED
- username: REDACTED
- password: REDACTED
- cluster: REDACTED
- Log in to the databricks cloud
- Click through the filesystem to get to the ipython notebook
- Click the IPYnb corner and clone the ipynb
- STart running
- RDD = Resilient Distributed Dataset
sc
= spark context. this is built into these notebooks and the spark shell- built into spark shell
- MLlib, other spark things will subclass
sc
# sc = "spark context"
data = xrange(1, 10001)
distributed_data = sc.parallelize(data)
# Apply the logic in the lambda to the values of the data array
x = distData.filter(lambda x: x < 10)
# Evaluate the actions up to this point
x.collect()
SErialize things: Parquet
Shows how Spark works under the hood and how the fault tolerance works
sc.textFile
reads in a file AND separates by newlines
lines = sc.textFile("/mnt/paco/intro/error_log.txt") \
.map(lambda x: x.split("\t"))
errors = lines.filter(lambda x: x[0] == "ERROR")
messages = errors.map(lambda x: x[1])
messages.filter(lambda x: x.find("mysql") > -1).count()
# At this point, the data is loaded into memory on the workers and you don't
# need to attack disk again, so this operation is very fast
messages.filter(lambda x: x.find("php") > -1).count()
EMR = Elastic Map Reduce
WordCount example
- monoid
- binary associative operation
- closure
- identity
from operator import add
lines = sc.textFile("/mnt/paco/intro/README.md")
lines.take(2)
# `flatMap` is like map, except it flattens the array (as if it ran itertools.chain(*lines))
# `reduceByKey`: Sums the amount we have for each keyword. Assumes we have (key, value)
wc = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)).reduceByKey(add)
clk = sc.textFile("/mnt/paco/intro/join/clk.tsv") \
.map(lambda x: x.split("\t"))
THis is what the outpu
> clk.collect()
Out[9]:
[[u'2014-03-04', u'15dfb8e6cc4111e3a5bb600308919594', u'11'],
[u'2014-03-06', u'81da510acc4111e387f3600308919594', u'61']]
# Create key-value pairs of each so we can join with others
clk_pairs = clk.map(lambda x: (x[1], x,))
clk_pairs.collect()
Out[6]:
[(u'15dfb8e6cc4111e3a5bb600308919594',
[u'2014-03-04', u'15dfb8e6cc4111e3a5bb600308919594', u'11']),
(u'81da510acc4111e387f3600308919594',
[u'2014-03-06', u'81da510acc4111e387f3600308919594', u'61'])]```
reg = sc.textFile("/mnt/paco/intro/join/reg.tsv") \
.map(lambda x: x.split("\t"))
reg.collect()
Out[12]:
[[u'2014-03-02',
u'15dfb8e6cc4111e3a5bb600308919594',
u'1',
u'33.6599436237',
u'-117.958125229'],
[u'2014-03-04',
u'81da510acc4111e387f3600308919594',
u'2',
u'33.8570099635',
u'-117.855744398']]
reg_pairs.join(clk_pairs).collect()
--- break!! ---
From CMU
- Decomposition
- Pattern Recognition
- Abstraction
- Algorithm Design
groupyByKey
: requries that that list of values that are grouped together will fit in memory, which may not be true. Usually will want to usereduceByKey
instead
-
For
reduce
to be performed properly, need both Associative and Commutative. -
reduceByKey
only needs to be Associative- Associative property:
$a + b = b + a$ - Commutative property:
$x * (y * z) = (x * y) * z$
- Associative property:
-
.take(n)
- return an array with the first n elemenst
Serialize to disk, in memory
Tachyon - distributed filesystem, all in memory, with no garbage collection
Send a variable or array to ALL workers efficiently
- Spark is very conservative, they don't accept a lot of the PRs that they receive because they cannot accept things that break parallelism
For classificaiton, need a LabeledPoint
type (can subclass it)
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
case class Peep(name: String, age: Int)
val nodeArray = Array(
(1L, Peep("Kim", 23)),
(2L, Peep("Pat", 31)),
(3L, Peep("Chris", 52)),
(4L, Peep("Kelly", 39)),
(5L, Peep("Leslie", 45))
)
val edgeArray = Array(
Edge(2L, 1L, 7),
Edge(2L, 4L, 2),
Edge(3L, 2L, 4),
Edge(3L, 5L, 3),
Edge(4L, 1L, 1),
Edge(5L, 3L, 9)
)
val nodeRDD: RDD[(Long, Peep)] = sc.parallelize(nodeArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val g: Graph[Peep, Int] = Graph(nodeRDD, edgeRDD)