Skip to content

Instantly share code, notes, and snippets.

@nviennot
Created August 5, 2015 04:44
Show Gist options
  • Save nviennot/67a0795db7ccad973885 to your computer and use it in GitHub Desktop.
Save nviennot/67a0795db7ccad973885 to your computer and use it in GitHub Desktop.
#!/usr/bin/env ruby
# `Gemfile` should have the following:
# source 'https://rubygems.org'
# gem 'goliath'
# gem 'nobrainer', :github => 'nviennot/nobrainer'
require 'bundler'
Bundler.require
# NoBrainer configuration.
# The default db name will be "goliath_development"
# We also instruct to use the EventMachine driver, and log debug info in STDERR.
NoBrainer.configure do |config|
config.app_name = "goliath"
config.environment = Goliath.env
config.driver = :em
config.logger = Logger.new(STDERR).tap { |logger| logger.level = Logger::DEBUG }
end
# StreamFiber is a helper that we'll use to process our asynchronous responses
# Calling stream() immediately returns http headers to the client, and
# schedule the passed blocked to be ran in a Fiber on the next EM tick.
# We must not let bubble up any exceptions from the fiber to avoid killing the
# EventMachine loop, killing the Goliath server.
module StreamFiber
def guard_async_response(env, &block)
block.call(env)
rescue Exception => e
begin
msg = {:error => "#{e.class} -- #{e.message.split("\n").first}"}.to_json
STDERR.puts msg
env.chunked_stream_send("#{msg}\n")
env.chunked_stream_close
rescue
end
ensure
env.chunked_stream_close
end
def stream(env, &block)
EM.next_tick { Fiber.new { guard_async_response(env, &block) }.resume }
chunked_streaming_response
end
end
# When the client disconnects, we must abort the ongoing changes() queries to
# avoid leaking resources. We'll add a bind_connection_to_cursor() method
# that will ensure that close() gets called on it when the client disconnects.
module BindCursor
def bind_connection_to_cursor(env, cursor)
if env['connection_closed']
cursor.close
else
env['cursors'] ||= []
env['cursors'] << cursor
end
cursor
end
def on_close(env)
(env['cursors'] || []).each(&:close)
env['connection_closed'] = true
end
end
# --------------------------------------------------------------------------
# Our application code starts here.
# 1) We'll use a Message model, with a required subject.
# 2) We'll define a Goliath endpoint which responds to /creates and /changes,
# which create Messages and listen to message changes.
# See below an example with curl.
#
# The interesting code is the changes() endpoint. First, it opens a streaming
# response to the client. Then it performs a changes() query filtered with the
# passed params. This query returns a cursor that can be indefinitely iterated
# upon. we call bind_connection_to_cursor() to make sure that cusor.each { }
# returns immediately when the client disconnects.
class Message
include NoBrainer::Document
field :subject, :type => String, :required => true
field :body, :type => Text
end
class Stream < Goliath::API
use Goliath::Rack::Params
include StreamFiber
include BindCursor
def changes(env)
stream(env) do
Message.where(env['params']).raw.changes(:include_states => true)
.tap { |cursor| bind_connection_to_cursor(env, cursor) }
.each { |changes| env.chunked_stream_send("#{changes.to_json}\n") }
end
end
def create(env)
user = Message.create!(env['params'])
[200, {}, user.to_json]
end
def response(env)
case [env['REQUEST_METHOD'].downcase.to_sym, env['PATH_INFO']]
when [:get, '/changes'] then changes(env)
when [:post, '/create'] then create(env)
else raise Goliath::Validation::NotFoundError
end
end
end
# When running the server as such:
# $ ruby <this_file.rb> -sv
#
# You should see:
# [28604:INFO] 2015-08-04 23:08:43 :: Starting server on 0.0.0.0:9000 in development mode. Watch out for stones.
#
# Now we can issue a few requests on our HTTP server.
#
# --------------------------------------
#
# Example 1: Creating an invalid message
#
# $ curl -X POST localhost:9000/create
# [:error, "#<Message id: \"2G7tczvZy9UdtL\"> is invalid: Subject can't be blank"]
#
# -----------------------------------------------------------
#
# Example 2: Creating a message while listening for changes
#
# First we listen for changes:
# $ curl localhost:9000/changes
# {"state":"ready"}
#
# Then we open a new shell and run:
# $ curl -X POST localhost:9000/create?subject=hello
# {"subject":"hello","id":"2G7yTuZ549CcQF"}
#
# We see on previous curl appear:
# {"new_val":{"id":"2G7yTuZ549CcQF","subject":"hello"},"old_val":null}
#
# On the server terminal, we should see the following:
# DEBUG -- : [ 4.4ms] r.table("messages").changes({"include_states" => true})
# DEBUG -- : [ 5.1ms] r.table("messages").insert({"subject" => "hello", "id" => "2G7yTuZ549CcQF"})
#
# --------------------------------------------------------
#
# Example 3: Listening for changes on a specific subject
#
# $ curl localhost:9000/changes?subject=hi
#
# $ curl -X POST localhost:9000/create?subject=blah
# $ curl -X POST localhost:9000/create?subject=hi
#
# We only see the changes of the second Message, not the first one.
#
# ---------------------------------------------------------
#
# Example 4: Running many clients
#
# $ for i in `seq 10`; do curl -N localhost:9000/changes &; done
#
# We see 10 times {"state":"ready"}
#
# $ curl -X POST localhost:9000/create?subject=hello
#
# We see 10 times {"new_val":{"id":"2G815T2RT9mklP","subject":"hello"},"old_val":null}
#
# This demonstrate that our server can handle many clients simultaneously.
#
# ---------------------------------------------------------
#
# Example 5: Handling connection failures
#
# If we kill the rethinkdb server while a /changes call is in progress we should
# see the following:
#
# $ curl localhost:9000/changes
# {"state":"ready"}
# {"error":"RethinkDB::RqlDriverError -- Connection closed by server."}
#
# If we try to redo the curl command:
# $ curl localhost:9000/changes
# {"error":"RethinkDB::RqlRuntimeError -- Connection is closed."}
#
# Once we restart the rethinkdb server, we can reissue requests with no further
# intervention:
# $ curl localhost:9000/changes
# {"state":"ready"}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment