Skip to content

Instantly share code, notes, and snippets.

@gkellogg
Created September 12, 2014 09:51
Show Gist options
  • Save gkellogg/00ed80d845e54519f59b to your computer and use it in GitHub Desktop.
Save gkellogg/00ed80d845e54519f59b to your computer and use it in GitHub Desktop.
Snippets for JSON-LD MongoDB adaptor
require 'bson'
module Wikia::Screen
# Extensions for mapping to/from BSON.
module MongoExtensions
def to_bson(obj)
obj = obj.dup
# Add created and modified dates
obj["schema:dateCreated"] = Time.parse(obj["schema:dateCreated"]) if obj.has_key?("schema:dateCreated")
obj["schema:dateCreated"] ||= obj["schema:dateModified"] = Time.new
# Add special _references field to allow for simple querying by object references
obj['_references'] = references(obj)
obj
end
# Turn BSON into a normal Hash representation, performing
# necessary transformations
#
# @param [Hash] obj
# @return [Hash]
# Turn Hash into BSON, performing necessary transformations
def from_bson(obj)
obj = obj.dup
# Transform schema:dateCreated and schema:dateModified
%w(schema:dateCreated schema:dateModified).each do |p|
obj[p] = obj[p].xmlschema if obj.has_key?(p) and obj[p].respond_to?(:xmlschema)
end
obj.delete_if {|k, v| %w(_id _references).include?(k.to_s)}
end
module_function :from_bson
# Extract subject references from an object
def references(obj)
obj.values.flatten.map {|o| o['id'] if o.is_a?(Hash) && o.keys == %w(id)}.compact.uniq
end
# Retry on connection failures
def rescue_connection_failure(max_retries=60)
retries = 0
begin
yield
rescue Mongo::ConnectionFailure => ex
retries += 1
raise ex if retries > max_retries
sleep(0.5)
retry
end
end
module_function :rescue_connection_failure
end
end
require 'rdf'
require 'rdf/rdfa'
require 'json/ld'
require 'mongo'
module Wikia::Screen
# Class to map a specific resource, identified as a _subject_
# in the parsed graph or some JSON added via PUT/POST
#
# REST operations operate using class methods that
class Resource
include MongoExtensions
CONTEXT_DIR = File.expand_path("../../../../public/contexts", __FILE__)
# Object representation of resource
#
# @attr_reader [Hash<String => Object] attributes
attr_reader :attributes
# Validation errors on this resource after `validate`
#
# @attr_reader [Hash{String => Array<String>}] errors
attr_reader :errors
# ID of this resource
#
# @attr_reader [String] id
attr_reader :id
# Context associated with this resource
#
# @attr_reader [JSON::LD::EvaluationContext] context
attr_reader :context
# Is this resource clean (i.e., saved to storage?)
#
# @return [Boolean]
def clean?; @clean; end
# Is this resource dirty (i.e., not yet saved to storage?)
#
# @return [Boolean]
def dirty?; !clean?; end
# Has this resource been reconciled against resource ID?
#
# @return [Boolean]
def reconciled?; @reconciled; end
# Has this resource been resolved so that
# all references are to other Resources?
#
# @return [Boolean]
def resolved?; @resolved; end
# Anonymous resources have BNode ids or no schema:url
#
# @return [Boolean]
def anonymous?; @anon; end
# Is this a stub resource, which has not yet been
# synched or created within the DB?
def stub?; !!@stub; end
# Is this a new resource, which has not yet been
# synched or created within the DB?
def new?; !!@new; end
# Set ontology used for validating resources
# FIXME: this should be determined from per-resource
# context, similar to the way the JSON-LD context
# and storage collection are identified
#
# @param [Ontology] ontology
# @return [Ontology]
def self.ontology=(ontology)
@@ontology = ontology
end
# Ontology used for validating resources
#
# @return [Ontology]
def self.ontology
@@ontology
end
# Ontology used for validating resources
# @return [Ontology]
def ontology
@@ontology rescue nil
end
# Mongo database
def self.db
@@database ||= begin
raise "Database must be defined in db_name enironment variable" unless ENV['db_name']
if ENV['replica_set']
logger.info "using database #{ENV['db_name']} on replica set #{ENV['replica_set']}"
set = ENV['replica_set'].split(',').map(&:strip)
Mongo::ReplSetConnection.new(set).db(ENV['db_name'], :pool_size => 5, :timeout => 5)
elsif ENV['db_host']
logger.info "using database #{ENV['db_name']} on host #{ENV['db_host']}"
Mongo::Connection.new(ENV['db_host']).db(ENV['db_name'], :pool_size => 5, :timeout => 5)
else
raise "Database Host must be defined in db_host or replica_set enironment variable"
end
end
end
# Mongo database used for validating resources
def db
self.class.db
end
# The schema:url attribute
# @return [String]
def url
attributes['schema:url']
end
# Manage contexts used by resources. If the context isn't
# found, use the default Wikia context
# @param [String] ctx
# @return [JSON::LD::EvaluationContext]
def self.set_context(ctx)
path = if File.exist?("#{CONTEXT_DIR}/#{ctx}.jsonld")
"file:/#{CONTEXT_DIR}/#{ctx}.jsonld"
else
"file:/#{CONTEXT_DIR}/wikia.jsonld"
end
(@@contexts ||= {})[ctx] = JSON::LD::EvaluationContext.new.parse(path)
end
# Find a resource or resources, by ID or query
#
# @overload find(id, options)
# Find a resource by identifier
# @param [String] id
# @param [Hash{Symbol => Object}] options
# Options and query parameters
# @option options [String] :context
# Resource context, used for finding
# appropriate collection and JSON-LD context.
# @return [Resource]
#
# @overload find(types, options)
# List distinct types in a collection
# @param [String] types must be :types
# @param [Hash{Symbol => Object}] options
# Options and query parameters
# @option options [String] :context
# Resource context, used for finding
# appropriate collection and JSON-LD context.
# @return [Array<String>] List of types
#
# @overload find(options)
# Find resources by query options.
# Returns a resource map indexed by resource.id
#
# @param [Hash{Symbol => Object}] options
# @option options [String] :context
# Resource context, used for finding
# appropriate collection and JSON-LD context.
# @option options [String] :withProperty
# Find an object having a particular property value, value specified
# with required `:withValue` option. This operates with data-type
# values only, not objects.
# @option options [String] :withValue
# Find an object having a particular property value, property specified
# with required `:withProperty` option. This operates with data-type
# values only, not objects.
# @option options [RDF::URI, String] :named
# Find all resources having the named IRI as a schema:url prefix.
# @option options [Array<String>, String] :withType
# Find resources having one or more types, expressed
# as compact IRIs. Finds resources having the specified type(s),
# or any of their sub-types.
# @option options [String] :includeWith
# Find resources having a wikia:includeWith relationship
# with a resource having the specified ID.
# @option options [String] :referencing
# Find resources having any reference to the resource
# having the specified ID.
# @option options [String] :modifiedSince
# Find resources modified since the specified
# time in a format parsable by `Time.parse`.
# @return [Hash{String => Resource}]
def self.find(*args)
options = args.last.is_a?(Hash) ? args.pop : {}
raise "No context specified" unless options.has_key?(:context)
raise "No database connected" unless db
id = args.shift unless args.first.is_a?(Hash)
collection = db.collection(options[:context])
if id == :types
MongoExtensions.rescue_connection_failure {collection.distinct('type').sort}
elsif id
bson = MongoExtensions.rescue_connection_failure {collection.find_one({id: id})}
raise NotFound, "unknown id #{id}" unless bson
Resource.new(MongoExtensions.from_bson(bson), options.merge(:clean => true, :new => false))
else
query_options = args.first || {}
keys = query_options.keys.map(&:to_s)
query = case
logger.debug("option keys: #{query_options.keys.inspect}")
when keys.include?('named')
{"schema:url" => %r(^#{query_options[:named]})}
when keys.include?('withType')
collection.ensure_index([['type', Mongo::ASCENDING]])
# Get all descendant types
types = [query_options[:withType]].flatten
types += types.dup.map do |t|
ontology.find(t).descendant_classes.map(&:id)
end.flatten.uniq
{'type' => {'$in' => types}}
when keys.include?('includeWith')
collection.ensure_index([['wikia:includeWith', Mongo::ASCENDING]])
{'wikia:includeWith' => {'id' => query_options[:includeWith]}}
when keys.include?('withFingerprint')
collection.ensure_index([['wikia:fingerprint', Mongo::ASCENDING]])
{'wikia:fingerprint' => query_options[:withFingerprint]}
when keys.include?('referencing')
collection.ensure_index([['_references', Mongo::ASCENDING]])
{'_references' => query_options[:referencing]}
else
{}
end
# Refine query based on property values
unless (keys & %w(withProperty withValue)).empty?
raise "Require use of withProperty when using withValue" unless keys.include?('withProperty')
raise "Require use of withValue when using withProperty" unless keys.include?('withValue')
collection.ensure_index([[query_options[:withProperty], Mongo::ASCENDING]])
query.merge!({query_options[:withProperty] => query_options[:withValue]})
end
# Apply modifiedSince to existing options to get
# a subset of the objects that would otherwise
# have been returned
if keys.include?('modifiedSince')
# Make sure dateModified is indexed
collection.ensure_index([["schema:dateModified", Mongo::DESCENDING]])
query.merge!(
'schema:dateModified' => {
"$gt" => Time.parse(query_options[:modifiedSince])
}
)
end
# Return resources for each result
logger.debug "query #{options[:context]} with #{query.inspect}"
MongoExtensions.rescue_connection_failure do
collection.
find(query, {:sort => ['schema:name', :asc]}).
inject({}) do |memo, data|
r = Resource.new(MongoExtensions.from_bson(data),
:context => options[:context],
:clean => true,
:new => false)
memo[r.id] = r
memo
end
end
end
end
# A new resource from the parsed `node_definition`.
#
# @param [Hash{String => Object}] node_definition
# Compacted or expanded (if :compact is true) JSON-LD definition
# @param [Hash{Symbol => Object}] options
# @option options [String] :context
# Resource context, used for finding
# appropriate collection and JSON-LD context.
# @option options [Boolean] :clean (false)
# @option options [Boolean] :compact (false)
# Assume `node_definition` is in expanded form
# and compact using `context`.
# @option options [Boolean] :reconciled (!new)
# node_definition is not based on resource IDs
# and must be reconciled against storage, or merged
# into another resource.
# @option options [Boolean] :new (true)
# This is a new resource, not yet saved to storage. If `node_definition`
# does not contain an `id` property, it will be given one when saved.
# @option options [Boolean] :stub (false)
# This is a stand-in for another resource that has
# not yet been retrieved (or created) from storage
# @option options [Boolean] :template (false)
# Add null or empty-array definitions of all distinctive
# properties not already defined in `node_definition`
def initialize(node_definition, options)
#logger.debug "New resource from #{node_definition.to_json(JSON::LD::JSON_STATE)}"
raise "No context specified" unless options.has_key?(:context)
@context_name = options[:context]
@context = self.class.set_context(@context_name)
@collection = db.collection(@context_name) if db
@clean = options.fetch(:clean, false)
@new = options.fetch(:new, true)
@reconciled = options.fetch(:reconciled, !@new)
@resolved = false
@attributes = if options[:compact]
JSON::LD::API.compact(node_definition, @context)
else
node_definition
end
@attributes.delete('@context') # Don't store with object
# Add property defs if creating a template
if options.fetch(:template, false)
type = attributes['type']
raise "Expected a single type, got #{type.inspect}" unless type.is_a?(String)
cls = ontology.find(type)
raise "Expected #{type.inspect} to be a class" if cls.nil? || !cls.is_a?(Ontology::OwlClass)
cls.property_restrictions.each do |r|
prop = r.onProperty
next if r.cardinality[1] == 0
# Make sure the property is defined, either as an array,
# if cardinality is unbound or > 1, or as nil if <= 1
attributes[prop.id] ||= (r.cardinality[1] || 2) > 1 ? [] : nil
end
end
@id = attributes.fetch('id', nil)
@anon = @reconciled ? !node_definition.has_key?('schema:url') : (@id.nil? || @id.to_s[0,2] == '_:')
if attributes.has_key?('type')
# Reject expansion types, favor other vocabs over schema types
types = [attributes['type']].flatten.map do |t|
next if t =~ %r((schema|ogp):\w+/\w+) # No expansion types or ogp
t
end.compact.sort
case types.length
when 0
# No type ??
raise "Expected type from #{node_definition['type'].inspect}, none found"
when 1
attributes['type'] = types.first
else
attributes['type'] = types
end
end
# No validation errors just yet
@errors = {}
end
# Return last-modified time of this resource
# @return [Time] value of `schema:dateModified` property
def last_modified
Time.parse property('schema:dateModified') rescue nil
end
# Return a hash of this object, suitable for use by for ETag
# @return [Fixnum]
def hash
self.deresolve.hash
end
# Reverse resolution of resource attributes.
# Just returns `attributes` if
# resource is unresolved. Otherwise, replaces `Resource`
# values with node references.
#
# Result is expanded and re-compacted to get to normalized
# representation.
#
# @return [Hash] deresolved attribute hash
def deresolve
node_definition = if resolved?
deresolved = attributes.keys.inject({}) do |memo, prop|
value = attributes[prop]
memo[prop] = case value
when Resource
{'id' => value.id}
when Array
value.map do |v|
v.is_a?(Resource) ? {'id' => v.id} : v
end
else
value
end
memo
end
deresolved
else
attributes
end
compacted = nil
JSON::LD::API.expand(node_definition, @context) do |expanded|
compacted = JSON::LD::API.compact(expanded, @context)
end
compacted.delete_if {|k, v| k == '@context'}
end
# Serialize to JSON-LD, minus `@context` using
# a deresolved version of the attributes
#
# @param [Hash] options
# @return [String] serizlied JSON representation of resource
def to_json(options = nil)
deresolve.to_json(options)
end
# Values of all properties other than id and type
def property_values
attributes.dup.delete_if {|k, v| %(id type).include?(k)}.values
end
# Create a new BSON ID, saving the existing id to schema:url
def create_id
attributes['schema:url'] ||= self.id unless self.anonymous?
@id = attributes['id'] = BSON::ObjectId.new.to_s
@clean = false
@reconciled = true
end
# Update node references using the provided map.
# This replaces node references with Resources,
# either stub or instantiated.
#
# Node references with ids not in the reference_map
# will cause stub resources to be added to the map.
#
# @param [Hash{String => Resource}] reference_map
# @return [Resource] self
def resolve(reference_map)
return if resolved?
def update_obj(obj, reference_map)
case obj
when Array
obj.map {|o| update_obj(o, reference_map)}
when Hash
if obj.node_ref?
reference_map[obj['id']] ||= Resource.new(obj,
:context => @context_name,
:clean => false,
:stub => true
)
else
obj.keys.each do |k|
obj[k] = update_obj(obj[k], reference_map)
end
obj
end
else
obj
end
end
#logger.debug "resolve(0): #{attributes.inspect}"
attributes.each do |k, v|
next if %w(id type).include?(k)
attributes[k] = update_obj(attributes[k], reference_map)
end
#logger.debug "resolve(1): #{attributes.inspect}"
@resolved = true
self
end
# Merge resources
# FIXME: If unreconciled or unresolved resources are merged
# against reconciled/resolved resources, they will appear
# to not match, even if they are really the same thing.
#
# @param [Resource] resource
# @return [Resource] self
def merge(resource)
#logger.debug("merge with #{resource.inspect}")
if attributes.neq?(resource.attributes)
#logger.debug "Update resource #{inspect} with #{resource.inspect}"
resource.attributes.each do |p, v|
next if p == 'id'
if v.nil? or (v.is_a?(Array) and v.empty?)
attributes.delete(p)
else
attributes[p] = v
end
end
@resolved = @clean = false
end
self
end
#
# Save the object to the storage collection
# use Upsert to create things that don't exist.
# First makes sure that the resource is valid.
#
# @param [Hash{Symbol => Object}] options
# @option options [Boolean] :validate (true)
# Validate the object before saving
# @return [Boolean] true or false if resource not saved
def save(options = {})
options = {:validate => true}.merge(options)
# Reconcile, if necessary
self.create_id unless reconciled?
if options[:validate] && !valid?
logger.error "Tried to save invalid resource #{id}: #{error_string}"
logger.error " => resource is: #{attributes.inspect}"
false
elsif @new
logger.info "Create object #{inspect}"
@attributes = deresolve
logger.debug("save: create #{attributes.to_json(JSON::LD::JSON_STATE)}")
bson = to_bson(attributes)
ret = MongoExtensions.rescue_connection_failure {@collection.update({id: id}, bson, :upsert => true)}
if ret.is_a?(Hash)
logger.info "Failed to create #{id}: #{ret}"
add_error('id', ret)
false
else
saved_json = from_bson(bson)
attributes['schema:dateCreated'] = saved_json['schema:dateCreated']
attributes['schema:dateModified'] = saved_json['schema:dateModified']
@new = false
@clean = true
end
elsif not @clean
logger.info "Update object #{inspect}"
@attributes = deresolve
logger.debug("save: create #{attributes.to_json(JSON::LD::JSON_STATE)}")
bson = to_bson(attributes)
ret = MongoExtensions.rescue_connection_failure {@collection.update({id: id}, bson)}
if ret.is_a?(Hash)
logger.info "Failed to update #{id}: #{ret}"
attributes['schema:dateCreated'] = saved_json['schema:dateCreated']
attributes['schema:dateModified'] = saved_json['schema:dateModified']
add_error('id', ret)
false
else
@clean = true
end
else
# Representations are equivalent
logger.info "Unchanged object #{id} for #{url || attributes['schema:name']}"
@clean = true
end
end
#
# Save the object to the storage collection
# use Upsert to create things that don't exist.
# First makes sure that the resource is valid.
#
# @param [Hash{Symbol => Object}] options
# @option options [Boolean] :validate (true)
# Validate the object before saving
# @return [true] unless a validation error
# @raise [ValidationError] raises a validation error
# if the resource cannot be saved, with details
# contained within the ValidationError instance
def save!(options = {})
save(options) || raise(ValidationError.new("Failed to save resource", errors))
end
# Validate the resource using the vocabulary definition
# @return [Hash{String => [String]}] error messages, or nil if valild
# @return [Boolean] true or false when invalid
def validate
return true unless ontology
# Map type(s)
typeClasses = [attributes['type']].flatten.compact.map {|t| ontology.find(t)}.compact
restrictions = typeClasses.
map {|t| t.property_restrictions}.
flatten
# Validate properties and values relative to typeClasses
attributes.each do |key, value|
next if %(id type).include?(key)
if prop = ontology.find(key)
# FIXME: figure this out
#unless prop.domains.nil? || typeClasses.any? {|t| prop.domains.include?(t)}
# add_error prop, "not in the domain of #{typeClasses.inspect}"
#end
# If there's a restriction on this property from any type,
# use that instead of property range.
restriction = restrictions.detect {|r| r.onProperty == prop}
ranges = [restriction ? restriction.ranges : prop.ranges].flatten
# Check range against dataType(s)
[value].flatten.compact.each do |v|
unless ranges.any? {|dt| dt.valid?(v)}
ranges.each do |dt|
add_error prop, dt.validate(v)
end
end
end
end
end
# Look for cardinality constraints
typeClasses.each do |type|
restrictions.each do |r|
prop = r.onProperty
card = r.cardinality
values = [attributes[prop.id]].flatten.compact
add_error(prop, "Must have at least #{card[0]} values: #{values.inspect}") if values.length < card[0]
add_error(prop, "Must have no more than #{card[1]} values: #{values.inspect}") if card[1] and values.length > card[1]
end
end
errors.empty?
end
# Is this resource valid?
def valid?; errors.empty? && validate; end
# Delete a resource
def delete
MongoExtensions.rescue_connection_failure {@collection.remove(id: self.id)}
end
# Revert the representation of the resource from the database. Marks as clean
# @return [Resource] self
def revert!
logger.info("revert resource #{id}")
bson = MongoExtensions.rescue_connection_failure {@collection.find_one(id: id)}
raise NotFound, "unknown id #{id}" unless bson
@attributes = MongoExtensions.from_bson(bson)
@clean = true
@new = @reconciled = false
self
end
# Access individual fields, from subject definition
def method_missing(method, *args)
property(method.to_s)
end
# Access individual fields, from subject definition
def property(prop_name); @attributes.fetch(prop_name, nil); end
# Update property
# @return [Object] new value
def update_property(prop_name, value)
@clean = false
@attributes[prop_name] = value
end
# Remove property
# @return [Object] old value
def remove_property(prop_name)
@clean = false
@attributes.delete(prop_name)
end
def inspect
"<Resource" +
attributes.dup.keep_if {|k, v| %(id type schema:url).include?(k)}.map do |k, v|
"\n #{k}: #{v.is_a?(Resource) ? v.id : v.inspect}"
end.join(" ") +
"\n clean: #{@clean}" +
"\n reconciled: #{@reconciled}" +
">"
end
private
def add_error(prop, message)
prop = prop.respond_to?(:id) ? prop.id : prop.to_s
#puts "add error(#{prop}): #{message}"
(errors[prop] ||= []) << message
end
def error_string
message = ""
errors.each do |prop_id, messages|
message << %(On property #{prop_id}: #{[messages].flatten.join("\n ")}\n)
end
end
def self.logger
@logger ||= $logger || begin
l = Object.new
def l.debug(msg); puts msg; end
def l.info(msg); puts msg; end
def l.error(msg); puts msg; end
l
end
end
def logger; self.class.logger; end
# Resource not found error
class NotFound < StandardError
end
# Validation error
class ValidationError < StandardError
attr_reader :errors
def initialize(message, errors)
@errors = errors.dup
super(message)
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment