Skip to content

Instantly share code, notes, and snippets.

@oplatek
Created March 19, 2015 10:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oplatek/90a089993a635c831f0a to your computer and use it in GitHub Desktop.
Save oplatek/90a089993a635c831f0a to your computer and use it in GitHub Desktop.
forwarder-2-way.py is a simplistic implementation of publish subscribe pattern with process device in python
#!/usr/bin/env python
# encoding: utf-8
# Copyright (c) 2015, Ondrej Platek, Ufal MFF UK <oplatek@ufal.mff.cuni.cz>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED
# WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
# MERCHANTABLITY OR NON-INFRINGEMENT.
# See the Apache 2 License for the specific language governing permissions and
# limitations under the License.
#
# Improvements and suggestions welcome via pull requests. Thanks!
import zmq
import random
import sys
import time
import multiprocessing
from gevent.greenlet import Greenlet
import gevent
class Bot(multiprocessing.Process):
def __init__(self, name, input_address, output_address):
super(Bot, self).__init__()
self.name = str(name)
self.isocket_add = input_address
self.osocket_add = output_address
def init_zmq(self):
self.context = zmq.Context()
self.isocket = self.context.socket(zmq.SUB)
self.isocket.connect("tcp://127.0.0.1:%s" % self.isocket_add)
self.isocket.setsockopt(zmq.SUBSCRIBE, b'')
self.osocket = self.context.socket(zmq.PUB)
self.osocket.connect("tcp://127.0.0.1:%s" % self.osocket_add)
self.poller = zmq.Poller()
self.poller.register(self.isocket, zmq.POLLIN)
def run(self):
self.init_zmq()
while True:
socks = dict(self.poller.poll())
if self.isocket in socks and socks[self.isocket] == zmq.POLLIN:
msg = self.isocket.recv()
print '%s recieved %s' % (self.name, msg)
self.osocket.send("answer to %s" % msg)
def forwarderdevice_start(frontend_port, backend_port):
forwarder = zmq.devices.ProcessDevice(zmq.FORWARDER, zmq.SUB, zmq.PUB)
forwarder.setsockopt_in(zmq.SUBSCRIBE, b'')
forwarder.bind_in("tcp://*:%s" % frontend_port)
forwarder.bind_out("tcp://*:%s" % backend_port)
forwarder.start()
return forwarder
def main():
bot_front_port, bot_back_port = "5559", "5560"
user_front_port, user_back_port = "5561", "5562"
forwarderdevice_start(bot_front_port, bot_back_port)
forwarderdevice_start(user_front_port, user_back_port)
b = Bot('name', bot_back_port, user_front_port)
b.start()
context = zmq.Context()
poller = zmq.Poller()
sender = context.socket(zmq.PUB)
sender.connect("tcp://localhost:%s" % bot_front_port)
subs = context.socket(zmq.SUB)
subs.connect("tcp://localhost:%s" % user_back_port)
subs.setsockopt(zmq.SUBSCRIBE, b'')
poller.register(subs, zmq.POLLIN)
# FIXME startup synchronisation - implement proper one
# do not put it immediately after b.start() it - the new process will be fallen asleep during its initialisation
time.sleep(1.0)
publisher_id, msg_id = random.randrange(0,9999), 0
while True:
topic = random.randrange(1,10)
messagedata = "server#%s msg %d" % (publisher_id, msg_id)
msg_id += 1
print "%s %s" % (topic, messagedata)
sender.send("%d %s" % (topic, messagedata))
socks = dict(poller.poll())
if subs in socks and socks[subs] == zmq.POLLIN:
msg = subs.recv()
print 'recieved back %s' % (msg)
time.sleep(0.2)
if __name__ == '__main__':
g = Greenlet.spawn(main) # Using gevent to simulate my webapp environment
gevent.sleep(0.000001) # TODO this need to be present I have no guess why
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment