Created
August 31, 2020 10:19
-
-
Save minrk/0068aac48e0ab64f1049647cb96d2f90 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from multiprocessing import Process | |
import random | |
import statistics | |
import sys | |
import time | |
import zmq | |
import zmq.asyncio | |
N = 500000 | |
interval = 20000 | |
pause_chance = 1e-4 | |
pause_duration = 1e-3 | |
def sender(url): | |
ctx = zmq.Context() | |
s = ctx.socket(zmq.PUSH) | |
s.linger = 1000 | |
s.bind(url) | |
tic = time.perf_counter() | |
for i in range(N): | |
s.send(str(i).encode("ascii")) | |
if random.random() < pause_chance: | |
toc = time.perf_counter() | |
msgs_per_sec = i / (toc - tic) | |
print(f"{i} sent {msgs_per_sec/1000:.0f}k msgs/sec") | |
time.sleep(pause_duration) | |
s.close() | |
ctx.term() | |
print(f"finished sending {N} msgs") | |
def process(msg): | |
pass | |
async def consumer(url): | |
ctx = zmq.asyncio.Context.instance() | |
receiver = ctx.socket(zmq.PULL) | |
receiver.connect(url) | |
shadow = zmq.Socket.shadow(receiver.underlying) | |
poller = zmq.asyncio.Poller() | |
poller.register(receiver, zmq.POLLIN) | |
n_processed = 0 | |
tic = time.perf_counter() | |
msgs_per_wake = [] | |
while n_processed < N: | |
events = await poller.poll() | |
count = 0 | |
while events: | |
try: | |
msg = shadow.recv_multipart(zmq.DONTWAIT) | |
except zmq.Again: | |
# checking events forces the edge-triggered FD to wake if needed | |
events = receiver.events & zmq.POLLIN | |
msgs_per_wake.append(count) | |
else: | |
count += 1 | |
process(msg) | |
n_processed += 1 | |
if n_processed % interval == 0: | |
toc = time.perf_counter() | |
msgs_per_sec = n_processed / (toc - tic) | |
per_wake = msgs_per_wake + [count] | |
print(f"{n_processed} processed {msgs_per_sec/1000:.0f}k msgs/sec") | |
if len(per_wake) >= 3: | |
print(f"{len(per_wake)} wakes {statistics.mean(per_wake):.0f}±{statistics.stdev(per_wake):.0f} per wake") | |
await block(0.01) | |
async def block(t=0.01): | |
"""simulate blocking processing in an async application | |
allows messages to queue-up | |
""" | |
time.sleep(t) | |
sys.stdout.write(".") | |
sys.stdout.flush() | |
async def main(): | |
url = "tcp://127.0.0.1:10101" | |
proc = Process(target=sender, args=(url,), daemon=True) | |
proc.start() | |
await consumer(url) | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment