Created
May 12, 2017 17:49
-
-
Save mivade/c69a258dc4935a8e28dcb872c5e1bb99 to your computer and use it in GitHub Desktop.
Binding to random ports for IPC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Examples of how to bind a socket to a random port and have that port number | |
accessible to other processes. Useful for interprocess communications. | |
""" | |
import time | |
import socket | |
from socket import AF_INET, SOCK_STREAM | |
from multiprocessing import Process, Queue, Event | |
from contextlib import contextmanager | |
import zmq | |
class TimeService(Process): | |
"""Example local process-based service that broadcasts the time every 1 s | |
to the connected peer. | |
""" | |
def __init__(self, host=''): | |
self.host = host | |
self._port_queue = Queue(1) | |
self.done = Event() | |
super().__init__() | |
@contextmanager | |
def __get_port(self): | |
"""Private context manager to get the port number off the shared queue | |
then immediately put it back. | |
""" | |
port = self._port_queue.get() | |
yield port | |
self._port_queue.put(port) | |
@property | |
def port(self): | |
"""Returns the assigned port number.""" | |
with self.__get_port() as port: | |
return port | |
def stop(self): | |
"""Signal the main loop to quit.""" | |
self.done.set() | |
def run(self): | |
sock = socket.socket(AF_INET, SOCK_STREAM) | |
sock.bind((self.host, 0)) | |
_, port = sock.getsockname() | |
sock.listen() | |
print("Service bound to port", port) | |
self._port_queue.put(port) | |
conn, addr = sock.accept() | |
while not self.done.is_set(): | |
now = time.time() | |
msg = "{:.3f}".format(now).encode() | |
print("Service sending", msg) | |
conn.send(msg) | |
time.sleep(1) | |
class ZMQTimeService(TimeService): | |
"""ZeroMQ-based time service.""" | |
def run(self): | |
ctx = zmq.Context() | |
sock = ctx.socket(zmq.PAIR) | |
port = sock.bind_to_random_port("tcp://*") | |
self._port_queue.put(port) | |
print("Service bound to port", port) | |
while not self.done.is_set(): | |
now = time.time() | |
msg = "{:.3f}".format(now).encode() | |
print("Service sending", msg) | |
sock.send(msg) | |
time.sleep(1) | |
def stdlib_main(): | |
"""Entry point for the Python standard library version.""" | |
service = TimeService() | |
service.start() | |
try: | |
sock = socket.socket(AF_INET, SOCK_STREAM) | |
sock.connect(('127.0.0.1', service.port)) | |
for n in range(10): | |
print("Received", sock.recv(128)) | |
finally: | |
service.stop() | |
def zmq_main(): | |
"""Entry point for the ZeroMQ version.""" | |
service = ZMQTimeService() | |
service.start() | |
ctx = zmq.Context() | |
sock = ctx.socket(zmq.PAIR) | |
sock.connect("tcp://127.0.0.1:{:d}".format(service.port)) | |
try: | |
for n in range(10): | |
print("Received", sock.recv()) | |
finally: | |
service.stop() | |
if __name__ == "__main__": | |
import sys | |
if len(sys.argv) < 2: | |
print("You must specify 'stdlib' or 'zmq'") | |
sys.exit(1) | |
elif sys.argv[1] == "stdlib": | |
stdlib_main() | |
elif sys.argv[1] == "zmq": | |
zmq_main() | |
else: | |
print("You must specify 'stdlib' or 'zmq'") | |
sys.exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment