Skip to content

Instantly share code, notes, and snippets.

@johnnyt
Last active October 31, 2019 19:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save johnnyt/f484549ebc778857367a to your computer and use it in GitHub Desktop.
Save johnnyt/f484549ebc778857367a to your computer and use it in GitHub Desktop.
Ruby NSQ Avro example
require "avro"
require "nsq"
SCHEMA = <<-JSON
{ "type": "record",
"name": "User",
"fields" : [
{"name": "username", "type": "string"},
{"name": "age", "type": "int"},
{"name": "verified", "type": "boolean", "default": "false"}
]}
JSON
SCHEMAS = {}
class AvroProducer
MESSAGE_VERSION = 1
attr_reader :nsq_producer, :topic
def initialize topic
@topic = topic
@nsq_producer = Nsq::Producer.new topic: topic
end
def write datum, schema
canonical_schema = Avro::Schema.parse schema
schema_string = canonical_schema.to_s
schema_md5 = Digest::MD5.hexdigest schema_string
register_schema schema_md5, schema_string
nsq_producer.write compose_message(datum, schema_md5, canonical_schema)
end
def stop
nsq_producer.terminate
end
private
def compose_message datum, schema_md5, canonical_schema
[MESSAGE_VERSION, schema_md5, encode_datum(datum, canonical_schema)].pack 'CH32a*'
end
def encode_datum datum, canonical_schema
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new buffer
writer = Avro::IO::DatumWriter.new canonical_schema
writer.write datum, encoder
buffer.rewind
buffer.read
end
def register_schema schema_md5, schema_string
SCHEMAS[schema_md5] = schema_string
end
end
class AvroConsumer
attr_reader :channel, :nsq_consumer, :topic
def initialize topic, channel
@topic, @channel = topic, channel
@nsq_consumer = Nsq::Consumer.new topic: topic, channel: channel
@consuming = false
@schemas = {}
end
def pop
msg = nsq_consumer.pop
version, schema_md5, encoded_datum = decompose_message msg.body
canonical_schema = schema_for schema_md5
decoded_message = decode_datum encoded_datum, canonical_schema
msg.finish
decoded_message
end
def stop
nsq_consumer.terminate
end
private
def decode_datum encoded_datum, canonical_schema
read_buffer = StringIO.new encoded_datum
decoder = Avro::IO::BinaryDecoder.new read_buffer
reader = Avro::IO::DatumReader.new canonical_schema
reader.read decoder
end
def decompose_message raw_message
raw_message.unpack "CH32a*"
end
def schema_for schema_md5
Avro::Schema.parse SCHEMAS[schema_md5]
end
end
if $0 == __FILE__
prod = AvroProducer.new 'foo-users'
datum = {"username" => "foo", "age" => 42, "verified" => true}
p [:sending, datum]
prod.write datum, SCHEMA
cons = AvroConsumer.new 'foo-users', 'foo-channel'
read_message = cons.pop
p [:received, read_message]
prod.stop
cons.stop
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment