Created
August 31, 2010 16:00
-
-
Save gcarothers/559256 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Provides common classes and functions for modelling an RDF graph using | |
Python objects.""" | |
__all__ = ['parse_curie', 'to_curie','MetaSubject', 'Subject', 'PredicateProperty', | |
'ScalarPredicateProperty', 'ResourcePredicateProperty', | |
'ScalarResourcePredicateProperty', 'ResourceReversePredicateProperty', | |
'SelfReferentialPredicateProperty', 'ScalarSelfReferentialPredicateProperty', | |
'RDFClassMismatchError',] | |
import os.path | |
import urlparse | |
import re | |
import logging | |
from cStringIO import StringIO | |
from string import Template | |
import rdflib | |
from rdflib.URIRef import URIRef as Original_URIRef | |
import httplib2 | |
import rdfsandvich.uri_schemes as uri_schemes | |
import rdfsandvich.util as util | |
log = logging.getLogger(__name__) | |
class SaneURIRef(Original_URIRef): | |
__doc__ = Original_URIRef.__doc__ + '\n Monkey-patched by rdfsandvich.RDF import to have a sane __eq__ method.' | |
def __eq__(self, other): | |
if isinstance(other, Original_URIRef) or isinstance(other, str) or isinstance(other, unicode): | |
return unicode(self) == unicode(other) | |
return NotImplemented | |
rdflib.URIRef = SaneURIRef # Monkey patch! | |
def parse_curie(curie, namespaces): | |
""" | |
Parses a CURIE within the context of the given namespaces. Will also accept | |
explicit URIs and wrap them in an rdflib URIRef. | |
Specifically: | |
1) If the CURIE is not of the form [stuff] and the prefix is in the list of | |
standard URIs, it is wrapped in a URIRef and returned unchanged. | |
2) Otherwise, the CURIE is parsed by the rules of CURIE Syntax 1.0: | |
http://www.w3.org/TR/2007/WD-curie-20070307/ The default namespace is the | |
namespace keyed by the empty string in the namespaces dictionary. | |
3) If the CURIE's namespace cannot be resolved, a ValueError is raised. | |
""" | |
definitely_curie = False | |
if curie[0] == '[' and curie[-1] == ']': | |
curie = curie[1:-1] | |
definitely_curie = True | |
prefix, sep, reference = curie.partition(':') | |
if not definitely_curie: | |
if prefix in uri_schemes.schemes: | |
return rdflib.URIRef(curie) | |
if not reference and '' in namespaces: | |
reference = prefix | |
return namespaces[''][reference] | |
if prefix in namespaces: | |
return namespaces[prefix][reference] | |
else: | |
raise ValueError('Could not parse CURIE prefix %s from namespaces %s' % (prefix, namespaces)) | |
def to_curie(uri, namespaces, seperator=":", explicit=False): | |
"""Converts a URI to a CURIE using the prefixes defined in namespaces. If | |
there is no matching prefix, return the URI unchanged. | |
namespaces - a dictionary of prefix -> namespace mappings. | |
separator - the character to use as the separator between the prefix and | |
the local name. | |
explicit - if True and the URI can be abbreviated, wrap the abbreviated form | |
in []s to indicate that it is definitely a CURIE.""" | |
for prefix, namespace in namespaces.items(): | |
if uri.startswith(namespace): | |
if explicit: | |
return '[' + uri.replace(namespace, prefix + seperator) + ']' | |
else: | |
return uri.replace(namespace, prefix + seperator) | |
return uri | |
class MetaResource(type): | |
"""Aggregates namespace information. | |
MetaResource makes namespaces a special attribute. namespaces is a | |
dictionary that is the 'union' of all namespaces attributes on a | |
class and its parents.""" | |
def __new__(cls, name, bases, dct): | |
namespaces = {} | |
for base in bases: | |
if hasattr(base, 'namespaces'): | |
namespaces.update(base.namespaces) | |
if 'namespaces' in dct: | |
for namespace in dct['namespaces']: | |
namespaces[namespace] = rdflib.Namespace( | |
dct['namespaces'][namespace]) | |
dct['namespaces'] = namespaces | |
return type.__new__(cls, name, bases, dct) | |
class ClassificationMismatchError(Exception): | |
"""Raised when an attempt is made to create a Subject for a resource that | |
does not match its classification constraints.""" | |
pass | |
class URLRetrievalError(Exception): | |
"""Raised when an attempt to retrieve a resource returns a status other | |
than 200 OK.""" | |
pass | |
class PredicateProperty(property): | |
""" | |
Provides property-style access to the objects related to a Subject | |
by a given predicate. If a language is specififed only values in that | |
language will be returned. | |
""" | |
def __init__(self, predicate, docstring=None, language=None): | |
self.predicate = predicate | |
self.language = language | |
self.__doc__ = ("""Returns list of values for predicate :term:`%s` | |
""" % (predicate,)) + (docstring or "") | |
def __get__(self, instance, owner): | |
if instance is None: | |
return self | |
if self.language: | |
value = [value for value in instance[self.predicate] if value.language == self.language] | |
else: | |
value = list(instance[self.predicate]) | |
return value | |
class ScalarPredicateProperty(PredicateProperty): | |
""" | |
Provides property-style access to a predicate that can only have one value. | |
If a language is specififed only values in that language will be returned. | |
None is returned if there are no values, and ValueError raised if | |
there are multiples. | |
""" | |
def __init__(self, predicate, docstring=None, language=None): | |
self.predicate = predicate | |
self.language = language | |
self.__doc__ = ("""Returns a value for predicate :term:`%s` | |
""" % (predicate,)) + (docstring or "") | |
def __get__(self, instance, owner): | |
if instance is None: | |
return self | |
value = util.one_or_none( | |
super(ScalarPredicateProperty, self).__get__(instance, owner)) | |
return value | |
class ResourcePredicateProperty(property): | |
""" | |
Provides property-style access to a predicate whose object is another | |
resource. These yeild further Resources. | |
cls is the class to be used to represent the resulting Resources. The | |
Resources are provided as a generator. Any Resources that do not meet the | |
classification constraints of cls (and, thus, raise a | |
ClassificationMismatchError on instantiation) are silently omitted from the | |
resulting generator. | |
""" | |
def __init__(self, predicate, cls, omit=True, docstring=None): | |
self.omit = omit | |
self.predicate = predicate | |
self.cls = cls | |
self.__doc__ = ("""Returns a list of values for predicate :term:`%s` as :class:`~%s.%s`""" % (predicate,cls.__module__,cls.__name__)) + (docstring or "") | |
def __get__(self, instance, owner): | |
if instance is None: | |
return self | |
else: | |
return list(self._resources(instance, owner)) | |
def _resources(self, instance, owner): | |
resource_generator = instance[self.predicate] | |
for resource in resource_generator: | |
seq = instance.graph.seq(resource) | |
if seq: | |
for seq_res in seq: | |
try: | |
yield self.cls(instance.graph, seq_res) | |
except ClassificationMismatchError: | |
if self.omit: | |
continue | |
else: | |
yield seq_res | |
else: | |
try: | |
yield self.cls(instance.graph, resource) | |
except ClassificationMismatchError: | |
if self.omit: | |
continue | |
else: | |
yield seq_res | |
class ScalarResourcePredicateProperty(ResourcePredicateProperty): | |
""" | |
Provides property-style access to a Resource predicate that can | |
yield only one resource. | |
""" | |
def __init__(self, predicate, cls, docstring=None): | |
super(ScalarResourcePredicateProperty, self).__init__(predicate, cls) | |
self.__doc__ = ("""Returns a value for predicate :term:`%s` as :class:`~%s.%s`""" % (predicate,cls.__module__,cls.__name__)) + (docstring or "") | |
def __get__(self, instance, owner): | |
if instance is None: | |
return self | |
value = util.one_or_none(list( | |
super(ScalarResourcePredicateProperty, self).__get__(instance, owner))) | |
return value | |
class ResourceReversePredicateProperty(property): | |
""" | |
Provides property-style access to a Resource predicate that yields the | |
Resource by following the predicate backwards, using the current subject | |
as the object of the graph query. | |
""" | |
def __init__(self, predicate, cls, docstring=None): | |
self.predicate = predicate | |
self.cls = cls | |
self.__doc__ = ("""Returns list of subjects with predicate :term:`%s` as a :class:`~%s.%s` where the predicate is the current object""" % (predicate,cls.__module__,cls.__name__)) + (docstring or "") | |
def __get__(self, instance, owner): | |
if instance is None: | |
return self | |
else: | |
return list(self._resources(instance, owner)) | |
def _resources(self, instance, owner): | |
resource_generator = instance.graph.subjects( | |
instance.resolve(self.predicate), instance.subject) | |
for resource in resource_generator: | |
try: | |
yield self.cls(instance.graph, resource) | |
except ClassificationMismatchError: | |
continue | |
class SelfReferentialPredicateProperty(property): | |
""" | |
Provides property-style access to a Resource predicate that yields the same | |
type of things as the predicate is on. | |
""" | |
def __init__(self, predicate, docstring=None): | |
self.predicate = predicate | |
self.__doc__ = docstring | |
def __get__(self, instance, owner): | |
if instance is None: | |
return self | |
else: | |
return self._resources(instance, owner) | |
def _resources(self, instance, owner): | |
resource_generator = instance[self.predicate] | |
for resource in resource_generator: | |
seq = instance.graph.seq(resource) | |
if seq: | |
for seq_res in seq: | |
try: | |
yield type(instance)(instance.graph, seq_res) | |
except ClassificationMismatchError: | |
continue | |
else: | |
try: | |
yield type(instance)(instance.graph, resource) | |
except ClassificationMismatchError: | |
continue | |
class ScalarSelfReferentialPredicateProperty(SelfReferentialPredicateProperty): | |
""" | |
Provides property-style access to a Resource predicate that yields the same | |
type of things as the predicate is on. | |
""" | |
def __get__(self, instance, owner): | |
if instance is None: | |
return self | |
value = util.one_or_none(list( | |
super(ScalarSelfReferentialPredicateProperty, self).__get__(instance, owner))) | |
return value | |
class InverseFunctionalLookupProperty(property): | |
"""Look up and return instances of another resource based on the values of a | |
predicate on this resource using an Inverse Functional Property. | |
lookup_method is expected to take (graph, value) and return an iteratable | |
of results.""" | |
def __init__(self, predicate, lookup_method): | |
self.predicate = predicate | |
self.lookup_method = lookup_method | |
def __get__(self, instance, owner): | |
for value in instance[self.predicate]: | |
for result in self.lookup_method(instance.graph, value): | |
yield result | |
def en(value): | |
"""Returns an RDF literal from the en-US language for the given value.""" | |
return rdflib.Literal(value, lang='en') | |
class Resource(object): | |
""" | |
Represents a resource in the RDF graph. | |
To inherit from this class, directly or indirectly, do the following: | |
1) Define any namespaces needed to resolve any predicates or other CURIEs. | |
These should go in an attribute called 'namespaces', which should contain a | |
dictionary mapping prefixes to URLs. The namespaces dictionaries on all | |
parents are merged together, with those at the bottom prioritized. The | |
values in the dictionaries will automatically be turned into | |
rdflib.Namespace objects. | |
2) Resource supports automatic "classification", which can be used to ensure | |
that Resources match certain criteria. Resource itself supports | |
classification by predicate - set the classification_predicate attribute to | |
the predicate to check (it defaults to rdf:type) and the | |
classification_value attribute to the expected value. If | |
classification_value is set, attempting to instantiate that Resource class | |
for any subject without that value for the classification_predicate will | |
result in an exception. Both classification_value and classification_predicate | |
are parsed as CURIEs. Setting these also allows for all instances of the | |
Resource in a graph to be enumerated through the in_graph method. | |
3) Resource supports automatic retrieval of RDF descriptions for resources | |
described by HTTP URLs. To enable this, set the retrieve_http attribute on | |
the graph object to True. When instantiation of a Resource object for one | |
of these resources fails due to classification constraints, rdfsandvich will | |
attempt to resolve the failure by retrieving the contents of the HTTP URL | |
and parsing them as RDF. To limit the URLs that will be queried, set the | |
"retrieve_http_whitelist" or "retrieve_http_blacklist" attributes to a list | |
of regular expressions. The whitelist receives priority. | |
If you need some other method of classification, override the | |
check_classification and in_graph methods. | |
""" | |
__metaclass__ = MetaResource | |
namespaces = {'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#', | |
'rdfs': 'http://www.w3.org/2000/01/rdf-schema#'} | |
classification_predicate = 'rdf:type' | |
label = ScalarPredicateProperty('rdfs:label') | |
rdf_type = PredicateProperty('rdf:type') | |
def __init__(self, graph, subject): | |
self.graph = graph | |
self.subject = subject | |
if not self.check_classification(): | |
parsed_subject = urlparse.urlparse(self.subject) | |
if hasattr(graph, "retrieve_http") and graph.retrieve_http and \ | |
(not hasattr(graph, "retrieved_uris") or \ | |
self.subject not in graph.retrieved_uris) and \ | |
parsed_subject.scheme == 'http': | |
if not hasattr(graph, "retrieved_uris"): | |
graph.retrieved_uris = set() | |
if self.valid_retrieve_url(graph, self.subject): | |
if hasattr(graph, 'http_cache'): | |
cache = graph.http_cache | |
else: | |
cache = None | |
http = httplib2.Http(cache=cache) | |
resp, metadata = http.request(uri=str(self.subject), | |
method='GET') | |
if resp['status'] == '200': | |
publicID=urlparse.urlunparse((parsed_subject.scheme, | |
parsed_subject.netloc, | |
parsed_subject.path, | |
'', '', '')) | |
graph.parse(StringIO(metadata), publicID=publicID) | |
graph.retrieved_uris.add(self.subject) | |
if self.check_classification(): | |
return | |
else: | |
log.warn('Could not retrieve %s: %s', self.subject, | |
resp['status']) | |
raise ClassificationMismatchError() | |
def valid_retrieve_url(self, graph, url): | |
if hasattr(graph, 'retrieve_http_whitelist'): | |
for entry in graph.retrieve_http_whitelist: | |
regex = re.compile(entry) | |
if regex.match(url): | |
log.debug('%s passed whitelist under %s', url, entry) | |
return True | |
return False | |
if hasattr(graph, 'retrieve_http_blacklist'): | |
for entry in graph.retrieve_http_blacklist: | |
regex = re.compile(entry) | |
if regex.match(url): | |
log.debug('%s failed blacklist under %s', url, entry) | |
return False | |
return True | |
def check_classification(self): | |
if hasattr(self, 'classification_value'): | |
classes = set(self[self.classification_predicate]) | |
if isinstance(self.classification_value, str) or\ | |
isinstance(self.classification_value, unicode): | |
classification_values = [self.classification_value] | |
else: | |
classification_values = self.classification_value | |
classification_values = set(self.resolve(classification_value) for\ | |
classification_value in\ | |
classification_values) | |
if classification_values.intersection(classes): | |
return True | |
else: | |
return True | |
@classmethod | |
def for_uri(cls, graph, identifier): | |
"""Basically the same as __init__, but handles wrapping identifier in | |
a rdflib.URIRef if necessary and handles type errors more gracefully.""" | |
try: | |
possible = cls(graph, identifier | |
if isinstance(identifier, rdflib.URIRef) | |
else rdflib.URIRef(identifier)) | |
return possible | |
except ClassificationMismatchError: | |
return None | |
@classmethod | |
def in_graph(cls, graph): | |
"""Iterates over all the instances of this Resource found in the | |
specified graph. This requires that the Resource have a RDF_type | |
specified.""" | |
if not cls.classification_value: | |
raise Exception() | |
for subj in graph.subjects(cls.resolve(cls.classification_predicate), | |
cls.resolve(cls.classification_value)): | |
try: | |
yield cls(graph, subj) | |
except ClassificationMismatchError: | |
continue | |
@classmethod | |
def resolve(cls, key): | |
"""Use the classes namespaces to resolve a curie""" | |
return parse_curie(key, cls.namespaces) | |
def __eq__(self, other): | |
if isinstance(other, Resource): | |
return self.subject == other.subject | |
elif isinstance(other, rdflib.URIRef) or isinstance(other, str) or\ | |
isinstance(other, unicode): | |
return self.subject == other | |
return NotImplemented | |
def __ne__(self, other): | |
return not self.__eq__(other) | |
def __hash__(self): | |
return hash(self.subject) | |
def __getitem__(self, key): | |
"""Fetch predicates off this subject by key dictionary-style.""" | |
return self.objects(self.resolve(key)) | |
def __repr__(self): | |
return "<%r: %r %r>" % (type(self), self.graph, self.subject) | |
def __str__(self): | |
if self.label: | |
return self.label | |
else: | |
return self.subject | |
def objects(self, predicate): | |
"""Fetch the objects for a predicate on this subject.""" | |
return self.graph.objects(self.subject, predicate) | |
def predicate_objects(self): | |
"""Fetch all predicate, object pairs for this subject.""" | |
return self.graph.predicate_objects(self.subject) | |
class Property(Resource): | |
"""A rdf:Property.""" | |
classification_value = '[rdf:Property]' | |
class OWLObjectProperty(Resource): | |
"""An owl:objectProperty.""" | |
namespaces = {'owl': 'http://www.w3.org/2002/07/owl#',} | |
classification_value = '[owl:ObjectProperty]' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
import rdflib | |
import rdfsandvich.RDF | |
class TestRDF(unittest.TestCase): | |
def testCurieURI(self): | |
"""Test CURIE parsing of explicit URIs.""" | |
test_ns = {'http': rdflib.Namespace('WRONG!'), | |
'urn': rdflib.Namespace('WRONG!'),} | |
self.assertEqual(rdfsandvich.RDF.parse_curie('http://oreilly.com', test_ns), | |
rdflib.URIRef('http://oreilly.com')) | |
self.assertEqual(rdfsandvich.RDF.parse_curie('urn:isbn:1234567890123', test_ns), | |
rdflib.URIRef('urn:isbn:1234567890123')) | |
def testCurieDefaultNamespace(self): | |
"""Test CURIE parsing of CURIEs in the default namespace.""" | |
test_ns = {'': rdflib.Namespace('foo/'), | |
'wrong': rdflib.Namespace('WRONG!')} | |
self.assertEqual(rdfsandvich.RDF.parse_curie('bar', test_ns), | |
rdflib.URIRef('foo/bar')) | |
self.assertEqual(rdfsandvich.RDF.parse_curie('[bar]', test_ns), | |
rdflib.URIRef('foo/bar')) | |
self.assertEqual(rdfsandvich.RDF.parse_curie('baz', test_ns), | |
rdflib.URIRef('foo/baz')) | |
self.assertEqual(rdfsandvich.RDF.parse_curie('[aap]', test_ns), | |
rdflib.URIRef('foo/aap')) | |
def testCurieNamespaces(self): | |
"""Test CURIE parsing of CURIEs in non-default namespaces.""" | |
test_ns = {'': rdflib.Namespace('WRONG!'), | |
'foo': rdflib.Namespace('foobly/'), | |
'bar': rdflib.Namespace('bardle/'), | |
'http': rdflib.Namespace('reallybadidea/'),} | |
self.assertEqual(rdfsandvich.RDF.parse_curie('foo:aap', test_ns), | |
rdflib.URIRef('foobly/aap')) | |
self.assertEqual(rdfsandvich.RDF.parse_curie('[bar:aap]', test_ns), | |
rdflib.URIRef('bardle/aap')) | |
self.assertEqual(rdfsandvich.RDF.parse_curie('[foo:baz]', test_ns), | |
rdflib.URIRef('foobly/baz')) | |
self.assertEqual(rdfsandvich.RDF.parse_curie('bar:baz', test_ns), | |
rdflib.URIRef('bardle/baz')) | |
self.assertEqual(rdfsandvich.RDF.parse_curie('[http://oreilly.com]', test_ns), | |
rdflib.URIRef('reallybadidea///oreilly.com')) | |
def testCurieNoSuffix(self): | |
"""Test CURIE parsing of CURIEs with no suffix.""" | |
pass | |
def testUnparseableCuries(self): | |
"""Test some CURIEs that shouldn't parse.""" | |
test_ns = {'foo': rdflib.Namespace('WRONG!'),} | |
self.assertRaises(ValueError, rdfsandvich.RDF.parse_curie, '[bar]', test_ns) | |
self.assertRaises(ValueError, rdfsandvich.RDF.parse_curie, 'bar', test_ns) | |
self.assertRaises(ValueError, rdfsandvich.RDF.parse_curie, 'bar:baz', test_ns) | |
self.assertRaises(ValueError, rdfsandvich.RDF.parse_curie, '[bar:baz]', test_ns) | |
def testMetaResourceNothingUseful(self): | |
"""Test applying a MetaResource to a class without anything it uses.""" | |
class Foo(object): | |
__metaclass__ = rdfsandvich.RDF.MetaResource | |
def testMetaResourceNamespaces(self): | |
"""Test the handling of namespaces by MetaResource.""" | |
class Foo(object): | |
__metaclass__ = rdfsandvich.RDF.MetaResource | |
namespaces = {'foo': 'bar', 'baz': 'garply', 'meme': 'lolcatz!',} | |
self.assertEqual(Foo.namespaces, {'foo': rdflib.Namespace('bar'), | |
'baz': rdflib.Namespace('garply'), | |
'meme': rdflib.Namespace('lolcatz!'),}) | |
def testMetaResourceNamespaceInheritance(self): | |
"""Test the composition of namespace dictionaries by MetaResource.""" | |
class Foo(object): | |
__metaclass__ = rdfsandvich.RDF.MetaResource | |
namespaces = {'foo': 'bar', 'baz': 'garply', 'meme': 'lolcatz!',} | |
class Bar(Foo): | |
namespaces = {'allyourbase': 'arebelongtous!', 'bunny': 'pancake',} | |
self.assertEqual(Foo.namespaces, {'foo': rdflib.Namespace('bar'), | |
'baz': rdflib.Namespace('garply'), | |
'meme': rdflib.Namespace('lolcatz!'),}) | |
self.assertEqual(Bar.namespaces, {'foo': rdflib.Namespace('bar'), | |
'baz': rdflib.Namespace('garply'), | |
'meme': rdflib.Namespace('lolcatz!'), | |
'allyourbase': rdflib.Namespace('arebelongtous!'), | |
'bunny': rdflib.Namespace('pancake'),}) | |
def testMetaResourceNamespaceInheritanceReplacement(self): | |
"""Test the composition of namespace dictionaries by MetaResource where | |
some namespaces on the parent get replaced.""" | |
class Foo(object): | |
__metaclass__ = rdfsandvich.RDF.MetaResource | |
namespaces = {'foo': 'bar', 'baz': 'garply', 'meme': 'lolcatz!',} | |
class Bar(Foo): | |
namespaces = {'allyourbase': 'arebelongtous!', 'bunny': 'pancake', | |
'foo': 'notbar', 'baz': 'notgarply',} | |
self.assertEqual(Foo.namespaces, {'foo': rdflib.Namespace('bar'), | |
'baz': rdflib.Namespace('garply'), | |
'meme': rdflib.Namespace('lolcatz!'),}) | |
self.assertEqual(Bar.namespaces, {'foo': rdflib.Namespace('notbar'), | |
'baz': rdflib.Namespace('notgarply'), | |
'meme': rdflib.Namespace('lolcatz!'), | |
'allyourbase': rdflib.Namespace('arebelongtous!'), | |
'bunny': rdflib.Namespace('pancake'),}) | |
def testLanguageScalarProperties(self): | |
"""Language filtering in ScalarProperties works""" | |
class Foo(rdfsandvich.RDF.Resource): | |
namespaces = {'test': 'http://example.com/test',} | |
english_name = rdfsandvich.RDF.ScalarPredicateProperty('test:name',language="en") | |
testns = rdflib.Namespace('http://example.com/test') | |
graph = rdflib.ConjunctiveGraph() | |
graph.add((rdflib.URIRef('foo'), testns['name'], rdflib.Literal("english",lang="en"))) | |
graph.add((rdflib.URIRef('foo'), testns['name'], rdflib.Literal("german",lang="de"))) | |
foo = Foo.for_uri(graph,'foo') | |
self.assertEquals(foo.english_name, rdfsandvich.RDF.en("english")) | |
def testClassification(self): | |
"""Test the classification feature.""" | |
graph = rdflib.ConjunctiveGraph() | |
rdfns = rdflib.Namespace('http://www.w3.org/1999/02/22-rdf-syntax-ns#') | |
testns = rdflib.Namespace('test!') | |
graph.set((rdflib.URIRef('foo'), rdfns['type'], testns['lolcats'])) | |
graph.set((rdflib.URIRef('foo'), testns['test'], "Brains.")) | |
graph.set((rdflib.URIRef('bar'), rdfns['type'], testns['lolcats'])) | |
graph.set((rdflib.URIRef('bar'), testns['test'], "Brains!")) | |
graph.set((rdflib.URIRef('baz'), rdfns['type'], testns['orlyowl'])) | |
graph.set((rdflib.URIRef('baz'), testns['test'], "Brains!!!")) | |
class Test(rdfsandvich.RDF.Resource): | |
namespaces = {'test': 'test!',} | |
classification_value = 'test:lolcats' | |
test = rdfsandvich.RDF.ScalarPredicateProperty('test:test') | |
t_foo = Test.for_uri(graph, 'foo') | |
self.assertEqual(t_foo.test, 'Brains.') | |
t_bar = Test.for_uri(graph, 'bar') | |
self.assertEqual(t_bar.test, 'Brains!') | |
self.assert_(Test.for_uri(graph, 'baz') is None) | |
self.assertRaises(rdfsandvich.RDF.ClassificationMismatchError, | |
Test, graph, rdflib.URIRef('baz')) | |
l = list(Test.in_graph(graph)) | |
self.assertEqual(len(l), 2) | |
self.assert_(t_foo in l) | |
self.assert_(t_bar in l) | |
def testClassificationPredicate(self): | |
"""Test the classification feature with different predicates.""" | |
graph = rdflib.ConjunctiveGraph() | |
testns = rdflib.Namespace('test!') | |
graph.set((rdflib.URIRef('foo'), testns['type'], testns['lolcats'])) | |
graph.set((rdflib.URIRef('foo'), testns['test'], "Brains.")) | |
graph.set((rdflib.URIRef('bar'), testns['type'], testns['lolcats'])) | |
graph.set((rdflib.URIRef('bar'), testns['test'], "Brains!")) | |
graph.set((rdflib.URIRef('baz'), testns['type'], testns['orlyowl'])) | |
graph.set((rdflib.URIRef('baz'), testns['test'], "Brains!!!")) | |
class Test(rdfsandvich.RDF.Resource): | |
namespaces = {'test': 'test!',} | |
classification_predicate = 'test:type' | |
classification_value = 'test:lolcats' | |
test = rdfsandvich.RDF.ScalarPredicateProperty('test:test') | |
t_foo = Test.for_uri(graph, 'foo') | |
self.assertEqual(t_foo.test, 'Brains.') | |
t_bar = Test.for_uri(graph, 'bar') | |
self.assertEqual(t_bar.test, 'Brains!') | |
self.assert_(Test.for_uri(graph, 'baz') is None) | |
self.assertRaises(rdfsandvich.RDF.ClassificationMismatchError, | |
Test, graph, rdflib.URIRef('baz')) | |
l = list(Test.in_graph(graph)) | |
self.assertEqual(len(l), 2) | |
self.assert_(t_foo in l) | |
self.assert_(t_bar in l) | |
def testResourceEquality(self): | |
graph = rdflib.ConjunctiveGraph() | |
otherGraph = rdflib.ConjunctiveGraph() | |
testResource = rdfsandvich.RDF.Resource.for_uri(graph, 'foo') | |
self.assertEqual(testResource, rdfsandvich.RDF.Resource.for_uri( | |
graph, 'foo')) | |
self.assertEqual(testResource, rdflib.URIRef('foo')) | |
self.assertEqual(testResource, 'foo') | |
self.assertNotEqual(testResource, rdfsandvich.RDF.Resource.for_uri( | |
graph, 'bar')) | |
self.assertEqual(testResource, rdfsandvich.RDF.Resource.for_uri( | |
otherGraph, 'foo')) | |
self.assertNotEqual(testResource, rdflib.URIRef('bar')) | |
self.assertNotEqual(testResource, 'bar') | |
self.assertNotEqual(testResource, 42) | |
def testAutomaticResourceRetreival(self): | |
class FakeProductType(rdfsandvich.RDF.Resource): | |
namespaces = {'rdfs': 'http://www.w3.org/2000/01/rdf-schema#', | |
'dcam': 'http://purl.org/dc/dcam/',} | |
classification_predicate = 'dcam:memberOf' | |
classification_value = 'http://purl.oreilly.com/product-types/' | |
label = rdfsandvich.RDF.ScalarPredicateProperty('rdfs:label') | |
graph = rdflib.ConjunctiveGraph() | |
graph.retrieve_http = True | |
testResource = FakeProductType.for_uri(graph, 'http://purl.oreilly.com/product-types/BOOK') | |
self.assertEqual(testResource.label, | |
rdflib.Literal('Print', lang='en-US')) | |
self.assert_(rdflib.URIRef('http://purl.oreilly.com/product-types/BOOK') in graph.retrieved_uris) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment