Skip to content

Instantly share code, notes, and snippets.

@jszmajda
Created December 18, 2017 16:34
Show Gist options
  • Save jszmajda/285d5ec616caaf33d70639f6bf798a02 to your computer and use it in GitHub Desktop.
Save jszmajda/285d5ec616caaf33d70639f6bf798a02 to your computer and use it in GitHub Desktop.
Showing how we're currently using AVRO in Optoro's ruby projects. This is very very much still subject to change!!
# Optoro bits of using Avro. Currently with the AvroTurf, but I'm going to refactor away from that.
# this all sits in a rails repo in `app/message_bus`
# `app/message_bus/message_bus.rb`
module MessageBus
module_function
def avro
# Things in the schema folder are `<MessageName>.avsc` files. They're JSON
# documents in the AVRO schema format. We don't use the AVRO IDL because I
# found that the tooling was very java-centric and the need to autogenerate
# things was less in ruby. (for now)
#
# This is also the only place I actually refer to AvroTurf by name, so
# swapping it out shouldn't be too hard fundamentally.
@avro ||= AvroTurf.new(schemas_path: "app/message_bus/avro_schema/")
end
class UnableToSerializeError < StandardError
end
class NotFullyImplementedError < StandardError
end
end
# `app/message_bus/publisher.rb`
module MessageBus
# Class to handle how we debug and publish messages
class Publisher
DEFAULT_NAMESPACE = 'com.optoro.inventory'.freeze
def initialize(schema_name:, namespace: DEFAULT_NAMESPACE)
@schema_name = schema_name
@namespace = namespace
@debug = !Rails.env.production? || ENV['DEBUG_AVRO']
end
def debug!
@debug = true
end
def validate_and_publish!(record, topic)
# Validation is provided through the schema but you have to ask for it directly
if MessageBus.avro.valid?(record, schema_name: @schema_name, namespace: @namespace)
encoded = MessageBus.avro.encode(record, schema_name: @schema_name, namespace: @namespace)
publish_to_wire(topic, wire_safe(encoded))
elsif @debug
# this call will raise a more useful exception clarifying why the
# record is invalid
MessageBus.avro.encode(record, schema_name: @schema_name, namespace: @namespace)
end
rescue Exception => e
# sometimes validation isn't perfect.. yay. This is belt-and-suspenders.
if @debug
raise MessageBus::UnableToSerializeError, "Invalid record: #{record.inspect}, Avro error: #{e.message}", caller
end
end
# Private so the public API isn't confusing
private
def wire_safe(message)
# we encode to Base64 for.. reasons. We may not really need to but
# honestly I haven't tried not yet. We're putting these into Kafka and I
# haven't read up on how Kafka handles binary data.
Base64.encode64(message)
end
def publish_to_wire(topic, message)
# Industrious is a background job processor we wrote here. Basically this
# just ships the message to the appropriate Kafka topic
Industrious.publish(topic, message)
end
end
end
# `app/message_bus/message.rb`
module MessageBus
# Base class for simplification
class Message
def topic
# self.class::SYMBOL looks up a class constant in a child class
if defined?(self.class::TOPIC)
self.class::TOPIC
else
raise MessageBus::NotFullyImplementedError, 'No Topic defined'
end
end
# Schema is inferred to be the .avsc file in the schemas folder with the
# name as given
def schema
if defined?(self.class::SCHEMA)
self.class::SCHEMA
else
raise MessageBus::NotFullyImplementedError, 'No Topic defined'
end
end
def record
raise MessageBus::NotFullyImplementedError, 'No Topic defined'
end
def publish
pub = MessageBus::Publisher.new(schema_name: schema)
pub.validate_and_publish!(record, topic)
end
end
end
# `app/message_bus/messages/unit_appraised.rb`
# This is a sample of a subclass of Message. It's used like this:
# `MessageBus::UnitAppraised.new(appraisal: self).publish`
module MessageBus
# Should be published when a new appraisal is created for a unit
class UnitAppraised < Message
TOPIC = 'unit_appraisals'.freeze
SCHEMA = 'UnitAppraisal'.freeze
def initialize(appraisal:)
@appraisal = appraisal
end
def record
{
unit_id: @appraisal.unit_id,
condition_id: @appraisal.condition_id,
appraiser_id: @appraisal.appraiser_id,
purpose: @appraisal.purpose
}
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment