Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save searchs/7b1ddf8965740ccf878a858b07741718 to your computer and use it in GitHub Desktop.
Save searchs/7b1ddf8965740ccf878a858b07741718 to your computer and use it in GitHub Desktop.
Cheat sheet for Spark Dataframes (using Python)
# A simple cheat sheet of Spark Dataframe syntax
# Current for Spark 1.6.1
# import statements
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
#creating dataframes
df = sqlContext.createDataFrame([(1, 4), (2, 5), (3, 6)], ["A", "B"]) # from manual data
df = sqlContext.read.format('com.databricks.spark.csv') \
.options(delimiter=';',header='true', inferschema='true',mode="FAILFAST") \
.load('csv_file_name_or_*_reference')
# adding columns and keeping existing ones
df.withColumn('zero', F.lit(0)) # add constant column
df.withColumn('A_times_two', df.A * 2)
# selecting columns, and creating new ones
df.select(
'A' # most of the time it's sufficient to just use the column name
, col('A').alias('new_name_for_A') # in other cases the col method is nice for referring to columnswithout having to repeat the dataframe name
, ( col('B') > 0 ).alias('is_B_greater_than_zero')
, unix_timestamp('A','dd.MM.yyyy HH:mm:ss').alias('A_in_unix_time') # convert to unix time from text
)
# filtering
df.filter('A_in_unix_time > 946684800')
# grouping and aggregating
df.groupBy("A").agg(
first("B").alias("my first")
, last("B").alias("my last")
, sum("B").alias("my everything")
)
# pivoting
df.groupBy('A','B').pivot('C').agg(first('D')).orderBy(['A','B']) # first could be any aggregate function
# inspecting dataframes
display(df) # table in notebook at least
df.show() # text table
######################################### Date time manipulation ################################
# Casting to timestamp from string with format 2015-01-01 23:59:59
df.select( df.start_time.cast("timestamp").alias("start_time") )
# Get all records that have a start_time and end_time in the same day, and the difference between the end_time and start_time is less or equal to 1 hour.
condition = \
(to_date(df.start_time) == to_date(df.end_time)) & \
(df.start_time + expr("INTERVAL 1 HOUR") >= df.end_time)
df.filter(condition).show()
############### WRITING TO AMAZON REDSHIFT ###############
REDSHIFT_JDBC_URL = "jdbc:redshift://%s:5439/%s" % (REDSHIFT_SERVER,DATABASE)
df.write \
.format("com.databricks.spark.redshift") \
.option("url", REDSHIFT_JDBC_URL) \
.option("dbtable", TABLE_NAME) \
.option("tempdir", "s3n://%s:%s@%s" % (ACCESS_KEY,SECRET, S3_BUCKET_PATH)) \
.mode("overwrite") \
.save()
######################### REFERENCE #########################
# recast variable
df.select(df[c],df[c].cast('int'))
dtype_dict = {'Player' : StringType, 'Pos' : StringType, 'Tm' : StringType, 'Age' : IntegerType, 'G' : IntegerType, 'GS' : IntegerType, 'yr' : IntegerType}
df2 = df.fillna('0')
for c in df2.schema.names[6:]:
dtype = DoubleType if c not in dtype_dict.keys() else dtype_dict[c]
df2 = df2.withColumn(c,df2[c].cast(dtype()))
## UDF
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
maturity_udf = udf(lambda age: "adult" if age >=18 else "child", StringType())
df = sqlContext.createDataFrame([{'name': 'Alice', 'age': 1}])
df.withColumn("maturity", maturity_udf(df.age))
##
# aggregate functions
approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct
# window functions
cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber
# string functions
ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper
# null and nan functions
isNaN, isnotnull, isnull
# misc functions
array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when
# datetime
current_date, current_timestamp, trunc, date_format
datediff, date_add, date_sub, add_months, last_day, next_day, months_between
year, month, dayofmonth, hour, minute, second
unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment