Skip to content

Instantly share code, notes, and snippets.

@minrk
Created August 31, 2020 10:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save minrk/9288e4d97022f41f05a33ad58ada1420 to your computer and use it in GitHub Desktop.
Save minrk/9288e4d97022f41f05a33ad58ada1420 to your computer and use it in GitHub Desktop.
import asyncio
from multiprocessing import Process
import random
import statistics
import sys
import time
import zmq
import zmq.asyncio
N = 500000
interval = 20000
pause_chance = 1e-4
pause_duration = 1e-3
def sender(url):
ctx = zmq.Context()
s = ctx.socket(zmq.PUSH)
s.linger = 1000
s.bind(url)
tic = time.perf_counter()
for i in range(N):
s.send(str(i).encode("ascii"))
if random.random() < pause_chance:
toc = time.perf_counter()
msgs_per_sec = i / (toc - tic)
print(f"{i} sent {msgs_per_sec/1000:.0f}k msgs/sec")
time.sleep(pause_duration)
s.close()
ctx.term()
print(f"finished sending {N} msgs")
def process(msg):
pass
async def consumer(url):
ctx = zmq.asyncio.Context.instance()
receiver = ctx.socket(zmq.PULL)
receiver.connect(url)
shadow = zmq.Socket.shadow(receiver.underlying)
poller = zmq.asyncio.Poller()
poller.register(receiver, zmq.POLLIN)
n_processed = 0
tic = time.perf_counter()
msgs_per_wake = []
while n_processed < N:
events = await poller.poll()
count = 0
while events:
try:
msg = shadow.recv_multipart(zmq.DONTWAIT)
except zmq.Again:
# checking events forces the edge-triggered FD to wake if needed
events = receiver.events & zmq.POLLIN
msgs_per_wake.append(count)
else:
count += 1
process(msg)
n_processed += 1
if n_processed % interval == 0:
toc = time.perf_counter()
msgs_per_sec = n_processed / (toc - tic)
per_wake = msgs_per_wake + [count]
print(f"{n_processed} processed {msgs_per_sec/1000:.0f}k msgs/sec")
if len(per_wake) >= 3:
print(f"{len(per_wake)} wakes {statistics.mean(per_wake):.0f}±{statistics.stdev(per_wake):.0f} per wake")
await block(0)
async def block(t=0.01):
"""simulate blocking processing in an async application
allows messages to queue-up
"""
time.sleep(t)
sys.stdout.write(".")
sys.stdout.flush()
async def main():
url = "tcp://127.0.0.1:10101"
proc = Process(target=sender, args=(url,), daemon=True)
proc.start()
await consumer(url)
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment