Created
July 14, 2021 10:03
-
-
Save miohtama/ba470d1543f72950de0b4951bd50af89 to your computer and use it in GitHub Desktop.
Writing Parquet files incrementally from Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
with pq.ParquetWriter( | |
fname, | |
Candle.to_pyarrow_schema(small_candles), | |
compression='snappy', | |
allow_truncated_timestamps=True, | |
version='2.0', # Highest available schema | |
data_page_version='2.0', # Highest available schema | |
) as writer: | |
def reset_data(): | |
nonlocal data | |
data = dict( | |
pair_id=[], | |
timestamp=[], | |
exchange_rate=[], | |
open=[], | |
close=[], | |
high=[], | |
low=[], | |
buys=[], | |
sells=[], | |
buy_volume=[], | |
sell_volume=[], | |
start_block=[], | |
end_block=[], | |
avg=[], | |
) | |
def writeout(): | |
nonlocal data | |
duration = time.time() - stats["started"] | |
throughout = stats["candles_processed"] / duration | |
logger.info("Writing Parquet table for candle %s, throughput is %s", "{:,}".format(stats["candles_processed"]), throughout) | |
writer.write_table( | |
pa.Table.from_pydict( | |
data, | |
writer.schema | |
) | |
) | |
reset_data() | |
process = psutil.Process(os.getpid()) | |
logger.info("Flushed %s writer, the memory usage is %s", bucket, process.memory_info()) | |
# Use massive yield_per() or otherwise we are leaking memory | |
reset_data() | |
for item in query.yield_per(100_000): | |
frame = construct_frame(row_type, item) | |
for key, value in frame.items(): | |
data[key].append(value) | |
stats["candles_processed"] += 1 | |
# Do regular checkopoints to avoid out of memory | |
# and to log the progress to the console | |
# For fine tuning Parquet writer see | |
# https://issues.apache.org/jira/browse/ARROW-10052 | |
if stats["candles_processed"] % 100_000 == 0: | |
writeout() | |
writeout() | |
return stats |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment