Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Demonstrate strange batch size related problems.
# Demonstrate pyspark strange behaviour.
#
# Usage:
#
# spark-submit <spark-options> pyspark_batch_size.py <input_file_specification> <batch_size> <result_file>
#
import pyspark
import json
import sys
def main():
# Argument parsing to get input files, batch_size and result_file.
input_files = sys.argv[1]
batch_size = int(sys.argv[2])
result_file = sys.argv[3]
# Create the context with the command-line specificed batch size.
# Note: 0 == automatic batch size, -1 == unlimited batch size.
sc = pyspark.SparkContext(batchSize=batch_size)
# Create an RDD with each line from the input files as an element.
text_lines = sc.textFile(input_files)
# Create an RDD that convert the JSON text lines into Python objects.
records = text_lines.map(json.loads)
# Now, persist the records
records.persist()
# Determine the number of records in each partition, and collect the
# result. This follows the pattern of the RDD.count method, but
# skips the final sum so there is some detail on exactly
records_per_partition = records.mapPartitions(lambda i: [sum(1 for _ in i)]).collect()
# Normally, we'd do other calculations on the input data here.
# And then unpersist.
records.unpersist()
# Stop the Spark context, we don't need it any more.
sc.stop()
# Now create the output.
result = {
'input_files': input_files,
'batch_size': batch_size,
'result': {
'partitions_count': len(records_per_partition),
'record_count': sum(records_per_partition),
'records_per_partition': records_per_partition,
}
}
# Print the result on the console.
print(result)
# And finally output to the result file.
with open(result_file, 'w') as f:
f.write(json.dumps(result, indent=2))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.