Skip to content

Instantly share code, notes, and snippets.

@saicologic
Forked from mfojtik/app.rb
Created May 14, 2012 07:16
Show Gist options
  • Save saicologic/2692404 to your computer and use it in GitHub Desktop.
Save saicologic/2692404 to your computer and use it in GitHub Desktop.
sinatra with simple worker and queue
require 'rack'
require 'sinatra/base'
require 'uuidtools'
require 'rest-client'
require 'eventmachine'
require 'json'
require_relative './cache'
class Worker < EM::Connection
attr_reader :query
def receive_data(data)
Thread.new {
query_id, url = JSON::parse(data)
puts 'W: Received [%s][%s]' % [query_id, url]
notify(query_id, {:status => 'received'})
result = RestClient.get url
notify(query_id, {:status => 'processing'})
sleep 30 # Simulate that we're doing something for 30 seconds
puts 'W: Processed [%s][%s]' % [query_id, url]
notify(query_id, {:status => 'finished', :results => result.to_s})
}
end
private
def notify(query_id, result)
RestClient.post 'http://localhost:4567/broker/%s/results' % query_id, result.to_json, :content_type => 'text/html'
end
end
class Broker
def initialize(app, options = {})
@app = app
puts "B: Starting broker"
EM::next_tick do
@server = EM::connect('127.0.0.1', 4000, Worker, self)
end
end
def call(env)
env['broker'] = @server
@app.call(env)
end
end
class App < Sinatra::Base
use Rack::CommonLogger
use Broker
set server: 'thin'
enable :xhtml
enable :dump_errors
enable :show_errors
disable :show_exceptions
helpers do
def broker; env['broker']; end
end
post '/query' do
worker_id = UUIDTools::UUID.timestamp_create.to_s
broker.send_data([worker_id, params[:url]].to_json)
[202, { 'Location' => url('/worker/%s/results' % worker_id) }, '']
end
post '/broker/:id/results' do
[201, {}, Cache[params[:id]] = request.body.read]
end
get '/broker/:id/results', provides: 'text/event-stream' do
worker_id = params[:id]
stream :keep_open do |out|
timer = EventMachine::PeriodicTimer.new(1) do
result = Cache[worker_id] || { :status => 'queued' }
out << result
timer.cancel if JSON::parse(result)['status'] == 'finished'
end
out.callback { timer.cancel }
out.errback { timer.cancel }
end
end
end
EM::run {
EventMachine::start_server '127.0.0.1', '4000', Worker
App.run!
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment