Skip to content

Instantly share code, notes, and snippets.

@ginjo
Last active November 12, 2023 08:48
Show Gist options
  • Save ginjo/99f169332778725ac3fc9f65bb1a0fff to your computer and use it in GitHub Desktop.
Save ginjo/99f169332778725ac3fc9f65bb1a0fff to your computer and use it in GitHub Desktop.
Wrappers for Bunny Gem (rabbitmq) providing 1. Multiple consumer threads, 2. Simple setup of RPC call/callback publishers and consumers.
# frozen_string_literal: true
# This gemspec currently describes a gem hosted on Github as a Gist only.
# See https://bundler.io/guides/git.html
require_relative "bunny_wrapper_version.rb"
Gem::Specification.new do |spec|
spec.name = "bunny_wrapper"
spec.version = BunnyWrapper::VERSION
spec.authors = ["wbr"]
spec.email = ["bunnywrapper@jamulii.com"]
spec.summary = "RabbitMQ RPC with Bunny"
#spec.description = "TODO: Write a longer description or delete this line."
spec.homepage = "https://gist.github.com/ginjo/99f169332778725ac3fc9f65bb1a0fff"
spec.license = "MIT"
spec.required_ruby_version = ">= 2.6.0"
spec.metadata["allowed_push_host"] = "TODO: Set to your gem server 'https://example.com'"
spec.metadata["homepage_uri"] = spec.homepage
spec.metadata["source_code_uri"] = spec.homepage
#spec.metadata["changelog_uri"] = "TODO: Put your gem's CHANGELOG.md URL here."
# Specify which files should be added to the gem when it is released.
# The `git ls-files -z` loads the files in the RubyGem that have been added into git.
spec.files = Dir.chdir(__dir__) do
`git ls-files -z`.split("\x0").reject do |f|
(f == __FILE__) || f.match(%r{\A(?:(?:bin|test|spec|features)/|\.(?:git|travis|circleci)|appveyor)})
end
end
spec.bindir = "."
spec.executables = spec.files.grep(%r{\Aexe/}) { |f| File.basename(f) }
spec.require_paths = ["."]
# Uncomment to register a new dependency of your gem
# spec.add_dependency "example-gem", "~> 1.0"
spec.add_dependency "bunny"
spec.add_dependency "json_rpc_object"
spec.add_dependency "connection_pool"
# For more information and examples about making a new gem, check out our
# guide at: https://bundler.io/guides/creating_gem.html
end
### Custom classes for Bunny gem (RabbitMQ client for Ruby).
### Allows creation of multiple consumer threads each with their own channel.
### This is a Gist on Github.
###
### https://gist.github.com/ginjo/99f169332778725ac3fc9f65bb1a0fff
###
### This script depends on the 'bunny' gem.
###
### This script depends on the JsonRpcObject gist (json_rpc_object.rb). Download the file here:
### https://gist.github.com/ginjo/5b128c655e79e1b4e0e10c27a5098177
###
### You have (at least) three options for loading the JsonRpcObject gist:
### 1. manuallly require it in your project.
### 2. put the file in the same directory as this bunny_wrapper.rb file.
### 3. Set the constant JSONRPC_GIST_PATH with the path to the json_rpc_object.rb file.
###
### If you set manual_ack:true on a consumer, you must acknowledge manually:
### channel.ack(delivery_info[:delivery_tag])
###
### Example basic usage:
### r = BunnyWrapper.new host:'172.31.4.108'
### r.subscribe('task_queue', thread_pool_size:3, durable:true, manual_ack:true){|a, b, c, ch| puts [a, b, c]; ch.ack(a[:delivery_tag])}
### r.publish('task_queue','Hello13!')
###
### Example json_rpc usage:
### # on machine 1
### caller = BunnyWrapper.json_rpc_caller('task_queue', reply_to:'callback_queue'){|json_rpc_response| puts json_rpc_response.to_yaml}
###
### # on machine 2
### BunnyWrapper.json_rpc_responder('task_queue', reply_to:'callback_queue'){|json_rpc_request| {some_response_data:'anything'}}
###
### # on machine 1
### caller['my_method', {my_data:{a:1, b:2}}]
### # asynchronous resposne...
### # => {"json_rpc"=>"2.0", "id"="<automatically-managed-id>", "response"=>{"some_response_data"=>"anything"}}
###
### For concurrency reference, see:
### http://rubybunny.info/articles/concurrency.html#consumer_work_pools
###
### See below for TODO's.
###
### See here for Ruby Bunny documentation: http://rubybunny.info/articles/guides.html
### See here for helpful rabbitmq explanation: https://www.rabbitmq.com/tutorials/amqp-concepts.html
###
###
require 'bunny'
require 'securerandom'
require 'logger'
require 'connection_pool'
require 'delegate'
require_relative 'bunny_wrapper_version'
begin
require 'json_rpc_object'
rescue LoadError => err
puts "BunnyWrapper requires the json_rpc_object library from the the following gist:"
puts " https://gist.github.com/ginjo/5b128c655e79e1b4e0e10c27a5098177"
puts "Add this gem requirement to your Gemfile, then bundle install:"
puts " gem 'json_rpc_object', gist:'5b128c655e79e1b4e0e10c27a5098177'"
raise
end
# This assumes the json_rpc_object.rb gist file at the following path.
# You may also manually require the json_rpc_object.rb file. See above for gist link.
# JSONRPC_GIST_PATH ||= ENV['JSONRPC_GIST_PATH']
# begin
# Module.const_defined?(:JsonRpcObject) ||
# require_relative(Module.const_defined?(:JSONRPC_GIST_PATH) ? JSONRPC_GIST_PATH : 'json_rpc_object.rb')
# rescue
# end
class BunnyWrapper
attr_reader :session, :threads, :consumers, :channels, :channel_opts, :queue_opts, :subscription_opts, :logger
attr_reader :channel_pool, :lock, :responders
CHANNEL_OPTS_KEYS = [
# create_channel() actually takes positional args, not kwargs, but these options
# will be passed in as positional.
:thread_pool_size,
:channel_pool_size,
:channel_pool_timeout,
:consumer_pool_abort_on_exception,
:consumer_pool_shutdown_timeout,
:prefetch,
:prefetch_global,
]
QUEUE_OPTS_KEYS = [
:durable, # (Boolean) — default: false — Should this queue be durable?
:auto_delete, # (Boolean) — default: false — Should this queue be automatically deleted when the last consumer disconnects?
:exclusive, # (Boolean) — default: false — Should this queue be exclusive (only can be used by this connection, removed when the connection is closed)?
:arguments, # (Hash) — default: {} — Additional optional arguments (typically used by RabbitMQ extensions and plugins)
]
SUBSCRIPTION_OPTS_KEYS = [
:ack, # (Boolean) — default: false — [DEPRECATED] Use :manual_ack instead
:manual_ack, # (Boolean) — default: false — Will this consumer use manual acknowledgements?
:exclusive, # (Boolean) — default: false — Should this consumer be exclusive for this queue?
:on_cancellation, # (#call) — Block to execute when this consumer is cancelled remotely (e.g. via the RabbitMQ Management plugin)
:consumer_tag, # (String) — Unique consumer identifier. It is usually recommended to let Bunny generate it for you.
:arguments, # (Hash) — default: {} — Additional (optional) arguments, typically used by RabbitMQ extensions
:thread_pool_size,# (Int) - It is used in subscribe() to tell us how many consumers to spawn.
]
def default_logger
_logger = ::Logger.new(STDOUT)
log_level = (::Logger.const_get(ENV['LOG_LEVEL'] || 'INFO') rescue Logger::INFO)
_logger.level = log_level
_logger.datetime_format = '%Y-%m-%d %H:%M:%6N %z'
_logger
end
# Define consumer subclass.
# I saw this in an example somewhere but don't know what it's for.
# Update: I saw it used in rabbitmq/bunny docs somewhere. If you cancel
# subscriptions, you might want to subclass Bunny::Consumer.
#
# class Consumer < Bunny::Consumer
# def cancelled?
# @cancelled
# end
#
# def handle_cancellation(_)
# @cancelled = true
# end
# end
# Creates a new instance of this class with an associated session to rabbitmq server.
#
# You can pass-in Bunny.new opts AND subscription opts here, and BunnyWrapper will sort them out.
# Make sure that the opts you want to use are included in SUBSCRIPTION_DEFAULTS above.
#
# Note that reset_queues does not currently do anything. You must issue a cli command to do that:
# rabbitmqadmin delete queue name=name-of-queue
#
def initialize(reset_queues:[], existing_connection:nil, **opts)
@logger = opts.delete(:logger) || default_logger
# See here for durability info.
# http://rubybunny.info/articles/durability.html
@channel_opts = opts.slice(*CHANNEL_OPTS_KEYS)
@queue_opts = opts.slice(*QUEUE_OPTS_KEYS)
@subscription_opts = opts.slice(*SUBSCRIPTION_OPTS_KEYS)
opts = opts.except(*CHANNEL_OPTS_KEYS, *QUEUE_OPTS_KEYS, *SUBSCRIPTION_OPTS_KEYS)
logger.info{ "BW initializing with @channel_opts: #{channel_opts}" }
logger.info{ "BW initializing with @queue_opts: #{queue_opts}" }
logger.info{ "BW initializing with @subscription_opts: #{subscription_opts}" }
logger.info{ "BW initializing with (remaining) opts: #{opts}" }
@session = (existing_connection ? existing_connection : Bunny.new(**opts))
# Loops until connected to rabbitmq server.
until (
begin
@session.start
rescue Bunny::Exception
false
end
) do
logger.info{ "#{self} waiting for rabbitmq server..." }
sleep 3
end
logger.info{ "#{self} connected to rabbitmq server: #{opts}" }
@session
end
# Creates channel, using global and local opts.
# See https://www.rubydoc.info/github/ruby-amqp/bunny/Bunny%2FSession:create_channel
#
def create_channel(**_opts)
opts = channel_opts.merge(_opts)
channel_id = opts[:channel_id]
thread_pool_size = (opts[:thread_pool_size]).to_i
consumer_pool_abort_on_exception = (opts[:consumer_pool_abort_on_exception] || false)
consumer_pool_shutdown_timeout = (opts[:consumer_pool_shutdown_timeout] || 60).to_i
prefetch = (opts[:prefetch] || 1).to_i
prefetch_global = (opts[:prefetch_global] || false)
ch = @session.create_channel(channel_id, thread_pool_size, consumer_pool_abort_on_exception, consumer_pool_shutdown_timeout)
# See here for confusing info about rabbitmq prefetch.
# https://www.rabbitmq.com/consumer-prefetch.html
ch.prefetch(prefetch, prefetch_global)
ch
end
def channel_pool(**_opts)
return @channel_pool if @channel_pool
opts = channel_opts.merge(_opts)
# See here for 'channel' pooling. This ~should~ solve our problems with sharing publishing channels between threads:
# https://wework.github.io/ruby/rails/bunny/rabbitmq/threads/concurrency/puma/errors/2015/11/12/bunny-threads/
# https://github.com/mperham/connection_pool
#
channel_pool_size = (opts[:channel_pool_size] || 1).to_i
channel_pool_timeout = (opts[:channel_pool_timeout] || 15).to_i
#
# Channels for all publishing and rpc consuming.
# Persistant consumers get their own dedicated channel, not from this pool.
@channel_pool = ConnectionPool::Wrapper.new(
size: channel_pool_size,
timeout: channel_pool_timeout
# The thread-pool is set here, but in most normal cases (publish, rpc-subscribe),
# it won't actually grow to a size > 1. And it won't even be constructed
# if no consumer is active on the channel.
#){ @session.create_channel(nil, thread_pool_size) }
){ create_channel(**opts) }
end
# Stop using this, as it could conflict with method-local vars.
#alias :channel :channel_pool
# Publicly accessible publish method.
# Handles message_id, object-to-json conversion, channel pool
#
# Returns message_id
#
# See here for info on rabbit queues & exchanges.
# https://www.rabbitmq.com/tutorials/amqp-concepts.html
#
#
# This original version works great.
#
# def publish(payload, routing_key:, **opts)
#
# opts[:message_id] ||= payload.is_a?(JsonRpcObject) ? payload['id'] : SecureRandom.uuid
# opts[:content_type] ||= payload.is_a?(String) ? 'application/octet-stream' : 'application/json'
# payload_string = payload.is_a?(String) ? payload : payload.to_h.to_json
#
# channel_pool.default_exchange.publish(payload_string, routing_key:, **opts)
#
# logger.debug{ "BW published to '#{routing_key}', '#{opts[:message_id]}', '#{opts[:content_type]}'" }
#
# return opts[:message_id]
# end
#
# This is the composit publish/publish_jrpc version.
# √ TODO: Consider flipping the order of payload, jrpc_method, since Jamulii
# is doing it that way, and it looks much better when actually using the jrpc_method.
#
# √ TODO: The rpc response message_id is the same as the request message_id,
# if the response payload is a JsonRpcObject (even if it's a JsonRpcObject result)
#
# TODO: Maybe we should create subclasses for JsonRpcObject for each type,
# but that's a JsonRpcObject gem issue.
#
def publish(jrpc_method=nil, payload, jrpc_type:'request', routing_key:, **opts)
logger.debug{"BW#publish '#{routing_key}' payload is_a? #{payload.class}"}
# If payload is JsonRpcObject
if payload.is_a?(JsonRpcObject)
payload_string = payload.to_h.to_json
opts[:message_id] ||= (payload.has_key?('result') ? SecureRandom.uuid : payload['id'])
opts[:content_type] ||= 'application/json-rpc'
jrpc_type = nil # just to be tidy
# If this is JsonRpc call with method + data
#elsif ( payload.is_a?(Hash) || payload.is_a?(Array) ) && jrpc_method
elsif jrpc_method
# Create a JsonRpcObject from hash or array.
payload_jrpc = JsonRpcObject.send(jrpc_type, jrpc_method, payload)
payload_string = payload_jrpc.to_h.to_json
opts[:message_id] ||= payload_jrpc['id']
opts[:content_type] ||= 'application/json-rpc'
# If payload is Hash
elsif payload.is_a?(Hash)
# Just convert to json.
payload_string = payload.to_h.to_json
opts[:message_id] ||= SecureRandom.uuid
opts[:content_type] ||= 'application/json'
# If payload is Array
elsif payload.is_a?(Array)
# Just convert to json.
payload_string = payload.to_a.to_json
opts[:message_id] ||= SecureRandom.uuid
opts[:content_type] ||= 'application/json'
# If payload is String
elsif payload.is_a?(String)
# Basically a pass-thru.
payload_string = payload.to_s
opts[:message_id] ||= SecureRandom.uuid
opts[:content_type] ||= 'application/octet-stream'
end
# Chooses a channel source to publish with.
# Will accept passed channel:<channel-or-pool>, or default to channel().
#
if opts[:channel]
logger.debug{"BW using chanel passed as option in BW#publish()."}
end
_channel = opts[:channel] || channel_pool
_channel.default_exchange.publish(payload_string, routing_key:, **opts)
logger.debug{ "BW published to '#{routing_key}', '#{opts[:message_id]}', '#{opts[:content_type]}'" }
return opts[:message_id]
end
# Subscribes to a queue, optionally creating thread_pool_size instances/consumers.
# Takes a block which is called upon receiving a message.
# The user block is passed these params - see receive() method.
# The params: are all extra positional args.
# The options: are all extra kwargs.
#
# |delivery_info:, properties:, payload:, params:, options:|
#
# See here about queues: http://rubybunny.info/articles/queues.html
#
# Default is auto-ack. Here's a manual-ack example.
# delivery_info[:channel].ack(delivery_info[:delivery_tag])
#
# TODO: Which method do the opts go to? @channels[n].queue() or q.subscribe_with()?
# Currently the opts are going to both methods (without any apparent errors?).
#
def subscribe(listen_queue, **_opts, &block)
opts = subscription_opts.merge(_opts)
# We create a dedicated channel for persistant consumers.
# We only pass _opts, since subscription_opts doesn't belong in the
# create_channel() method. Same with ch.queue() below.
ch = create_channel(**_opts)
#ch = channel_pool
logger.info "BW subscribing to queue '#{listen_queue}' with opts:#{opts}"
queue = ch.queue(listen_queue.to_s, **queue_opts, **_opts)
# Creates multiple consumers to match thread_pool_size.
# We have to do this cuz I guess Bunny doesn't create the extra
# consumer instances, given thread_pool_size.
#
thread_pool_size = (opts[:thread_pool_size]).to_i
consumers = thread_pool_size.times.map do |n|
#consumers = 1.times.map do |n|
consumer = queue.subscribe(consumer_tag: "#{queue.name}_#{n}", **opts) do
|delivery_info, properties, payload|
logger.debug{"BW consumer block (#{queue.name} - #{n}) received message"}
if block_given?
#block.call(delivery_info, metadata, payload, channel, consumer)
enhanced_message = receive(delivery_info, properties, payload, opts)
logger.debug{"BW consumer block (#{queue.name} - #{n}) calling inner block with: #{enhanced_message}"}
block.call(**enhanced_message)
else
logger.warn "BunnyWrapper consumer block (#{queue.name} - #{n}), but no handler block was given."
logger.debug payload
nil
end
end
end
#logger.info "#{self} listening to queue '#{listen_queue}' (#{queue.name}) with pool size #{(opts[:thread_pool_size] || subscription_opts[:thread_pool_size] || 1)}."
# Returns the subscription object so we can use it in downstream methods.
consumers
end
# Processes all params and payload returned from bunnywrapper
# subscribe() block, and converts payload to ruby object,
# if appropriate, based on content_type.
#
def receive(delivery_info, properties, payload, *params, **options)
begin
message = case
when properties[:content_type].to_s.match?(/json-rpc/i)
logger.debug{"#{self}#receive() is processing json-rpc"}
JsonRpcObject.load(payload)
when properties[:content_type].to_s.match?(/json/i)
logger.debug{"#{self}#receive() is processing json"}
JSON.load(payload)
else
logger.debug{"#{self}#receive() is processing text"}
payload
end
rescue => err
logger.warn{"#{self}#receive() was not able to process the payload: #{err}"}
message = payload
end
return {delivery_info:, properties:, payload:message, params:, options:}
end
# A Subscription that publishes the result to the reply_to queue.
# See subscribe() or receive() for the kwargs passed to the user block.
#
def responder(listen_queue, reply_to:nil, **opts, &block)
subscribe(listen_queue, **opts) do |*params, delivery_info:, properties:, payload:, **options|
# The extra *params and **options above will capture any extra args or kwargs.
#puts properties.inspect
_reply_to ||= (options[:reply_to] || properties[:reply_to] || _reply_to)
#puts "rpc_responder reply_to: #{reply_to}"
callback_headers = {
original_payload: payload,
original_properties: properties.to_h,
}
out_params = {
headers: callback_headers.to_h, #.merge({'exit_code'=>0}),
content_type: properties[:content_type],
correlation_id: (properties[:message_id] || properties[:correlation_id]),
}
block_call_params = {
params:, #params.dup,
delivery_info:, # delivery_info.to_h,
properties:, #properties.to_h,
payload:, #payload.dup,
options:, #options.to_h,
}
if block_given?
rslt = block.call(**block_call_params)
else
rslt = {error:"BW responder had no block to call, but here are the params to pass:",
block_call_params: block_call_params.except(:delivery_info)
}
end
logger.debug{ "Responder '#{listen_queue}' publishing result:" }
logger.debug{ rslt.to_yaml }
publish(rslt, routing_key:_reply_to, **out_params)
end
end # responder
# Convenience storage hash for responders, keyed by listen_queue name.
# TODO: Hmm, I don't know if we want this here. It may be more of an app-level function.
def responders(listen_queue, reply_to:nil, **opts, &block)
@responders ||= {}
@responders[listen_queue] ||= responder(listen_queue, reply_to:, **opts, &block)
end
# This class creates an instance to wrap a synchronous rpc call/response.
# Pass this an instance of BunnyWrapper, when initializing.
# SyncRpc.new(<bunnywrapper-instance>, send_queue, **opts)[payload]
#
# Use the convenience method rpc() for most cases.
#
# The output is the same as the subscribe() or receive() methods.
#
# This handles everything for a single synchronous send/receive
# rpc call through rabbitmq. This can also handle multiple threads
# call this class at the same time.
#
# It assumes there is a handler on the send-to queue that returns
# something to the given reply-to queue.
#
# This uses the custom BunnyWrapper publish() method, but does NOT
# use the BunnyWrapper subscribe() method.
#
# Extra _opts are currently passed on to each call to Bunny/Rabbitmq.
# For most cases, it should not be necessary to pass extra opts.
#
# For examples of setup and usage of rabbitmq/bunny/bunnywrapper/ruby/threaded/synchronous operations,
# see the Terraform rabbitmq/test_examples.rb file.
#
class SyncRpc < SimpleDelegator
attr_accessor :send_queue, :response, :condition, :opts, :lock, :sync_queue
def initialize(bw_inst, _send_queue=nil, **_opts)
super(bw_inst)
@send_queue = _send_queue
@opts = _opts
end
def call(jrpc_method=nil, payload, routing_key:@send_queue, **__opts)
_opts = opts.merge(__opts)
# Set up channel.
# We can actually use a channel from the pool here.
# TODO: Would passing **opts here break anything? It would be intended
# for users who want to customize their rpc, but I don't know if it's appropriate here.
#
channel_pool.with do |ch|
# Or we can create/destroy a dedicated channel just for this instance.
#ch = session.create_channel
# exclusive:true was here but didn't do anything, so moved it to subscribe().
# TODO: Is it ok to pass _opts here?
reply_queue = ch.queue('', exclusive:true, auto_delete:true, **_opts)
reply_queue_name = reply_queue.name
# If routing_key is '__test__' we short-circuit the remote responder
# and just send to the listen-queue.
(routing_key == '__test__') && (routing_key = reply_queue_name)
# Set up the lock
@lock = Monitor.new
@condition = MonitorMixin::ConditionVariable.new(@lock)
that = self
# Using Queue to make sure condition.wait happens before condition.signal.
# See https://stackoverflow.com/questions/52068527/simple-thread-conditional-variable-example-giving-deadlock-in-ruby
@sync_queue = Queue.new
# Set up consumer
consumer = reply_queue.subscribe(manual_ack:true, exclusive:true, **_opts) do |_delivery_info, _properties, payload|
that.lock.synchronize do
logger.debug{ "SyncRpc#call #{reply_queue_name} received '#{payload}'" }
that.response = receive(_delivery_info, _properties, payload, **_opts)
logger.debug{"SyncRpc#call #{reply_queue_name} sending ack"}
ch.ack(_delivery_info[:delivery_tag])
logger.debug{"SyncRpc#call #{reply_queue_name} calling condition.signal"}
that.sync_queue.pop
that.condition.signal
logger.debug{"SyncRpc#call #{reply_queue_name} after condition.signal"}
end
# that.lock.synchronize { that.condition.signal }
end
#ch_publish = session.create_channel
lock.synchronize do
publish(jrpc_method, payload, routing_key:, channel:ch, reply_to: reply_queue_name, **_opts) # channel: ch_publish
logger.debug{"SyncRpc#call #{routing_key} synchronizing condition.wait"}
sync_queue << 1
condition.wait
logger.debug{"SyncRpc#call #{routing_key} canceling consumer #{consumer}"}
consumer.cancel
logger.debug{"SyncRpc#call #{routing_key} returning response: #{response}"}
end
# If discarding one-off channel.
#consumer.cancel
# If using persistent channel pool
end # channel_pool.with
return response
end
alias :[] :call
end # SyncRpc
# Convenience method that creates a synchronous publish-subscribe
# instance with SyncRpc and calls it with the given params.
#
# The output is same as subscribe() or receive() methods.
#
# Payload can be string, json, hash, array, or JsonRpcObject.
#
def rpc(jrpc_method=nil, payload, routing_key:, **opts)
rslt = SyncRpc.new(self, routing_key, **opts)[jrpc_method, payload, **opts]
end
### LEGACY METHODS ###
# # For informational use.
# def channels
# @session.instance_variable_get(:@channels)
# # Bunny session@channels holds a hash of channels {1=>ch1, 2=>ch2}
# end
#
#
# # All consumers on all session channels.
# # For informational use.
# def consumers
# channels.values.inject({}) do |a,b|
# a.merge(b.consumers)
# end
# end
#
# # For informational use.
# def exchanges
# channels.values.collect{|ch| ch.exchanges rescue nil}.flatten
# end
#
#
# def queues
# channels.values.collect{|ch| ch.queues rescue nil}.flatten
# end
#
# def find_queue(name)
# queues.find{|q| q[name]}&.values&.fetch(0)
# end
#
#
# Cancels consumers and kills threads.
# Note: This does not free memory or delete queues!
#
# See Queue#purge to clear all messages from a queue.
#
# def reset!
# channels.each(&:close)
# end
#
#
# # Experimental: Deletes the queue, triggering the consumer cancellation handler
# # Update: This may no longer work since we implemented connection pooling.
# #
# def delete_queue(_q)
# @ch_x.queue(_q).delete
# end
end
##### TODO / Issues #####
# √ Put basic boilerplate code from tf_messaging.rb and terraform/tf_worker.rb into this library.
# Things like error-handling and call-reponse callback functionality. And/or...
# Handle rpc_call() with no block (a jsonrpc 'notification'), which
# doesn't expect a response. This is important for app-servers vs app-workers.
# √ For call-response functionality, create two BunnyWrapper methods that are closely coupled.
# These methods would be used to set up call and response functionality between
# two different types of nodes in a distributed computing scenario.
#
# # sets up a response listener and returns a 'caller' proc.
# r.rpc_caller(queue, reply_to:'callack_queue', thread_pool_size:1, **opts, &block)
# # => proc
#
# # Subscribes to queue which sends result of block to the callback-queue.
# r.rpc_responder(queue, reply_to:<given in caller's headers>, thread_pool_size:1, **opts, &block)
# √ TODO: rpc_caller() thread_pool_size:<integer> are not working, not causing multiple threads to spawn.
# Consider rufus-scheduler gem for timed/recuring jobs.
# https://rubygems.org/gems/rufus-scheduler
# Consider concurrent-ruby gem for better concurrency handling.
# https://github.com/ruby-concurrency/concurrent-ruby
# Update: What would be the benefits of using concurrent-ruby for BunnyWrapper?
# Is this item possibly irrelevant now that we use Bunny's built-in concurrency tools?
# Consider bypassing BunnyWrapper#publish and just use Bunny...publish for all non-user initiated publishing
# like callbacks and Proc publishing. Look at every place where you call 'publish' from within this library.
# BunnyWrapper#publish should just be for users.
# Update: What is the benefit of making this change?
# √ Change rpc_responder cb_queue to callback_queue, and make it a keyword instead of a positional param.
# Update: now using properties[:reply_to].
# Review the keyword args vs **opts of each function definition. Keyword args keeps extra stuff out of **opts,
# but it's more to manage and more verbose.
# √ We need a generic json-rpc-publish method, so we can send one-off json-rpc messages without having
# to set up listeners. This would be especially helful for testing (so we can send test messages to local
# callback-listeners and responder listeners).
# √ Finish handling correlation_id for json_rpc and regular rpc call/response.
# √ It's working for rpc_caller/rpc_reponder, but not for json_rpc_caller/json_rpc_responder.
# √ Eliminate callback_headers[:callback_queue] in favor of properties[:reply_to].
# Make sure content_type handling is correct.
# Do the above 3 things for the publish_jrpc() method.
# √ The synchronous rpc system works but is not threadsafe.
# If multiple threads make synchronous rpc calls at the same time,
# the results will be scrambled.
# Try putting rpc_caller entire block (of the resposne handler) in a mutex.synchronize again,
# maybe will solve multi-threading problems?
# Update this might be solved now by doing two things:
# 1. Making sure all rpc-callers have their own channel.
# No two threads should ever publish to rabbitmq on the same channel.
# 2. Using Monitor class instead of Mutex class. Monitor is re-entrant and handles recursive locking.
# Refactor handles all of this.
# √ Clean up the one-channel-per-synchronous-rpc-call mechanism.
# Find a way to allow multiple instances of BunnyWrapper to share a single connection.
# Or find a way to have a BunnyWrapper instance for each rpc-call that is made,
# so we can avoid resursive collisions when two or more rpc calls/responses try to
# access the same variables. I think this is only an issue when chaining synchronous
# rpc calls/responses together, which we do in JamService.
# Update: I think the refactor can handle all of this.
# Create a Logger object that automatically overrides itself with $logger, if exists.
# Or allow Instanciation of BunnyWrapper class to accept a logger instance.
# Or both.
# - TODO: Customize the procs that are created, so that they are regular objects (hashes?)
# that can be introspected and configured. They will need a 'call' method, so
# they can behave like procs. This feature will make it easier to debug and/or modify
# caller/responder proc objects.
# Update: I believe this is no longer relevant.
#
# TODO: Make sure SUBSCRIPTION_DEFAULTS contains proper, or at least sane, defaults
# for all possible Bunny#subscribe options.
#
# SEE here for good tips on managing threads/channels/queues/exchanges:
# https://www.cloudamqp.com/blog/part4-rabbitmq-13-common-errors.html
# http://rubybunny.info/articles/concurrency.html
# √ TODO: Provide a way to disable listeners while still allowing callers to work.
# This is needed in testing where we don't want to foul up the data flow, just by booting
# an app in the cli (instead of as a daemon on the production server).
#
# √ TODO: Currently we cannot create a jrpc-caller without also setting up a listener.
# According to Bunny docs, if you create a channel without attaching a consumer,
# a listener will not be set up. Figure out how to implement that in this library.
# See 'consumer work pools' section here: http://rubybunny.info/articles/concurrency.html
#
# √ TODO: I think thread_count should be changed to listener_pool_size, or does Bunny actually use 'thread_count'?
# Note that we also have a channel_pool_size, managed by connection_pool gem.
# Update: I don't think bunny has a name for it, it just takes an integer with create_channel().
# Update: Renamed in the refactor.
#
# √TODO: Currently, async rpc calls return the exchange. They should return the request ID!
# It should utlimately be possible to query BunnyWrapper about the state of an async job
# by using the request ID.
# √ TODO: I think BunnyWrapper may need to have 2 kinds of public 'base' methods which all others are based on.
# This will help us create background workers that can listen (and call), plus front-end servers that can call
# but NOT listen (except to syncr responses).
# 1. Caller
# 2. Listener
# 3. Responder (listener + caller)
# 4. Async Caller (with async listener)
# 5. Syncr Caller (with synchronous listener)
# Update: The refactor follows the above, mostly.
#
# I think right now, a responder can respond to either an async or syncr caller.
# I don't think the responder cares (or has any idea about) whether the caller
# is syncr or async. It just responds to the given reply_to queue.
class BunnyWrapper
VERSION = '0.1.0'
end
source "https://rubygems.org"
gem 'json_rpc_object', gist:'5b128c655e79e1b4e0e10c27a5098177'
#gem 'connection_pool'
#gem 'json_rpc_object', path:'../JsonRpcRuby'
# Declare your gem's dependencies in foo.gemspec.
# Bundler will treat runtime dependencies like base dependencies, and
# development dependencies will be added by default to the :development group.
gemspec
RABBITMQ_HOST = ENV['RABBITMQ_HOST'] || 'localhost'
CONSUMER_POOL_SIZE = ENV['CONSUMER_POOL_SIZE'] || 16
CHANNEL_POOL_SIZE = ENV['CHANNEL_POOL_SIZE'] || 32
PREFETCH = ENV['PREFETCH'] || 1
ENV['LOG_LEVEL'] ||= 'DEBUG'
require 'bundler'
Bundler.setup
require 'bunny_wrapper'
# require 'bunny'
# require 'connection_pool'
require 'yaml'
require 'json'
# See for examples: https://www.rabbitmq.com/tutorials/tutorial-one-ruby.html
# See for amqp modle explained: https://www.rabbitmq.com/tutorials/amqp-concepts.html
# See for queues: http://rubybunny.info/articles/queues.html
# See for concurrency: http://rubybunny.info/articles/concurrency.html#:~:text=are%20not%20used.-,Consumer%20Work%20Pools,ordered%20message%20processing%20by%20default.
# See for synchronous rpc: https://www.rabbitmq.com/tutorials/tutorial-six-ruby.html
#
# NOTE: The rabbitmq chapter with examples of synchronous rpc does not work in
# multiple concurrent threads... without some modification. Mainly, the synchronize blocks
# need to enclose more of the code. See the sync examples below.
#
# You may also want to use a ruby Queue object to make sure the order of competing threads
# (publisher vs consumer) works with our code. For that, see the following:
# https://stackoverflow.com/questions/52068527/simple-thread-conditional-variable-example-giving-deadlock-in-ruby
module Bun
extend self
def connection
Bun.const_defined?(:Connection) && (return Connection)
host = (ENV['RABBITMQ_HOST'])
Bun.const_set('Connection', Bunny.new(hostname: host, automatic_recovery: true))
puts "Bunny connected to rabbitmq host: #{host}."
Connection.start
Connection
end
def wrapper
@wrapper ||= BunnyWrapper.new(
existing_connection: connection,
channel_pool_size: CHANNEL_POOL_SIZE,
thread_pool_size: CONSUMER_POOL_SIZE,
prefetch: PREFETCH,
)
#@wrapper.logger.level = 1
#puts "Logger object: #{@wrapper.logger.object_id}"
end
### BASIC QUEUE OPERATIONS ###
def setup1
#connect
@ch1 = connection.create_channel
@ch2 = connection.create_channel
@ch3 = connection.create_channel
@ch1.queue('q1')
begin
#queue.subscribe(block: true) do |_delivery_info, _properties, body|
@ch2.queue('q1').subscribe do |_delivery_info, _properties, body|
puts "q1 Received '#{body}'"
end
puts "Subscribed to 'q1'."
rescue Interrupt => _
puts "Closing connection."
connection.close
end
end
def test1(txt='Hi there!')
@ch1 || setup1
# channel.default_exchange.publish('Hello World!', routing_key: queue.name)
@ch3.default_exchange.publish(txt, routing_key: 'q1')
3.times do |n|
Thread.new{instance_variable_get("@ch#{n+1}").default_exchange.publish("q1 (#{n}) text message.", routing_key: :q1)}
end
end
### WITH CHANNEL POOL and CONSUMER WORK POOL ###
attr_reader :pool
def setup2
#connect
@pool = ConnectionPool::Wrapper.new(size: CHANNEL_POOL_SIZE){ connection.create_channel(nil, CONSUMER_POOL_SIZE) }
@pool.queue('q1')
begin
#queue.subscribe(block: true) do |_delivery_info, _properties, body|
@pool.queue('q1').subscribe do |_delivery_info, _properties, body|
sleep 1
puts "q1 received '#{body}'"
end
puts "Subscribed to 'q1'."
rescue Interrupt => _
puts "Closing connection."
connection.close
end
end
def test2(txt='Hi there!')
@pool || setup2
# channel.default_exchange.publish('Hello World!', routing_key: queue.name)
@pool.default_exchange.publish(txt, routing_key: 'q1')
20.times do |n|
Thread.new{@pool.default_exchange.publish("q1 (#{n}) text message.", routing_key: :q1)}
end
end
### BLOCKING ### Doesn't really do what we want, and it may not be threadsafe.
def test3(txt="test3 synchronous Yay!")
#connect
ch = connection.create_channel
q = ch.queue('', exclusive:true, auto_delete:true)
q_name = q.name
ch.default_exchange.publish(txt, routing_key:q_name)
rslt = nil
sub = q.subscribe(block:true) do |_delivery_info, _properties, body|
puts "#{q_name} received '#{body}'"
ch.close
rslt = body
next body
end
rslt
end
### SYNCHRONOUS ### Note the use of Monitor and MonitorMixin.
attr_accessor :response, :lock, :condition
# This handles everything for a single synchronous send/receive
# rpc call through rabbitmq.
#
# It assumes there is a handler on the send-to queue that returns
# something to the given reply-to queue.
#
# This is as concise as the code can get (pretty much),
# and it does NOT USE BunnyWrapper.
#
# This example creates/destroys a dedicated channel for each rpc publish->consume
# operation, and it ~should~ be threadsafe. If not, consider using a ruby Queue
# to order the publish->consume deterministically, or try using a separate
# channel for publish vs consume. See BunnyWrapper for examples of each of those.
#
# NOTE: For this example, the send-to and reply-to queue is the same
# anonymous '' queue, so we're simply short-circuiting the remote responder.
# The functionality and behavior is identical to a real-world scenario where
# they are different queues, each with their own handler.
#
def test4(txt="Synchronous Yay!")
#connect
ch = connection.create_channel
q = ch.queue('', exclusive:true, auto_delete:true)
q_name = q.name
@lock = Monitor.new
@condition = MonitorMixin::ConditionVariable.new(@lock)
that = self
q.subscribe do |_delivery_info, _properties, payload|
that.lock.synchronize do
puts "#{q_name} received '#{payload}'"
that.response = payload
that.condition.signal
end
#that.lock.synchronize { that.condition.signal }
end
lock.synchronize do
ch.default_exchange.publish(txt, routing_key:q_name)
#lock.synchronize { condition.wait }
condition.wait
ch.close
end
return response
end
class SyncRpc
include Bun
def initialize(msg)
test4(msg)
end
end
# Runs multiple test4 calls in threads, each in an instance of SyncRpc, so we
# don't clash the ivars, which happens because we're not syncing those vars
# (I think we could though).
#
def test4a(how_many=5)
#connect
threads = how_many.times.map do |n|
Thread.new(n) do |i|
Thread.current[:output] = SyncRpc.new("test4a synced #{i}").response
puts "Thread #{i} done, with output #{Thread.current[:output]}."
end
end
puts "Thread count: #{threads.size}"
threads.map do |t|
t.join
t[:output]
end
end
### WITH BUNNYWRAPPER ###
#attr_reader :wrapper
def setup5
@test5_consumer ||= wrapper.subscribe('test5') do |payload:, **params|
wrapper.logger.info{payload}
payload
end
end
# Runs basic BW (BunnyWrapper) publish() and subscribe() methods.
def test5(payload='Basic BunnyWrapper text payload.')
setup5
wrapper.publish(payload, routing_key:'test5')
end
# Runs basic BW publish() and subscribe() methods with payload as hash,
# which should convert it to json.
def test5a(payload={a:1, b:2})
setup5
wrapper.publish(payload, routing_key:'test5')
end
# Runs BW publish() method with payload as a hash,
# which should convert to JsonRpcObject.
def test5b(payload={c:5, d:6})
setup5
#@wrapper.publish(payload, routing_key:'test6b')
wrapper.publish(payload, routing_key:'test5')
end
### SYNCHRONOUS BUNNYWRAPPER ###
# Runs BW basic synchronous subscribe-publish-receive functionality.
# The routing-key '__test__' causes the publisher to push to the
# reply-to queue, short circuiting any responders (of which there
# are none for this test). So this just tests the sender and receiver.
#
def test5c
setup5
wrapper.rpc('Sync rpc call from test5c()', routing_key:'__test__').slice(:payload, :properties)
end
# Runs multiple BW synchronous calls in threads.
# You can pass a block to be evaluated as the 'payload', it will receive i from the iterator.
#
# See test5c for note about using teh __test__ routing key.
#
# Example usage:
# rpc = Bun.test5d
# jrpc = Bun.test5d(25){|i|{iterator:i, self:self}}
#
def test5d(how_many=5, &block)
#wrapper || setup5
threads = how_many.times.map do |n|
t = Thread.new(n) do |i|
puts "Thread #{n} beginning"
Thread.current[:output] = wrapper.rpc((block_given? ? block.call(i) : "test5d #{i}"), routing_key:'__test__').slice(:payload, :properties)
puts "Thread #{n} done"
#wrapper.logger.info{ "Thread #{i} done, with output #{Thread.current[:output]}." }
end
#t.join
t
end
wrapper.logger.info{ "Thread count: #{threads.size}" }
threads.map do |t|
t.join
t[:output]
end
end
# Runs test5d with hash input
def test5e
test5d(5){|i|{iterator:i, self:self}}
end
### WITH RESPONDER ###
attr_reader :responders, :test6_consumer
# Consumer to receive responses.
def setup6_consumer
@test6_consumer ||= wrapper.subscribe('test6_consumer') do |*params, delivery_info:, properties:, payload:, **options|
#puts params.inspect
puts ''
puts "TEST6 Consumer received response from publishing to responder:"
#puts({params:, properties:, payload:, options:, delivery_info:}.to_yaml)
puts({params:, properties:, payload:, options:}.to_yaml)
end
end
# Sends message to consumer, to show it's working.
def test6(txt="Hey test6")
setup6_consumer
wrapper.publish({text:txt}, routing_key:'test6_consumer')
end
# Sets up a responder with NO block.
def setup6a
setup6_consumer
responder('test6a_responder')
end
# Sets up a responder with a block.
def setup6b
setup6_consumer
responder('test6b_responder') do |payload:, properties:, delivery_info:, **extra_opts_and_prms|
puts "setup6b responder inner block running" # with payload: #{payload}"
# Result of this is sent (as payload, by BW) to the reply_to queue.
if payload.is_a?(JsonRpcObject)
payload.response('setup6b jrpc result text')
elsif payload.is_a?(Hash)
payload.merge({note:'This hash is just coppied from the test6_ request'})
else
"This is the setup6b rpc text response, and here is the original payload: #{payload.chomp.strip} ."
end
end
end
# Publish text to responder-without-block.
def test6a(payload="Hi from test6a! Full async rpc")
setup6a
# Publishes to the responder.
wrapper.publish(payload, routing_key:'test6a_responder', reply_to:'test6_consumer')
end
# Publish text to responder-with-block.
def test6b(payload="Hi from test6b! Full async rpc")
setup6b
# Publishes to the responder.
wrapper.publish(payload, routing_key:'test6b_responder', reply_to:'test6_consumer')
end
# Publish hash to responder-with-block.
def test6c
test6b({a:1, b:2})
end
# Publish JsonRpcObject to responder-with-block.
def test6d
test6b(JsonRpcObject.request('do-my-thing', {c:3, d:4}))
end
# Publish synchronous-rpc with text to responder-with-block
def test6e(payload="Test6 rpc to responder--with-block")
setup6b
wrapper.rpc(payload, routing_key:'test6b_responder').slice(:payload, :properties)
end
# Publish synchronous-rpc with hash to responder-with-block
def test6f
test6e({x:'one', y:'two'})
end
# Publish synchronous-rpc with jrpc to responder-with-block
def test6g(_method='test-method', payload={m:'Mmmm', n:'Good!'})
setup6b
wrapper.rpc(_method, payload, routing_key:'test6b_responder').slice(:payload, :properties)
end
# Sets up a native BunnyWrapper responder with a block.
def setup7
setup6_consumer
wrapper.responders('test7_responder') do |payload:, properties:, delivery_info:, **extra_opts_and_prms|
puts "setup7 responder inner block running" # with payload #{payload}"
# Result of this is sent (as payload, by BW) to the reply_to queue.
if payload.is_a?(JsonRpcObject)
payload.response('setup7 responder result text')
elsif payload.is_a?(Hash)
payload.merge({note:'This hash is just coppied from the test7_ request'})
else
"This is the setup7 rpc text response, and here is the original test7 payload: #{payload.chomp.strip} ."
end
end
end # setup7
# Tests syncr rpc call to native BunnyWrapper responder.
def test7(_method='test-method', payload={ggg:"They're", great:'Awesome!'})
setup7
wrapper.rpc(_method, payload, routing_key:'test7_responder').slice(:payload, :properties)
end
# Test currency between long-running responder and one-off rpc() calls.
#
def setup8
setup7
wrapper.responders('test8_responder', manual_ack:true) do |payload:, properties:, delivery_info:, **extra_opts_and_prms|
puts "setup8 responder inner block running" # with payload #{payload}"
# Result of this is sent (as payload, by BW) to the reply_to queue.
sleep 5
output = if payload.is_a?(JsonRpcObject)
payload.response('setup8 jrpc result text')
elsif payload.is_a?(Hash)
payload.merge({note:'This hash is just coppied from the test8_ request'})
else
"This is the setup8 rpc text response, and here is the original test8 payload: #{payload.chomp.strip} ."
end
delivery_info[:channel].ack(delivery_info[:delivery_tag])
output
end
end # setup8
# Tests syncr rpc call to native BunnyWrapper responder.
def test8(_method='test-method', payload={green:"eggs", with:'ham!'})
setup8
results = []
n=0
5.times do |n|
#Thread.new do
results << wrapper.publish(_method, "test8 publishing to test8_responder", routing_key:"test8_responder")
#end
end
results << wrapper.rpc(_method, payload, routing_key:'test7_responder').slice(:payload, :properties)
puts "Wait for it..."
return results
end
def run_all
results = {
test1:,
test2:,
test3:,
test4:,
test4a:,
test5:,
test5a:,
test5b:,
test5c:,
test5d:,
test5e:,
test6:,
test6a:,
test6b:,
test6c:,
test6d:,
test6e:,
test6f:,
test6g:,
test7:,
test8:,
}
end
### EXIT POLITELY ###
def exit(*args)
connection.close
super
end
### GENERIC RESPONDER PROTOTYPE ###
# This was used as the base of the current BunnyWrapper#responder() method.
# I think this is still used by one the tests above, but otherwise we should
# be using the BunnyWrapper responder!
def responder(listen_queue, reply_to:nil, &block)
@responders ||= {}
@responders[listen_queue] ||= wrapper.subscribe(listen_queue) do |*params, delivery_info:, properties:, payload:, **options|
# The extra *params and **options above will capture any extra args or kwargs.
#puts properties.inspect
_reply_to ||= (options[:reply_to] || properties[:reply_to] || _reply_to)
#puts "rpc_responder reply_to: #{reply_to}"
callback_headers = {
original_payload: payload,
original_properties: properties.to_h,
}
out_params = {
headers: callback_headers.to_h, #.merge({'exit_code'=>0}),
content_type: properties[:content_type],
correlation_id: (properties[:message_id] || properties[:correlation_id]),
}
block_call_params = {
params:, #params.dup,
delivery_info:, # delivery_info.to_h,
properties:, #properties.to_h,
payload:, #payload.dup,
options:, #options.to_h,
}
if block_given?
rslt = block.call(**block_call_params)
else
rslt = {error:"BunnyWrapper responder had no block to call, but here are the params to pass:",
block_call_params: block_call_params.except(:delivery_info)
}
end
puts "Responder '#{listen_queue}' publishing result:"
puts rslt.to_yaml
wrapper.publish(rslt, routing_key:_reply_to, **out_params)
end
end # responder
end # Bun
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment