Skip to content

Instantly share code, notes, and snippets.

@jtalmi
Last active November 16, 2017 18:38
Show Gist options
  • Save jtalmi/87b615187ced5c4b848efbcaf7ab54ab to your computer and use it in GitHub Desktop.
Save jtalmi/87b615187ced5c4b848efbcaf7ab54ab to your computer and use it in GitHub Desktop.
Pyspark DataFrames codelab

PySpark DataFrames

A DataFrame is a distributed collection of data, which is organized into named columns. Conceptually, it is equivalent to relational tables with good optimization techniques.

id name age
1201 phil 25
1202 barbara 28
1203 jon 39
1204 dirk 23
1205 sanjay 23

DataFrames are a high-level abstraction providing “declarative” transforms and computations. This allows Spark to optimize your job to run in a more efficient manner.

A DataFrame can be constructed from an array of different sources such as Hive tables, Structured Data files, external databases, lists of Rows or tuples, existing RDDs, or dictionaries.

Setup

To begin using DataFrames, you can obtain a SQLContext by accessing sc.sql. A SQLContext can be used create DataFrames, register DataFrames as tables, execute SQL over tables, cache tables, and read parquet files.

from pyspark import SparkContext
sc = SparkContext()
sql = SQLContext(sc)

Creating DataFrames

To begin, let's import the objects and functions we will need in the codelab:

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, TimestampType, DateType()
import pyspark.sql.functions as F
import numpy as np

As stated above, DataFrames can be created from a variety of sources. In your ETL jobs, you will read data into DataFrames from raw hive tables (shops, transactions, subscriptions, etc.), but DataFrames can also be generated from lists of certain objects, RDDs, or pandas DataFrames. Since DataFrames are column-based data stores, the objects passed into them must be column-delimited. You can't create a DataFrame out of a pure list like [1, 2, 3, 4], but you can create one out of a list of multi-dimensional objects, since each object is treated like a column, e.g. [('c', 3), ('d', 4)][('a', 1), ('b', 2)]

Let's start by creating the DataFrame from the top of the page.

names = [(1201, 'phil', 25), (1202, 'barbara', 28), (1203, 'jon', 39), (1204, 'dirk', 23), (1205, 'sanjay', 23)]
df = sc.sql.createDataFrame(names, ['id', 'name', 'age'])
df.show()
df.show(3)
df.collect()
  1. You created a list of tuples with each tuple corresponding to a row in the DataFrame
  2. You created the DataFrame while specifying column names in this case.
  3. You ran df.show() which dispalys the DataFrame in markdown (default number of rows is 20)
  4. You ran df.collect() which displays the DataFrame as a list of rows in the console.

Be careful using df.collect() with large DataFrames as this method collects the entire DataFrame in memory.

You can create spark DataFrames from RDDs as well.

rdd = sc.parallelize(names)
df = sc.sql.createDataFrame(names, ['id', 'name', 'age'])
df.collect()

The output of df.collect() is the list of the Row objects that constitute the DataFrame. Rows can be created standalone as well, but remember that DataFrames only take in lists of rows:

from pyspark.sql import Row
row = Row(id=1206, name='Alice', age = 41)
df2 = sc.sql.createDataFrame([row])
df2.collect()

Rows can also be created using other row objects.

Person = sc.sql.Row('id', 'name', 'age')
alice = Person(1206, 'Alice', 41)

Finally, you can combine two DataFrames if they have the same number of columns.

df = df.union(df2)
df.collect()
df.show()

df.union() is column-name agnostic:

row2 = Row(id=1206, age= 15, person_name='Alice')
df2 = sc.sql.createDataFrame([row])
df = df.union(df2)
df.collect()
df.show()

Note the following:

  • The entire age column was converted to strings (to conform to the new row value ‘Alice’)
  • Alice's age was converted into a string (to conform to the df.name column)

We avoid these issues by creating DataFrames with schemas, which assign an object type and other properties to each column.

For more on creating DataFrames, please read the official documentation here

DataFrame schemas: StructType and StructField

Typically, each column of a DataFrame has a singular object type. It helps if we pre-define the types of each column by using schemas made up of StructTypes and StructFields.

A StructField specifies a column name, object class, and boolean for if null values are allowed:

s1 = StructField("id", IntegerType(), False)
s2 = StructField("name", StringType(), False)

Pyspark has it's own object types which must be used in DataFrame functions. The full list can be found here

A StructType consists of a list of StructFields which constitutes the schema of the DataFrame.

schema = StructType([s1, s2, StructField("age", IntegerType(), True)])

Let's say we don't always know the age of the user, so we allow nullables in the age column.

We can now recreate our original DataFrames using this schema:

df = sc.sql.createDataFrame(names, schema)
row = Row(id=1206, age = 41, name='Alice')
df2 = sc.sql.createDataFrame([row], schema)
df = df.union(df2)
df.collect()

Even though the order of values were different, the schemas ensured that the two DataFrames were combined properly.

Let's add some more rows using the Person row template.

Person = Row('id', 'name', 'age')
ids = [1407, 11, 4151, 182, 934, 4122, 3001, 1321, 5197]
names = ['rachel', 'bob', 'maria', 'bob', 'leah', 'beth', 'liam', 'anna', 'liana']
ages = [13, 22, 29, 19, 25, 19, 43, 24, 20]
users = [Person(ids[x], names[x], ages[x]) for x in range(len(names))] 
df2 = sc.sql.createDataFrame(users, schema)
df = df.union(df2)
df.collect()

This looks about right. We will be using df throughout the codelab so create a backup which you can refer to if things go wrong.

backup = df

Let's say that this DataFrame constitutes a slice of our userbase. Let's learn some things about this dataset.

Navigating DataFrames

We want to know things about our data. Columns can be selected by the following:

df['name']
df.name

But the output of these are column objects, which cannot be viewed in isolation

Using the select() method allows you to select columns -- or transformations of columns -- and output output a DataFrame. This means you can run DataFrame operations on slices or transformations of DataFrames, including collect()orshow()`.

df.select('name')
df.select('age', 'name').collect()

You can also refer to columns directly by using F.col() as in F.col('age')

Notice how F.col('age') creates a placeholder without referencing a specific DataFrame. You can use F.col('age') in DataFrame operations like select() and the appropriate column will be called from the database

df.select(F.col('name')).collect()

Other functions output columns corresponding to a specific aggregation function.

df.select(F.min('age')).collect()
df.select(F.avg('age')).collect()
df.select(F.first('id')).collect()

These functions are used when you need to generate a column object.

For example, if you want to create a new column in your DataFrame corresponding to the last id, you would set that equal to F.last(df.id). We will talk more about adding new columns to DataFrames below.

Grouping data

To run database operations aggregating over the whole DataFrame or slices of it -- like count, sum, min, max, average, mean -- you have to use groupBy, which groups rows together based on common elements in each column.

When there are no restrictions in the groupBy clause, you can calculate descriptive stats for the whole DataFrame.

df.groupBy().count().collect()
df.groupBy().avg('age').collect()
df.groupBy().avg('age', 'id').collect()

#The first tells us there are 15 users in our DataFrame, but let's run the same command after grouping by age. df.groupBy('age') or df.groupBy(df.age) creates a new object class called GroupedData, after which you can run grouped data operation, so we can count the number of users belonging to each age group

sorted(df.groupBy(df.age).count().collect())

#sorted() orders the df in terms of age from lowest to highest, so we can see there are 2 users at each 19 and 25 years of age

We can check the min and max ages as well:

df.groupBy().min('age').collect()
df.groupBy().max('age').collect()

Agg is a function used to compute aggregates and return the results as a DataFrame. It is an alias for df.groupBy() plus an aggregate function

df.agg({'age': 'avg'}).collect()

is the same as

df.groupBy().avg('age').collect()` and `df.agg(F.avg('age')).collect()

The keys in the dictionary are the columns and the values are the aggregation functions. Using star in any of the keys means the aggregation functions will be applied to all columns.

df.agg({'age': 'avg', 'name': 'count'}).collect()

You can use agg after data has already been grouped

grouped_df = df.groupBy(df.name)
sorted(grouped_df.agg({"*": "count"}).collect())
sorted(grouped_df.agg(F.min(df.age)).collect())

Column operations

Columns are interoperable. This means that operate on them the same way we would many other object types.

df.age + 100
df.select(df.age + 100).collect()
df.select(df.age + df.id).collect()
df.select(df.name + str(df.id)).collect()

There are a number of functions taking in columns and returning columns. We saw F.min(df.column) and F.avg(df.column) above, which returned columns populated by the minimum and average of df.column.

Others include F.first, (taking the first value) and F.last (taking the last value. F.lit creates a column of the literal value specified in brackets.

F.lit(5) creates a column of 5s, while F.lit('dog') creates a column of the string dog.

As previously mentioned, F.col() refers to a column in the database when so applied.

These functions are useful for generating columns to add to DataFrames, or after grouping data together.

#Filtering We may want to filter the DataFrame.

df.where(df.age == 19).collect()

is the same as:

df.where("age = 19").collect()

Where, or its alias filter, takes in a column of booleans or a string of SQL. A condition like df.age == 19 or df.age > 25 creates a column of booleans, whereas "age = 19" is a SQL expression wrapped in quotes.

When() and otherwise() are used to assign new values to slices of the DataFrames based on filtering conditions without actually filtering. It takes in a list of conditions and returns one of multiple possible result expressions.

df.select(F.when(df['age'] == 19, "nineteen")
df.select(F.when(df['age'] == 19, "nineteen").otherwise("not_nineteen")).collect()

We can use this to add columns to the DataFrame and use the alias() function to rename them:

df.select('id', 'name', 'age', F.when(df['age'] == 19, "Nineteen").otherwise("not_nineteen").alias("is_nineteen")).collect()

This is useful when you need to demarcate certain features of an existing DataFrame, like marking whether a shop has made a sale or not in a given time frame.

Altering DataFrames

The primary method for creating new columns or changing existing columns is using df.withColumn.

df2 = df.withColumn('new_column', df.age + 1000)
df2.collect()

The second argument must be a column, so remember to use column generators like F.lit and F.col.

df.withColumn('number_5', 5).collect()
df.withColumn('number_5', F.lit(5).collect()

Now we can apply column functions to our DataFrame, such as cast which transforms a column's object type

df = df2.withColumn('age', df2.age.astype(IntegerType()))
df.collect()

We can add columns to existing DataFrames. For example, F.current_date() returns a column for the current date

from datetime import datetime
df = df.withColumn('date', F.current_date())
df.collect()

And use spark's date functinos to manipulate these columns.

df.withColumn('date', F.date_format(df.date, 'yyyy-MM')).collect()

Or we can truncate dates entirely, which makes it easier to perform grouping by day or month

dates = [datetime.date()]
df = df.withColumn('date', F.trunc(df.date, 'month'))
df.collect()
df.groupBy('date').agg({'age': 'avg'})
df = df.drop(df.date)

We can also combine withColumn with F.when in order to create dummy variables, or other condition-based columns

Above we saw the command for assigning a dummy variable to a df using select:

df.select('id', 'name', 'age', F.when(df['age'] == 19, "Nineteen").otherwise("Not nineteen").alias("is_nineteen")).collect()

But using withColumn is much easier especially for large DataFrames

df.withColumn('is_nineteen', F.when(df.age == 19, 1).otherwise(0)).collect()

Let's try a more challenging example. We just added a column for the current date but let let's introduce some variation by generating a bunch of random dates from the last year.

from datetime import timedelta
date = date.today()
#Create a list of tuples out of ids and dates
ids = [x[0] for x in df.select('id').collect()]
tuples = [tuple([ids[i], date + timedelta(days=np.random.randint(-365, 0, 1)[0])]) for i in range(len(df.collect()))]
df2 = sc.sql.createDataFrame(tuples, schema=StructType([StructField('id', IntegerType(), False), StructField('date', DateType(), False)]))
df.withColumn('date', df2.date)

Doesn't work. We have to join the two DataFrames together as we would with SQL databases. The default is an inner join. Let's remove three entries from the list of tuples, recreate the DataFrame, and test out some joins. Note the for loop in this case is only used to execute the remove command three times

for i in range(3):
	tuples.remove(tuples[-1])
df2 = sc.sql.createDataFrame(tuples, schema=StructType([StructField('id', IntegerType(), False), StructField('date', DateType(), False)]))
len(df2.collect())

There are 12 rows in the new DataFrame compared to 15 in df. When we go with the default inner join, we will end up with only rows where id is matched in both DataFrames, so only 12 rows.

df.join(df2, 'id').collect()

An outer join is greedy in the sense that it will include all possible rows in both DataFrames

df.join(df2, 'id', 'outer').collect()

Other types of joins:

df3 = df.select('id', 'name', 'age').where(df.id < 2000).where(df.id > 500)
df3.collect()
df.join(df3, 'id', 'inner').collect()
df.join(df3, 'id', 'outer').collect()
df.join(df3, 'id', 'left_outer').collect()
df.join(df3, 'id', 'right_outer').collect()

In this case, inner is the same as right_outer whereas outer is the same as left_outer.

Use select to refine your output DataFrame so that there aren't column doubles.

df.join(df3, 'id', 'inner').select(df3.id, df.name, df.age).collect()

We can set a condition to specifiy the columns being joined on.

cond = [df.name == df3.name, df.age == df3.age]
df.join(df3, cond, 'outer').select(df.name, df3.age).collect()

We can join on multiple columns. Let's create a new DataFrame with we remove one person's age.

rows = df3.collect()
rows[5] = Row(id=1206, name=u'Alice', age=None)
df4 = sc.sql.createDataFrame(rows, schema)
df.join(df4, ['name', 'age']).select(df.name, df.age).collect()

Compare that to the output of:

df.join(df3, ['name', 'age']).select(df.name, df.age).collect()

Since we're joining on name and age, the row with the null age was dropped in our inner join

Window functions

A window is a group of rows surrounding a given row defined by a pre-set rule. Examples of windows include "The 3 rows before and 3 rows after each row"

from sc.sql.window import Window
window = Window.rowseBetween(-3, 3)

or "All the rows before each row".

window = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)

DataFrame functions are executed on the window surrounding each row instead of the row itself. The result is single object or value associated with each row.

window = Window.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn('window', F.sum(df.age).over(window)).collect()

Windows are often used for ordered and partitioned data, such as time series with categorical variables like country of origin. For example, if you have a dataset of orders data with a timestamp and country-of-origin columns, you can use window functions to calculate the number of order coming from each country per day. This is accomplished by partitioning the data by country, ordering by date, and summing up all the rows preceding. The window would be:

window = Window.orderBy("date").partitionBy("country").rowsBetween(Window.unboundedPreceding, Window.currentRow)

To partition by age, order by ID, and take a window of 1 row preceding and 1 row following, we use the following window:

window = Window.partitionBy("age").orderBy("id").rowsBetween(-1, 1) 
df.orderBy('age').select('age', F.min('id').over(window).alias('min_id')).collect()

Another frame specification is rangeBetween, which creates windows for the values specified. For example, in a table of orders with dollar amounts attached, we could set rangeBetween(-10, 40) the window would include all the rows within [current order value - 10] and [current order value + 40].

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

#Spark SQL Under the hood, DataFrame operations and commands are executed in SQL. You can access sql directly by running

df.registerTempTable("table1")
df = sc.sql.sql("SELECT * FROM table1") 
df.collect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment