Skip to content

Instantly share code, notes, and snippets.

@cmpute
Last active December 2, 2020 15:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cmpute/eaee40db68233612b8493f3b7985b67f to your computer and use it in GitHub Desktop.
Save cmpute/eaee40db68233612b8493f3b7985b67f to your computer and use it in GitHub Desktop.
Simplest IPC service for large data in Python
import numpy as np
import msgpack
import msgpack_numpy
msgpack_numpy.patch()
import socket, time, os
from socket_helpers import recv_msg, send_msg
sout = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM)
sout.connect("/tmp/ros-ipc-data.sock")
sin = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM)
sin.bind("/tmp/ros-ipc-result.sock")
try:
while True:
arr = np.random.rand(10000, 4)
arr = msgpack.packb({b"data":arr})
print("Send Time:", time.time())
send_msg(sout, arr)
result = recv_msg(sin)
print("Mean Result:", msgpack.unpackb(result)[b'mean'])
time.sleep(1)
finally:
sin.close()
sout.close()
os.remove("/tmp/ros-ipc-result.sock")
import numpy as np
import msgpack
import msgpack_numpy
msgpack_numpy.patch()
import socket, time, os
from socket_helpers import recv_msg, send_msg
sin = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM)
sin.bind("/tmp/ros-ipc-data.sock")
sout = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM)
sout_connected = False
try:
while True:
arr = recv_msg(sin)
print("Recv Time:", time.time())
arr = msgpack.unpackb(arr)[b'data']
if not sout_connected:
sout.connect("/tmp/ros-ipc-result.sock")
send_msg(sout, msgpack.packb({b'mean': np.mean(arr)}))
finally:
sin.close()
sout.close()
os.remove("/tmp/ros-ipc-data.sock")
import asyncio
import numpy as np
import msgpack
import msgpack_numpy
msgpack_numpy.patch()
import socket, time, os
from socket_helpers_async import recv_msg, send_msg
sin = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM)
sin.bind("/tmp/ros-ipc-data.sock")
sout = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_DGRAM)
sout_connected = False
async def handle_data():
try:
while True:
arr = await recv_msg(loop, sin)
print("Recv Time:", time.time())
arr = msgpack.unpackb(arr)[b'data']
if not sout_connected:
sout.connect("/tmp/ros-ipc-result.sock")
await send_msg(loop, sout, msgpack.packb({b'mean': np.mean(arr)}))
finally:
sin.close()
sout.close()
os.remove("/tmp/ros-ipc-data.sock")
loop = asyncio.get_event_loop()
loop.run_until_complete(handle_data())
# Helper Functions for sending large data through sockets
# https://stackoverflow.com/questions/17667903/python-socket-receive-large-amount-of-data
import struct
PACK_SIZE = 4096
def send_msg(sock, msg):
# Prefix each message with a 4-byte length (network byte order)
dlen = len(msg)
dptr = 0
sock.send(struct.pack('>I', dlen))
while dptr < dlen:
doff = dptr + PACK_SIZE
sock.sendall(msg[dptr:doff])
dptr = doff
def recvall(sock, n):
# Helper function to recv n bytes or return None if EOF is hit
data = bytearray()
while len(data) < n:
packet = sock.recv(n - len(data))
if not packet:
return None
data.extend(packet)
return data
def recv_msg(sock):
raw_msglen = recvall(sock, 4)
if not raw_msglen:
return None
msglen = struct.unpack('>I', raw_msglen)[0]
# Read the message data
return recvall(sock, msglen)
# Helper Functions for sending large data through sockets
# https://stackoverflow.com/questions/17667903/python-socket-receive-large-amount-of-data
import struct
PACK_SIZE = 4096
async def send_msg(loop, sock, msg):
# Prefix each message with a 4-byte length (network byte order)
dlen = len(msg)
dptr = 0
sock.send(struct.pack('>I', dlen))
while dptr < dlen:
doff = dptr + PACK_SIZE
await loop.sock_sendall(sock, msg[dptr:doff])
dptr = doff
async def recvall(loop, sock, n):
# Helper function to recv n bytes or return None if EOF is hit
data = bytearray()
while len(data) < n:
packet = await loop.sock_recv(sock, n - len(data))
if not packet:
return None
data.extend(packet)
return data
async def recv_msg(loop, sock):
raw_msglen = await recvall(loop, sock, 4)
if not raw_msglen:
return None
msglen = struct.unpack('>I', raw_msglen)[0]
# Read the message data
return await recvall(loop, sock, msglen)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment