Skip to content

Instantly share code, notes, and snippets.

@BryanCutler
Created November 8, 2016 23:18
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BryanCutler/930ecd1de1c6d9484931505dcb6bb321 to your computer and use it in GitHub Desktop.
Save BryanCutler/930ecd1de1c6d9484931505dcb6bb321 to your computer and use it in GitHub Desktop.
import socket
import struct
import sys
from pyarrow.ipc import ArrowFileReader
def _read_int(stream):
length = stream.read(4)
if not length:
raise EOFError
return struct.unpack("!i", length)[0]
def _read_with_length(stream):
length = _read_int(stream)
obj = stream.read(length)
if len(obj) < length:
raise EOFError
return obj
def _read_record_batch(obj):
reader = ArrowFileReader(obj)
print("ArrowFileReader, num batches %d" % reader.num_record_batches)
return reader.get_record_batch(0)
if __name__ == '__main__':
if len(sys.argv) != 2:
print("Usage: arrow_client_reader.py <PORT>")
port = int(sys.argv[1])
sock = None
# Support for both IPv4 and IPv6.
# On most of IPv6-ready systems, IPv6 will take precedence.
for res in socket.getaddrinfo("localhost", port, socket.AF_UNSPEC, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
sock = socket.socket(af, socktype, proto)
try:
sock.connect(sa)
except socket.error:
sock.close()
sock = None
continue
break
if not sock:
raise Exception("could not open socket on port %d" % port)
try:
rf = sock.makefile("rb", 65536)
obj = _read_with_length(rf)
batch = _read_record_batch(obj)
finally:
sock.close()
import io.netty.buffer.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.file.ArrowWriter;
import org.apache.arrow.vector.schema.ArrowFieldNode;
import org.apache.arrow.vector.schema.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Schema;
import static java.util.Arrays.asList;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.channels.Channels;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
public class ArrowJavaToPython {
public final static int PORT = 8080;
static ArrowBuf buf(byte[] bytes, BufferAllocator allocator) {
ArrowBuf buffer = allocator.buffer(bytes.length);
buffer.writeBytes(bytes);
return buffer;
}
public static void writeBatchToStream(OutputStream out, BufferAllocator allocator) throws IOException {
Schema schema = new Schema(asList(new Field("testField", true, new ArrowType.Int(8, true), Collections.<Field>emptyList())));
byte[] validity = new byte[] { (byte)255, 0};
// second half is "undefined"
byte[] values = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
try (ArrowWriter writer = new ArrowWriter(Channels.newChannel(out), schema)) {
ArrowBuf validityb = buf(validity, allocator);
ArrowBuf valuesb = buf(values, allocator);
writer.writeRecordBatch(new ArrowRecordBatch(16, asList(new ArrowFieldNode(16, 8)), asList(validityb, valuesb)));
writer.close();
}
}
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(1);
Thread serverThread = new Thread(new Runnable() {
@Override
public void run() {
try {
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
// Listen on port
ServerSocket serverSocket = new ServerSocket(PORT);
System.out.println("Waiting for clients to connect...");
latch.countDown();
Socket clientSocket = serverSocket.accept();
System.out.println("... Client Connected");
// Write record batch to ByteArray
ByteArrayOutputStream arr = new ByteArrayOutputStream();
writeBatchToStream(arr, allocator);
// Send framed-ByteArray over socket
DataOutputStream dataOut = new DataOutputStream(new BufferedOutputStream(clientSocket.getOutputStream()));
dataOut.writeInt(arr.size());
dataOut.write(arr.toByteArray());
dataOut.close();
} catch (IOException e) {
System.err.println("Unable to process client request");
}
}
});
serverThread.start();
Thread pyThread = new Thread(new Runnable() {
@Override
public void run() {
ProcessBuilder builder = new ProcessBuilder("/usr/bin/python", "arrow_client_reader.py", Integer.toString(PORT));
builder.environment().putAll(System.getenv());
Process process = null;
try {
builder.inheritIO();
latch.await();
System.out.println("Starting python process");
process = builder.start();
int exitCode = process.waitFor();
if (exitCode != 0) {
throw new RuntimeException("failed with exit code: " + exitCode);
}
} catch (Exception e) {
System.err.println("Python process exception: " + e);
} finally {
if (process != null) {
process.destroy();
}
}
}
});
pyThread.start();
try {
pyThread.join();
serverThread.join();
} catch (InterruptedException e) {}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment