public
Created

  • Download Gist
riak-tee.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
# Tee with Sinatra
# http://rigelgroupllc.com/wp/blog/tee-with-sinatra
# This proxy script will accept HTTP requests meant for Riak
# and in addition to passing them on to Riak will send a duplicate
# request to an ElasticSearch cluster
# -- John Lynch, www.rigelgroupllc.com
#
 
require 'rubygems'
require 'sinatra'
require 'typhoeus'
 
OPTIONS = {}
OPTIONS[:riak_host] = "localhost"
OPTIONS[:riak_port] = "8098"
OPTIONS[:es_host] = "localhost"
OPTIONS[:es_port] = "9200"
OPTIONS[:riak_timeout] = 5000 # milliseconds
OPTIONS[:es_timeout] = 5000 # milliseconds
class Rack::Proxy
def initialize(app)
@app = app
@hydra = Typhoeus::Hydra.new
end
def call(env)
req = Rack::Request.new(env)
# We need to use it twice, so read in the stream. This is an obvious problem with large bodies, so beware.
req_body = req.body.read if req.body
riak_url = "http://#{OPTIONS[:riak_host]}:#{OPTIONS[:riak_port]}#{req.fullpath}"
 
opts = {:timeout => OPTIONS[:riak_timeout]}
opts.merge!(:method => req.request_method.downcase.to_sym)
opts.merge!(:headers => {"Content-type" => req.content_type}) if req.content_type
opts.merge!(:body => req_body) if req_body && req_body.length > 0
 
riak_req = Typhoeus::Request.new(riak_url, opts)
riak_response = {}
riak_req.on_complete do |response|
riak_response[:code] = response.code
riak_response[:headers] = response.headers_hash
riak_response[:body] = response.body
end
@hydra.queue riak_req
 
# If we are putting or posting JSON, send a copy to the ElasticSearch index named "riak"
if (req.put? || req.post?) && req.content_type == "application/json"
req.path =~ %r{^/riak/([^/]+)/([^/]+)}
bucket, key = $1, $2
es_url = "http://#{OPTIONS[:es_host]}:#{OPTIONS[:es_port]}/riak/#{bucket}/#{key}"
opts = {:timeout => OPTIONS[:es_timeout]}
opts.merge!(:method => req.request_method.downcase.to_sym)
opts.merge!(:body => req_body) if req_body && req_body.length > 0
es_req = Typhoeus::Request.new(es_url, opts)
es_response = {}
es_req.on_complete do |response|
es_response[:code] = response.code
es_response[:headers] = response.headers_hash
es_response[:body] = response.body
end
@hydra.queue es_req
end
 
# Concurrently executes both HTTP requests, blocks until they both finish
@hydra.run
#If we wrote to ES add a custom header
riak_response[:headers].merge!("X-ElasticSearch-ResCode" => es_response[:code].to_s) if es_response && es_response[:code]
#Typhoeus can add nil headers, lets get rid of them
riak_response[:headers].delete_if {|k,v| v == nil}
 
# Return original Riak response to client
[riak_response[:code], riak_response[:headers], riak_response[:body]]
end
end
 
use Rack::Proxy

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.