Skip to content

Instantly share code, notes, and snippets.

@guyskk
Created July 8, 2018 03:12
Show Gist options
  • Save guyskk/d477951fba26c6cfbffa1816dbe29bb1 to your computer and use it in GitHub Desktop.
Save guyskk/d477951fba26c6cfbffa1816dbe29bb1 to your computer and use it in GitHub Desktop.
Non-blocking will block on busy syscall!
"""
Non-blocking will block on busy syscall!
两条数据流形成一个环,预先填充128字节数据。
Producer和Consumer驱动数据在环中流动。
+----------------+
| |
+ v
producer consumer
^ +
| |
+----------------+
"""
import os
import socket
import time
from threading import Thread
class SocketPairTest:
def __init__(self, duration):
self._s1, self._s2 = socket.socketpair()
self._s1.setblocking(False)
self._s2.setblocking(False)
self._producer_wait, self._notify_producer = self._s1, self._s2
self._consumer_wait, self._notify_consumer = self._s2, self._s1
self._notify_producer.sendall(b'1' * 128)
self._deadline = time.monotonic() + duration
self.comsumer_times = []
self.producer_times = []
def consumer(self):
while time.monotonic() < self._deadline:
try:
t = time.monotonic()
self._notify_producer.send(b'1')
self._consumer_wait.recv(1)
cost = (time.monotonic() - t) * 1000
self.comsumer_times.append(cost)
except BlockingIOError:
pass
def producer(self):
while time.monotonic() < self._deadline:
try:
t = time.monotonic()
self._notify_consumer.send(b'1')
self._producer_wait.recv(1)
cost = (time.monotonic() - t) * 1000
self.producer_times.append(cost)
except BlockingIOError:
pass
class PipeTest:
def __init__(self, duration):
self._producer_wait, self._notify_producer = os.pipe2(os.O_NONBLOCK)
self._consumer_wait, self._notify_consumer = os.pipe2(os.O_NONBLOCK)
os.write(self._notify_producer, b'1' * 128)
self._deadline = time.monotonic() + duration
self.comsumer_times = []
self.producer_times = []
def consumer(self):
while time.monotonic() < self._deadline:
try:
t = time.monotonic()
os.write(self._notify_producer, b'1')
os.read(self._consumer_wait, 1)
cost = (time.monotonic() - t) * 1000
self.comsumer_times.append(cost)
except BlockingIOError:
pass
def producer(self):
while time.monotonic() < self._deadline:
try:
t = time.monotonic()
os.read(self._producer_wait, 1)
os.write(self._notify_consumer, b'1')
cost = (time.monotonic() - t) * 1000
self.producer_times.append(cost)
except BlockingIOError:
pass
def start(test):
producer = Thread(target=test.producer)
consumer = Thread(target=test.consumer)
producer.start()
consumer.start()
producer.join()
consumer.join()
p_max = max(test.producer_times)
p_n = len(test.producer_times)
p_avg = sum(test.producer_times) / p_n
c_max = max(test.comsumer_times)
c_n = len(test.comsumer_times)
c_avg = sum(test.comsumer_times) / c_n
print(f'producer max={p_max:.3f} avg={p_avg:.3f} n={p_n}')
print(f'consumer max={c_max:.3f} avg={c_avg:.3f} n={c_n}')
if __name__ == '__main__':
print('SocketPairTest'.center(80, '-'))
start(SocketPairTest(30))
print('PipeTest'.center(80, '-'))
start(PipeTest(30))
@guyskk
Copy link
Author

guyskk commented Jul 8, 2018

Non-blocking may block 0~120ms !

---------------------------------SocketPairTest---------------------------------
producer max=66.680 avg=0.013 n=1899256
consumer max=69.002 avg=0.013 n=1899375
------------------------------------PipeTest------------------------------------
producer max=8.651 avg=0.011 n=2172036
consumer max=126.647 avg=0.013 n=2162756

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment