Skip to content

Instantly share code, notes, and snippets.

@setop
Forked from wesm/parquet-benchmark-20170210.py
Last active July 8, 2022 15:51
Show Gist options
  • Save setop/fc69c99074bd97d7bcc5665e5a84b62c to your computer and use it in GitHub Desktop.
Save setop/fc69c99074bd97d7bcc5665e5a84b62c to your computer and use it in GitHub Desktop.
Parquet multithreaded benchmarks
import gc
import os
import time
import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import snappy
def generate_floats(n, pct_null, repeats=1):
nunique = int(n / repeats)
unique_values = np.random.randn(nunique)
num_nulls = int(nunique * pct_null)
null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
unique_values[null_indices] = np.nan
return unique_values.repeat(repeats)
DATA_GENERATORS = {
'float64': generate_floats
}
def generate_data(nrows, ncols, pct_null=0.1, repeats=1, dtype='float64'):
type_ = np.dtype('float64')
datagen_func = DATA_GENERATORS[dtype]
data = {
'c' + str(i): datagen_func(nrows, pct_null, repeats)
for i in range(ncols)
}
return pd.DataFrame(data)
def write_to_parquet(df, out_path, use_dictionary=True,
compression='SNAPPY'):
arrow_table = pa.Table.from_pandas(df)
if compression.lower() == 'uncompressed':
compression = None
pq.write_table(arrow_table, out_path, use_dictionary=use_dictionary,
compression=compression)
def read_pyarrow(path, nthreads=1):
return pq.read_table(path, use_threads=nthreads>1).to_pandas()
def get_timing(f, path, niter):
start = time.clock_gettime(time.CLOCK_MONOTONIC)
for i in range(niter):
f(path)
elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
return elapsed
MEGABYTE = 1 << 20
DATA_SIZE = 128 * MEGABYTE
NCOLS = 8
NROWS = DATA_SIZE / NCOLS / np.dtype('float64').itemsize
cases = {
'low_entropy_dict': {
'pct_null': 0.1,
'repeats': 1000,
'use_dictionary': True
},
'low_entropy': {
'pct_null': 0.1,
'repeats': 1000,
'use_dictionary': False
},
'high_entropy_dict': {
'pct_null': 0.1,
'repeats': 1,
'use_dictionary': True
},
'high_entropy': {
'pct_null': 0.1,
'repeats': 1,
'use_dictionary': False
}
}
NITER = 3
readers = [
('pyarrow', lambda path: read_pyarrow(path)),
('pyarrow 2 threads', lambda path: read_pyarrow(path, nthreads=2)),
#('pyarrow 4 threads', lambda path: read_pyarrow(path, nthreads=4))
# there is no way to control the number of threads in the new [API](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html#pyarrow-parquet-read-table)
]
COMPRESSIONS = ['UNCOMPRESSED', 'SNAPPY'] # , 'GZIP']
case_files = {}
for case, params in cases.items():
for compression in COMPRESSIONS:
path = '{0}_{1}.parquet'.format(case, compression or 'UNCOMPRESSED')
if not os.path.exists(path): # do not recreate files if existing, this is far too long
df = generate_data(NROWS, NCOLS, repeats=params['repeats'])
write_to_parquet(df, path, compression=compression,
use_dictionary=params['use_dictionary'])
df = None
case_files[case, compression] = path
for case, params in cases.items():
for compression in COMPRESSIONS:
path = case_files[case, compression]
compression = compression if compression != 'UNCOMPRESSED' else None
# prime the file cache
read_pyarrow(path)
read_pyarrow(path)
for reader_name, f in readers:
elapsed = get_timing(f, path, NITER) / NITER
result = case, compression, reader_name, elapsed
print(*result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment