Skip to content

Instantly share code, notes, and snippets.

@wesm
Created February 10, 2017 18:07
Show Gist options
  • Save wesm/2108100781481d342fa129b648fdc4ae to your computer and use it in GitHub Desktop.
Save wesm/2108100781481d342fa129b648fdc4ae to your computer and use it in GitHub Desktop.
Parquet multithreaded benchmarks
import gc
import os
import time
import numpy as np
import pandas as pd
from pyarrow.compat import guid
import pyarrow as pa
import pyarrow.parquet as pq
import snappy
def generate_floats(n, pct_null, repeats=1):
nunique = int(n / repeats)
unique_values = np.random.randn(nunique)
num_nulls = int(nunique * pct_null)
null_indices = np.random.choice(nunique, size=num_nulls, replace=False)
unique_values[null_indices] = np.nan
return unique_values.repeat(repeats)
DATA_GENERATORS = {
'float64': generate_floats
}
def generate_data(nrows, ncols, pct_null=0.1, repeats=1, dtype='float64'):
type_ = np.dtype('float64')
datagen_func = DATA_GENERATORS[dtype]
data = {
'c' + str(i): datagen_func(nrows, pct_null, repeats)
for i in range(ncols)
}
return pd.DataFrame(data)
def write_to_parquet(df, out_path, use_dictionary=True,
compression='SNAPPY'):
arrow_table = pa.Table.from_pandas(df)
if compression.lower() == 'uncompressed':
compression = None
pq.write_table(arrow_table, out_path, use_dictionary=use_dictionary,
compression=compression)
def read_pyarrow(path, nthreads=1):
return pq.read_table(path, nthreads=nthreads).to_pandas()
def get_timing(f, path, niter):
start = time.clock_gettime(time.CLOCK_MONOTONIC)
for i in range(niter):
f(path)
elapsed = time.clock_gettime(time.CLOCK_MONOTONIC) - start
return elapsed
MEGABYTE = 1 << 20
DATA_SIZE = 1024 * MEGABYTE
NCOLS = 16
NROWS = DATA_SIZE / NCOLS / np.dtype('float64').itemsize
cases = {
'low_entropy_dict': {
'pct_null': 0.1,
'repeats': 1000,
'use_dictionary': True
},
'low_entropy': {
'pct_null': 0.1,
'repeats': 1000,
'use_dictionary': False
},
'high_entropy_dict': {
'pct_null': 0.1,
'repeats': 1,
'use_dictionary': True
},
'high_entropy': {
'pct_null': 0.1,
'repeats': 1,
'use_dictionary': False
}
}
NITER = 5
results = []
readers = [
('pyarrow', lambda path: read_pyarrow(path)),
('pyarrow 2 threads', lambda path: read_pyarrow(path, nthreads=2)),
('pyarrow 4 threads', lambda path: read_pyarrow(path, nthreads=4))
]
COMPRESSIONS = ['UNCOMPRESSED', 'SNAPPY'] # , 'GZIP']
case_files = {}
for case, params in cases.items():
for compression in COMPRESSIONS:
path = '{0}_{1}.parquet'.format(case, compression or 'UNCOMPRESSED')
df = generate_data(NROWS, NCOLS, repeats=params['repeats'])
write_to_parquet(df, path, compression=compression,
use_dictionary=params['use_dictionary'])
df = None
case_files[case, compression] = path
for case, params in cases.items():
for compression in COMPRESSIONS:
path = case_files[case, compression]
compression = compression if compression != 'UNCOMPRESSED' else None
# prime the file cache
read_pyarrow(path)
read_pyarrow(path)
for reader_name, f in readers:
elapsed = get_timing(f, path, NITER) / NITER
result = case, compression, reader_name, elapsed
print(result)
results.append(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment