Skip to content

Instantly share code, notes, and snippets.

@ebongzzang
Created December 12, 2018 08:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ebongzzang/208df437b543003c1fba0e278d096127 to your computer and use it in GitHub Desktop.
Save ebongzzang/208df437b543003c1fba0e278d096127 to your computer and use it in GitHub Desktop.
pyarrow insert_df_to_plasma
import numpy as np
import pandas as pd
import pyarrow as pa
from pyarrow import plasma
def insert_df_to_plasma(client, df: pd.DataFrame, key=plasma.ObjectID(np.random.bytes(20))) -> plasma.ObjectID:
record_batch = pa.RecordBatch.from_pandas(df)
object_id = key
mock_sink = pa.MockOutputStream()
stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema)
stream_writer.write_batch(record_batch)
stream_writer.close()
data_size = mock_sink.size()
buf = client.create(object_id, data_size)
stream = pa.FixedSizeBufferWriter(buf)
stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema)
stream_writer.write_batch(record_batch)
stream_writer.close()
client.seal(object_id)
return object_id
def df_to_dict(df: pd.DataFrame):
for row in df.values:
yield [str(v) for k, v in zip(df.columns, np.atleast_1d(row))]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment