Skip to content

Instantly share code, notes, and snippets.

@dan-manges
Last active April 12, 2023 13:45
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dan-manges/ab169e13a4a92ebd10cfe5b518accca9 to your computer and use it in GitHub Desktop.
Save dan-manges/ab169e13a4a92ebd10cfe5b518accca9 to your computer and use it in GitHub Desktop.
in process pub sub implementation in Ruby

Ruby Pub/Sub

This is an implementation of in-process pub/sub in Ruby with type checking at runtime.

It only allows subscribing to events with jobs to ensure that the subscriber blocks are fully asynchronous and cannot cause runtime exceptions.

This is the approach we use in production at rwx

Copyright 2022 ReadWriteExecute, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# in app/pub_sub/pub_sub.rb
PubSub = PubSubManager.new
PubSub.register_event("user.created") { user_id String }
PubSub.subscribe("user.created", RequestEmailVerificationJob)
# in another class:
PubSub.publish("user.created", user_id: user.id)
class PubSubManager
EventAlreadyRegisteredError = Class.new(StandardError)
UnregisteredEventError = Class.new(StandardError)
NotAJobError = Class.new(StandardError)
def initialize
@registered_events = {}
end
def register_event(event_name, &block)
if @registered_events[event_name].present?
raise EventAlreadyRegisteredError, "PubSub event #{event_name} already registered"
end
payload_struct = Class.new(ValueStruct) do
attributes(&block)
define_singleton_method(:to_s) do
"PubSub event #{event_name}"
end
end
@registered_events[event_name] = {
payload_struct:,
subscribers: []
}
end
def subscribe(event_name, job_class)
registered_event = @registered_events[event_name]
raise UnregisteredEventError, "Unknown PubSub event #{event_name}" if registered_event.blank?
raise NotAJobError, "#{job_class} must inherit from ApplicationJob" unless job_class.ancestors.include?(ApplicationJob)
registered_event[:subscribers] << job_class
end
def publish(event_name, event_payload, subscriber_index = nil)
registered_event = @registered_events[event_name]
raise UnregisteredEventError, "Unknown PubSub event #{event_name}" if registered_event.blank?
# verify the structure of the payload
registered_event[:payload_struct].new(event_payload)
registered_event[:subscribers].each do |job_class|
job_class.perform_later(**event_payload)
end
end
end
require "rails_helper"
RSpec.describe PubSubManager do
before do
stub_const(
"PubSubTestJob",
Class.new(ApplicationJob) do
def perform(params)
end
end
)
end
it "allows registering events, subscribing to events, and publishing events to enqueue jobs" do
pub_sub = PubSubManager.new
pub_sub.register_event("test.event") do
name String
end
pub_sub.subscribe("test.event", PubSubTestJob)
expect do
pub_sub.publish("test.event", name: "my name")
end.to enqueue_job(PubSubTestJob).with(name: "my name")
end
it "raises if an event is registered twice" do
pub_sub = PubSubManager.new
pub_sub.register_event("test.event") do
name String
end
expect do
pub_sub.register_event("test.event") do
name String
end
end.to raise_error(PubSubManager::EventAlreadyRegisteredError, "PubSub event test.event already registered")
end
it "raises UnregisteredEventError if the event is not registered when subscribing" do
pub_sub = PubSubManager.new
expect do
pub_sub.subscribe("test.event", PubSubTestJob)
end.to raise_error(PubSubManager::UnregisteredEventError, "Unknown PubSub event test.event")
end
it "raises NotAJobError if the job does not inherit from ApplicationJob" do
pub_sub = PubSubManager.new
pub_sub.register_event("test.event") do
name String
end
expect do
pub_sub.subscribe("test.event", User)
end.to raise_error(PubSubManager::NotAJobError, "User must inherit from ApplicationJob")
end
it "raises UnregisteredEventError if the event is not registered when publishing" do
pub_sub = PubSubManager.new
expect do
pub_sub.publish("test.event", name: "my name")
end.to raise_error(PubSubManager::UnregisteredEventError, "Unknown PubSub event test.event")
end
it "raises UnrecognizedAttributes if the event is passed a key it was not registered with" do
pub_sub = PubSubManager.new
pub_sub.register_event("test.event") do
name String
end
expect do
pub_sub.publish("test.event", not_name: "some value")
end.to raise_error(ValueSemantics::UnrecognizedAttributes, "`PubSub event test.event` does not define attributes: `:not_name`")
end
it "raises InvalidValue if the event is passed a value of the wrong type" do
pub_sub = PubSubManager.new
pub_sub.register_event("test.event") do
name String
end
expect do
pub_sub.publish("test.event", name: 1)
end.to raise_error(ValueSemantics::InvalidValue, "Some attributes of `PubSub event test.event` are invalid:\n - name: 1\n")
end
end
class ValueStruct
def self.attributes(&block)
include ValueSemantics.for_attributes(&block)
end
def self.try_new(**params)
value_struct = new(**params)
Success.new(value_struct)
rescue ValueSemantics::MissingAttributes, ValueSemantics::InvalidValue => e
Failure.new(e.message)
end
def self.coerce_date(value)
case value
when String
begin
Date.strptime(value, "%Y-%m-%d")
rescue ArgumentError
value
end
else
value
end
end
def self.coerce_string(value)
value.to_s
end
def self.coerce_time(value)
case value
when String
begin
Time.parse(value).utc
rescue ArgumentError
value
end
else
value
end
end
class Success
attr_reader :value
def initialize(value_struct)
@value = value_struct
end
def success?
true
end
def error
nil
end
end
class Failure
attr_reader :error
def initialize(error)
@error = error
end
def success?
false
end
def value
nil
end
end
DateCoercer = method(:coerce_date)
StringCoercer = method(:coerce_string)
TimeCoercer = method(:coerce_time)
end
require "rails_helper"
class ValueStructClass < ValueStruct
attributes do
name String
end
end
RSpec.describe ValueStruct do
describe ".coerce_date" do
it "parses a date string" do
value = "2022-03-02"
actual = ValueStruct.coerce_date(value)
expect(actual).to eq(Date.new(2022, 3, 2))
end
it "returns the string if the value cannot be parsed" do
value = "not a date"
actual = ValueStruct.coerce_date(value)
expect(actual).to eq(value)
end
it "returns nil for nil" do
actual = ValueStruct.coerce_date(nil)
expect(actual).to eq(nil)
end
end
describe ".coerce_time" do
it "parses a time string" do
value = "2022-03-02T16:34:22Z"
actual = ValueStruct.coerce_time(value)
expect(actual).to eq(Time.utc(2022, 3, 2, 16, 34, 22))
end
it "returns the string if the value cannot be parsed" do
value = "no time information"
actual = ValueStruct.coerce_time(value)
expect(actual).to eq(value)
end
it "returns nil for nil" do
actual = ValueStruct.coerce_time(nil)
expect(actual).to eq(nil)
end
end
describe ".coerce_string" do
it "converts an integer to a string" do
actual = ValueStruct.coerce_string(1)
expect(actual).to eq("1")
end
end
describe ".try_new" do
it "returns success when the value struct is successfully instantiated" do
result = ValueStructClass.try_new(name: "Vanguard")
expect(result).to be_success
expect(result.error).to be_nil
expect(result.value).to eq(ValueStructClass.new(name: "Vanguard"))
end
it "returns error when a required attribute is missing" do
result = ValueStructClass.try_new
expect(result).not_to be_success
expect(result.value).to be_nil
expect(result.error).to eq("Some attributes required by `ValueStructClass` are missing: `name`")
end
it "returns error when an attribute has the wrong type" do
result = ValueStructClass.try_new(name: 1)
expect(result).not_to be_success
expect(result.value).to be_nil
expect(result.error).to eq("Some attributes of `ValueStructClass` are invalid:\n - name: 1\n")
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment