Skip to content

Instantly share code, notes, and snippets.

@alendit
Created April 17, 2018 08:00
Show Gist options
  • Save alendit/c6cdd1adaf7007786392731152d3b6b9 to your computer and use it in GitHub Desktop.
Save alendit/c6cdd1adaf7007786392731152d3b6b9 to your computer and use it in GitHub Desktop.
RecordBatch stream writing and reading
#include <arrow/buffer.h>
#include <arrow/builder.h>
#include <arrow/io/memory.h>
#include <arrow/ipc/reader.h>
#include <arrow/ipc/writer.h>
#include <arrow/memory_pool.h>
#include <arrow/record_batch.h>
#include <arrow/type.h>
#include <memory>
#include <iostream>
using namespace arrow;
using namespace std;
auto main(int argc, char* argv[]) -> int {
arrow::MemoryPool* pool = arrow::default_memory_pool();
std::shared_ptr<arrow::PoolBuffer> buffer(new arrow::PoolBuffer(pool));
arrow::Int64Builder builder(pool);
builder.Append(1);
std::shared_ptr<arrow::Array> array;
builder.Finish(&array);
std::vector<std::shared_ptr<arrow::Field>> schema_vector =
{arrow::field("id", arrow::int64())};
auto schema = std::make_shared<arrow::Schema>(schema_vector);
// Write
std::shared_ptr<arrow::RecordBatch> batchOut;
batchOut = arrow::RecordBatch::Make(schema, 1, {array});
std::unique_ptr<arrow::io::BufferOutputStream> stream;
stream.reset(new arrow::io::BufferOutputStream(buffer));
std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
// #1 - Segmentation fault (core dumped)
arrow::ipc::RecordBatchStreamWriter::Open(stream.get(), schema,
&writer);
// #2 - OK
//arrow::ipc::RecordBatchStreamWriter::Open(stream.get(), schema,&writer);
writer->WriteRecordBatch(*batchOut);
writer->Close();
stream->Close();
vector<std::shared_ptr<RecordBatch>> out_batches;
// Read
arrow::io::BufferReader buf_reader(buffer);
std::shared_ptr<RecordBatchReader> reader;
arrow::ipc::RecordBatchStreamReader::Open(&buf_reader, &reader);
std::shared_ptr<RecordBatch> chunk;
while (true) {
reader->ReadNext(&chunk);
if (chunk == nullptr) {
break;
}
out_batches.emplace_back(chunk);
}
cout << "Read " << out_batches.size() << endl;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment