Skip to content

Instantly share code, notes, and snippets.

@Paxa
Forked from charger/basic.ru
Created October 20, 2012 02:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Paxa/3921812 to your computer and use it in GitHub Desktop.
Save Paxa/3921812 to your computer and use it in GitHub Desktop.
Async request handling with sinatra and EM, freeze if request not exist URL
#!/usr/bin/env rackup -Ilib:../lib -s thin
# async message handling
# using gem https://github.com/raggi/async_sinatra
require 'sinatra/async'
require "em-http-request"
require "em-synchrony"
require "em-synchrony/em-http"
require "em-synchrony/fiber_iterator"
require 'resolv'
require 'resolv-replace' #https://github.com/mperham/em-resolv-replace
module Handler
MAX_RETRIES = 5
CONCURRENCY = 5
class Job
attr_accessor :url, :retries, :run_after
def initialize(url)
@retries = 0
@url = url
end
def runable?
!run_after || Time.now > run_after
end
def run_again!
@retries += 1
@run_after = Time.now + self.retries # * 60
end
end
extend self
@queue = []
def <<(value)
puts "add to queue #{value}"
@queue << Job.new(value)
end
def get_jobs
jobs = @queue.select(&:runable?).first(CONCURRENCY)
jobs.each {|job| @queue.delete(job) }
jobs
end
# like in goliath
def latency_timer
@last_latency = Time.now.to_f
EM.add_periodic_timer(1) do
@recent_latency = ((Time.now.to_f - @last_latency) - 1)
puts "LATENCY: #{(@recent_latency * 1000)} ms"
@last_latency = Time.now.to_f
end
end
def run
return if @running
@running = true
latency_timer
EM.add_periodic_timer(1) do
concurrency = 5
jobs = get_jobs
if jobs.any?
puts "remain #{@queue.size}"
EM.synchrony do
EM::Synchrony::FiberIterator.new(jobs, CONCURRENCY).each do |job|
resp = EventMachine::HttpRequest.new(job.url).get(:redirects => 10)
if resp.response_header.status == 200
puts "Message delivered"
else
puts "Error when get #{job.url} status #{resp.response_header.status}"
if job.retries < MAX_RETRIES
job.run_again!
puts "Requesting #{job.url} will run again at #{job.run_after}"
@queue.push(job) #потом запросим повторно
end
end
puts "request complete #{resp.response_header.status}"
end
end
end
end
puts "Handler started"
end
end
class AsyncTest < Sinatra::Base
register Sinatra::Async
enable :show_exceptions
before do
Handler.run
end
aget '/send' do
msg = params[:http]
body "Added to queue: #{msg}"
Handler << msg
end
end
run AsyncTest.new
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment