Skip to content

Instantly share code, notes, and snippets.

@raggi
Created January 13, 2010 00:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save raggi/275797 to your computer and use it in GitHub Desktop.
Save raggi/275797 to your computer and use it in GitHub Desktop.
Pooled async sequel stab
require 'eventmachine'
require 'sequel'
# I stole this code from em-mysql (tmm1 <3)
module Sequel
class Database
attr_accessor :_async
end
class Dataset
def async_insert *args, &cb
db._async.insert insert_sql(*args), &cb
nil
end
def async_update *args, &cb
db._async.update update_sql(*args), &cb
nil
end
def async_delete &cb
db._async.execute delete_sql, &cb
nil
end
def async_multi_insert *args, &cb
db._async.execute multi_insert_sql(*args).first, &cb
nil
end
def async_multi_insert_ignore *args, &cb
db._async.execute multi_insert_sql(*args).first.sub(/insert/i, "INSERT IGNORE"), &cb
nil
end
def async_each *args
db._async.select(select_sql(*args)) do |rows|
rows.each{|r|
r = transform_load(r) if @transform
r = row_proc[r] if row_proc
yield r
}
end
nil
end
def async_all
db._async.select(sql) do |rows|
if row_proc or transform
yield(rows.map{|r|
r = transform_load(r) if @transform
r = row_proc[r] if row_proc
r
})
else
yield(rows)
end
end
nil
end
def async_count &cb
if options_overlap(COUNT_FROM_SELF_OPTS)
from_self.async_count(&cb)
else
naked.async_each(STOCK_COUNT_OPTS){|r|
yield r.values.first.to_i
}
end
nil
end
end
class Model
def async_update *args, &cb
this.async_update(*args, &cb)
set(*args)
self
end
def async_delete &cb
this.async_delete(&cb)
nil
end
class << self
[ :async_insert,
:async_multi_insert,
:async_multi_insert_ignore,
:async_each,
:async_all,
:async_update,
:async_count ].each do |method|
class_eval %[
def #{method} *args, &cb
dataset.#{method}(*args, &cb)
end
]
end
# async version of Model#[]
def async_lookup args
unless Hash === args
args = primary_key_hash(args)
end
dataset.where(args).limit(1).async_all{ |rows|
if rows.any?
yield rows.first
else
yield nil
end
}
nil
end
end
end
end
# class Future
# def initialize(data, &work)
# @data = data
# @work = work
# @callbacks = []
# @errbacks = []
# end
#
# def callback(&b)
# @callbacks << b
# end
#
# def errback(&b)
# @errbacks << b
# end
#
# def execute
# deferrable = @work.call(@data)
# @callbacks.each { |c| deferrable.callback(&c) }
# @errbacks.each { |e| deferrable.errback(&e) }
# deferrable
# end
# end
class EM::P::Postgres3::Sequel < EM::P::Postgres3
class Pool
attr_reader :size, :uri, :queue, :connections
def initialize(size = 10, uri = "/tmp/.s.PGSQL.5432")
@size = size
@uri = uri
@queue = EM::Queue.new
@connections = []
end
def spawn
@size.times do
EM.connect(uri, EM::P::Postgres3::Sequel) do |db|
db.connect('raggi', 'raggi').callback do |status, error|
abort(error) unless status
connections << db
dq = lambda {
queue.pop do |args|
method, sql, cb = *args
db.__send__(method, sql) do |*a|
cb.call(*a)
dq.call
end
end
}
dq.call
end
end
end
end
def execute(sql, &cb)
@queue.push [__method__, sql, cb]
end
def insert(sql, &cb)
@queue.push [__method__, sql, cb]
end
def update(sql, &cb)
@queue.push [__method__, sql, cb]
end
def select(sql, &cb)
@queue.push [__method__, sql, cb]
end
end
def execute(sql, &cb)
query(sql).callback do |status, result, errors|
cb[result.cmd_tag.split(' ').last.to_i]
end
end
def insert(sql, &cb)
# TODO see #insert_result, it uses conn.last_insert_id
query(sql).callback do |status, result, errors|
cb[status, result, errors]
end
end
def update(sql, &cb)
query(sql).callback do |status, result, errors|
cb[nil]
end
end
def select(sql, &cb)
query(sql).callback do |status, result, errors|
fields = result.fields.map { |f| f.name }
result.rows.map! { |r| Hash[*fields.zip(r).flatten] }
cb[result.rows]
end
end
end
BEGIN { require "rubygems" if __FILE__ == $0 }
if __FILE__ == $0
DB = Sequel.connect 'postgres:///raggi'
DB.create_table(:marshal_store) do
primary_key :id
String :marshal_data
DateTime :created_at
end unless DB.table_exists? :marshal_store
data = 'a' * 1024
t = Time.now
pool_size = 10
DB._async = EM::P::Postgres3::Sequel::Pool.new(pool_size)
backgrounded_em = Thread.new do
Thread.current.abort_on_exception = true
EM.run { DB._async.spawn }
end
backgrounded_em.join(0.1) until DB._async.connections.size == pool_size
DB[:marshal_store].delete
require 'benchmark'
TESTS = 5000
Benchmark.bmbm do |results|
DB[:marshal_store].delete
sleep 0.1
results.report("insert:") { TESTS.times { DB[:marshal_store] << { :marshal_data => data, :created_at => t } } }
DB[:marshal_store].delete
sleep 0.1
results.report("asynci:") {
completed = 0
TESTS.times do
DB[:marshal_store].async_insert( :marshal_data => data, :created_at => t ) do |s, r, e|
raise 'error in async insert: ' + r.to_s unless s
completed += 1
end
end
sleep 0.01 until completed >= TESTS
}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment