Created
January 13, 2010 00:40
-
-
Save raggi/275797 to your computer and use it in GitHub Desktop.
Pooled async sequel stab
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'eventmachine' | |
require 'sequel' | |
# I stole this code from em-mysql (tmm1 <3) | |
module Sequel | |
class Database | |
attr_accessor :_async | |
end | |
class Dataset | |
def async_insert *args, &cb | |
db._async.insert insert_sql(*args), &cb | |
nil | |
end | |
def async_update *args, &cb | |
db._async.update update_sql(*args), &cb | |
nil | |
end | |
def async_delete &cb | |
db._async.execute delete_sql, &cb | |
nil | |
end | |
def async_multi_insert *args, &cb | |
db._async.execute multi_insert_sql(*args).first, &cb | |
nil | |
end | |
def async_multi_insert_ignore *args, &cb | |
db._async.execute multi_insert_sql(*args).first.sub(/insert/i, "INSERT IGNORE"), &cb | |
nil | |
end | |
def async_each *args | |
db._async.select(select_sql(*args)) do |rows| | |
rows.each{|r| | |
r = transform_load(r) if @transform | |
r = row_proc[r] if row_proc | |
yield r | |
} | |
end | |
nil | |
end | |
def async_all | |
db._async.select(sql) do |rows| | |
if row_proc or transform | |
yield(rows.map{|r| | |
r = transform_load(r) if @transform | |
r = row_proc[r] if row_proc | |
r | |
}) | |
else | |
yield(rows) | |
end | |
end | |
nil | |
end | |
def async_count &cb | |
if options_overlap(COUNT_FROM_SELF_OPTS) | |
from_self.async_count(&cb) | |
else | |
naked.async_each(STOCK_COUNT_OPTS){|r| | |
yield r.values.first.to_i | |
} | |
end | |
nil | |
end | |
end | |
class Model | |
def async_update *args, &cb | |
this.async_update(*args, &cb) | |
set(*args) | |
self | |
end | |
def async_delete &cb | |
this.async_delete(&cb) | |
nil | |
end | |
class << self | |
[ :async_insert, | |
:async_multi_insert, | |
:async_multi_insert_ignore, | |
:async_each, | |
:async_all, | |
:async_update, | |
:async_count ].each do |method| | |
class_eval %[ | |
def #{method} *args, &cb | |
dataset.#{method}(*args, &cb) | |
end | |
] | |
end | |
# async version of Model#[] | |
def async_lookup args | |
unless Hash === args | |
args = primary_key_hash(args) | |
end | |
dataset.where(args).limit(1).async_all{ |rows| | |
if rows.any? | |
yield rows.first | |
else | |
yield nil | |
end | |
} | |
nil | |
end | |
end | |
end | |
end | |
# class Future | |
# def initialize(data, &work) | |
# @data = data | |
# @work = work | |
# @callbacks = [] | |
# @errbacks = [] | |
# end | |
# | |
# def callback(&b) | |
# @callbacks << b | |
# end | |
# | |
# def errback(&b) | |
# @errbacks << b | |
# end | |
# | |
# def execute | |
# deferrable = @work.call(@data) | |
# @callbacks.each { |c| deferrable.callback(&c) } | |
# @errbacks.each { |e| deferrable.errback(&e) } | |
# deferrable | |
# end | |
# end | |
class EM::P::Postgres3::Sequel < EM::P::Postgres3 | |
class Pool | |
attr_reader :size, :uri, :queue, :connections | |
def initialize(size = 10, uri = "/tmp/.s.PGSQL.5432") | |
@size = size | |
@uri = uri | |
@queue = EM::Queue.new | |
@connections = [] | |
end | |
def spawn | |
@size.times do | |
EM.connect(uri, EM::P::Postgres3::Sequel) do |db| | |
db.connect('raggi', 'raggi').callback do |status, error| | |
abort(error) unless status | |
connections << db | |
dq = lambda { | |
queue.pop do |args| | |
method, sql, cb = *args | |
db.__send__(method, sql) do |*a| | |
cb.call(*a) | |
dq.call | |
end | |
end | |
} | |
dq.call | |
end | |
end | |
end | |
end | |
def execute(sql, &cb) | |
@queue.push [__method__, sql, cb] | |
end | |
def insert(sql, &cb) | |
@queue.push [__method__, sql, cb] | |
end | |
def update(sql, &cb) | |
@queue.push [__method__, sql, cb] | |
end | |
def select(sql, &cb) | |
@queue.push [__method__, sql, cb] | |
end | |
end | |
def execute(sql, &cb) | |
query(sql).callback do |status, result, errors| | |
cb[result.cmd_tag.split(' ').last.to_i] | |
end | |
end | |
def insert(sql, &cb) | |
# TODO see #insert_result, it uses conn.last_insert_id | |
query(sql).callback do |status, result, errors| | |
cb[status, result, errors] | |
end | |
end | |
def update(sql, &cb) | |
query(sql).callback do |status, result, errors| | |
cb[nil] | |
end | |
end | |
def select(sql, &cb) | |
query(sql).callback do |status, result, errors| | |
fields = result.fields.map { |f| f.name } | |
result.rows.map! { |r| Hash[*fields.zip(r).flatten] } | |
cb[result.rows] | |
end | |
end | |
end | |
BEGIN { require "rubygems" if __FILE__ == $0 } | |
if __FILE__ == $0 | |
DB = Sequel.connect 'postgres:///raggi' | |
DB.create_table(:marshal_store) do | |
primary_key :id | |
String :marshal_data | |
DateTime :created_at | |
end unless DB.table_exists? :marshal_store | |
data = 'a' * 1024 | |
t = Time.now | |
pool_size = 10 | |
DB._async = EM::P::Postgres3::Sequel::Pool.new(pool_size) | |
backgrounded_em = Thread.new do | |
Thread.current.abort_on_exception = true | |
EM.run { DB._async.spawn } | |
end | |
backgrounded_em.join(0.1) until DB._async.connections.size == pool_size | |
DB[:marshal_store].delete | |
require 'benchmark' | |
TESTS = 5000 | |
Benchmark.bmbm do |results| | |
DB[:marshal_store].delete | |
sleep 0.1 | |
results.report("insert:") { TESTS.times { DB[:marshal_store] << { :marshal_data => data, :created_at => t } } } | |
DB[:marshal_store].delete | |
sleep 0.1 | |
results.report("asynci:") { | |
completed = 0 | |
TESTS.times do | |
DB[:marshal_store].async_insert( :marshal_data => data, :created_at => t ) do |s, r, e| | |
raise 'error in async insert: ' + r.to_s unless s | |
completed += 1 | |
end | |
end | |
sleep 0.01 until completed >= TESTS | |
} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment