Skip to content

Instantly share code, notes, and snippets.

@ntakouris
Created October 7, 2021 07:33
Show Gist options
  • Save ntakouris/3860fd6bf31f0b1453bd5ba5e58620d1 to your computer and use it in GitHub Desktop.
Save ntakouris/3860fd6bf31f0b1453bd5ba5e58620d1 to your computer and use it in GitHub Desktop.
pu sub with nanomsg and capnp (python)
@0xeaa9e4314af47a92;
struct BoundingBox {
x @0: UInt16;
y @1: UInt16;
w @2: UInt16;
h @3: UInt16;
cls @4: Text;
}
import argparse
import asyncio
import time
import pynng
import capnp
BoundingBox = capnp.load('bounding_box.capnp').BoundingBox
parser = argparse.ArgumentParser()
parser.add_argument('--task', type=str, help='pub / sub')
addr = 'tcp://127.0.0.1:25565'
async def pub():
print('pub')
with pynng.Pub0(listen=addr) as pub:
while(True):
# print('send')
msg = BoundingBox.new_message()
await pub.asend(msg.to_bytes())
# await asyncio.sleep(1)
async def sub():
print('sub')
i = 0
start_t = time.time()
with pynng.Sub0(dial=addr, topics='') as sub:
while(True):
mgs_bytes = await sub.arecv()
# print('recv')
msg = BoundingBox.from_bytes(mgs_bytes)
i += 1
if i % 10_000 == 0:
elapsed_t = time.time() - start_t
# on my ryzen 3600 pc this is about
# 7-8K /s on *windows* / tcp:// socket
# should be more in linux
print(f'{10_000 / elapsed_t} msg/s')
start_t = time.time()
i = 0
if __name__=='__main__':
args = parser.parse_args()
loop = asyncio.get_event_loop()
trg = pub() if args.task == 'pub' else sub()
loop.run_until_complete(trg)
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment