Skip to content

Instantly share code, notes, and snippets.

@aruprakshit
Created July 16, 2014 03:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aruprakshit/e5e3a55e3757b3a50dd9 to your computer and use it in GitHub Desktop.
Save aruprakshit/e5e3a55e3757b3a50dd9 to your computer and use it in GitHub Desktop.
def store(provider, board, status)
# @connection is a PG::Connection object
# @logger is a Logger object
# The part before `.each` is inconsequential, it's just a prepared statement and we're
# iterating over the results.
@connection.exec_prepared(TYPE, [provider, board].concat(statement status)).each do |row|
# Ignore this
@logger.debug(Harvester::Runner) do
Scrawl.new(label: "insert", table: TABLE, id: row["id"])
end
# We asyncrounuously tell postgres to notify the TABLE channel, with the payload of 'ID'
# example: `NOTIFY statuses, '1'
# The payload is any string you want, I use IDs.
@connection.async_exec("NOTIFY #{TABLE}, '#{row["id"]}';")
# Ignore this
@logger.debug(Harvester::Runner) do
Scrawl.new(label: "notify", channel: TABLE, payload: row["id"])
end
end
end
class Listener
def initialize(connection, channel, logger = Database::LOGGER)
# Get the raw connection from ActiveRecord
@connection = connection.raw_connection
# The channel we're going to listen to
@channel = channel
# The Logger
@logger = logger
# Okay, this means we're listening, but doesn't do anything specific.
# ex: LISTEN statuses
@connection.async_exec("LISTEN #{channel}")
# Jump to the #wait method
@logger.debug(Socialboard::Database) do
Scrawl.new(label: "listen", channel: channel)
end
end
def listen(&postprocess)
begin
wait(&postprocess)
rescue => exception
report(exception)
ensure
unlisten
end
end
private def report(exception)
@logger.error(Socialboard::Database) do
Scrawl.new(label: "exception", exception: exception)
end
@logger.debug(Socialboard::Database) do
Scrawl.new(label: "backtrace", backtrace: exception.backtrace)
end
end
private def wait(&block)
# Ignore this
@logger.debug(Socialboard::Database) do
Scrawl.new(label: "waiting", state: "starting", channel: @channel)
end
# This method on connection takes a block, or anonymous function
# If there is ever a `NOTIFY` on the right channel the block evaluates ONCE
# Until the block gets evaluated the entire thread is blocked
# AKA nothing after this line gets evaluated until the notify is triggered
@connection.wait_for_notify(&block)
# Jump to unlisten
# Ignore this
@logger.debug(Socialboard::Database) do
Scrawl.new(label: "waiting", state: "finishing", channel: @channel)
end
end
private def unlisten
# This has to be run if things fail or you're done, or else
# you get a whole bunch of hanging listens on the channel
# and things get slow
@connection.async_exec("UNLISTEN #{@channel}")
@logger.debug(Socialboard::Database) do
Scrawl.new(label: "unlisten", channel: @channel)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment