Skip to content

Instantly share code, notes, and snippets.

@mndrake
Created May 27, 2018 13:20
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save mndrake/2adf4a037ceccba87a70f4a24d432017 to your computer and use it in GitHub Desktop.
Save mndrake/2adf4a037ceccba87a70f4a24d432017 to your computer and use it in GitHub Desktop.
read/write to split parquet files
import os
from io import BytesIO
import pyarrow as pa
import pyarrow.parquet as pq
kilobytes = 1024
megabytes = kilobytes * 1000
chunksize = int(10 * megabytes)
def write_split_parquet(df, todir, chunksize=chunksize, compression='GZIP'):
# initialize output directory
if not os.path.exists(todir):
os.mkdir(todir)
else:
for file in os.listdir(todir):
os.remove(os.path.join(todir, file))
# create parquet in-memory stream from dataframe
table = pa.Table.from_pandas(df) # pyarrow table
stream = BytesIO()
pq.write_table(table, stream, compression=compression)
stream.seek(0) # reset stream
# write chunks to files
i = 0
while True:
chunk = stream.read(chunksize)
if not chunk:
break
i += 1
filename = os.path.join(todir, ('part%04d' % i))
with open(filename, 'wb') as f:
f.write(chunk)
stream.close()
assert i <= 9999 # join sort fails if 5 digits
return i
def read_split_parquet(fromdir):
with BytesIO() as s:
for file in os.listdir(fromdir):
with open(os.path.join(fromdir, file), 'rb') as f:
s.write(f.read())
table = pq.read_table(s)
df = table.to_pandas()
return df
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment