Skip to content

Instantly share code, notes, and snippets.

@miohtama
Created July 14, 2021 10:03
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save miohtama/ba470d1543f72950de0b4951bd50af89 to your computer and use it in GitHub Desktop.
Writing Parquet files incrementally from Python
with pq.ParquetWriter(
fname,
Candle.to_pyarrow_schema(small_candles),
compression='snappy',
allow_truncated_timestamps=True,
version='2.0', # Highest available schema
data_page_version='2.0', # Highest available schema
) as writer:
def reset_data():
nonlocal data
data = dict(
pair_id=[],
timestamp=[],
exchange_rate=[],
open=[],
close=[],
high=[],
low=[],
buys=[],
sells=[],
buy_volume=[],
sell_volume=[],
start_block=[],
end_block=[],
avg=[],
)
def writeout():
nonlocal data
duration = time.time() - stats["started"]
throughout = stats["candles_processed"] / duration
logger.info("Writing Parquet table for candle %s, throughput is %s", "{:,}".format(stats["candles_processed"]), throughout)
writer.write_table(
pa.Table.from_pydict(
data,
writer.schema
)
)
reset_data()
process = psutil.Process(os.getpid())
logger.info("Flushed %s writer, the memory usage is %s", bucket, process.memory_info())
# Use massive yield_per() or otherwise we are leaking memory
reset_data()
for item in query.yield_per(100_000):
frame = construct_frame(row_type, item)
for key, value in frame.items():
data[key].append(value)
stats["candles_processed"] += 1
# Do regular checkopoints to avoid out of memory
# and to log the progress to the console
# For fine tuning Parquet writer see
# https://issues.apache.org/jira/browse/ARROW-10052
if stats["candles_processed"] % 100_000 == 0:
writeout()
writeout()
return stats
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment