This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Tee with Sinatra | |
# http://rigelgroupllc.com/wp/blog/tee-with-sinatra | |
# This proxy script will accept HTTP requests meant for Riak | |
# and in addition to passing them on to Riak will send a duplicate | |
# request to an ElasticSearch cluster | |
# -- John Lynch, www.rigelgroupllc.com | |
# | |
require 'rubygems' | |
require 'sinatra' | |
require 'typhoeus' | |
OPTIONS = {} | |
OPTIONS[:riak_host] = "localhost" | |
OPTIONS[:riak_port] = "8098" | |
OPTIONS[:es_host] = "localhost" | |
OPTIONS[:es_port] = "9200" | |
OPTIONS[:riak_timeout] = 5000 # milliseconds | |
OPTIONS[:es_timeout] = 5000 # milliseconds | |
class Rack::Proxy | |
def initialize(app) | |
@app = app | |
@hydra = Typhoeus::Hydra.new | |
end | |
def call(env) | |
req = Rack::Request.new(env) | |
# We need to use it twice, so read in the stream. This is an obvious problem with large bodies, so beware. | |
req_body = req.body.read if req.body | |
riak_url = "http://#{OPTIONS[:riak_host]}:#{OPTIONS[:riak_port]}#{req.fullpath}" | |
opts = {:timeout => OPTIONS[:riak_timeout]} | |
opts.merge!(:method => req.request_method.downcase.to_sym) | |
opts.merge!(:headers => {"Content-type" => req.content_type}) if req.content_type | |
opts.merge!(:body => req_body) if req_body && req_body.length > 0 | |
riak_req = Typhoeus::Request.new(riak_url, opts) | |
riak_response = {} | |
riak_req.on_complete do |response| | |
riak_response[:code] = response.code | |
riak_response[:headers] = response.headers_hash | |
riak_response[:body] = response.body | |
end | |
@hydra.queue riak_req | |
# If we are putting or posting JSON, send a copy to the ElasticSearch index named "riak" | |
if (req.put? || req.post?) && req.content_type == "application/json" | |
req.path =~ %r{^/riak/([^/]+)/([^/]+)} | |
bucket, key = $1, $2 | |
es_url = "http://#{OPTIONS[:es_host]}:#{OPTIONS[:es_port]}/riak/#{bucket}/#{key}" | |
opts = {:timeout => OPTIONS[:es_timeout]} | |
opts.merge!(:method => req.request_method.downcase.to_sym) | |
opts.merge!(:body => req_body) if req_body && req_body.length > 0 | |
es_req = Typhoeus::Request.new(es_url, opts) | |
es_response = {} | |
es_req.on_complete do |response| | |
es_response[:code] = response.code | |
es_response[:headers] = response.headers_hash | |
es_response[:body] = response.body | |
end | |
@hydra.queue es_req | |
end | |
# Concurrently executes both HTTP requests, blocks until they both finish | |
@hydra.run | |
#If we wrote to ES add a custom header | |
riak_response[:headers].merge!("X-ElasticSearch-ResCode" => es_response[:code].to_s) if es_response && es_response[:code] | |
#Typhoeus can add nil headers, lets get rid of them | |
riak_response[:headers].delete_if {|k,v| v == nil} | |
# Return original Riak response to client | |
[riak_response[:code], riak_response[:headers], riak_response[:body]] | |
end | |
end | |
use Rack::Proxy |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment