Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@sonhmai
Last active October 8, 2018 02:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sonhmai/61e3de5109e74d5cc537229b575ac6c1 to your computer and use it in GitHub Desktop.
Save sonhmai/61e3de5109e74d5cc537229b575ac6c1 to your computer and use it in GitHub Desktop.
pyspark
class pyspark.sql.DataFrame
people = spark.read.parquet("...")
department = spark.read.parquet("...")
people.filter(people.age > 30) \
.join(department, people.deptId == department.id) \
.groupBy(department.name, "gender") \
.agg({"salary": "avg", "age": "max"})
# agg -> aggregate on entire dataframe without groups
df.agg({"age": "max"}).collect()
from pyspark.sql import functions as F
df.agg(F.min(df.age)).collect()
# persists dataframe with default storage level (MEMORY_AND_DiSK). Useful in cases:
# 1) reusing in alternative loop (ML algos). 2) reuse RDD multiple times in single application, job, notebook.
# 3) upfront cost to regenerate RDD is big (HDFS, after complex set of map(), filter(),..)
df.cache()
# returns new dataframe that has exactly numPartitions partitions
df1 = df.coalesce(numPartitions=1)
df1.rdd.getNumPartitions # output 1
df.collect() # returns all records as list of Row
df.count() # number of rows in dataframe
df.describe() # basic statistics for numeric and string columns
df.distinct().count() # distinct returns a new dataframe containing distinct rows in this dataframe
# drop() returns new df that drops specified col
df.drop('age').collect()
df.drop(df.age).collect()
df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
=> [Row(age=5, height=85, name='Bob')]
df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
=> [Row(age=5, name='Bob', height=85)]
df.join(df2, 'name', 'inner').drop('age', 'height').collect()
=> [Row(name='Bob')]
df.dropDuplicates() # returns new df with duplicate rows removed, optionally only considering certain cols
from pyspark.sql import Row
df = sc.parallelize([ \
... Row(name='Alice', age=5, height=80), \
... Row(name='Alice', age=5, height=80), \
... Row(name='Alice', age=10, height=80)]).toDF()
df.dropDuplicates().show() # if name, age, height are similar -> duplicate
df.dropDuplicates(['name', 'height']).show() # if name and height are similar -> duplicate
df.dropna(how='any', thresh=None, subset=None) #subset optional list of col names to consider
df.dtypes # return list of tuples (colname, datatype)
=> [('age', 'int'), ('name', 'string')]
df.na #returns a DataFrameNaFunctions for handling missing values
# sort() and orderBy()
df.orderBy() # returns new df sorted by specified columns
df.sort(df.age.desc()).collect()
df.sort("age", ascending=False).collect()
df.orderBy(df.age.desc()).collect()
from pyspark.sql.functions import *
df.sort(asc("age")).collect()
df.orderBy(desc("age"), "name").collect()
df.orderBy(["age","name"], ascending=[0,1]).collect()
df.printSchema()
# replace(to_replace, value, subset). subset - optional list of col names to consider.
# replace to_replace with value except when to_replace is a dict
df4.na.replace(10, 20).show()
df4.na.replace('Alice', None).show()
df4.na.replace({'Alice': None}).show()
df4.na.replace(['Alice','Bob'],['A','B'],'name').show()
# take(num) -> returns 1st num rows as a list of Row
df.take(2)
# toDF(*cols) -> returns new class DataFrame with new specified col names
df.toDF('f1', 'f2').collect()
=> [Row(f1=2, f2='Alice'), Row(f1=5, f2='Bob')]
# write => DataFrameWriter -> interface for saving content of non-streaming DataFrame out into external storage
# writeStream => DataStreamWriter -> saving streaming df to external storage
class pyspark.sql.DataFrameWriter(df)
# bucketBy(numbuckets, col, *cols) -> buckets output by given cols. If specified, output is laid out on file system similar
# to Hive's bucketing scheme
(df.write.format('parquet') \
.bucketBy(numBuckets=100, col='year', cols='month') \
.mode("overwrite") \
.saveAsTable('bucketed_table'))
df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data')
jdbc(url, table, mode=None, properties=None) # saves content of DataFrame to external db via JDBC
df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
# parquet(path, mode=None, partitionBy=None, compression=None)[source]
# compresseion = 1 in ['snappy', 'gzip', 'lzo']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment