Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Demonstrate strange batch size related problems.
# Demonstrate pyspark strange behaviour.
# Usage:
# spark-submit <spark-options> <input_file_specification> <batch_size> <result_file>
import pyspark
import json
import sys
def main():
# Argument parsing to get input files, batch_size and result_file.
input_files = sys.argv[1]
batch_size = int(sys.argv[2])
result_file = sys.argv[3]
# Create the context with the command-line specificed batch size.
# Note: 0 == automatic batch size, -1 == unlimited batch size.
sc = pyspark.SparkContext(batchSize=batch_size)
# Create an RDD with each line from the input files as an element.
text_lines = sc.textFile(input_files)
# Create an RDD that convert the JSON text lines into Python objects.
records =
# Now, persist the records
# Determine the number of records in each partition, and collect the
# result. This follows the pattern of the RDD.count method, but
# skips the final sum so there is some detail on exactly
records_per_partition = records.mapPartitions(lambda i: [sum(1 for _ in i)]).collect()
# Normally, we'd do other calculations on the input data here.
# And then unpersist.
# Stop the Spark context, we don't need it any more.
# Now create the output.
result = {
'input_files': input_files,
'batch_size': batch_size,
'result': {
'partitions_count': len(records_per_partition),
'record_count': sum(records_per_partition),
'records_per_partition': records_per_partition,
# Print the result on the console.
# And finally output to the result file.
with open(result_file, 'w') as f:
f.write(json.dumps(result, indent=2))
if __name__ == '__main__':
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.