Skip to content

Instantly share code, notes, and snippets.

@KobaKhit
Last active August 27, 2020 23:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save KobaKhit/699260a40b60c767795fc46d01aeb6a2 to your computer and use it in GitHub Desktop.
Save KobaKhit/699260a40b60c767795fc46d01aeb6a2 to your computer and use it in GitHub Desktop.
Repartition skewed pyspark dataframes.
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql import Window
from functools import reduce
def partitionIt(size, num):
'''
Create a list of partition indices each of size num where number of groups is ceiling(len(seq)/num)
Args:
size (int): number of rows/elemets
num (int) : number of elements in partition
Return:
a list with partition indices `[1,1,1,1,2,2,2,2, ...]`
'''
avg = size / float(num)-1
out = []
last = 0.0
index = 0
while last < size:
out.append([index]*num)
last += avg
index += 1
out = reduce(lambda x,y: x+y,out)[:size]
return out
# set up
df = ...
n = 1000 # number of records per partition
# convert list to a spark dataframe
index = partitionIt(df.count(),n)
b = spark.createDataFrame([(l,) for l in index], ['Index'])
# add index and join both dataframe to get the final result
df = df.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
b = b.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
df = df.join(b, df.row_idx == b.row_idx,'left').\
drop("row_idx")
# save
df.write.partitionBy('Index').save('')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment