Skip to content

Instantly share code, notes, and snippets.

@twillis
Last active December 17, 2015 06:59
Show Gist options
  • Save twillis/5569288 to your computer and use it in GitHub Desktop.
Save twillis/5569288 to your computer and use it in GitHub Desktop.
service registration just works. yay.
"""
contains decorators for pyramid to discover and add various things to the registry, primarily the service decorator
"""
import venusian
from repoze.lru import lru_cache
from zope.interface import implements, Interface, Attribute, classImplements
import pyramid
from pyramid import testing
from pyramid import view
import unittest
from webob import Request
from pyramid.response import Response
class service(object):
"""declaration:
Example:
from interfaces import IUserProvider, INeedUsers
@service("user", IUserProvider, (INeedUsers,))
class DataStoreUserProvider(object):
pass
when the application starts up and pyramid is in the scan phase
picking up views it will also pick up the service decorator and
register those objects.
getting an instance of a service that implements IUserProvider can
then be done like this this is handy for traversal as it keeps the
keys consistent
class Community(object):
implements(INeedUsers)
user_provider = service.locate_service_by_key(request, Community(), "user")
# interact with it
you can also retrieve by the specified interface...
user_provider = service.locate_service(request, Community(), IUserProvider)
"""
REG_KEY = "services"
def __init__(self, name, interface, adapts):
self.name = name
self.interface = interface
self.adapts = adapts
self.cls = None
def __call__(self, cls):
self.cls = cls
venusian.attach(self.cls, self.on_scan)
return self.cls
def on_scan(self, scanner, name, ob):
reg = scanner.config.registry
if not reg.settings.get(self.REG_KEY, None):
scanner.config.add_settings({self.REG_KEY:{}})
if self.name not in reg.settings[self.REG_KEY]:
classImplements(self.cls, self.interface)
reg.registerAdapter(self.cls,
self.adapts,
self.interface)
reg.settings[self.REG_KEY][self.name] = self.interface
else:
raise ValueError("%s already registered as %s" % (self.name,
reg[self.REG_KEY]))
@staticmethod
@lru_cache(1000)
def locate_service(request, obj, interface):
return request.registry.queryAdapter(obj,
interface)
@staticmethod
@lru_cache(1000)
def locate_service_by_key(request, obj, key):
reg = request.registry.settings[service.REG_KEY]
result = service.locate_service(request, obj, reg[key])
if result:
result.__name__ = key
return result
class IResource(Interface):
__parent__ = Attribute("reference to parent object")
__name__ = Attribute("name of this object")
def __getitem__(key):
pass
# these would actually have methods and attributes declared
class IUserProvider(IResource):
pass
class IHasUsers(IResource):
pass
class ICommunityProvider(IResource):
pass
class IHasCommunities(IResource):
pass
class IRoot(IResource):
pass
class BaseResource(object):
implements(IResource)
__parent__ = None
def __init__(self, request, parent=None, name=None):
"""
standard constructor to make resource locatable and have a url per
pyramids traversal requirements
"""
self.request = request
if parent:
self.__parent__ = parent
self.__name__ = name
@lru_cache(1000)
def __getitem__(self, key):
result = service.locate_service_by_key(self.request, self, key)
if result:
return result
else:
raise KeyError(key)
@lru_cache(1000)
def find_parent(self, parent_type):
"""finds nearest parent of type or interface
if __kind__ then thats considered though interface takes
precedence
"""
is_interface = issubclass(parent_type, Interface)
for p in pyramid.location.lineage(self):
if is_interface and parent_type.providedBy(p):
return p
elif getattr(p, "__kind__", None) == parent_type:
return p
elif isinstance(p, parent_type):
return p
@service("user", IUserProvider, (IHasUsers, ))
class UserProvider(BaseResource):
def __init__(self, parent):
super(UserProvider, self).__init__(parent.request, parent=parent)
@service("community", ICommunityProvider, (IHasCommunities, ))
class CommunityProvider(BaseResource):
def __init__(self, parent):
super(CommunityProvider, self).__init__(parent.request,
parent=parent)
class APPRoot(BaseResource):
implements(IRoot, IHasUsers, IHasCommunities)
def __init__(self, request):
super(APPRoot, self).__init__(request)
@view.view_config(context=IRoot)
def view_root_communities(context, request):
return Response("I am Root %s" % context.__class__.__name__)
@view.view_config(context=ICommunityProvider)
def view_has_communities(context, request):
return Response("I provide Communities %s" % context.__class__.__name__)
@view.view_config(context=IUserProvider)
def view_has_users(context, request):
return Response("I Provide Users %s" % context.__class__.__name__)
HTTP_HOST = "testbed.example.com" # uploads don't get the cookie otherwise
REMOTE_ADDR = "192.168.1.101"
def make_request(**kw):
environ = dict(HTTP_HOST=HTTP_HOST,
REMOTE_ADDR=REMOTE_ADDR)
if "environ" in kw:
environ.update(kw["environ"])
del kw["environ"]
environ["HTTP_HOST"] = HTTP_HOST
environ["REMOTE_ADDR"] = REMOTE_ADDR
environ["wsgi.url_scheme"] = "http"
#needed for path to work correctly under tests
environ["SCRIPT_NAME"] = "/model/"
r = pyramid.request.Request(environ, **kw)
return r
class PyramidTC(unittest.TestCase):
"""
setup and teardown pyramid
configurator instance for current test as 'config'
"""
def setUp(self):
self.config = testing.setUp()
self._event_list = self.config.testing_add_subscriber()
def tearDown(self):
testing.tearDown()
def get_events(self):
"""
returns list of events that have been raised thus far
"""
return self._event_list[:] # return a copy
def track_events(self, IFace):
"""
returns a list that will be updated with events of type IFace
"""
return self.config.testing_add_subscriber(IFace)
class TestServiceTraversal(PyramidTC):
def setUp(self):
super(TestServiceTraversal, self).setUp()
self.request = make_request()
def testServiceTraversal(self):
self.request.registry = self.config.registry
self.config.scan()
root = APPRoot(self.request)
for name in ("user", "community"):
self.assert_(root[name])
self.assertEquals(root[name].__parent__, root)
self.assertEquals(root[name].__name__, name)
self.assertEquals(root[name].find_parent(IRoot), root)
self.assertEquals(root[name].find_parent(APPRoot), root)
self.assert_(service.locate_service(self.request, root, IUserProvider))
def testThroughApp(self):
self.config.set_root_factory(APPRoot)
self.config.scan()
app = self.config.make_wsgi_app()
for name in ( "/", "/user", "/community"):
res = Request.blank(name).send(app)
self.assertEquals(res.status_int, 200)
def serve_app():
cfg = pyramid.config.Configurator(root_factory=APPRoot)
cfg.scan()
app = cfg.make_wsgi_app()
from wsgiref.simple_server import make_server
make_server("0.0.0.0", 6543, app).serve_forever()
if __name__ == "__main__":
serve_app()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment