Skip to content

Instantly share code, notes, and snippets.

@mivade
Last active January 15, 2022 21:14
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mivade/782d2bf5ab01b649b138e4a1e8932f1d to your computer and use it in GitHub Desktop.
Save mivade/782d2bf5ab01b649b138e4a1e8932f1d to your computer and use it in GitHub Desktop.
Interprocess communication speed comparisons
from multiprocessing import Event, Process, Queue
import time
import zmq
class BaseActor(Process):
ready = Event()
def handle(self, msg):
print("dt =", time.time() - msg['timestamp'])
def main(n_bytes: int):
raise NotImplementedError
class QActor(BaseActor):
def __init__(self, queue: Queue, *args, **kwargs):
super().__init__(*args, **kwargs)
self.queue = queue
def run(self):
self.ready.set()
while True:
msg = self.queue.get()
if msg['data'] is None:
break
self.handle(msg)
@staticmethod
def main(n_bytes: int):
queue = Queue()
def put(data):
queue.put({
'timestamp': time.time(),
'data': data,
})
actor = QActor(queue)
actor.start()
actor.ready.wait()
payload = b'*' * n_bytes
put(payload)
put(None)
actor.join()
class ZActor(BaseActor):
def __init__(self, address, *args, **kwargs):
super().__init__(*args, **kwargs)
self.address = address
def run(self):
ctx = zmq.Context()
sock = ctx.socket(zmq.PULL)
sock.connect(self.address)
self.ready.set()
while True:
msg = sock.recv_pyobj()
if msg['data'] is None:
break
self.handle(msg)
@staticmethod
def main(n_bytes: int):
address = 'tcp://127.0.0.1:8778'
# address = 'ipc://endpoint'
ctx = zmq.Context()
sock = ctx.socket(zmq.PUSH)
sock.bind(address)
def send(data):
sock.send_pyobj({
'timestamp': time.time(),
'data': data
})
actor = ZActor(address)
actor.start()
actor.ready.wait()
payload = b'*' * n_bytes
send(payload)
send(None)
actor.join()
if __name__ == "__main__":
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument('n_bytes', type=int, help='number of bytes to send')
args = parser.parse_args()
print("IPC via queue")
QActor.main(args.n_bytes)
print("IPC via ZMQ sockets")
ZActor.main(args.n_bytes)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment