Skip to content

Instantly share code, notes, and snippets.

@pedroteixeira
Created May 5, 2011 19:06
Show Gist options
  • Save pedroteixeira/957673 to your computer and use it in GitHub Desktop.
Save pedroteixeira/957673 to your computer and use it in GitHub Desktop.
require 'ruote/storage/base'
module Ruote
module ActiveRecord
class Document < ::ActiveRecord::Base
set_table_name :documents
def self.before_fork
::ActiveRecord::Base.clear_all_connections!
end
def self.after_fork
::ActiveRecord::Base.establish_connection
end
end
class Storage
include Ruote::StorageBase
def initialize
end
def put_msg(action, options)
# put_msg is a unique action, no need for all the complexity of put
do_insert(prepare_msg_doc(action, options), 1)
nil
end
def put_schedule(flavour, owner_fei, s, msg)
# put_schedule is a unique action, no need for all the complexity of put
doc = prepare_schedule_doc(flavour, owner_fei, s, msg)
return nil unless doc
do_insert(doc, 1)
doc['_id']
end
def put(doc, opts={})
if doc['_rev']
d = get(doc['type'], doc['_id'])
return true unless d
return d if d['_rev'] != doc['_rev']
# failures
end
nrev = doc['_rev'].to_i + 1
begin
do_insert(doc, nrev)
rescue Exception => de
return (get(doc['type'], doc['_id']) || true)
# failure
end
Document.delete(:typ => doc['type'], :ide => doc['_id'], :rev.lt => nrev)
# @sequel[@table].where(
# :typ => doc['type'], :ide => doc['_id']
# ).filter { rev < nrev }.delete
doc['_rev'] = nrev if opts[:update_rev]
nil
# success
end
def get(type, key)
d = do_get(type, key)
d ? Rufus::Json.decode(d[:doc]) : nil
end
def delete(doc)
raise ArgumentError.new('no _rev for doc') unless doc['_rev']
count = do_delete(doc)
return (get(doc['type'], doc['_id']) || true) if count < 1
# failure
nil
# success
end
def get_many(type, key=nil, opts={})
ds = Document.where(:typ => type)
keys = key ? Array(key) : nil
ds = ds.where(:wfid => keys) if keys && keys.first.is_a?(String)
return ds.all.size if opts[:count]
ds = ds.order(
*(opts[:descending] ? [ :ide.desc, :rev.desc ] : [ :ide.asc, :rev.asc ])
)
ds = ds.limit(opts[:limit]).offset(opts[:skip])
docs = ds.all
docs = select_last_revs(docs, opts[:descending])
docs = docs.collect { |d| Rufus::Json.decode(d[:doc]) }
keys && keys.first.is_a?(Regexp) ?
docs.select { |doc| keys.find { |key| key.match(doc['_id']) } } :
docs
# (pass on the dataset.filter(:wfid => /regexp/) for now
# since we have potentially multiple keys)
end
# Returns all the ids of the documents of a given type.
#
def ids(type)
Document.where(:typ => type).select(:ide).order(:ide.asc).collect { |d| d[:ide] }.uniq
end
# Nukes all the documents in this storage.
#
def purge!
Document.delete_all
end
# Returns a string representation the current content of the storage for
# a given type.
#
def dump(type)
"=== #{type} ===\n" +
get_many(type).map { |h| " #{h['_id']} => #{h.inspect}" }.join("\n")
end
# Calls #disconnect on the db. According to Sequel's doc, it closes
# all the idle connections in the pool (not the active ones).
#
def shutdown
#Ruote::ActiveRecord::Document.before_fork
end
# Grrr... I should sort the mess between close and shutdown...
# Tests vs production :-(
#
def close
#Ruote::ActiveRecord::Document.before_fork
end
# Mainly used by ruote's test/unit/ut_17_storage.rb
#
def add_type(type)
# does nothing, types are differentiated by the 'typ' column
end
# Nukes a db type and reputs it (losing all the documents that were in it).
#
def purge_type!(type)
Document.delete(:typ => type)
end
# A provision made for workitems, allow to query them directly by
# participant name.
#
def by_participant(type, participant_name, opts)
raise NotImplementedError if type != 'workitems'
docs = Document.where(
:typ => type, :participant_name => participant_name
).limit(opts[:limit]).offset(opts[:offset] || opts[:skip])
docs = select_last_revs(docs)
opts[:count] ?
docs.size :
docs.collect { |d| Ruote::Workitem.from_json(d[:doc]) }
end
# Querying workitems by field (warning, goes deep into the JSON structure)
#
def by_field(type, field, value, opts)
raise NotImplementedError if type != 'workitems'
lk = [ '%"', field, '":' ]
lk.push(Rufus::Json.encode(value)) if value
lk.push('%')
docs = Document.where(:typ => type, :doc.matches => lk.join)
docs = docs.limit(opts[:limit]).offset(opts[:skip] || opts[:offset])
docs = select_last_revs(docs)
opts[:count] ?
docs.size :
docs.map { |d| Ruote::Workitem.from_json(d[:doc]) }
end
def query_workitems(criteria)
ds = Document.where(:typ => 'workitems')
count = criteria.delete('count')
limit = criteria.delete('limit')
offset = criteria.delete('offset') || criteria.delete('skip')
ds = ds.limit(limit).offset(offset)
wfid =
criteria.delete('wfid')
pname =
criteria.delete('participant_name') || criteria.delete('participant')
ds = ds.where(:ide.matches => "%!#{wfid}") if wfid
ds = ds.where(:participant_name => pname) if pname
criteria.collect do |k, v|
ds = ds.where(:doc.matches => "%\"#{k}\":#{Rufus::Json.encode(v)}%")
end
ds = select_last_revs(ds.all)
count ?
ds.size :
ds.collect { |d| Ruote::Workitem.new(Rufus::Json.decode(d[:doc])) }
end
protected
def do_delete(doc)
Document.delete(
:ide => doc['_id'], :typ => doc['type'], :rev => doc['_rev'].to_i
)
end
def do_insert(doc, rev)
Document.create!(
:ide => doc['_id'],
:rev => rev,
:typ => doc['type'],
:doc => Rufus::Json.encode(doc.merge(
'_rev' => rev,
'put_at' => Ruote.now_to_utc_s)),
:wfid => extract_wfid(doc),
:participant_name => doc['participant_name']
)
end
def extract_wfid(doc)
doc['wfid'] || (doc['fei'] ? doc['fei']['wfid'] : nil)
end
def do_get(type, key)
Document.where(
:typ => type, :ide => key
).order(:rev.desc).first
end
# Don't put configuration if it's already in
#
# (avoid storages from trashing configuration...)
#
def put_configuration
return if get('configurations', 'engine')
conf = { '_id' => 'engine', 'type' => 'configurations' }.merge(@options)
put(conf)
end
def select_last_revs(docs, reverse=false)
docs = docs.inject({}) { |h, doc|
h[doc[:ide]] = doc
h
}.values.sort_by { |h|
h[:ide]
}
reverse ? docs.reverse : docs
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment