Created
September 30, 2024 01:15
-
-
Save ylow/db38522fb0ca69bdf1065237222b4d1c to your computer and use it in GitHub Desktop.
Content Defined Parquet Writing Prototype
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import pyarrow as pa | |
import pyarrow.parquet as pq | |
min_row_count = 512 | |
max_row_count = 2048 | |
def write_parquet_content_defined(df: pd.DataFrame, key_column: str, output_file: str): | |
# Initialize the Parquet writer object | |
writer = None | |
batch_accumulator = [] | |
try: | |
for idx, row in df.iterrows(): | |
# Append the current row to the batch accumulator | |
batch_accumulator.append(row) | |
# Check if the hash of the key column % 1024 == 0 | |
if (len(batch_accumulator) >= min_row_count and | |
hash(row[key_column]) % 1024 == 0) or \ | |
len(batch_accumulator) >= max_row_count: | |
# Convert the accumulated rows into a DataFrame | |
batch_df = pd.DataFrame(batch_accumulator) | |
# Convert the DataFrame to a PyArrow Table | |
table = pa.Table.from_pandas(batch_df) | |
# Initialize the writer if it's the first time | |
if writer is None: | |
writer = pq.ParquetWriter(output_file, table.schema) | |
# Write the batch as a Parquet table | |
writer.write_table(table) | |
# Clear the accumulator for the next batch | |
batch_accumulator = [] | |
# Write any remaining rows in the accumulator after the loop | |
if batch_accumulator: | |
batch_df = pd.DataFrame(batch_accumulator) | |
table = pa.Table.from_pandas(batch_df) | |
if writer is None: | |
writer = pq.ParquetWriter(output_file, table.schema) | |
writer.write_table(table) | |
finally: | |
# Close the writer to finish the file | |
if writer is not None: | |
writer.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment