Skip to content

Instantly share code, notes, and snippets.

@almarklein
Created January 25, 2016 21:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save almarklein/2b6ca182a3e94d532931 to your computer and use it in GitHub Desktop.
Save almarklein/2b6ca182a3e94d532931 to your computer and use it in GitHub Desktop.
Multi-process communication via a message pool implemented using UDP multicast
# Copyright 2016 (C) Almar Klein. Consider this 2-clause BSD licensed.
"""
I initially wrote this to be included in flexx for multiple processes
on the same machine to communicate, more specifically, to allow a Flexx
CLI to get info on and terminate running server processes. I tested it
to work on Windows and Linux. I eventuially discarded this approach
because I don't fully get how UDP multicast works. It's important for
this to only work for localhost, and I am not sure if it does, or how
hard it is to compromise this. I went with a simpler approach based on
http requests using Tornado.
----
Functionality to connect multiple processes in a multicast environment.
Based on a channel name (which is hashed to a port number), a pool is
defined, through which nodes can send messages. Messages can be send
to a specific node, or broadcasted to any node that's interested.
This provides a small and lightweight implementation for inter process
communication, which supports pub-sub and req-rep.
Each node in the pool has a specific id, that can be used to address
messages. Each message also has a topic, which is used by nodes to
filter messages.
Each message is a UTF-8 encoded string that looks like this:
'<header> <channel> <body>'
The channel contains information about sender, recipient and a "topic".
* A/B/X: A message with topic X is send by A to B.
* A/*/X: A message with topic X is broadcasted (i.e. published) by A.
A topic that starts with "req_" is intended as a request to send back
a message. By default, all nodes support the topic "req_identify",
which will send back on topic "identify". This can be used to get
an overview of all nodes in the pool.
"""
import os
import sys
import time
import socket
import logging
import weakref
import threading
ANY = '0.0.0.0'
LOCAL = '127.0.0.1'
def port_hash(name):
""" Given a string, returns a port number between 49152 and 65535
This range (of 2**14 posibilities) is the range for dynamic and/or
private ports (ephemeral ports) specified by iana.org. The algorithm
is deterministic.
"""
fac = 0xd2d84a61
val = 0
for c in name:
val += (val >> 3) + (ord(c) * fac)
val += (val >> 3) + (len(name) * fac)
return 49152 + (val % 2**14)
class PoolNode:
""" A node in a pool of connections.
"""
def __init__(self, id=None, channel=None, mcast_ip='239.255.24.24'):
for name, val in [('id', id), ('channel', channel)]:
if not val:
raise ValueError('%s needs an %s as input' % (self.__class__.__name__, name))
self._id = str(id)
self._header = str(channel)
self._headerb = self._header.encode()
self._port = port_hash(self._header)
self._mcast_ip = mcast_ip
self._lock = threading.RLock()
self._collect_topic = ''
self._collected_messagages = []
self._sock = self._create_socket()
self._thread = PoolNodeThread(self)
self._thread.start()
def __del__(self):
# No need to stop thread; it stops when we die
try:
self._sock.close()
except Exception:
pass
def _create_socket(self):
""" The multicast magic is here.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((ANY, self._port))
sock.settimeout(0.1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 0)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP,
socket.inet_aton(self._mcast_ip) + socket.inet_aton(ANY))
return sock
def send(self, to, topic, msg):
""" Send a message into the pool. To broadcast, use "*" for ``to``.
"""
channel = '%s/%s/%s' % (self._id, to, topic)
txt = '%s %s %s' % (self._header, channel, msg)
self._sock.sendto(txt.encode(), (self._mcast_ip, self._port))
def request(self, to, topic, msg, timeout=0.1):
""" Make a request. This sends a message on topic "req_X" and waits
for a reply, directed at us with topic "X". If the to field is "*",
multiple messages are collected over a short amount of time. Otherwise
the function returns as soon as one reply has been received.
"""
if not topic.startswith('req_'):
raise ValueError('A request should have a topic that starts with "req_".')
with self._lock:
self.send(to, topic, msg)
count = None if to == '*' else 1
self._collected_messagages = []
self._collect_topic = topic[4:]
self._receive(count, timeout)
with self._lock:
self._collect_topic = ''
return self._collected_messagages[:]
def receive(self, filter_topic, count=None, timeout=0.1):
""" Receive a message for a given topic.
"""
with self._lock:
self._collected_messagages = []
self._collect_topic = filter_topic
self._receive(count, timeout)
with self._lock:
self._collect_topic = ''
return self._collected_messagages[:]
def _receive(self, count, timeout):
etime = time.time() + timeout
if count is None:
time.sleep(timeout)
else:
while time.time() < etime:
time.sleep(0.001)
if len(self._collected_messagages) >= count:
break
def _respond(self, data):
""" The thread calls this. Here we dispatch the handling of a message.
"""
data = data.decode()
_, channel, msg = data.split(' ', 2)
fro, to, topic = channel.split('/')
if to in (self._id, '*'):
# Are we waiting for it?
with self._lock:
if self._collect_topic and topic == self._collect_topic:
self._collected_messagages.append((fro, msg))
return
# Process normally
method = getattr(self, 'on_' + topic, None)
if method:
res = method(msg)
if res is not None and topic.startswith('req_'):
self.send(fro, topic[4:], res)
def on_req_identify(self, msg):
return ''
class PoolNodeThread(threading.Thread):
""" The thread for the PoolNode that listens for incoming messages.
"""
def __init__(self, responder):
super().__init__()
self._responder = weakref.ref(responder)
self._headerb = responder._headerb
self._do_stop = False
self.setDaemon(True)
def run(self):
try:
self._run()
except Exception:
pass # interpreter shutdown
def _run(self):
responder = None
while True:
responder = self._responder()
if responder is None:
break
sock = responder._sock
del responder
try:
data, addr = sock.recvfrom(1024)
except socket.timeout:
continue
if data.startswith(self._headerb):
responder = self._responder()
if responder is not None:
try:
responder._respond(data)
except Exception as err:
logging.error('PoolNodeThread error: %s' % str(err))
if __name__ == '__main__':
class MyReplier(PoolNode):
def on_req_exit(self, msg):
import signal
os.kill(os.getpid(), signal.SIGINT)
res = MyReplier(os.getpid(), 'FLEXX')
req = MyReplier(os.getpid()+1, 'FLEXX')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment