Created
December 18, 2017 16:34
-
-
Save jszmajda/285d5ec616caaf33d70639f6bf798a02 to your computer and use it in GitHub Desktop.
Showing how we're currently using AVRO in Optoro's ruby projects. This is very very much still subject to change!!
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Optoro bits of using Avro. Currently with the AvroTurf, but I'm going to refactor away from that. | |
# this all sits in a rails repo in `app/message_bus` | |
# `app/message_bus/message_bus.rb` | |
module MessageBus | |
module_function | |
def avro | |
# Things in the schema folder are `<MessageName>.avsc` files. They're JSON | |
# documents in the AVRO schema format. We don't use the AVRO IDL because I | |
# found that the tooling was very java-centric and the need to autogenerate | |
# things was less in ruby. (for now) | |
# | |
# This is also the only place I actually refer to AvroTurf by name, so | |
# swapping it out shouldn't be too hard fundamentally. | |
@avro ||= AvroTurf.new(schemas_path: "app/message_bus/avro_schema/") | |
end | |
class UnableToSerializeError < StandardError | |
end | |
class NotFullyImplementedError < StandardError | |
end | |
end | |
# `app/message_bus/publisher.rb` | |
module MessageBus | |
# Class to handle how we debug and publish messages | |
class Publisher | |
DEFAULT_NAMESPACE = 'com.optoro.inventory'.freeze | |
def initialize(schema_name:, namespace: DEFAULT_NAMESPACE) | |
@schema_name = schema_name | |
@namespace = namespace | |
@debug = !Rails.env.production? || ENV['DEBUG_AVRO'] | |
end | |
def debug! | |
@debug = true | |
end | |
def validate_and_publish!(record, topic) | |
# Validation is provided through the schema but you have to ask for it directly | |
if MessageBus.avro.valid?(record, schema_name: @schema_name, namespace: @namespace) | |
encoded = MessageBus.avro.encode(record, schema_name: @schema_name, namespace: @namespace) | |
publish_to_wire(topic, wire_safe(encoded)) | |
elsif @debug | |
# this call will raise a more useful exception clarifying why the | |
# record is invalid | |
MessageBus.avro.encode(record, schema_name: @schema_name, namespace: @namespace) | |
end | |
rescue Exception => e | |
# sometimes validation isn't perfect.. yay. This is belt-and-suspenders. | |
if @debug | |
raise MessageBus::UnableToSerializeError, "Invalid record: #{record.inspect}, Avro error: #{e.message}", caller | |
end | |
end | |
# Private so the public API isn't confusing | |
private | |
def wire_safe(message) | |
# we encode to Base64 for.. reasons. We may not really need to but | |
# honestly I haven't tried not yet. We're putting these into Kafka and I | |
# haven't read up on how Kafka handles binary data. | |
Base64.encode64(message) | |
end | |
def publish_to_wire(topic, message) | |
# Industrious is a background job processor we wrote here. Basically this | |
# just ships the message to the appropriate Kafka topic | |
Industrious.publish(topic, message) | |
end | |
end | |
end | |
# `app/message_bus/message.rb` | |
module MessageBus | |
# Base class for simplification | |
class Message | |
def topic | |
# self.class::SYMBOL looks up a class constant in a child class | |
if defined?(self.class::TOPIC) | |
self.class::TOPIC | |
else | |
raise MessageBus::NotFullyImplementedError, 'No Topic defined' | |
end | |
end | |
# Schema is inferred to be the .avsc file in the schemas folder with the | |
# name as given | |
def schema | |
if defined?(self.class::SCHEMA) | |
self.class::SCHEMA | |
else | |
raise MessageBus::NotFullyImplementedError, 'No Topic defined' | |
end | |
end | |
def record | |
raise MessageBus::NotFullyImplementedError, 'No Topic defined' | |
end | |
def publish | |
pub = MessageBus::Publisher.new(schema_name: schema) | |
pub.validate_and_publish!(record, topic) | |
end | |
end | |
end | |
# `app/message_bus/messages/unit_appraised.rb` | |
# This is a sample of a subclass of Message. It's used like this: | |
# `MessageBus::UnitAppraised.new(appraisal: self).publish` | |
module MessageBus | |
# Should be published when a new appraisal is created for a unit | |
class UnitAppraised < Message | |
TOPIC = 'unit_appraisals'.freeze | |
SCHEMA = 'UnitAppraisal'.freeze | |
def initialize(appraisal:) | |
@appraisal = appraisal | |
end | |
def record | |
{ | |
unit_id: @appraisal.unit_id, | |
condition_id: @appraisal.condition_id, | |
appraiser_id: @appraisal.appraiser_id, | |
purpose: @appraisal.purpose | |
} | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment