Last active
January 28, 2021 03:44
-
-
Save lmyyao/60cefd034c89c77cfa4b10227a64bc85 to your computer and use it in GitHub Desktop.
pyarrow server/client tcp demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyarrow as pa | |
import pyarrow.csv | |
import socket | |
def make_endpoint(ip_addr, port): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.connect((ip_addr, port)) | |
return sock | |
def make_remote_dataset(endpoint): | |
infile = endpoint.makefile(mode='rb') | |
reader = None | |
while True: | |
if reader is None: | |
reader = pa.RecordBatchStreamReader(infile) | |
try: | |
batch = reader.read_next_batch() | |
except StopIteration: | |
break | |
else: | |
print(batch) | |
infile.close() | |
endpoint.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pyarrow as pa | |
import pyarrow.csv | |
import socket | |
def read_and_process(filename): | |
opts = pyarrow.csv.ReadOptions(use_threads=True, block_size=4096) | |
table = pyarrow.csv.read_csv(filename, opts) | |
for batch in table.to_batches(): | |
yield batch | |
def serve_csv_data(ip_addr, port_num, filename): | |
""" | |
Create a socket and serve Arrow record batches as a stream read from the CVS files. | |
""" | |
# Create the socket | |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
sock.bind((ip_addr, port_num)) | |
sock.listen(1) | |
# Serve forever, each client will get one iteration over data | |
while True: | |
conn, _ = sock.accept() | |
outfile = conn.makefile(mode='wb') | |
writer = None | |
try: | |
# Read directory and iterate over each batch in each file | |
batch_iter = read_and_process(filename) | |
for batch in batch_iter: | |
# Initialize the pyarrow writer on first batch | |
if writer is None: | |
writer = pa.RecordBatchStreamWriter(outfile, batch.schema) | |
# Write the batch to the client stream | |
writer.write_batch(batch) | |
# Cleanup client connection | |
finally: | |
if writer is not None: | |
writer.close() | |
outfile.close() | |
conn.close() | |
sock.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://blog.tensorflow.org/2019/08/tensorflow-with-apache-arrow-datasets.html