Skip to content

Instantly share code, notes, and snippets.

@mivade
Created May 12, 2017 17:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mivade/c69a258dc4935a8e28dcb872c5e1bb99 to your computer and use it in GitHub Desktop.
Save mivade/c69a258dc4935a8e28dcb872c5e1bb99 to your computer and use it in GitHub Desktop.
Binding to random ports for IPC
"""Examples of how to bind a socket to a random port and have that port number
accessible to other processes. Useful for interprocess communications.
"""
import time
import socket
from socket import AF_INET, SOCK_STREAM
from multiprocessing import Process, Queue, Event
from contextlib import contextmanager
import zmq
class TimeService(Process):
"""Example local process-based service that broadcasts the time every 1 s
to the connected peer.
"""
def __init__(self, host=''):
self.host = host
self._port_queue = Queue(1)
self.done = Event()
super().__init__()
@contextmanager
def __get_port(self):
"""Private context manager to get the port number off the shared queue
then immediately put it back.
"""
port = self._port_queue.get()
yield port
self._port_queue.put(port)
@property
def port(self):
"""Returns the assigned port number."""
with self.__get_port() as port:
return port
def stop(self):
"""Signal the main loop to quit."""
self.done.set()
def run(self):
sock = socket.socket(AF_INET, SOCK_STREAM)
sock.bind((self.host, 0))
_, port = sock.getsockname()
sock.listen()
print("Service bound to port", port)
self._port_queue.put(port)
conn, addr = sock.accept()
while not self.done.is_set():
now = time.time()
msg = "{:.3f}".format(now).encode()
print("Service sending", msg)
conn.send(msg)
time.sleep(1)
class ZMQTimeService(TimeService):
"""ZeroMQ-based time service."""
def run(self):
ctx = zmq.Context()
sock = ctx.socket(zmq.PAIR)
port = sock.bind_to_random_port("tcp://*")
self._port_queue.put(port)
print("Service bound to port", port)
while not self.done.is_set():
now = time.time()
msg = "{:.3f}".format(now).encode()
print("Service sending", msg)
sock.send(msg)
time.sleep(1)
def stdlib_main():
"""Entry point for the Python standard library version."""
service = TimeService()
service.start()
try:
sock = socket.socket(AF_INET, SOCK_STREAM)
sock.connect(('127.0.0.1', service.port))
for n in range(10):
print("Received", sock.recv(128))
finally:
service.stop()
def zmq_main():
"""Entry point for the ZeroMQ version."""
service = ZMQTimeService()
service.start()
ctx = zmq.Context()
sock = ctx.socket(zmq.PAIR)
sock.connect("tcp://127.0.0.1:{:d}".format(service.port))
try:
for n in range(10):
print("Received", sock.recv())
finally:
service.stop()
if __name__ == "__main__":
import sys
if len(sys.argv) < 2:
print("You must specify 'stdlib' or 'zmq'")
sys.exit(1)
elif sys.argv[1] == "stdlib":
stdlib_main()
elif sys.argv[1] == "zmq":
zmq_main()
else:
print("You must specify 'stdlib' or 'zmq'")
sys.exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment