Skip to content

Instantly share code, notes, and snippets.

@dapangmao dapangmao/chap3.md
Last active Jan 21, 2019

Embed
What would you like to do?

Chapter 3. Programming with RDDs

This chapter introduces Spark’s core abstraction for working with data, the Resilient Distributed Dataset (RDD). An RDD is simply a distributed collection of elements. In Spark all work is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result. Under the hood, Spark automatically distributes the data contained in RDDs across your cluster and parallelizes the operations you perform on them.

Both Data Scientists and Engineers should read this chapter, as RDDs are the core concept in Spark. We highly recommend that you try some of these examples in an interactive shell (see Introduction to Spark’s Python and Scala Shells). In addition, all code in this chapter is available in the book’s GitHub repository.

RDD Basics

An RDD in Spark is simply a distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java or Scala objects, including user-defined classes.

Users create RDDs in two ways: by loading an external dataset, or by distributing a collection of objects in their driver program. We have already seen loading a text file as an RDD of strings using SparkContext.textFile():

Example 3-1. Creating an RDD of strings with textFile() in Python

>>> lines = sc.textFile("README.md")

Once created, RDDs offer two types of operations: transformations and actions. Transformations construct a new RDD from a previous one. For example, one transformation we saw before is filtering data that matches a predicate. In our text file example, we can use this to create a new RDD holding just the strings that contain “Python”:

>>> pythonLines = lines.filter(lambda line: "Python" in line)

Actions, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system (e.g., HDFS). One example of an action we called earlier is first(), which returns the first element in an RDD:

>>> pythonLines.first()
u'## Interactive Python Shell'

The difference between transformations and actions is due to the way Spark computes RDDs. Although you can define new RDDs any time, Spark only computes them in a lazy fashion, the first time they are used in an action. This approach might seem unusual at first, but makes a lot of sense when working with big data. For instance, consider the example above, where we defined a text file and then filtered the lines with “Python”. If Spark were to load and store all the lines in the file as soon as we wrote lines = sc.textFile(...), it would waste a lot of storage space, given that we then immediately filter out many lines. Instead, once Spark sees the whole chain of transformations, it can compute just the data needed for its result. In fact, for the first() action, Spark only scans the file until it finds the first matching line; it doesn’t even read the whole file.

Finally, Spark’s RDDs are by default recomputed each time you run an action on them. If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it using RDD.persist(). After computing it the first time, Spark will store the RDD contents in memory (partitioned across the machines in your cluster), and reuse them in future actions. Persisting RDDs on disk instead of memory is also possible. The behavior of not persisting by default may again seem unusual, but it makes a lot of sense for big datasets: if you will not reuse the RDD, there’s no reason to waste storage space when Spark could instead stream through the data once and just compute the result.[8]

In practice, you will often use persist to load a subset of your data into memory and query it repeatedly. For example, if we knew that we wanted to compute multiple results about the README lines that contain “Python”, we could write:

>>> pythonLines.persist()

>>> pythonLines.count()
2

>>> pythonLines.first()
u'## Interactive Python Shell'

To summarize, every Spark program and shell session will work as follows:

  1. Create some input RDDs from external data.
  2. Transform them to define new RDDs using transformations like filter().
  3. Ask Spark to persist() any intermediate RDDs that will need to be reused.
  4. Launch actions such as count() and first() to kick off a parallel computation, which is then optimized and executed by Spark.

In the rest of this chapter, we’ll go through each of these steps in detail, and cover some of the most common RDD operations in Spark.

Creating RDDs

Spark provides two ways to create RDDs: loading an external dataset and parallelizing a collection in your driver program.

The simplest way to create RDDs is to take an existing in-memory collection and pass it to SparkContext’s parallelize method. This approach is very useful when learning Spark, since you can quickly create your own RDDs in the shell and perform operations on them. Keep in mind however, that outside of prototyping and testing, this is not widely used since it requires you have your entire dataset in memory on one machine.

Example 3-2. Python parallelize example

lines = sc.parallelize(["pandas", "i like pandas"])

Example 3-3. Scala parallelize example

val lines = sc.parallelize(List("pandas", "i like pandas"))

Example 3-4. Java parallelize example

JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));

A more common way to create RDDs is to load data in external storage. Loading external datasets is covered in detail in Chapter 5. However, we already saw one method that loads a text file as an RDD of strings, SparkContext.textFile:

Example 3-5. Python textFile example

lines = sc.textFile("/path/to/README.md")

Example 3-6. Scala textFile example

val lines = sc.textFile("/path/to/README.md")

Example 3-7. Java textFile example

JavaRDD<String> lines = sc.textFile("/path/to/README.md");

RDD Operations

RDDs support two types of operations, transformations and actions. Transformations are operations on RDDs that return a new RDD, such as map and filter. Actions are operations that return a result back to the driver program or write it to storage, and kick off a computation, such as count and first. Spark treats transformations and actions very differently, so understanding which type of operation you are performing will be important. If you are ever confused whether a given function is a transformation or and action, you can look at its return type: transformations return RDDs whereas actions return some other data type.

Transformations

Transformations are operations on RDDs that return a new RDD. As discussed in the lazy evaluation section, transformed RDDs are computed lazily, only when you use them in an action. Many transformations are element-wise, that is they work on one element at a time, but this is not true for all transformations.

As an example, suppose that we have a log file, log.txt, with a number of messages, and we want to select only the error messages. We can use the filter transformation seen before. This time though, we’ll show a filter in all three of Spark’s language APIs:

Example 3-8. Python filter example

inputRDD = sc.textFile("log.txt")
errorsRDD = inputRDD.filter(lambda x: "error" in x)

Example 3-9. Scala filter example

val inputRDD = sc.textFile("log.txt")
val errorsRDD = inputRDD.filter(line => line.contains("error"))

Example 3-10. Java filter example

JavaRDD<String> inputRDD = sc.textFile("log.txt");
JavaRDD<String> errorsRDD = inputRDD.filter(
  new Function<String, Boolean>() {
    public Boolean call(String x) { return x.contains("error");
  }
});

Note that the filter operation does not mutate the existing inputRDD. Instead, it returns a pointer to an entirely new RDD. inputRDD can still be re-used later in the program, for instance, to search for other words. In fact, let’s use inputRDD again to search for lines with the word “warning” in them. Then, we’ll use another transformation, union, to print out the number of lines that contained either “error” or “warning”. We show Python here, but the union() function is identical in all three languages:

Example 3-11. Python union example

errorsRDD = inputRDD.filter(lambda x: "error" in x)
warningsRDD = inputRDD.filter(lambda x: "warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)

union is a bit different than filter, in that it operates on two RDDs instead of one. Transformations can actually operate on any number of input RDDs.

Finally, as you derive new RDDs from each other using transformations, Spark keeps track of the set of dependencies between different RDDs, called the lineage graph. It uses this information to compute each RDD on demand and to recover lost data if part of a persistent RDD is lost. We will show a lineage graph for this example in Figure 3-1.

Actions

We’ve seen how to create RDDs from each other with transformations, but at some point, we’ll want to actually do something with our dataset. Actions are the second type of RDD operation. They are the operations that return a final value to the driver program or write data to an external storage system. Actions force the evaluation of the transformations required for the RDD they are called on, since they are required to actually produce output.

Continuing the log example from the previous section, we might want to print out some information about the badLinesRDD. To do that, we’ll use two actions, count(), which returns the count as a number, and take(), which collects a number of elements from the RDD.

Example 3-12. Python error count example using actions

print "Input had " + badLinesRDD.count() + " concerning lines"
print "Here are 10 examples:"
for line in badLinesRDD.take(10):
    print line

Example 3-13. Scala error count example using actions

println("Input had " + badLinesRDD.count() + " concerning lines")
println("Here are 10 examples:")
badLinesRDD.take(10).foreach(println)

Example 3-14. Java error count example using actions

System.out.println("Input had " + badLinesRDD.count() + " concerning lines")
System.out.println("Here are 10 examples:")
for (String line: badLinesRDD.take(10)) {
  System.out.println(line);
}

In this example, we used take() to retrieve a small number of elements in the RDD at the driver program. We then iterate over them locally to print out information at the driver. RDDs also have a collect() function to retrieve the entire RDD. This can be useful if your program filters RDDs down to a very small size and you’d like to deal with it locally. Keep in mind that your entire dataset must fit in memory on a single machine to use collect() on it, so collect() shouldn’t be used on large datasets.

In most cases RDDs can’t just be collect()‘ed to the driver because they are too large. In these cases, it’s common to write data out to a distributed storage systems such as HDFS or Amazon S3. The contents of an RDD can be saved using the saveAsTextFile action, saveAsSequenceFile or any of a number actions for various built-in formats. We will cover the different options for exporting data later on in Chapter 5.

The image below presents the lineage graph for this entire example, starting with our inputRDD and ending with the two actions. It is important to note that each time we call a new action, the entire RDD must be computed “from scratch”. To avoid this inefficiency, users can persist intermediate results, as we will cover in Persistence (Caching).

RDD lineage graph created during log analysis.

Figure 3-1. RDD lineage graph created during log analysis.

inputRDD

filter    filter
errorsRDD   warningsRDDD
      union
      badLinesRDD
      count    take

Lazy Evaluation

Transformations on RDDs are lazily evaluated, meaning that Spark will not begin to execute until it sees an action. This can be somewhat counter-intuitive for new users, but may be familiar for those who have used functional languages such as Haskell or LINQ-like data processing frameworks.

Lazy evaluation means that when we call a transformation on an RDD (for instance calling map), the operation is not immediately performed. Instead, Spark internally records meta-data to indicate this operation has been requested. Rather than thinking of an RDD as containing specific data, it is best to think of each RDD as consisting of instructions on how to compute the data that we build up through transformations. Loading data into an RDD is lazily evaluated in the same way transformations are. So when we call sc.textFile the data is not loaded until it is necessary. Like with transformations, the operation (in this case reading the data) can occur multiple times.

  • Tip

    Although transformations are lazy, force Spark to execute them at any time by running an action, such as count(). This is an easy way to test out just part of your program.

Spark uses lazy evaluation to reduce the number of passes it has to take over our data by grouping operations together. In MapReduce systems like Hadoop, developers often have to spend a lot time considering how to group together operations to minimize the number of MapReduce passes. In Spark, there is no substantial benefit to writing a single complex map instead of chaining together many simple operations. Thus, users are free to organize their program into smaller, more manageable operations.

Passing Functions to Spark

Most of Spark transformations, and some of its actions, depend on passing in functions that are used by Spark to compute data. Each of the core languages has a slightly different mechanism for passing functions to Spark.

Python

In Python, we have three options for passing functions into Spark. For shorter function we can pass in lambda expressions, as we have done in the example at the start of this chapter. We can also pass in top-level functions, or locally defined functions.

Example 3-15. Passing a lambda in Python

word = rdd.filter(lambda s: "error" in s)

Passing a top-level Python function.

def containsError(s):
    return "error" in s
word = rdd.filter(containsError)

One issue to watch out for when passing functions if that if you pass functions that are members of an object, or references to fields in an object (e.g., self.field), this results in sending in the entire object, which can be much larger than just the bit of information you need. Sometimes this can also cause your program to fail, if your class contains objects that Python can’t figure out how to pickle.

Example 3-16. Passing a function with field references (don’t do this!)

class SearchFunctions(object):
  def __init__(self, query):
      self.query = query
  def isMatch(self, s):
      return query in s
  def getMatchesFunctionReference(self, rdd):
      # Problem: references all of "self" in "self.isMatch"
      return rdd.filter(self.isMatch)
  def getMatchesMemberReference(self, rdd):
      # Problem: references all of "self" in "self.query"
      return rdd.filter(lambda x: self.query in x)

Instead, just extract the fields you need from your object into local variable and pass that in, like we do below:

Example 3-17. Python function passing without field references

class WordFunctions(object):
  ...
  def getMatchesNoReference(self, rdd):
      # Safe: extract only the field we need into a local variable
      query = self.query
      return rdd.filter(lambda x: query in x)

Scala

In Scala, we can pass in functions defined inline or references to methods or static functions as we do for Scala’s other functional APIs. Some other considerations come into play though, namely that the function we pass and the data referenced in it needs to be Serializable (implementing Java’s Serializable interface). Furthermore, like in Python, passing a method or field of an object includes a reference to that whole object, though this is less obvious because we are not forced to write these references with self. Like how we did with Python, we can instead extract out the fields we need as local variables and avoid needing to pass the whole object containing them.

Example 3-18. Scala function passing

class SearchFunctions(val query: String) {
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }
  def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
    // Problem: "isMatch" means "this.isMatch", so we pass all of "this"
    rdd.map(isMatch)
  }
  def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
    // Problem: "query" means "this.query", so we pass all of "this"
    rdd.map(x => x.split(query))
  }
  def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
    // Safe: extract just the field we need into a local variable
    val query_ = this.query
    rdd.map(x => x.split(query_))
  }
}

If you “NotSerializableException” errors in Scala, a reference to a method or field in a non-serializable class is usually the problem. Note that passing in local variables or functions that are members of a top-level object is always safe.

Java

In Java, functions are specified as objects that implement one of Spark’s function interfaces from the org.apache.spark.api.java.function package. There are a number of different interfaces based on the return type of the function. We show the most basic function interfaces below, and cover a number of other function interfaces for when we need to return special types of data in the section on converting between RDD types.

Table 3-1. Standard Java function interfaces

Function name  method to implement  Usage 


Function<T, R>
R call(T)
Take in one input and return one output, for use with things like map and filter.
 

Function2<T1, T2, R>
R call(T1, T2)
Take in two inputs and return one output, for use with things like aggregate or fold.
 

FlatMapFunction<T, R>
Iterable<R> call(T)
Take in one input and return zero or more outputs, for use with things like flatMap.

Take in one input and return zero or more outputs, for use with things like flatMap.

We can either define our function classes in-line as anonymous inner classes, or make a named class:

Example 3-19. Java function passing with anonymous inner class

RDD<String> errors = lines.filter(new Function<String, Boolean>() {
  public Boolean call(String x) { return s.contains("error"); }
});

Example 3-20. Java function passing with named class

class ContainsError implements Function<String, Boolean>() {
  public Boolean call(String x) { return x.contains("error"); }
}

RDD<String> errors = lines.filter(new ContainsError());

The style to choose is a personal preference, but we find that top-level named functions are often cleaner for organizing large programs. One other benefit of top-level functions is that you can give them constructor parameters:

Example 3-21. Java function class with parameters

class Contains implements Function<String, Boolean>() {
  private String query;
  public Contains(String query) { this.query = query; }
  public Boolean call(String x) { return x.contains(query); }
}

RDD<String> errors = lines.filter(new Contains("error"));

In Java 8, you can also use lambda expressions to concisely implement the Function interfaces. Since Java 8 is still relatively new as of the writing of this book, our examples use the more verbose syntax for defining classes in previous versions of Java. However, with lambda expressions, our search example would look like this:

Example 3-22. Java function passing with lambda expression in Java 8

RDD<String> errors = lines.filter(s -> s.contains("error"));

If you are interested in using Java 8’s lambda expression, refer to Oracle’s documentation and the Databricks blog post on how to use lambdas with Spark.

  • Tip

Both anonymous inner classes and lambda expressions can reference any final variables in the method enclosing them, so you can pass these variables to Spark just like in Python and Scala.

Common Transformations and Actions

In this chapter, we tour the most common transformations and actions in Spark. Additional operations are available on RDDs containing certain type of data—for example, statistical functions on RDDs of numbers, and key-value operations such as aggregating data by key on RDDs of key-value pairs. We cover converting between RDD types and these special operations in later sections.

Basic RDDs

We will begin by evaluating what operations we can do on all RDDs regardless of the data. These transformations and actions are available on all RDD classes.

Transformations
Element-wise transformations

The two most common transformations you will likely be performing on basic RDDs are map, and filter. The map transformation takes in a function and applies it to each element in the RDD with the result of the function being the new value of each element in the resulting RDD. The filter transformation take in a function and returns an RDD which only has elements that pass the filter function.

We can use map to do any number of things from fetching the website associated with each URL in our collection, to just squaring the numbers. With Scala and python you can use the standard anonymous function notation or pass in a function, and with Java you should use Spark’s Function class from org.apache.spark.api.java.function or Java 8 functions.

It is useful to note that the return type of the map does not have to be the same as the input type, so if we had an RDD of customer IDs and our map function were to fetch the corresponding customer records the type of our input RDD would be RDD[CustomerID] and the type of the resulting RDD would be RDD[CustomerRecord].

Lets look at a basic example of map which squares all of the numbers in an RDD:

Example 3-23. Python squaring the value in an RDD

nums = sc.parallelize([1, 2, 3, 4])
squared = nums.map(lambda x: x * x).collect()
for num in squared:
    print "%i " % (num)

Example 3-24. Scala squaring the values in an RDD

val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x*x)
println(result.collect())

Example 3-25. Java squaring the values in an RDD

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
  public Integer call(Integer x) { return x*x; }
});
System.out.println(StringUtils.join(result.collect(), ","));

Sometimes we want to produce multiple output elements for each input element. The operation to do this is called flatMap. Like with map, the function we provide to flatMap is called individually for each element in our input RDD. Instead of returning a single element, we return an iterator with our return values. Rather than producing an RDD of iterators, we get back an RDD which consists of the elements from all of the iterators. A simple example of flatMap is splitting up an input string into words, as shown below.

Example 3-26. Python flatMap example, splitting lines into words

lines = sc.parallelize(["hello world", "hi"])
words = lines.flatMap(lambda line: line.split(" "))
words.first()  # returns "hello"

Example 3-27. Scala flatMap example, splitting lines into multiple words

val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first()  // returns "hello"

Example 3-28. Scala flatMap example, splitting lines into multiple words

JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello world", "hi"));
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
  public Iterable<String> call(String line) {
    return Arrays.asList(line.split(" "));
  }
});
words.first();  // returns "hello"

Pseudo Set Operations

Some simple set operations (image to be redone)

Figure 3-3. Some simple set operations (image to be redone)

RDDs support many of the operations of mathematical sets, such as union and intersection, even when the RDDs themselves not properly sets.

The set property most frequently missing from our RDDs is the uniqueness of elements. If we only want unique elements we can use the RDD.distinct() transformation to produce a new RDD with only distinct items. Note that distinct() is expensive, however, as it requires shuffling all the data over the network to ensure that we only receive one copy of each element.

The simplest set operation is union(other), which gives back an RDD consisting of the data from both sources. This can be useful in a number of use cases, such as processing log files from many sources. Unlike the mathematical union(), if there are duplicates in the input RDDs, the result of Spark’s union() will contain duplicates (which we can fix if desired with distinct()).

Spark also provides an intersection(other) method, which returns only elements in both RDDs. intersection() also removes all duplicates (including duplicates from a single RDD) while running. While intersection and union are to very similar concepts, the performance of intersection is much worse since it requires a shuffle over the network to identify common elements.

Sometimes we need to remove some data from consideration. The subtract(other) function takes in another RDD and returns an RDD that only has values present in the first RDD and not the second RDD.

We can also compute a Cartesian product between two RDDs. The cartesian(other) transformation results in possible pairs of (a, b) where a is in the source RDD and b is in the other RDD. The Cartesian product can be useful when we wish to consider the similarity between all possible pairs, such as computing every users expected interests in each offer. We can also take the Cartesian product of an RDD with itself, which can be useful for tasks like computing user similarity.

Cartesian product between two RDDs

Figure 3-4. Cartesian product between two RDDs

The tables below summarize common single-RDD and multi-RDD transformations.

Table 3-2. Basic RDD transformations on an RDD containing {1, 2, 3, 3}

Function Name  Purpose  Example  Result 

- map
Apply a function to each element in the RDD and return an RDD of the result
rdd.map(x => x + 1)
{2, 3, 4, 4}
 

- flatMap
Apply a function to each element in the RDD and return an RDD of the contents of the iterators returned. Often used to extract words.
rdd.flatMap(x => x.to(3))
{1, 2, 3, 2, 3, 3, 3}
 

- filter
Return an RDD consisting of only elements which pass the condition passed to filter
rdd.filter(x => x != 1)
{2, 3, 3}
 

- distinct
Remove duplicates
rdd.distinct()
{1, 2, 3}
 

- sample(withReplacement, fraction, [seed])
Sample an RDD
rdd.sample(false, 0.5)
non-deterministic

Table 3-3. Two-RDD transformations on RDDs containing {1, 2, 3} and {3, 4, 5}

Function Name  Purpose  Example  Result 
- union
Produce an RDD contain elements from both RDDs
rdd.union(other)
{1, 2, 3, 3, 4, 5}
 

- intersection
RDD containing only elements found in both RDDs
rdd.intersection(other)
{3}
 
- subtract
Remove the contents of one RDD (e.g. remove training data)
rdd.subtract(other)
{1, 2}
 

- cartesian
Cartesian product with the other RDD
rdd.cartesian(other)
{(1, 3), (1, 4), … (3,5)}

As you can see there are a wide variety of transformations available on all RDDs regardless of our specific underlying data. We can transform our data element-wise, obtain distinct elements, and do a variety of set operations.

Actions

The most common action on basic RDDs you will likely use is reduce. Reduce takes in a function which operates on two elements of the same type of your RDD and returns a new element of the same type. A simple example of such a function is + , which we can use to sum our RDD. With reduce we can easily sum the elements of our RDD, count the number of elements, and perform other types of aggregations.

Example 3-29. Python reduce example

sum = rdd.reduce(lambda x, y: x + y)

Example 3-30. Scala reduce example

val sum = rdd.reduce((x, y) => x + y)

Example 3-31. Java reduce example

Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer x, Integer y) { return x + y;}
});

Similar to reduce is fold which also takes a function with the same signature as needed for reduce, but also takes a “zero value” to be used for the initial call on each partition. The zero value you provide should be the identity element for your operation, that is applying it multiple times with your function should not change the value, (e.g. 0 for +, 1 for *, or an empty list for concatenation).

  • Tip

You can minimize object creation in fold by modifying and returning the first of the two parameters in-place. However, you should not modify the second parameter.

Fold and reduce both require that the return type of our result be the same type as that of the RDD we are operating over. This works well for doing things like sum, but sometimes we want to return a different type. For example when computing the running average we need to have a different return type. We could implement this using a map first where we transform every element into the element and the number 1 so that the reduce function can work on pairs.

The aggregate function frees us from the constraint of having the return the same type as the RDD which we are working on. With aggregate, like fold, we supply an initial zero value of the type we want to return. We then supply a function to combine the elements from our RDD with the accumulator. Finally, we need to supply a second function to merge two accumulators, given that each node accumulates its own results locally.

We can use aggregate to compute the average of a RDD avoiding a map before the fold.

Example 3-32. Python aggregate example

sumCount = nums.aggregate((0, 0),
               (lambda x, y: (x[0] + y, x[1] + 1),
               (lambda x, y: (x[0] + y[0], x[1] + y[1]))))
return sumCount[0] / float(sumCount[1])


Example 3-33. Scala aggregate example

val result = input.aggregate((0, 0))(
               (x, y) => (x._1 + y, x._2 + 1),
               (x, y) => (x._1 + y._1, x._2 + y._2))
val avg = result._1 / result._2.toDouble

Example 3-34. Java aggregate example

class AvgCount {
  public AvgCount(int total, int num) {
    this.total = total;
    this.num = num;
  }
  public int total;
  public int num;
  public double avg() {
    return total / (double) num;
  }
}
Function2<AvgCount, Integer, AvgCount> addAndCount =
  new Function2<AvgCount, Integer, AvgCount>() {
    public AvgCount call(AvgCount a, Integer x) {
      a.total += x;
      a.num += 1;
      return a;
  }
};
Function2<AvgCount, AvgCount, AvgCount> combine =
  new Function2<AvgCount, AvgCount, AvgCount>() {
  public AvgCount call(AvgCount a, AvgCount b) {
    a.total += b.total;
    a.num += b.num;
    return a;
  }
};
AvgCount initial = new AvgCount(0, 0);
AvgCount result = rdd.aggregate(initial, addAndCount, combine);
System.out.println(result.avg());

Some actions on RDDs return some or all of the data to our driver program in the form of a regular collection or value.

The simplest and most common operation that returns data to our driver program is collect(), which returns the entire RDD’s contents. collect suffers from the restriction that all of your data must fit on a single machine, as it all needs to be copied to the driver.

take(n) returns n elements from the RDD and attempts to minimize the number of partitions it accesses, so it may represent a biased collection. Its important to note that these operations do not return the elements in the order you might expect.

These operations are useful for unit tests and quick debugging, but may introduce bottlenecks when dealing with large amounts of data.

If there is an ordering defined on our data, we can also extract the top elements from an RDD using top. top will use the default ordering on the data, but we can supply our own comparison function to extract the top elements.

Sometimes we need a sample of our data in our driver program. The takeSample(withReplacement, num, seed) function allows us to take a sample of our data either with or without replacement. For more control, we can create a Sampled RDD and collect which we will talk about in the Sampling your data section in the Simple Optimizations chapter.

The further standard operations on a basic RDD all behave pretty much exactly as one would imagine from their name. count() returns a count of the elements, and countByValue() returns a map of each unique value to its count. See the basic RDD actions table for more actions.

Table 3-4. Basic actions on an RDD containing {1, 2, 3, 3}

Function Name  Purpose  Example  Result 


collect()
Return all elements from the RDD
rdd.collect()
{1, 2, 3, 3}
 

count()
Number of elements in the RDD
rdd.count()
4
 

take(num)
Return num elements from the RDD
rdd.take(2)
{1, 2}
 

top(num)
Return the top num elements the RDD
rdd.top(2)
{3, 3}
 

takeOrdered(num)(ordering)
Return num elements based on providing ordering
rdd.takeOrdered(2)(myOrdering)
{3, 3}
 

takeSample(withReplacement, num, [seed])
Return num elements at random
rdd.takeSample(false, 1)
non-deterministic
 

reduce(func)
Combine the elements of the RDD together in parallel (e.g. sum)
rdd.fold((x, y) => x + y)
9
 

fold(zero)(func)
Same as reduce but with the provided zero value
rdd.fold(0)((x, y) => x + y)
9
 

aggregate(zeroValue)(seqOp, combOp)
Similar to reduce but used to return a different type
rdd.aggregate(0, 0) ({case (x, y) => (y._1() + x, y._2() + 1)}, {case (x, y) => (y._1() + x._1(), y._2() + x._2()) })
(9, 4)
 

foreach(func)
Apply the provided function to each element of the RDD
rdd.foreach(func)
nothing

Converting Between RDD Types

We don’t have to do anything special to get back the correct templated/generic type of RDD (that is, our RDD of Strings can become an RDD of Integers just by calling map with the correct function). Some functions are only available on certain types of RDDs, such as average on numeric RDDs and join on key-value pair RDDs. We will cover these special functions for numeric data in (to come) and pair RDDs in Chapter 4.

In Scala and Java, these methods aren’t defined on the standard RDD class, so to access this additional functionality we have to make sure we get the correct specialized class.

Scala

In Scala the conversion between such RDDs (like from RDD[Double] and RDD[Numeric] to DoubleRDD) is handled automatically using implicit conversions. A mentioned in standard imports, we need to add import org.apache.spark.SparkContext._ for these conversion to work. You can see the implicit conversions listed in the object SparkContext in the Spark source code at ./core/src/main/scala/org/apache/spark/SparkContext.scala. These implicits also allow for RDDs of Scala types to be written out to HDFS and similar.

Implicits, while quite powerful, can sometimes be confusing. If you call a function like say stats() on an RDD, you might look at the scaladocs for the RDD class and notice there is no stats() function. The call manages to succeed because of implicit conversions between RDD[Numeric] and DoubleRDDFunctions. When looking for functions on your RDD in Scaladoc make sure to look at functions that are available in the other RDD classes.

Java

In Java the conversion between the specialized types of RDDs is a bit more explicit. This has the benefit of giving you a greater understanding of what exactly is going on, but can be a bit more cumbersome.

To allow Spark to determine the correct return type, instead of always using the Function class we will need to use specialized versions. If we want to create a DoubleRDD from an RDD of type T, rather than using Function<T, Double> we use DoubleFunction. The special Java functions table shows the specialized functions and their uses.

We also need to call different functions on our RDD (so we can’t just create a DoubleFunction and pass it to map). When we want a DoubleRDD back instead of calling map we need to call mapToDouble with the same pattern followed with all other functions.

We can modify our previous example where we squared an RDD of numbers to produce a JavaDoubleRDD. This gives us access to the additional DoubleRDD specific functions like average and stats.

Example 3-35. Java create DoubleRDD example

JavaDoubleRDD result = rdd.mapToDouble(
  new DoubleFunction<Integer>() {
    public double call(Integer x) {
      return (double) x * x;
    }
});
System.out.println(result.average());

Python

The Python API is structured a bit different than both the Java and Scala API. Like the Scala API, we don’t need to be explicit to access the functions which are only available on Double or Pair RDDs. In Python all of the functions are implemented on the base RDD class and will simply fail at runtime if the type doesn’t work.

Persistence (Caching)

As discussed earlier, Spark RDDs are lazily evaluated, and sometimes we may wish to use the same RDD multiple times. If we do this naively, Spark will recompute the RDD and all of its dependencies each time we call an action on the RDD. This can be especially expensive for iterative algorithms, which look at the data many times. Another trivial example would be doing a count and then writing out the same RDD.

Example 3-36. Scala double execute example

val result = input.map(x => x*x)
println(result.count())
println(result.collect().mkString(","))

To avoid computing an RDD multiple times, we can ask Spark to persist the data. When we ask Spark to persist an RDD, the nodes that compute the RDD store their partitions. If a node that has data persisted on it fails, Spark will recompute the lost partitions of the data when needed. We can also replicate our data on multiple nodes if we want to be able to handle node failure without slowdown.

Spark has many levels of persistence to chose from based on what our goals are. In Scala and Java, the default persist() will store the data in the JVM heap as unserialized objects. In Python, we always serialize the data that persist stores, so the default is instead stored in the JVM heap as pickled objects. When we write data out to disk or off-heap storage that data is also always serialized.

  • Tip

Off-heap caching is experimental and uses Tachyon. If you are interested in off-heap caching with Spark, take a look at the running Spark on Tachyon guide.

Example 3-37. Scala persist example

val result = input.map(x => x*x)
result.persist(MEMORY_ONLY)
println(result.count())
println(result.collect().mkString(","))
  • Tip You will note that we called persist on the RDD before the first action. The persist call on its own doesn’t force evaluation.

If you attempt to cache too much data to fit in memory, Spark will automatically evict old partitions using a Least Recently Used (LRU) cache policy. For the memory-only storage levels, it will recompute these partitions the next time they are accessed, while for the memory-and-disk ones, it will write them out to disk. In either case, this means that you don’t have to worry about your job breaking if you ask Spark to cache too much data. However, caching unnecessary data can lead to eviction of useful data and more recomputation time.

Finally, RDDs come with a method called unpersist() that lets you manually remove them from the cache.

Conclusion

In this chapter, we have covered the RDD execution model and a large number of common operations on RDDs. If you have gotten here, congratulations—you’ve learned all the core concepts of working in Spark. In the next chapter, we’ll cover a special set of operations available on RDDs of key-value pairs, which are the most common way to aggregate or group together data in parallel. After that, we discuss input and output from a variety of data sources, and more advanced topics in working with SparkContext.

@dbiswa4

This comment has been minimized.

Copy link

commented Jul 1, 2015

Very informative...

I have a question about below transformation. I understand flapMap() operation, but what x.to(3) mean?

rdd.flatMap(x => x.to(3))
{1, 2, 3, 2, 3, 3, 3}

@elangovankrishna

This comment has been minimized.

Copy link

commented Feb 19, 2016

I have the very same question as dbiswa4 does.

@dungdm

This comment has been minimized.

Copy link

commented Apr 11, 2016

x.to(3) means [x, x + 1, x + 2... 3]

@rleena

This comment has been minimized.

Copy link

commented Jul 25, 2017

Nicely explained. Thank you.

@kumgaurav

This comment has been minimized.

Copy link

commented Sep 2, 2017

what is python equivalent of this
rdd.flatMap(x => x.to(3)) is not working for python.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.