Created
July 16, 2014 03:26
-
-
Save aruprakshit/e5e3a55e3757b3a50dd9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def store(provider, board, status) | |
# @connection is a PG::Connection object | |
# @logger is a Logger object | |
# The part before `.each` is inconsequential, it's just a prepared statement and we're | |
# iterating over the results. | |
@connection.exec_prepared(TYPE, [provider, board].concat(statement status)).each do |row| | |
# Ignore this | |
@logger.debug(Harvester::Runner) do | |
Scrawl.new(label: "insert", table: TABLE, id: row["id"]) | |
end | |
# We asyncrounuously tell postgres to notify the TABLE channel, with the payload of 'ID' | |
# example: `NOTIFY statuses, '1' | |
# The payload is any string you want, I use IDs. | |
@connection.async_exec("NOTIFY #{TABLE}, '#{row["id"]}';") | |
# Ignore this | |
@logger.debug(Harvester::Runner) do | |
Scrawl.new(label: "notify", channel: TABLE, payload: row["id"]) | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Listener | |
def initialize(connection, channel, logger = Database::LOGGER) | |
# Get the raw connection from ActiveRecord | |
@connection = connection.raw_connection | |
# The channel we're going to listen to | |
@channel = channel | |
# The Logger | |
@logger = logger | |
# Okay, this means we're listening, but doesn't do anything specific. | |
# ex: LISTEN statuses | |
@connection.async_exec("LISTEN #{channel}") | |
# Jump to the #wait method | |
@logger.debug(Socialboard::Database) do | |
Scrawl.new(label: "listen", channel: channel) | |
end | |
end | |
def listen(&postprocess) | |
begin | |
wait(&postprocess) | |
rescue => exception | |
report(exception) | |
ensure | |
unlisten | |
end | |
end | |
private def report(exception) | |
@logger.error(Socialboard::Database) do | |
Scrawl.new(label: "exception", exception: exception) | |
end | |
@logger.debug(Socialboard::Database) do | |
Scrawl.new(label: "backtrace", backtrace: exception.backtrace) | |
end | |
end | |
private def wait(&block) | |
# Ignore this | |
@logger.debug(Socialboard::Database) do | |
Scrawl.new(label: "waiting", state: "starting", channel: @channel) | |
end | |
# This method on connection takes a block, or anonymous function | |
# If there is ever a `NOTIFY` on the right channel the block evaluates ONCE | |
# Until the block gets evaluated the entire thread is blocked | |
# AKA nothing after this line gets evaluated until the notify is triggered | |
@connection.wait_for_notify(&block) | |
# Jump to unlisten | |
# Ignore this | |
@logger.debug(Socialboard::Database) do | |
Scrawl.new(label: "waiting", state: "finishing", channel: @channel) | |
end | |
end | |
private def unlisten | |
# This has to be run if things fail or you're done, or else | |
# you get a whole bunch of hanging listens on the channel | |
# and things get slow | |
@connection.async_exec("UNLISTEN #{@channel}") | |
@logger.debug(Socialboard::Database) do | |
Scrawl.new(label: "unlisten", channel: @channel) | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment