Last active
November 30, 2017 19:25
-
-
Save ghukill/8c8d97fd9d18042b1df8bcea26258b0e to your computer and use it in GitHub Desktop.
RDD subsets with zipWithIndex()
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def rdd_subset(rdd, chunk_size_limit=10000): | |
''' | |
Small method to create subsets of a pyspark RDD. | |
Achieved by zipping the input RDD with .zipwithIndex(), | |
accepting a chunk size not to exceed, and returning lazily evaluated | |
RDDs with nearly evenly distributed subsets. | |
Note: This can be quite inefficient, as each time an RDD is used from the | |
resulting subsets, the original RDD must be looked over again. 100,000 rows, 10k | |
chunk sizes, look at 10-11 quick counts of the originating RDD. | |
However, this can allow for more memory efficient processing, e.g. | |
saveAsNewAPIHadoopFile, which uses a scala SerDeUtil that can wreak havoc | |
with spark driver heap size. Using this, one can pass subsets of an RDD to a | |
method like that, that you might not have much control over in the context of the | |
pipeline. | |
Args: | |
rdd (pyspark rdd): input RDD that will be split by appended ID at various chunks | |
chunk_size_limit (int): number of RDD rows to not exceed per chunk (specifically aimed | |
at limiting chunk size for memory intensive operations) | |
Returns: | |
(list): List of RDDs with subset of original RDD | |
''' | |
# empty list for subsets | |
subsets = [] | |
# zip with index | |
zrdd = rdd.zipWithIndex() | |
# get count | |
count = zrdd.count() | |
# determine number of chunks | |
steps = count / chunk_size_limit | |
if steps % 1 != 0: | |
steps = int(steps) + 1 | |
# evenly distribute chunks, while not exceeding chunk_limit | |
dist_chunk_size = int(count / steps) + 1 | |
# loop through steps, appending subset to list for return | |
for step in range(0, steps): | |
# determine bounds | |
lower_bound = step * dist_chunk_size | |
upper_bound = (step + 1) * dist_chunk_size | |
# select subset | |
rdd_subset = zrdd.filter(lambda x: x[1] >= lower_bound and x[1] < upper_bound).map(lambda x: x[0]) | |
subsets.append(rdd_subset) | |
# return | |
return subsets |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment