Created
August 31, 2020 10:21
-
-
Save minrk/9288e4d97022f41f05a33ad58ada1420 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from multiprocessing import Process | |
import random | |
import statistics | |
import sys | |
import time | |
import zmq | |
import zmq.asyncio | |
N = 500000 | |
interval = 20000 | |
pause_chance = 1e-4 | |
pause_duration = 1e-3 | |
def sender(url): | |
ctx = zmq.Context() | |
s = ctx.socket(zmq.PUSH) | |
s.linger = 1000 | |
s.bind(url) | |
tic = time.perf_counter() | |
for i in range(N): | |
s.send(str(i).encode("ascii")) | |
if random.random() < pause_chance: | |
toc = time.perf_counter() | |
msgs_per_sec = i / (toc - tic) | |
print(f"{i} sent {msgs_per_sec/1000:.0f}k msgs/sec") | |
time.sleep(pause_duration) | |
s.close() | |
ctx.term() | |
print(f"finished sending {N} msgs") | |
def process(msg): | |
pass | |
async def consumer(url): | |
ctx = zmq.asyncio.Context.instance() | |
receiver = ctx.socket(zmq.PULL) | |
receiver.connect(url) | |
shadow = zmq.Socket.shadow(receiver.underlying) | |
poller = zmq.asyncio.Poller() | |
poller.register(receiver, zmq.POLLIN) | |
n_processed = 0 | |
tic = time.perf_counter() | |
msgs_per_wake = [] | |
while n_processed < N: | |
events = await poller.poll() | |
count = 0 | |
while events: | |
try: | |
msg = shadow.recv_multipart(zmq.DONTWAIT) | |
except zmq.Again: | |
# checking events forces the edge-triggered FD to wake if needed | |
events = receiver.events & zmq.POLLIN | |
msgs_per_wake.append(count) | |
else: | |
count += 1 | |
process(msg) | |
n_processed += 1 | |
if n_processed % interval == 0: | |
toc = time.perf_counter() | |
msgs_per_sec = n_processed / (toc - tic) | |
per_wake = msgs_per_wake + [count] | |
print(f"{n_processed} processed {msgs_per_sec/1000:.0f}k msgs/sec") | |
if len(per_wake) >= 3: | |
print(f"{len(per_wake)} wakes {statistics.mean(per_wake):.0f}±{statistics.stdev(per_wake):.0f} per wake") | |
await block(0) | |
async def block(t=0.01): | |
"""simulate blocking processing in an async application | |
allows messages to queue-up | |
""" | |
time.sleep(t) | |
sys.stdout.write(".") | |
sys.stdout.flush() | |
async def main(): | |
url = "tcp://127.0.0.1:10101" | |
proc = Process(target=sender, args=(url,), daemon=True) | |
proc.start() | |
await consumer(url) | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment