Skip to content

Instantly share code, notes, and snippets.

@justanr
Last active May 24, 2016 04:10
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save justanr/13811f8a4717971844f1 to your computer and use it in GitHub Desktop.
Save justanr/13811f8a4717971844f1 to your computer and use it in GitHub Desktop.
Basic Command Bus example
import inspect
class HandlerNotFound(Exception):
pass
class BasicCommandBus(object):
def execute(self, command):
self._determine_handler(command).execute(command)
def _determine_handler(self, command):
command_module = inspect.getmodule(command)
command_type_name = type(command).__name__.split('Command')[0]
possible_handler_name = "{0}Handler".format(command_type_name)
handler = getattr(command_module, possible_handler_name, None)
if handler is None:
raise HandlerNotFound("Handler not found for {0}".format(command_type_name))
return handler()
from commandbus import BasicCommandBus
from hello import HelloPersonCommand
bus = BasicCommandBus()
bus.execute(HelloPersonCommand(receiver="fred", sender="fred") # prints out "Fred says: Hello Fred!"
from collections import namedtuple
HelloPersonCommand = namedtuple("HelloPersonCommand", ["receiver", "sender"])
class HelloPersonHandler(object):
def execute(self, command):
print("{0.sender} says: Hello {0.receiver}!".format(command))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment