Skip to content

Instantly share code, notes, and snippets.

@weapp
Last active September 30, 2015 22:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save weapp/1cb9cb385a452c8a4d31 to your computer and use it in GitHub Desktop.
Save weapp/1cb9cb385a452c8a4d31 to your computer and use it in GitHub Desktop.
RedBunny. redlock + bunny. redis + rabbitmq
#!/usr/bin/env ruby
require 'bundler/setup'
require 'json'
require 'securerandom'
require 'singleton'
require 'prettyprint'
Bundler.require(:default)
TTL = 3 * 60 * 60 * 1_000 # 3.hours
module RedBunny
class Locker
include Singleton
attr_accessor :lock_manager
def initialize
@lock_manager = Redlock::Client.new(["redis://127.0.0.1:6379"])
end
def self.lock(*args)
instance.lock_manager.lock(*args)
end
end
class BunnyMessage
attr_accessor :body, :delivery_info, :metadata, :id
def initialize(message = {})
self.body = message[:body]
self.delivery_info = message[:delivery_info]
self.metadata = message[:metadata]
self.id = message[:id]
end
def self.from_pop(arr)
delivery_info, metadata, payload = arr
return nil unless payload
message = JSON[payload]
new(delivery_info: delivery_info,
metadata: metadata,
id: message['id'],
body: message['body'])
end
def ack
delivery_info[:channel].ack(delivery_info.delivery_tag)
end
def to_h
{
body: body,
delivery_info: delivery_info,
metadata: metadata,
id: id
}
end
end
class Queue
attr_reader :conn, :ch, :q
def initialize(name)
@conn = Bunny.new
@conn.start
@ch = conn.create_channel
@q = @ch.queue(name)
return unless block_given?
yield self
stop
end
def publish(body, id = SecureRandom.uuid)
@q.publish({ id: id, body: body }.to_json)
end
def pop
BunnyMessage.from_pop(@q.pop)
end
def subscribe(options = {})
@q.subscribe(options) do |*pop|
message = BunnyMessage.from_pop(pop)
yield message if Locker.lock("redbunny:#{message.id}", TTL)
end
end
def stop
@conn.stop
end
end
end
Bundler.require(:development)
def publisher?
ARGV[0]
end
def message
ARGV.join(" ")
end
RedBunny::Queue.new("test1") do |q|
if publisher?
q.publish(message)
else
q.subscribe(block: true, manual_ack: true) do |bunny_message|
Pry::ColorPrinter.pp bunny_message.to_h
puts
bunny_message.ack
end
# puts "This is the message:"
# ap q.pop.to_h
end
end
# -*- coding: utf-8 -*-
source 'https://rubygems.org'
gem "pry", group: 'development'
gem "bunny", ">= 2.1.0"
gem "redlock"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment