Last active
December 17, 2015 06:59
-
-
Save twillis/5569288 to your computer and use it in GitHub Desktop.
service registration just works. yay.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
contains decorators for pyramid to discover and add various things to the registry, primarily the service decorator | |
""" | |
import venusian | |
from repoze.lru import lru_cache | |
from zope.interface import implements, Interface, Attribute, classImplements | |
import pyramid | |
from pyramid import testing | |
from pyramid import view | |
import unittest | |
from webob import Request | |
from pyramid.response import Response | |
class service(object): | |
"""declaration: | |
Example: | |
from interfaces import IUserProvider, INeedUsers | |
@service("user", IUserProvider, (INeedUsers,)) | |
class DataStoreUserProvider(object): | |
pass | |
when the application starts up and pyramid is in the scan phase | |
picking up views it will also pick up the service decorator and | |
register those objects. | |
getting an instance of a service that implements IUserProvider can | |
then be done like this this is handy for traversal as it keeps the | |
keys consistent | |
class Community(object): | |
implements(INeedUsers) | |
user_provider = service.locate_service_by_key(request, Community(), "user") | |
# interact with it | |
you can also retrieve by the specified interface... | |
user_provider = service.locate_service(request, Community(), IUserProvider) | |
""" | |
REG_KEY = "services" | |
def __init__(self, name, interface, adapts): | |
self.name = name | |
self.interface = interface | |
self.adapts = adapts | |
self.cls = None | |
def __call__(self, cls): | |
self.cls = cls | |
venusian.attach(self.cls, self.on_scan) | |
return self.cls | |
def on_scan(self, scanner, name, ob): | |
reg = scanner.config.registry | |
if not reg.settings.get(self.REG_KEY, None): | |
scanner.config.add_settings({self.REG_KEY:{}}) | |
if self.name not in reg.settings[self.REG_KEY]: | |
classImplements(self.cls, self.interface) | |
reg.registerAdapter(self.cls, | |
self.adapts, | |
self.interface) | |
reg.settings[self.REG_KEY][self.name] = self.interface | |
else: | |
raise ValueError("%s already registered as %s" % (self.name, | |
reg[self.REG_KEY])) | |
@staticmethod | |
@lru_cache(1000) | |
def locate_service(request, obj, interface): | |
return request.registry.queryAdapter(obj, | |
interface) | |
@staticmethod | |
@lru_cache(1000) | |
def locate_service_by_key(request, obj, key): | |
reg = request.registry.settings[service.REG_KEY] | |
result = service.locate_service(request, obj, reg[key]) | |
if result: | |
result.__name__ = key | |
return result | |
class IResource(Interface): | |
__parent__ = Attribute("reference to parent object") | |
__name__ = Attribute("name of this object") | |
def __getitem__(key): | |
pass | |
# these would actually have methods and attributes declared | |
class IUserProvider(IResource): | |
pass | |
class IHasUsers(IResource): | |
pass | |
class ICommunityProvider(IResource): | |
pass | |
class IHasCommunities(IResource): | |
pass | |
class IRoot(IResource): | |
pass | |
class BaseResource(object): | |
implements(IResource) | |
__parent__ = None | |
def __init__(self, request, parent=None, name=None): | |
""" | |
standard constructor to make resource locatable and have a url per | |
pyramids traversal requirements | |
""" | |
self.request = request | |
if parent: | |
self.__parent__ = parent | |
self.__name__ = name | |
@lru_cache(1000) | |
def __getitem__(self, key): | |
result = service.locate_service_by_key(self.request, self, key) | |
if result: | |
return result | |
else: | |
raise KeyError(key) | |
@lru_cache(1000) | |
def find_parent(self, parent_type): | |
"""finds nearest parent of type or interface | |
if __kind__ then thats considered though interface takes | |
precedence | |
""" | |
is_interface = issubclass(parent_type, Interface) | |
for p in pyramid.location.lineage(self): | |
if is_interface and parent_type.providedBy(p): | |
return p | |
elif getattr(p, "__kind__", None) == parent_type: | |
return p | |
elif isinstance(p, parent_type): | |
return p | |
@service("user", IUserProvider, (IHasUsers, )) | |
class UserProvider(BaseResource): | |
def __init__(self, parent): | |
super(UserProvider, self).__init__(parent.request, parent=parent) | |
@service("community", ICommunityProvider, (IHasCommunities, )) | |
class CommunityProvider(BaseResource): | |
def __init__(self, parent): | |
super(CommunityProvider, self).__init__(parent.request, | |
parent=parent) | |
class APPRoot(BaseResource): | |
implements(IRoot, IHasUsers, IHasCommunities) | |
def __init__(self, request): | |
super(APPRoot, self).__init__(request) | |
@view.view_config(context=IRoot) | |
def view_root_communities(context, request): | |
return Response("I am Root %s" % context.__class__.__name__) | |
@view.view_config(context=ICommunityProvider) | |
def view_has_communities(context, request): | |
return Response("I provide Communities %s" % context.__class__.__name__) | |
@view.view_config(context=IUserProvider) | |
def view_has_users(context, request): | |
return Response("I Provide Users %s" % context.__class__.__name__) | |
HTTP_HOST = "testbed.example.com" # uploads don't get the cookie otherwise | |
REMOTE_ADDR = "192.168.1.101" | |
def make_request(**kw): | |
environ = dict(HTTP_HOST=HTTP_HOST, | |
REMOTE_ADDR=REMOTE_ADDR) | |
if "environ" in kw: | |
environ.update(kw["environ"]) | |
del kw["environ"] | |
environ["HTTP_HOST"] = HTTP_HOST | |
environ["REMOTE_ADDR"] = REMOTE_ADDR | |
environ["wsgi.url_scheme"] = "http" | |
#needed for path to work correctly under tests | |
environ["SCRIPT_NAME"] = "/model/" | |
r = pyramid.request.Request(environ, **kw) | |
return r | |
class PyramidTC(unittest.TestCase): | |
""" | |
setup and teardown pyramid | |
configurator instance for current test as 'config' | |
""" | |
def setUp(self): | |
self.config = testing.setUp() | |
self._event_list = self.config.testing_add_subscriber() | |
def tearDown(self): | |
testing.tearDown() | |
def get_events(self): | |
""" | |
returns list of events that have been raised thus far | |
""" | |
return self._event_list[:] # return a copy | |
def track_events(self, IFace): | |
""" | |
returns a list that will be updated with events of type IFace | |
""" | |
return self.config.testing_add_subscriber(IFace) | |
class TestServiceTraversal(PyramidTC): | |
def setUp(self): | |
super(TestServiceTraversal, self).setUp() | |
self.request = make_request() | |
def testServiceTraversal(self): | |
self.request.registry = self.config.registry | |
self.config.scan() | |
root = APPRoot(self.request) | |
for name in ("user", "community"): | |
self.assert_(root[name]) | |
self.assertEquals(root[name].__parent__, root) | |
self.assertEquals(root[name].__name__, name) | |
self.assertEquals(root[name].find_parent(IRoot), root) | |
self.assertEquals(root[name].find_parent(APPRoot), root) | |
self.assert_(service.locate_service(self.request, root, IUserProvider)) | |
def testThroughApp(self): | |
self.config.set_root_factory(APPRoot) | |
self.config.scan() | |
app = self.config.make_wsgi_app() | |
for name in ( "/", "/user", "/community"): | |
res = Request.blank(name).send(app) | |
self.assertEquals(res.status_int, 200) | |
def serve_app(): | |
cfg = pyramid.config.Configurator(root_factory=APPRoot) | |
cfg.scan() | |
app = cfg.make_wsgi_app() | |
from wsgiref.simple_server import make_server | |
make_server("0.0.0.0", 6543, app).serve_forever() | |
if __name__ == "__main__": | |
serve_app() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment