Skip to content

Instantly share code, notes, and snippets.

@andrewgross
Last active May 25, 2018 18:35
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrewgross/2b64dde5b3eea14d6f1456a2a704406b to your computer and use it in GitHub Desktop.
Save andrewgross/2b64dde5b3eea14d6f1456a2a704406b to your computer and use it in GitHub Desktop.
PySpark scripts to predict the number of partitions needed to get good output file sizes (100-300MB for Parquet). Also a helper function to determine your average byte array size.
def get_files_per_partition(df, partition_key, file_type="parquet", compression="snappy", byte_array_size=256):
rows = df.count()
print "Dataset has {} rows".format(rows)
schema = df.schema
num_partitions = 1
if partition_key is not None:
num_partitions = df.select([partition_key]).distinct().count()
print "Dataset has {} distinct partition keys".format(num_partitions)
_df = df.drop(partition_key)
schema = _df.schema
return get_num_files(rows, num_partitions, schema, file_type=file_type, compression=compression, byte_array_size=byte_array_size)
def get_num_files(rows, num_partitions, schema, file_type="parquet", compression="snappy", byte_array_size=256):
record_size = _get_record_size(schema, file_type, byte_array_size)
print "Average Record Size is {} bits".format(record_size)
min_partitions_for_file_size = _get_min_partition_based_on_file_size(rows, record_size, num_partitions, compression)
return max(1, min_partitions_for_file_size)
def _get_record_size(schema, file_type, byte_array_size):
size_mapping = get_size_mapping(file_type, byte_array_size)
record_size = 0
for field in schema:
_type = field.dataType.typeName()
field_size = size_mapping[_type]
record_size = record_size + field_size
return record_size
def get_average_byte_array_size(schema):
"""
Calculate the average size of VARCHAR columns in a schema
"""
average_size = 0.0
string_fields = 0
total_size = 0.0
for field in schema.fields:
if str(field.dataType) == 'StringType':
size_string = field.metadata.get('HIVE_TYPE_STRING')
if size_string:
result = re.search(r'(?P<type>[^\(\)]+)\((?P<size>\d+)\)', size_string)
if result:
size = result.groupdict().get('size', 0)
print "Found field {} with size {}".format(field.name, size)
total_size = total_size + int(size)
string_fields = string_fields + 1
if string_fields > 0:
average_size = total_size / string_fields
return average_size
def _get_min_partition_based_on_file_size(rows, record_size, num_partitions, compression):
"""
Based on our data size, calculate how many partitions we need to get ~200MB output files
"""
compression_ratio = get_compression_ratio(compression)
avg_rows_per_partition = rows / num_partitions
avg_partition_size = record_size * avg_rows_per_partition
print "Average Partition Size is {} bits before compression".format(avg_partition_size)
# Convert to bytes
avg_partition_size = avg_partition_size / 8
print "Average Partition Size is {} bytes before compression".format(avg_partition_size)
# Convert bytes to MB
avg_partition_size = avg_partition_size / (1024 * 1024)
print "Average Partition Size is {} MB before compression".format(avg_partition_size)
avg_partition_size = avg_partition_size * compression_ratio
print "Average Partition Size is {} MB after compression".format(avg_partition_size)
# Convert to 200MB chunks
avg_partition_size = avg_partition_size / 200
return int(ceil(avg_partition_size))
def get_size_mapping(file_type, byte_array_size):
"""
Get a mapping of the file type to the size in bits of various field types
"""
if file_type == "parquet":
# Convert to Bits
string_size = byte_array_size * 8
PARQUET_SIZE_MAPPING = {
"integer": 32,
"long": 64,
"boolean": 1,
"float": 32,
"double": 64,
"decimal": 64,
"string": string_size,
"date": 32, # Assume no date64
"timestamp": 96, # Assume legacy timestamp
}
return PARQUET_SIZE_MAPPING
return {}
def get_compression_ratio(compression):
"""
Return a floating point scalar for the size after compression.
"""
if compression == "snappy":
return 0.4
return 1.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment