-
-
Save pitrou/d809618359915967ffc44b1ecfc2d2ad to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fcntl | |
from functools import partial | |
import hashlib | |
import gc | |
import multiprocessing | |
import os | |
from queue import Queue | |
import socket | |
import select | |
import selectors | |
import threading | |
import numpy as np | |
def get_ip(host='8.8.8.8', port=80): | |
try: | |
return [(s.connect((host, port)), s.getsockname()[0], s.close()) | |
for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1] | |
except OSError: | |
return '127.0.0.1' | |
def run_transfer(make_endpoints, nbytes=200 * 1024**2): | |
import time | |
#clock = time.monotonic | |
clock = time.process_time | |
io_chunk = 1 * 300 * 1024 | |
datum = b'x' * io_chunk | |
r, w = make_endpoints(nonblock=True) | |
rfd = r.fileno() | |
wfd = w.fileno() | |
def transfer_loop(): | |
nread = 0 | |
nwritten = 0 | |
sel = selectors.DefaultSelector() | |
sel.register(rfd, selectors.EVENT_READ) | |
sel.register(wfd, selectors.EVENT_WRITE) | |
while nwritten < nbytes and nread < nbytes: | |
for key, _ in sel.select(): | |
fd = key.fd | |
if fd == rfd: | |
n = len(os.read(rfd, io_chunk)) | |
nread += n | |
if fd == wfd: | |
if nbytes - nwritten < len(datum): | |
n = os.write(wfd, datum[:nbytes - nwritten]) | |
else: | |
n = os.write(wfd, datum) | |
nwritten += n | |
runtimes = [] | |
gc.collect() | |
gc.disable() | |
time.sleep(0.2) | |
for i in range(8): | |
#time.sleep(0.2) | |
t1 = clock() | |
transfer_loop() | |
t2 = clock() | |
runtimes.append(t2 - t1) | |
gc.enable() | |
def mbps(duration): | |
return nbytes / duration / (1024 ** 2) | |
print("max speed: %d MB/s, avg speed: %d MB/s" | |
% (mbps(min(runtimes)), mbps(np.mean(runtimes))) | |
) | |
def _read_loop(rfd, nbytes, io_chunk): | |
remaining = nbytes | |
while remaining > 0: | |
n = len(os.read(rfd, min(remaining, io_chunk))) | |
remaining -= n | |
def _write_loop(wfd, nbytes, datum): | |
remaining = nbytes | |
m = memoryview(datum) | |
while remaining > 0: | |
if remaining < len(datum): | |
n = os.write(wfd, m[:remaining]) | |
else: | |
n = os.write(wfd, m) | |
remaining -= n | |
def run_transfer(make_endpoints, nbytes=100 * 1024**2): | |
import time | |
clock = time.monotonic | |
#clock = time.process_time | |
io_chunk = 1 * 500 * 1024 | |
datum = b'x' * io_chunk | |
r, w = make_endpoints(nonblock=False) | |
rfd = r.fileno() | |
wfd = w.fileno() | |
pool = multiprocessing.Pool(2) | |
read_loop = partial(_read_loop, rfd, nbytes, io_chunk) | |
write_loop = partial(_write_loop, wfd, nbytes, datum) | |
def transfer(): | |
if 1: | |
r1 = pool.apply_async(read_loop) | |
r2 = pool.apply_async(write_loop) | |
r1.wait() | |
r2.wait() | |
r1.get() | |
r2.get() | |
else: | |
p1 = multiprocessing.Process(target=read_loop) | |
p2 = multiprocessing.Process(target=write_loop) | |
p1.start() | |
p2.start() | |
p1.join() | |
p2.join() | |
runtimes = [] | |
gc.collect() | |
gc.disable() | |
time.sleep(0.2) | |
for i in range(10): | |
#time.sleep(0.2) | |
t1 = clock() | |
transfer() | |
t2 = clock() | |
runtimes.append(t2 - t1) | |
gc.enable() | |
def mbps(duration): | |
return nbytes / duration / (1024 ** 2) | |
print("max speed: %d MB/s, avg speed: %d MB/s" | |
% (mbps(min(runtimes)), mbps(np.mean(runtimes))) | |
) | |
def _make_fd_non_blocking(fd): | |
fl = fcntl.fcntl(fd, fcntl.F_GETFL) | |
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) | |
def _make_inet_sock(addr, nonblock): | |
serv = socket.socket(socket.AF_INET) | |
serv.bind((addr, 0)) | |
serv.listen(5) | |
accepted = Queue() | |
def accept(): | |
conn, _ = serv.accept() | |
accepted.put(conn) | |
serv.close() | |
t = threading.Thread(target=accept) | |
t.start() | |
a = socket.create_connection(serv.getsockname()) | |
b = accepted.get() | |
t.join() | |
#print(a.getsockname(), a.getpeername()) | |
#print(b.getsockname(), b.getpeername()) | |
if nonblock: | |
a.setblocking(False) | |
b.setblocking(False) | |
return a, b | |
def make_net_sock(nonblock): | |
return _make_inet_sock(get_ip(), nonblock) | |
def make_localhost_sock(nonblock): | |
return _make_inet_sock("127.0.0.1", nonblock) | |
def make_unix_sock(nonblock): | |
r, w = socket.socketpair(socket.AF_UNIX) | |
if nonblock: | |
r.setblocking(False) | |
w.setblocking(False) | |
return r, w | |
def make_anon_pipe(nonblock): | |
r, w = os.pipe() | |
r = os.fdopen(r, "rb") | |
w = os.fdopen(w, "wb") | |
if nonblock: | |
_make_fd_non_blocking(r.fileno()) | |
_make_fd_non_blocking(w.fileno()) | |
return r, w | |
for i in range(2): | |
print() | |
print("pipe") | |
run_transfer(make_anon_pipe) | |
print("AF_UNIX") | |
run_transfer(make_unix_sock) | |
print("AF_INET localhost") | |
run_transfer(make_localhost_sock) | |
print("AF_INET external") | |
run_transfer(make_net_sock) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment