Skip to content

Instantly share code, notes, and snippets.

@venetanji
Created October 23, 2012 08:03
Show Gist options
  • Save venetanji/3937545 to your computer and use it in GitHub Desktop.
Save venetanji/3937545 to your computer and use it in GitHub Desktop.
Syncable class
class Syncable
include Mongoid::Document
field :url
field :method, default: :get
field :query
field :body
field :headers, default: {}
field :callback_class, default: :syncable
field :callback_method, default: :default
field :due, default: ->() {Time.now}
field :interval, default: 1.minute
field :context, default: {}
field :persistent, default: true
field :limited, default: false
attr_accessor :response
CONCURRENCY = 40
scope :due, ->() {where(:due.lt => Time.now)}
scope :unlimited, where(limited: false)
scope :limited, where(limited: true)
index :url => 1
index :url => 1, :query => 1
index :due => 1, :limited => 1
after_initialize :symbolize_context
def symbolize_context
context.symbolize_keys!
end
class << self
def run
EM.synchrony do
@requests_queue = EM::Queue.new
@process_request = proc do
@requests_queue.pop do |request|
Fiber.new do
request.perform
EM.defer do
begin
request.run_callback
request.destroy unless request.persistent
rescue Exception => e
puts e.message
end
end
@process_request.call
end.resume
end
end
EM.add_periodic_timer(10) do
@requests_queue.push(*(due.unlimited))
end
EM.add_periodic_timer(2) do
@requests_queue.push(*(due.limited.limit(5)))
end
CONCURRENCY.times do @process_request.call end
["INT", "TERM"].each do |signal|
Signal.trap(signal) {
if EM.reactor_running?
puts "Shutting down the reactor..."
EM.stop
end
}
Signal.trap("TSTP") {
reload!
}
end
end
end
def default_callback(request)
puts request.response.body
end
end
def perform
connection = Faraday.new do |conn|
conn.request :retry, max: 2, interval: 1
conn.adapter (defined?(EM) && EM.try("reactor_running?")) ? :em_synchrony : :net_http
end
self.response = connection.send method do | req |
req.url url, query
req.body = body
req.headers = headers
end
update_attributes(due: due + interval)
end
def run_callback
callback_class.to_s.classify.constantize.send("#{callback_method}_callback", self)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment