Skip to content

Instantly share code, notes, and snippets.

@raggi
Created March 17, 2009 02:47
Show Gist options
  • Save raggi/80247 to your computer and use it in GitHub Desktop.
Save raggi/80247 to your computer and use it in GitHub Desktop.
eventmachine postgres to sequel async shim
# I stole this code from em-mysql (tmm1 <3)
module Sequel
class Database
attr_accessor :_async
end
class Dataset
def async_insert *args, &cb
db._async.insert insert_sql(*args), &cb
nil
end
def async_update *args, &cb
db._async.update update_sql(*args), &cb
nil
end
def async_delete &cb
db._async.execute delete_sql, &cb
nil
end
def async_multi_insert *args, &cb
db._async.execute multi_insert_sql(*args).first, &cb
nil
end
def async_multi_insert_ignore *args, &cb
db._async.execute multi_insert_sql(*args).first.sub(/insert/i, "INSERT IGNORE"), &cb
nil
end
def async_each *args
db._async.select(select_sql(*args)) do |rows|
rows.each{|r|
r = transform_load(r) if @transform
r = row_proc[r] if row_proc
yield r
}
end
nil
end
def async_all
db._async.select(sql) do |rows|
if row_proc or transform
yield(rows.map{|r|
r = transform_load(r) if @transform
r = row_proc[r] if row_proc
r
})
else
yield(rows)
end
end
nil
end
def async_count &cb
if options_overlap(COUNT_FROM_SELF_OPTS)
from_self.async_count(&cb)
else
naked.async_each(STOCK_COUNT_OPTS){|r|
yield r.values.first.to_i
}
end
nil
end
end
class Model
def async_update *args, &cb
this.async_update(*args, &cb)
set(*args)
self
end
def async_delete &cb
this.async_delete(&cb)
nil
end
class << self
[ :async_insert,
:async_multi_insert,
:async_multi_insert_ignore,
:async_each,
:async_all,
:async_update,
:async_count ].each do |method|
class_eval %[
def #{method} *args, &cb
dataset.#{method}(*args, &cb)
end
]
end
# async version of Model#[]
def async_lookup args
unless Hash === args
args = primary_key_hash(args)
end
dataset.where(args).limit(1).async_all{ |rows|
if rows.any?
yield rows.first
else
yield nil
end
}
nil
end
end
end
end
# Ze shim!
module EventMachine::Protocols::Postgres3::AsyncSequelShim
def execute(sql, &cb)
query(sql).callback do |status, result, errors|
cb[result.cmd_tag.split(' ').last.to_i]
end
end
def insert(sql, &cb)
# TODO see #insert_result, it uses conn.last_insert_id
query(sql).callback do |status, result, errors|
cb[nil]
end
end
def update(sql, &cb)
query(sql).callback do |status, result, errors|
cb[nil]
end
end
def select(sql, &cb)
query(sql).callback do |status, result, errors|
fields = result.fields.map { |f| f.name }
result.rows.map! { |r| Hash[*fields.zip(r).flatten] }
cb[result.rows]
end
end
end
BEGIN{%w(rubygems sequel eventmachine).each {|r| require r} if __FILE__ == $0}
if __FILE__ == $0
require 'eventmachine'
DB = Sequel.connect "postgres:///raggi"
# DB.create_table(:a)
# DB.alter_table(:a) do
# primary_key :id
# add_column :one, :text
# add_column :two, :integer
# end
EM.run do
EM.connect("/tmp/.s.PGSQL.5432", EventMachine::Protocols::Postgres3) do |db|
db.connect('raggi', 'raggi').callback do |status, error|
abort(error) unless status
db.extend EventMachine::Protocols::Postgres3::AsyncSequelShim
DB._async = db
DB[:a].async_insert(:one => 'blah', :two => 20) do |res|
p res
EM.stop
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment