Skip to content

Instantly share code, notes, and snippets.

@NickCrews
Last active January 10, 2024 03:48
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save NickCrews/7a47ef4083160011e8e533531d73428c to your computer and use it in GitHub Desktop.
Save NickCrews/7a47ef4083160011e8e533531d73428c to your computer and use it in GitHub Desktop.
Coalesce parquet files
"""coalesce_parquets.py
gist of how to coalesce small row groups into larger row groups.
Solves the problem described in https://issues.apache.org/jira/browse/PARQUET-1115
"""
from __future__ import annotations
from pathlib import Path
from typing import Callable, Iterable, TypeVar
import pyarrow as pa
import pyarrow.parquet as pq
def stream_to_parquet(path: Path, tables: Iterable[pa.Table]) -> None:
try:
first = next(tables)
except StopIteration:
return
schema = first.schema
with pq.ParquetWriter(path, schema) as writer:
writer.write_table(first)
for table in tables:
table = table.cast(schema) # enforce schema
writer.write_table(table)
def stream_from_parquet(path: Path) -> Iterable[pa.Table]:
reader = pq.ParquetFile(path)
for batch in reader.iter_batches():
yield pa.Table.from_batches([batch])
def stream_from_parquets(paths: Iterable[Path]) -> Iterable[pa.Table]:
for path in paths:
yield from stream_from_parquet(path)
T = TypeVar("T")
def coalesce(
items: Iterable[T], max_size: int, sizer: Callable[[T], int] = len
) -> Iterable[list[T]]:
"""Coalesce items into chunks. Tries to maximize chunk size and not exceed max_size.
If an item is larger than max_size, we will always exceed max_size, so make a
best effort and place it in its own chunk.
You can supply a custom sizer function to determine the size of an item.
Default is len.
>>> list(coalesce([1, 2, 11, 4, 4, 1, 2], 10, lambda x: x))
[[1, 2], [11], [4, 4, 1], [2]]
"""
batch = []
current_size = 0
for item in items:
this_size = sizer(item)
if current_size + this_size > max_size:
yield batch
batch = []
current_size = 0
batch.append(item)
current_size += this_size
if batch:
yield batch
def coalesce_parquets(paths: Iterable[Path], outpath: Path, max_size: int = 2**20) -> None:
tables = stream_from_parquets(paths)
# Instead of coalescing using number of rows as your metric, you could
# use pa.Table.nbytes or something.
# table_groups = coalesce(tables, max_size, sizer=lambda t: t.nbytes)
table_groups = coalesce(tables, max_size)
coalesced_tables = (pa.concat_tables(group) for group in table_groups)
stream_to_parquet(outpath, coalesced_tables)
paths = Path("in_dir").glob("*.parquet")
coalesce_parquets(paths, "out.parquet")
@Dibyajyoti227
Copy link

Dibyajyoti227 commented Aug 25, 2022

Do you happen to have the main.py to call the coalesce_parquet? I am getting error

for path in directory.glob("*.parquet"):
AttributeError: 'str' object has no attribute 'glob'

@NickCrews
Copy link
Author

NickCrews commented Aug 26, 2022

@Dibyajyoti227 Ahh, my bad, my type annotations and variable names were misleading, it was requiring a pathlib.Path object. But try v3, it should now work with both strings and Paths.

@Dibyajyoti227
Copy link

Dibyajyoti227 commented Aug 26, 2022

@NickCrews , thank you for the fix. It works. However I am getting this error.
In the line:
def coalesce(items: Iterable[T], max_size: int, sizer: Callable[[T], int] = len) -> Iterable[list[T]]:,

items: Iterable[T], max_size: int, sizer: Callable[[T], int] = len) -> Iterable[list[T]]: TypeError: 'type' object is not subscriptable

Another one was,

schema = first.schema
AttributeError: 'list' object has no attribute 'schema'

@NickCrews
Copy link
Author

@Dibyajyoti227 The first is because you are using an older python version that evaluates type hints. See the fixed version with from __future__ import annotations. Second one was a mistake where I wasn't actually merging table chunks into a new table. See the new version.

@Dibyajyoti227
Copy link

@NickCrews , I am currently using python 3.7 as most of my work need this version. With the changes made by you, the code runs without any error but it doesn't generate any parquet file or doesn't write data to parquet file. My input scripts looks something like this,

indir = r"H:\Python_work\DATA\Parquet_files_2022-09-14_16-10-07"
outfile = r"H:\Python_work\DATA\total_Parquet_files_2022-09-14.parquet"
coalese_parquets(indir, outfile)

@NickCrews
Copy link
Author

NickCrews commented Aug 29, 2022

It works fine for me with a few test parquet files. I'm on 3.10, but I wouldn't think 3.7 would be a problem. I'd imagine it's something external to the script, such as

  • Are those paths correct? Test by opening one of the files in the input dir using eg pandas or pyarrow
  • Do the input parquets actually have >0 rows?
  • Some sort of permissions errors? Can you write that output parquet using pandas.DataFrame.to_parquet()?

Insert some debug print statements in there and see where something unexpected is happening.

@treatmesubj
Copy link

this is awesome, thank you

@VladimirPchelko
Copy link

What about performance and memory usage?

At first look - The method coalesce loads all data from parquet files into memory.
Also it make sense to filter files that do not make sense to coalesce.

@NickCrews
Copy link
Author

What about performance and memory usage?

At first look - The method coalesce loads all data from parquet files into memory.

coalesce() doesn't load ALL data into memory, it loads them until the batch size is over the threshold, then yields that batch and then deletes that batch. So it will hold up to ~max_size amount of data in memory at a time. At least that was the intention, perhaps there is a bug in there where I'm not actually freeing the memory. If you have profiled or have some evidence of this, please let me know.

Also it make sense to filter files that do not make sense to coalesce.

Not sure what you mean here?

@billy-doyle
Copy link

This is great!

Adding a small reproducible example of how this works below (the following code creates some directories)

import os
import pandas as pd
import pyarrow.parquet as pq

home = os.environ["HOME"]
in_path = os.path.join(home, "scratch", "to_coalesce")
out_path = os.path.join(home, "scratch", "coalesced")

os.makedirs(in_path, exist_ok=True)
os.makedirs(out_path, exist_ok=True)

df = pd.read_csv('https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv')

for i in range(10):
    dfcopy = df.copy()
    dfcopy["file"] = i
    dfcopy.to_parquet(os.path.join(in_path, f"iris_{i}.parquet"))
    
coalese_parquets(in_path, outpath=os.path.join(out_path, "coalesce.parquet"))
print(pq.ParquetFile(os.path.join(out_path, "coalesce.parquet")).metadata)
# <pyarrow._parquet.FileMetaData object at 0x1575f9d60>
#   created_by: parquet-cpp-arrow version 7.0.0
#   num_columns: 6
#   num_rows: 1500
#   num_row_groups: 1
#   format_version: 1.0
#   serialized_size: 3857

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment