Skip to content

Instantly share code, notes, and snippets.

@pitrou

pitrou/ipc.py Secret

Created September 10, 2017 18:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pitrou/d809618359915967ffc44b1ecfc2d2ad to your computer and use it in GitHub Desktop.
Save pitrou/d809618359915967ffc44b1ecfc2d2ad to your computer and use it in GitHub Desktop.
import fcntl
from functools import partial
import hashlib
import gc
import multiprocessing
import os
from queue import Queue
import socket
import select
import selectors
import threading
import numpy as np
def get_ip(host='8.8.8.8', port=80):
try:
return [(s.connect((host, port)), s.getsockname()[0], s.close())
for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]
except OSError:
return '127.0.0.1'
def run_transfer(make_endpoints, nbytes=200 * 1024**2):
import time
#clock = time.monotonic
clock = time.process_time
io_chunk = 1 * 300 * 1024
datum = b'x' * io_chunk
r, w = make_endpoints(nonblock=True)
rfd = r.fileno()
wfd = w.fileno()
def transfer_loop():
nread = 0
nwritten = 0
sel = selectors.DefaultSelector()
sel.register(rfd, selectors.EVENT_READ)
sel.register(wfd, selectors.EVENT_WRITE)
while nwritten < nbytes and nread < nbytes:
for key, _ in sel.select():
fd = key.fd
if fd == rfd:
n = len(os.read(rfd, io_chunk))
nread += n
if fd == wfd:
if nbytes - nwritten < len(datum):
n = os.write(wfd, datum[:nbytes - nwritten])
else:
n = os.write(wfd, datum)
nwritten += n
runtimes = []
gc.collect()
gc.disable()
time.sleep(0.2)
for i in range(8):
#time.sleep(0.2)
t1 = clock()
transfer_loop()
t2 = clock()
runtimes.append(t2 - t1)
gc.enable()
def mbps(duration):
return nbytes / duration / (1024 ** 2)
print("max speed: %d MB/s, avg speed: %d MB/s"
% (mbps(min(runtimes)), mbps(np.mean(runtimes)))
)
def _read_loop(rfd, nbytes, io_chunk):
remaining = nbytes
while remaining > 0:
n = len(os.read(rfd, min(remaining, io_chunk)))
remaining -= n
def _write_loop(wfd, nbytes, datum):
remaining = nbytes
m = memoryview(datum)
while remaining > 0:
if remaining < len(datum):
n = os.write(wfd, m[:remaining])
else:
n = os.write(wfd, m)
remaining -= n
def run_transfer(make_endpoints, nbytes=100 * 1024**2):
import time
clock = time.monotonic
#clock = time.process_time
io_chunk = 1 * 500 * 1024
datum = b'x' * io_chunk
r, w = make_endpoints(nonblock=False)
rfd = r.fileno()
wfd = w.fileno()
pool = multiprocessing.Pool(2)
read_loop = partial(_read_loop, rfd, nbytes, io_chunk)
write_loop = partial(_write_loop, wfd, nbytes, datum)
def transfer():
if 1:
r1 = pool.apply_async(read_loop)
r2 = pool.apply_async(write_loop)
r1.wait()
r2.wait()
r1.get()
r2.get()
else:
p1 = multiprocessing.Process(target=read_loop)
p2 = multiprocessing.Process(target=write_loop)
p1.start()
p2.start()
p1.join()
p2.join()
runtimes = []
gc.collect()
gc.disable()
time.sleep(0.2)
for i in range(10):
#time.sleep(0.2)
t1 = clock()
transfer()
t2 = clock()
runtimes.append(t2 - t1)
gc.enable()
def mbps(duration):
return nbytes / duration / (1024 ** 2)
print("max speed: %d MB/s, avg speed: %d MB/s"
% (mbps(min(runtimes)), mbps(np.mean(runtimes)))
)
def _make_fd_non_blocking(fd):
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
def _make_inet_sock(addr, nonblock):
serv = socket.socket(socket.AF_INET)
serv.bind((addr, 0))
serv.listen(5)
accepted = Queue()
def accept():
conn, _ = serv.accept()
accepted.put(conn)
serv.close()
t = threading.Thread(target=accept)
t.start()
a = socket.create_connection(serv.getsockname())
b = accepted.get()
t.join()
#print(a.getsockname(), a.getpeername())
#print(b.getsockname(), b.getpeername())
if nonblock:
a.setblocking(False)
b.setblocking(False)
return a, b
def make_net_sock(nonblock):
return _make_inet_sock(get_ip(), nonblock)
def make_localhost_sock(nonblock):
return _make_inet_sock("127.0.0.1", nonblock)
def make_unix_sock(nonblock):
r, w = socket.socketpair(socket.AF_UNIX)
if nonblock:
r.setblocking(False)
w.setblocking(False)
return r, w
def make_anon_pipe(nonblock):
r, w = os.pipe()
r = os.fdopen(r, "rb")
w = os.fdopen(w, "wb")
if nonblock:
_make_fd_non_blocking(r.fileno())
_make_fd_non_blocking(w.fileno())
return r, w
for i in range(2):
print()
print("pipe")
run_transfer(make_anon_pipe)
print("AF_UNIX")
run_transfer(make_unix_sock)
print("AF_INET localhost")
run_transfer(make_localhost_sock)
print("AF_INET external")
run_transfer(make_net_sock)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment