Skip to content

Instantly share code, notes, and snippets.

@lassebenni
Last active April 30, 2020 11:13
Show Gist options
  • Save lassebenni/2112808fc89bffa5a382e25262a9e875 to your computer and use it in GitHub Desktop.
Save lassebenni/2112808fc89bffa5a382e25262a9e875 to your computer and use it in GitHub Desktop.
Spark
  1. Connect to spark by creating a spark context.

from pyspark import SparkContext, SparkConf conf = SparkConf().setAppName('somename').setMaster('local') sc = SparkContext(conf=conf) The appName parameter is a name for your application to show on the cluster UI. master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit and receive it there. However, for local testing and unit tests, you can pass “local” to run Spark in-process.

  1. Creating an RDD. RDD's are distributed objects that contain data. Mostly used for lists. Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system.

Local collection:

data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) distData.reduce(lambda a, b: a + b) External collection:

PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

Text file RDDs can be created using SparkContext’s textFile method. This method takes an URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI) and reads it as a collection of lines. Here is an example invocation:

distFile = sc.textFile("data.txt")

  1. Entire directories of textfiles can be read by using WholeTextfiles.

SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file.

files = sc.wholeTextFiles('*.txt') files.collect()

  1. Using Sample to take a part of the dataset.

Things I learned

Using wildcards! All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").

SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with textFile, which would return one record per line in each file

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.

SequenceFile is a way to store a lot of small files in one big file so it uses less of the HDFS cluster's memory. The file will take more space, but it is much quicker because MapReduce jobs don't have to switch for each small file.

You can't expect Spark to retain knowledge about global variables or scope. The Executor nodes (nodes running spark actions) have copies of scoped variables and won't update the global state.

The variables within the closure sent to each executor are now copies and thus, when counter is referenced within the foreach function, it’s no longer the counter on the driver node. There is still a counter in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of counter will still be zero since all operations on counter were referencing the value within the serialized closure.

The same applies to working with a print statement on the driver node. The execturor's run their code and print output to their own stdout, not to the driver node's:

Another common idiom is attempting to print out the elements of an RDD using rdd.foreach(println) or rdd.map(println). On a single machine, this will generate the expected output and print all the RDD’s elements. However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these! To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). Collect() can cause resource problems on the driver node:

This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine. Tuples can be operated on in RDD's by 'groupByKey' and 'mapValues'.

rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) sorted(rdd.groupByKey().mapValues(len).collect())[('a', 2), ('b', 1)] sorted(rdd.groupByKey().mapValues(list).collect())​ .join() for getting the similar key/value pairs as a new dataset. When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.

Garbage Collection: The main point to remember here is that the cost of garbage collection is proportional to the number of Java objects, so using data structures with fewer objects (e.g. an array of Ints instead of a LinkedList) greatly lowers this cost.

Memory issues: Before trying other techniques, the first thing to try if GC is a problem is to use serialized caching.

Dynamically Loading Spark Properties. In some cases, you may want to avoid hard-coding certain configurations in a SparkConf. For instance, if you’d like to run the same application with different masters or different amounts of memory. Spark allows you to simply create an empty conf.

Then, you can supply configuration values at runtime:

./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

Dataframes are basically a DataSet (derivative of RDD) squeezed into Rows and Columns.

Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine.

UDF (user defined functions) can be used as 'SQL'-functions that run on values in DataFrames:

from pyspark.sql.types import IntegerType slen = udf(lambda s: len(s), IntegerType()) df.select(slen(df.name).alias('slen')).collect() [Row(slen=5), Row(slen=3)] This can be handy to create own transformation/filtering on the DataFrame. Things I didnt understand

Why is groupByKey slower than reduceByKey What is an associative and commutative reduce function. What is Output will be partitioned with C{numPartitions} partitions, or the default parallelism level if C{numPartitions} is not specified. Default partitioner is hash-partition. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations.

Things I want to do

Check what other builtin methods from operator has. Run spark cluster in docker Run spark cluster in Azure

To test Spark Jobs, we can combine PyTest and Spark.

A commonly used object in Spark is the SparkContext and SparkSession. For each SparkJob we create one or the other. Since Spark 2.0.0 it has become SparkSession, which encapsulates functionality of multiple SparkContexts in a single point of entry. To reuse this SparkSession, we can define it as a fixture:

1

spark_settings.py

2 import pytest 3 from pyspark.sql import SparkSession 4 @pytest.fixture(scope="session") 5 def get_spark(request): 6 spark = SparkSession.builder 7 ​ 8 .master("local") 9 ​ 10 .appName("test_job")
11 .config("spark.some.config.option", "some-value")
12 .getOrCreate() 13 request.addfinalizer(lambda: spark.stop()) 14 return spark Here we use PyTest to declare an annotated fixture that is usable for all tests during the session (all collected test items). This creates a "spark" object that returns a SparkSession with the appropriate configuration files. We can add a finalizer that runs a lambda after the fixture is used. In our case, this will be after all the collected tests have run, since we defined the scope to be "session". If we had no scope definition, the default would be to create and destroy the SparkSession after each function. That would be quite a waste of time and resources, and ususally not necessary unless we wanted a different kind SparkContext configuration for a specific module or function.

Now to make use of this session-wide fixture we can just import it in one of our test files:

1

test_some.py

2 import pytest 3 from spark_settings import get_spark 4 def test_version(get_spark): 5 assert get_spark.version == '2.3.0' Voilà, the spark object contains the SparkSession and is injected into the "test_version" function by simply adding it to the function as a parameter. In the current setup the spark dependency is automatically injected into each function that imports the fixture. If you have a lot of files that will be importing fixtures, it can get messy. PyTest uses a single point of entry for Fixtures by using a file named conf.test. In here, you can define any number of fixtures, or import them from other files. Then, PyTest supplies all these fixtures from conf.test into your collected test functions. It's a convenient way to view all the available fixtures:

1

conftest.py

2 import pytest 3 from spark_settings import get_spark 4 @pytest.fixture(scope="session", autouse=True) 5 def spark(request): 6 return get_spark(request) Notice the 'autouse=True' paramater added to the fixture. This will create the spark-fixture automatically when PyTest starts collecting the testfunctions. Autouse can be used to setup/run certain code that will be available for each testfunction. You can create autouse fixtures that modify other fixtures before passing them along to testfunctions. In this case, ALL testfunctions in our TestSuite will have access to the spark fixture automatically and the spark-fixture will only be created once. Now in our tests:

1

test_some.py

2 import pytest 3 def important_test(spark): 4 assert spark.version == '2.0.0' PyTest does a couple of things here. First it gathers the fixtures defined in conftest.py and these will be available as dependencies for the test functions. The function "important_test" can access the spark object without explicitely importing either spark_settings or conftest.py. Neat. Pytest also loads possible plugins (like pep-8 compliance), which we didn't declare this time. Furthermore, it's possible to run functionality before and after your tests functions, these are called hooks. Finally the conftest.py file will be used as project root, meaning all submodules will be added to the sys.path. This way your tests will be able to import modules without explicitely adding them to PYTHONPATH. On all these features and more, checkout this Stackoverflow post.

Common practice is to now have a similar directory tree to:

1 .
2 ├── my_code.py
3 └── tests 4 ├── test_some.py 5 ├── conftest.py 6 ├── fixtures 7 ├── spark.py Here we have our functionality/code at the top-level dir. That also contains the folder 'tests', which has a testfile, conftest.py for all the reasons listed above, and (personal-taste) a folder containing all the fixtures. Note that you can import all your fixtures from whatever location you wish into your conftest.py. If your fixtures are specifically designed as test-cases, you could consider putting them in a folder like shown above.

Running our tests is as simple as running pytest in the root folder of the project. PyTest will look for the conftest.py and start collecting tests to run.

1 Testing started at 10:13 AM ... 2 Launching py.test with arguments in /opt/project/spark-tests 3 ============================= test session starts ============================== 4 platform linux2 -- Python 2.7.15, pytest-3.6.1, py-1.5.3, pluggy-0.6.0 5 spark version -- Spark 2.0.0 built for Hadoop 2.7.2 | Build flags: -Psparkr -Phadoop-2.7 6 -Phive -Phive-thriftserver -Pyarn -DzincPort=3036 7 rootdir: /opt/project/spark-tests, inifile: 8 plugins: spark-0.4.4collected 1 item 9 tests/test_some.py [100%] 10 ========================== 1 passed in 11.10 seconds =========================== We can see some information about the platform we are running our tests, versions of Spark and Hadoop, root directory of the files and some plugins. Finally we see the files we collected: "tests/test_some.py" and that the code passes.

Pytest-Spark

There's a python module on Github called Pytest-Spark. It's basically a wrapper on everything we talked about above. It provides some other functionality for logging. It might be interesting to take a look at it, if only for reference on your own Spark fixtures. The init.py contains the meat of the operation: https://github.com/malexer/pytest-spark/blob/master/pytest_spark/__init__.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment