Skip to content

Instantly share code, notes, and snippets.

@oscarknagg
Last active January 20, 2022 17:56
Show Gist options
  • Save oscarknagg/151432d56614ec431048b9cde540cb13 to your computer and use it in GitHub Desktop.
Save oscarknagg/151432d56614ec431048b9cde540cb13 to your computer and use it in GitHub Desktop.
import os.path
import psutil
import pyarrow as pa
import numpy as np
from pyarrow import parquet as pq
import time
WINDOW_LENGTH = 1000
N = 1000000
_1_MB = 1024 ** 2
def setup():
timeseries = np.arange(1 * int(10 ** 9), dtype="float32")
# Add a NaN
timeseries[len(timeseries) // 2] = np.nan
# Write as Arrow
table = pa.Table.from_arrays(arrays=[timeseries], names=["timeseries"])
with pa.OSFile('timeseries.arrow', 'wb') as sink:
with pa.RecordBatchFileWriter(sink, table.schema) as writer:
writer.write_table(table)
# Write as parquet to compare file sizes
pq.write_table(table, "timeseries.parquet")
def stream_from_arrow(file):
memory0 = psutil.Process().memory_info()
t0 = time.time()
source = pa.memory_map(file, 'r')
table_parrow = pa.ipc.RecordBatchFileReader(source).read_all()
memory1 = psutil.Process().memory_info()
t1 = time.time()
for i in range(N):
window = table_parrow["timeseries"][i:i + WINDOW_LENGTH].to_numpy()
yield window
t2 = time.time()
print("Yielded {} windows from Arrow file in {:.3f}s ({:.3f}s including I/O)".format(
N,
t2 - t1,
t2-t0
))
print("Used {:.1f}MB of additional memory".format(
(memory1.rss - memory0.rss) / _1_MB
))
print()
def stream_from_parquet(file):
memory0 = psutil.Process().memory_info()
t0 = time.time()
timeseries = pq.read_table(file)["timeseries"].to_numpy()
memory1 = psutil.Process().memory_info()
t1 = time.time()
for i in range(N):
window = timeseries[i:i+WINDOW_LENGTH]
yield window
t2 = time.time()
print("Yielded {} windows from Parquet file in {:.3f}s ({:.3f}s including I/O)".format(
N,
t2 - t1,
t2 - t0
))
print("Used {:.1f}MB of additional memory".format(
(memory1.rss - memory0.rss) / _1_MB
))
print()
def main():
setup()
for window in stream_from_arrow("timeseries.arrow"):
pass
for window in stream_from_parquet("timeseries.parquet"):
pass
print("timeseries.arrow: {:.1f}MB, timeseries.parquet: {:.1f}MB".format(
os.path.getsize("timeseries.arrow") / _1_MB,
os.path.getsize("timeseries.parquet") / _1_MB,
))
if __name__ == '__main__':
"""This script compares the speed of loading subsections of a
timeseries from disk when stored in either Arrow or Parquet
format.
Expected output:
Yielded 1000000 windows from Arrow file in 3.832s (3.832s including I/O)
Used 0.0MB of additional memory
Yielded 1000000 windows from Parquet file in 0.169s (5.374s including I/O)
Used 7768.2MB of additional memory
timeseries.arrow: 3814.7MB, timeseries.parquet: 457.4MB
"""
try:
main()
finally:
os.remove("timeseries.arrow")
os.remove("timeseries.parquet")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment