Skip to content

Instantly share code, notes, and snippets.

@140am
Last active August 12, 2016 22:35
Show Gist options
  • Save 140am/4964751 to your computer and use it in GitHub Desktop.
Save 140am/4964751 to your computer and use it in GitHub Desktop.
Example of using "butler" Python library to communicate between multiple processes (local or distributed)
""" Example of how to get started with the `butler` library for RPC calls
Defines 3 example classes we want to expose for RPC use (ServiceA, ServiceB, ServiceC)
which all have a 'test' method.
Start a Router and register all the Service classes (`services_objects`)
python butler_service.py --service
Start only a Router process:
python butler_service.py
Start and register only the Services - connect to an existing Router:
python butler_service.py --service --router tcp://0.0.0.0:5556
ØMQ Sockets:
tcp://0.0.0.0:5555 : Client Frontend
tcp://0.0.0.0:5556 : Service Backend
Installation:
easy_install butler
Recommended environment (gevent 1.0+):
easy_install cython
easy_install https://github.com/SiteSupport/gevent/archive/1.0rc2.zip
Author: Manuel Kreutz <manuel@140.am>
License: MIT
"""
import logging
import optparse
import butler
import multiprocessing
import gevent
logging.basicConfig(
level = logging.INFO
)
log = logging.getLogger(__name__)
parser = optparse.OptionParser()
parser.add_option(
'-r', '--router',
dest = 'router_address',
default = '',
help = 'connect to an existing router at a specific address (default: run one locally)'
)
parser.add_option(
'-s', '--service',
action = 'store_true',
dest = 'enable_service',
default = False,
help = 'register and start a service worker in a seperate process. each service runs in a greenlet'
)
##
## define some sample classes you want to expose
class ServiceSample(object):
def test(self):
return 'hello %s' % self.name
class ServiceA(ServiceSample):
name = 'service_a'
def __init__(self):
log.info('init %s' % self.name)
class ServiceB(ServiceSample):
name = 'service_b'
def __init__(self):
log.info('init %s' % self.name)
class ServiceC(ServiceSample):
name = 'service_c'
def __init__(self):
log.info('init %s' % self.name)
##
## init a router instance in a seperate processs
class ServiceRouter(multiprocessing.Process):
def run(self):
log.info('starting ServiceRouter [PID: %s]' % self.pid)
router = butler.Router()
router.frontend.bind("tcp://0.0.0.0:5555")
router.backend.bind("tcp://0.0.0.0:5556")
router.heartbeat_timeout = 15
router.run()
##
## factory class to create and register service worker
class ServiceWorker(multiprocessing.Process):
def __init__(self, service_obj = [], router_address = 'tcp://0.0.0.0:5556'):
self.router_address = router_address
self.service_obj = service_obj
super(ServiceWorker, self).__init__()
def start_worker(self, worker_class):
""" register the `worker_class` with the router at `self.router_address`
under the RPC name as set on the class attribute `name`
"""
service = butler.Service(
self.router_address, worker_class.name
)
service.register(worker_class())
service.run()
def run(self):
log.info('starting ServiceWorker: %s' % self.service_obj)
gevent.joinall([
gevent.spawn(self.start_worker, service_class)
for service_class in self.service_obj
])
def run():
# list of classes to expose for RPC
services_objects = [ServiceA, ServiceB, ServiceC]
(options, args) = parser.parse_args()
# if no existing router provided start one locally
if not options.router_address:
router = ServiceRouter()
router.start()
# with no local router running connect to the address of an existing one
if options.router_address and options.enable_service:
worker = ServiceWorker(
services_objects,
router_address = options.router_address
)
worker.start()
# start services and connect to default router address (tcp://0.0.0.0:5556)
elif options.enable_service:
worker = ServiceWorker(
services_objects
)
worker.start()
if __name__ == '__main__':
run()
# call the `test` method on any of the 3 registered example objects
import butler
client = butler.Client('tcp://0.0.0.0:5555').rpc('service_b')
print client.test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment