Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save searchs/66beef7a5ee6ec1e2b0afff0a6f5750a to your computer and use it in GitHub Desktop.
Save searchs/66beef7a5ee6ec1e2b0afff0a6f5750a to your computer and use it in GitHub Desktop.
Cheat sheet for Spark Dataframes (using Python)
# A simple cheat sheet of Spark Dataframe syntax
# Current for Spark 1.6.1
# import statements
#from pyspark.sql import SQLContext
#from pyspark.sql.types import *
#from pyspark.sql.functions import *
from pyspark.sql import functions as F
#SparkContext available as sc, HiveContext available as sqlContext.
#creating dataframes
df = sqlContext.createDataFrame([(1, 4,5), (1,5,4), (2, 5,3),(2,3,1) ,(3, 6,2),(3,1,3)], ["A", "B", "C"]) # from manual data
df.show()
df = sqlContext.read.format('com.databricks.spark.csv') \
.options(delimiter=';',header='true', inferschema='true',mode="FAILFAST") \
.load('csv_file_name_or_*_reference')
df.filter(df.A >2).select(df.B,df.C+1).show()
#or equivalently
sqlContext.registerDataFrameAsTable(df, "table1")
sqlContext.sql('select B, C +1 from table1 where A > 2').show()
df.select(df.A,df.B).where(df.A>2).show()
df[df.A>2].select(df.A,df.B).show()
df.select(df.A,df.B).where(df.A>2).explain()
df[df.A>2].select(df.A,df.B).explain()
df.cache() # put the df in cache and results will be cached too (try to run a count twice after this)
df.count()
df.count()
# adding columns and keeping existing ones F.lit(0) return a column
df.withColumn('zero', F.lit(0))
df.withColumn('A_times_two', df.A * 2)
# convert A as string
df = df.withColumn('A',df['A'].cast(StringType()))
# selecting columns, and creating new ones
df.select(
'A' # most of the time it's sufficient to just use the column name
, col('A').alias('new_name_for_A') # in other cases the col method is nice for referring to columnswithout having to repeat the dataframe name
, ( col('B') > 0 ).alias('is_B_greater_than_zero')
, unix_timestamp('A','dd.MM.yyyy HH:mm:ss').alias('A_in_unix_time') # convert to unix time from text
)
# filtering
df.filter('A_in_unix_time > 946684800')
# grouping and aggregating (first row or last row or sum in the group)
df.groupBy("A").agg(
first("B").alias("my first")
, last("B").alias("my last")
, sum("B").alias("my everything")
)
#grouping and sorting (count is the name of the created column)
temp = df.groupby('A').count().sort(F.desc('count'))
#top 10
temp.take(10)
# aggregation
df.agg({'A':'max','B','min'}).show()
#or
df.agg(F.max(df.A),F.min(df.B)).show()
# pivoting
df.groupBy('A','B').pivot('C').agg(first('D')).orderBy(['A','B']) # first could be any aggregate function
# inspecting dataframes
df.show() # text table
########### pandas integration
df.toPandas()
######################################### Date time manipulation ################################
# Casting to timestamp from string with format 2015-01-01 23:59:59
df.select( df.start_time.cast("timestamp").alias("start_time") )
# Get all records that have a start_time and end_time in the same day, and the difference between the end_time and start_time is less or equal to 1 hour.
condition = \
(to_date(df.start_time) == to_date(df.end_time)) & \
(df.start_time + expr("INTERVAL 1 HOUR") >= df.end_time)
df.filter(condition).show()
############### WRITING TO AMAZON REDSHIFT ###############
REDSHIFT_JDBC_URL = "jdbc:redshift://%s:5439/%s" % (REDSHIFT_SERVER,DATABASE)
df.write \
.format("com.databricks.spark.redshift") \
.option("url", REDSHIFT_JDBC_URL) \
.option("dbtable", TABLE_NAME) \
.option("tempdir", "s3n://%s:%s@%s" % (ACCESS_KEY,SECRET, S3_BUCKET_PATH)) \
.mode("overwrite") \
.save()
######################### REFERENCE #########################
# aggregate functions
approxCountDistinct, avg, count, countDistinct, first, last, max, mean, min, sum, sumDistinct
# window functions
cumeDist, denseRank, lag, lead, ntile, percentRank, rank, rowNumber
# string functions
ascii, base64, concat, concat_ws, decode, encode, format_number, format_string, get_json_object, initcap, instr, length, levenshtein, locate, lower, lpad, ltrim, printf, regexp_extract, regexp_replace, repeat, reverse, rpad, rtrim, soundex, space, split, substring, substring_index, translate, trim, unbase64, upper
# null and nan functions
isNaN, isnotnull, isnull
# misc functions
array, bitwiseNOT, callUDF, coalesce, crc32, greatest, if, inputFileName, least, lit, md5, monotonicallyIncreasingId, nanvl, negate, not, rand, randn, sha, sha1, sparkPartitionId, struct, when
# datetime
current_date, current_timestamp, trunc, date_format
datediff, date_add, date_sub, add_months, last_day, next_day, months_between
year, month, dayofmonth, hour, minute, second
unix_timestamp, from_unixtime, to_date, quarter, day, dayofyear, weekofyear, from_utc_timestamp, to_utc_timestamp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment