Created
March 19, 2015 10:41
-
-
Save oplatek/90a089993a635c831f0a to your computer and use it in GitHub Desktop.
forwarder-2-way.py is a simplistic implementation of publish subscribe pattern with process device in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
# Copyright (c) 2015, Ondrej Platek, Ufal MFF UK <oplatek@ufal.mff.cuni.cz> | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
# KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED | |
# WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, | |
# MERCHANTABLITY OR NON-INFRINGEMENT. | |
# See the Apache 2 License for the specific language governing permissions and | |
# limitations under the License. | |
# | |
# Improvements and suggestions welcome via pull requests. Thanks! | |
import zmq | |
import random | |
import sys | |
import time | |
import multiprocessing | |
from gevent.greenlet import Greenlet | |
import gevent | |
class Bot(multiprocessing.Process): | |
def __init__(self, name, input_address, output_address): | |
super(Bot, self).__init__() | |
self.name = str(name) | |
self.isocket_add = input_address | |
self.osocket_add = output_address | |
def init_zmq(self): | |
self.context = zmq.Context() | |
self.isocket = self.context.socket(zmq.SUB) | |
self.isocket.connect("tcp://127.0.0.1:%s" % self.isocket_add) | |
self.isocket.setsockopt(zmq.SUBSCRIBE, b'') | |
self.osocket = self.context.socket(zmq.PUB) | |
self.osocket.connect("tcp://127.0.0.1:%s" % self.osocket_add) | |
self.poller = zmq.Poller() | |
self.poller.register(self.isocket, zmq.POLLIN) | |
def run(self): | |
self.init_zmq() | |
while True: | |
socks = dict(self.poller.poll()) | |
if self.isocket in socks and socks[self.isocket] == zmq.POLLIN: | |
msg = self.isocket.recv() | |
print '%s recieved %s' % (self.name, msg) | |
self.osocket.send("answer to %s" % msg) | |
def forwarderdevice_start(frontend_port, backend_port): | |
forwarder = zmq.devices.ProcessDevice(zmq.FORWARDER, zmq.SUB, zmq.PUB) | |
forwarder.setsockopt_in(zmq.SUBSCRIBE, b'') | |
forwarder.bind_in("tcp://*:%s" % frontend_port) | |
forwarder.bind_out("tcp://*:%s" % backend_port) | |
forwarder.start() | |
return forwarder | |
def main(): | |
bot_front_port, bot_back_port = "5559", "5560" | |
user_front_port, user_back_port = "5561", "5562" | |
forwarderdevice_start(bot_front_port, bot_back_port) | |
forwarderdevice_start(user_front_port, user_back_port) | |
b = Bot('name', bot_back_port, user_front_port) | |
b.start() | |
context = zmq.Context() | |
poller = zmq.Poller() | |
sender = context.socket(zmq.PUB) | |
sender.connect("tcp://localhost:%s" % bot_front_port) | |
subs = context.socket(zmq.SUB) | |
subs.connect("tcp://localhost:%s" % user_back_port) | |
subs.setsockopt(zmq.SUBSCRIBE, b'') | |
poller.register(subs, zmq.POLLIN) | |
# FIXME startup synchronisation - implement proper one | |
# do not put it immediately after b.start() it - the new process will be fallen asleep during its initialisation | |
time.sleep(1.0) | |
publisher_id, msg_id = random.randrange(0,9999), 0 | |
while True: | |
topic = random.randrange(1,10) | |
messagedata = "server#%s msg %d" % (publisher_id, msg_id) | |
msg_id += 1 | |
print "%s %s" % (topic, messagedata) | |
sender.send("%d %s" % (topic, messagedata)) | |
socks = dict(poller.poll()) | |
if subs in socks and socks[subs] == zmq.POLLIN: | |
msg = subs.recv() | |
print 'recieved back %s' % (msg) | |
time.sleep(0.2) | |
if __name__ == '__main__': | |
g = Greenlet.spawn(main) # Using gevent to simulate my webapp environment | |
gevent.sleep(0.000001) # TODO this need to be present I have no guess why |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment