Last active
August 12, 2016 22:35
-
-
Save 140am/4964751 to your computer and use it in GitHub Desktop.
Example of using "butler" Python library to communicate between multiple processes (local or distributed)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Example of how to get started with the `butler` library for RPC calls | |
Defines 3 example classes we want to expose for RPC use (ServiceA, ServiceB, ServiceC) | |
which all have a 'test' method. | |
Start a Router and register all the Service classes (`services_objects`) | |
python butler_service.py --service | |
Start only a Router process: | |
python butler_service.py | |
Start and register only the Services - connect to an existing Router: | |
python butler_service.py --service --router tcp://0.0.0.0:5556 | |
ØMQ Sockets: | |
tcp://0.0.0.0:5555 : Client Frontend | |
tcp://0.0.0.0:5556 : Service Backend | |
Installation: | |
easy_install butler | |
Recommended environment (gevent 1.0+): | |
easy_install cython | |
easy_install https://github.com/SiteSupport/gevent/archive/1.0rc2.zip | |
Author: Manuel Kreutz <manuel@140.am> | |
License: MIT | |
""" | |
import logging | |
import optparse | |
import butler | |
import multiprocessing | |
import gevent | |
logging.basicConfig( | |
level = logging.INFO | |
) | |
log = logging.getLogger(__name__) | |
parser = optparse.OptionParser() | |
parser.add_option( | |
'-r', '--router', | |
dest = 'router_address', | |
default = '', | |
help = 'connect to an existing router at a specific address (default: run one locally)' | |
) | |
parser.add_option( | |
'-s', '--service', | |
action = 'store_true', | |
dest = 'enable_service', | |
default = False, | |
help = 'register and start a service worker in a seperate process. each service runs in a greenlet' | |
) | |
## | |
## define some sample classes you want to expose | |
class ServiceSample(object): | |
def test(self): | |
return 'hello %s' % self.name | |
class ServiceA(ServiceSample): | |
name = 'service_a' | |
def __init__(self): | |
log.info('init %s' % self.name) | |
class ServiceB(ServiceSample): | |
name = 'service_b' | |
def __init__(self): | |
log.info('init %s' % self.name) | |
class ServiceC(ServiceSample): | |
name = 'service_c' | |
def __init__(self): | |
log.info('init %s' % self.name) | |
## | |
## init a router instance in a seperate processs | |
class ServiceRouter(multiprocessing.Process): | |
def run(self): | |
log.info('starting ServiceRouter [PID: %s]' % self.pid) | |
router = butler.Router() | |
router.frontend.bind("tcp://0.0.0.0:5555") | |
router.backend.bind("tcp://0.0.0.0:5556") | |
router.heartbeat_timeout = 15 | |
router.run() | |
## | |
## factory class to create and register service worker | |
class ServiceWorker(multiprocessing.Process): | |
def __init__(self, service_obj = [], router_address = 'tcp://0.0.0.0:5556'): | |
self.router_address = router_address | |
self.service_obj = service_obj | |
super(ServiceWorker, self).__init__() | |
def start_worker(self, worker_class): | |
""" register the `worker_class` with the router at `self.router_address` | |
under the RPC name as set on the class attribute `name` | |
""" | |
service = butler.Service( | |
self.router_address, worker_class.name | |
) | |
service.register(worker_class()) | |
service.run() | |
def run(self): | |
log.info('starting ServiceWorker: %s' % self.service_obj) | |
gevent.joinall([ | |
gevent.spawn(self.start_worker, service_class) | |
for service_class in self.service_obj | |
]) | |
def run(): | |
# list of classes to expose for RPC | |
services_objects = [ServiceA, ServiceB, ServiceC] | |
(options, args) = parser.parse_args() | |
# if no existing router provided start one locally | |
if not options.router_address: | |
router = ServiceRouter() | |
router.start() | |
# with no local router running connect to the address of an existing one | |
if options.router_address and options.enable_service: | |
worker = ServiceWorker( | |
services_objects, | |
router_address = options.router_address | |
) | |
worker.start() | |
# start services and connect to default router address (tcp://0.0.0.0:5556) | |
elif options.enable_service: | |
worker = ServiceWorker( | |
services_objects | |
) | |
worker.start() | |
if __name__ == '__main__': | |
run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# call the `test` method on any of the 3 registered example objects | |
import butler | |
client = butler.Client('tcp://0.0.0.0:5555').rpc('service_b') | |
print client.test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment