Skip to content

Instantly share code, notes, and snippets.

@SemanticBeeng
Last active May 31, 2018 17:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SemanticBeeng/0e9f9ed40866a5f844be13ab6ef3469e to your computer and use it in GitHub Desktop.
Save SemanticBeeng/0e9f9ed40866a5f844be13ab6ef3469e to your computer and use it in GitHub Desktop.
arrow panda marshalling
# https://arrow.apache.org/docs/python/memory.html
# https://arrow.apache.org/docs/python/generated/pyarrow.RecordBatch.html
# https://arrow.apache.org/docs/python/ipc.html
# https://github.com/apache/arrow/blob/master/python/pyarrow/tests/test_io.py
# https://github.com/apache/arrow/blob/master/python/pyarrow/serialization.py
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html
# https://stackoverflow.com/questions/46837472/converting-pandas-dataframe-to-structured-arrays
import pyarrow as pa
import pandas as pd
import numpy as np
# Generate `pandas` data frame
dates = pd.date_range('20130101', periods=6)
df = pd.DataFrame(np.random.randn(6,4), index=dates, columns=list('ABCD'))
# Write data frame in arrow format
bfo = pa.BufferOutputStream()
record_batch = pa.RecordBatch.from_pandas(df)
print(record_batch.schema)
stream_writer = pa.RecordBatchStreamWriter(bfo, record_batch.schema)
stream_writer.write_batch(record_batch)
data = bfo.get_result()
# Actual bytes
print(data.to_pybytes())
# Read data frame from bytes in `arrow` format
reader = pa.open_stream(data)
print(reader.schema)
batches = [b for b in reader]
df_out = pa.RecordBatch.to_pandas(batches[0])
# Comapre df with df_out
df.dtypes
df_out.dtypes
df
df_out
====
# Reading data written by
# val arrowPayloads = df.toArrowPayload.collect()
#
>>> data
<pyarrow.lib.BufferReader object at 0x7fcd06d9c4a8>
>>> reader = pa.RecordBatchFileReader(data)
>>> reader.read_all()
# woks
import pyarrow as pa
import pandas as pd
import numpy as np
fh = open("/tmp/payloadIterator.bin", 'rb')
data_bytes=fh.read()
fh.close()
reader=pa.RecordBatchFileReader(pa.BufferReader(data_bytes))
type(reader)
table=reader.read_all()
type(table)
table.num_rows
col1=table[0]
type(col1)
pt=table.to_pandas()
type(pt)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment