Skip to content

Instantly share code, notes, and snippets.

@ghukill
Last active November 30, 2017 19:25
Show Gist options
  • Save ghukill/8c8d97fd9d18042b1df8bcea26258b0e to your computer and use it in GitHub Desktop.
Save ghukill/8c8d97fd9d18042b1df8bcea26258b0e to your computer and use it in GitHub Desktop.
RDD subsets with zipWithIndex()
def rdd_subset(rdd, chunk_size_limit=10000):
'''
Small method to create subsets of a pyspark RDD.
Achieved by zipping the input RDD with .zipwithIndex(),
accepting a chunk size not to exceed, and returning lazily evaluated
RDDs with nearly evenly distributed subsets.
Note: This can be quite inefficient, as each time an RDD is used from the
resulting subsets, the original RDD must be looked over again. 100,000 rows, 10k
chunk sizes, look at 10-11 quick counts of the originating RDD.
However, this can allow for more memory efficient processing, e.g.
saveAsNewAPIHadoopFile, which uses a scala SerDeUtil that can wreak havoc
with spark driver heap size. Using this, one can pass subsets of an RDD to a
method like that, that you might not have much control over in the context of the
pipeline.
Args:
rdd (pyspark rdd): input RDD that will be split by appended ID at various chunks
chunk_size_limit (int): number of RDD rows to not exceed per chunk (specifically aimed
at limiting chunk size for memory intensive operations)
Returns:
(list): List of RDDs with subset of original RDD
'''
# empty list for subsets
subsets = []
# zip with index
zrdd = rdd.zipWithIndex()
# get count
count = zrdd.count()
# determine number of chunks
steps = count / chunk_size_limit
if steps % 1 != 0:
steps = int(steps) + 1
# evenly distribute chunks, while not exceeding chunk_limit
dist_chunk_size = int(count / steps) + 1
# loop through steps, appending subset to list for return
for step in range(0, steps):
# determine bounds
lower_bound = step * dist_chunk_size
upper_bound = (step + 1) * dist_chunk_size
# select subset
rdd_subset = zrdd.filter(lambda x: x[1] >= lower_bound and x[1] < upper_bound).map(lambda x: x[0])
subsets.append(rdd_subset)
# return
return subsets
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment