Skip to content

Instantly share code, notes, and snippets.

@bloodearnest
Created March 22, 2022 09:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bloodearnest/f77fb30ad44880deae85c478161d2163 to your computer and use it in GitHub Desktop.
Save bloodearnest/f77fb30ad44880deae85c478161d2163 to your computer and use it in GitHub Desktop.
streaming feather files
import time
import sys
import itertools
import os
import numpy as np
import pandas as pd
import pyarrow as pa
import psutil
nrows = 512**2
ncols = 32
nbatches = int(sys.argv[1])
batch_size = nrows // nbatches
print(f"{nbatches} batches of {batch_size} rows")
process = psutil.Process(os.getpid())
previous_mem = 0
def pmem(msg):
global previous_mem
current_mem = process.memory_info().rss // 1024
print(f"{msg}: {current_mem - previous_mem:+d}kb")
previous_mem = current_mem
def generate_rows():
for i in range(nrows):
yield {f"col{j}": np.random.randn() for j in range(ncols)}
def chunked_iterable(iterable, size):
it = iter(iterable)
while True:
chunk = tuple(itertools.islice(it, size))
if not chunk:
break
yield chunk
def write(rows, schema):
batches = chunked_iterable(rows, batch_size)
options = pa.ipc.IpcWriteOptions(compression='zstd', use_threads=True)
with pa.OSFile('test.feather', 'wb') as sink:
with pa.ipc.new_file(sink, schema, options=options) as writer:
for i, batch in enumerate(batches):
batch_data = list(batch)
b = pa.RecordBatch.from_pylist(batch_data, schema)
writer.write(b)
pmem(f"{i} {len(batch_data)}")
previous_mem = process.memory_info().rss // 1024
schema = pa.schema([pa.field(f"col{i}", pa.float64()) for i in range(ncols)])
rows = generate_rows()
write(rows, schema)
df = pd.read_feather('test.feather')
pmem("post-read")
@JosephHodes
Copy link

does this allow u to progressively write to feather files?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment