Skip to content

Instantly share code, notes, and snippets.

@lmyyao
Last active January 28, 2021 03:44
Show Gist options
  • Save lmyyao/60cefd034c89c77cfa4b10227a64bc85 to your computer and use it in GitHub Desktop.
Save lmyyao/60cefd034c89c77cfa4b10227a64bc85 to your computer and use it in GitHub Desktop.
pyarrow server/client tcp demo
import pyarrow as pa
import pyarrow.csv
import socket
def make_endpoint(ip_addr, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip_addr, port))
return sock
def make_remote_dataset(endpoint):
infile = endpoint.makefile(mode='rb')
reader = None
while True:
if reader is None:
reader = pa.RecordBatchStreamReader(infile)
try:
batch = reader.read_next_batch()
except StopIteration:
break
else:
print(batch)
infile.close()
endpoint.close()
import pyarrow as pa
import pyarrow.csv
import socket
def read_and_process(filename):
opts = pyarrow.csv.ReadOptions(use_threads=True, block_size=4096)
table = pyarrow.csv.read_csv(filename, opts)
for batch in table.to_batches():
yield batch
def serve_csv_data(ip_addr, port_num, filename):
"""
Create a socket and serve Arrow record batches as a stream read from the CVS files.
"""
# Create the socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind((ip_addr, port_num))
sock.listen(1)
# Serve forever, each client will get one iteration over data
while True:
conn, _ = sock.accept()
outfile = conn.makefile(mode='wb')
writer = None
try:
# Read directory and iterate over each batch in each file
batch_iter = read_and_process(filename)
for batch in batch_iter:
# Initialize the pyarrow writer on first batch
if writer is None:
writer = pa.RecordBatchStreamWriter(outfile, batch.schema)
# Write the batch to the client stream
writer.write_batch(batch)
# Cleanup client connection
finally:
if writer is not None:
writer.close()
outfile.close()
conn.close()
sock.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment