Last active
October 31, 2019 19:13
-
-
Save johnnyt/f484549ebc778857367a to your computer and use it in GitHub Desktop.
Ruby NSQ Avro example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "avro" | |
require "nsq" | |
SCHEMA = <<-JSON | |
{ "type": "record", | |
"name": "User", | |
"fields" : [ | |
{"name": "username", "type": "string"}, | |
{"name": "age", "type": "int"}, | |
{"name": "verified", "type": "boolean", "default": "false"} | |
]} | |
JSON | |
SCHEMAS = {} | |
class AvroProducer | |
MESSAGE_VERSION = 1 | |
attr_reader :nsq_producer, :topic | |
def initialize topic | |
@topic = topic | |
@nsq_producer = Nsq::Producer.new topic: topic | |
end | |
def write datum, schema | |
canonical_schema = Avro::Schema.parse schema | |
schema_string = canonical_schema.to_s | |
schema_md5 = Digest::MD5.hexdigest schema_string | |
register_schema schema_md5, schema_string | |
nsq_producer.write compose_message(datum, schema_md5, canonical_schema) | |
end | |
def stop | |
nsq_producer.terminate | |
end | |
private | |
def compose_message datum, schema_md5, canonical_schema | |
[MESSAGE_VERSION, schema_md5, encode_datum(datum, canonical_schema)].pack 'CH32a*' | |
end | |
def encode_datum datum, canonical_schema | |
buffer = StringIO.new | |
encoder = Avro::IO::BinaryEncoder.new buffer | |
writer = Avro::IO::DatumWriter.new canonical_schema | |
writer.write datum, encoder | |
buffer.rewind | |
buffer.read | |
end | |
def register_schema schema_md5, schema_string | |
SCHEMAS[schema_md5] = schema_string | |
end | |
end | |
class AvroConsumer | |
attr_reader :channel, :nsq_consumer, :topic | |
def initialize topic, channel | |
@topic, @channel = topic, channel | |
@nsq_consumer = Nsq::Consumer.new topic: topic, channel: channel | |
@consuming = false | |
@schemas = {} | |
end | |
def pop | |
msg = nsq_consumer.pop | |
version, schema_md5, encoded_datum = decompose_message msg.body | |
canonical_schema = schema_for schema_md5 | |
decoded_message = decode_datum encoded_datum, canonical_schema | |
msg.finish | |
decoded_message | |
end | |
def stop | |
nsq_consumer.terminate | |
end | |
private | |
def decode_datum encoded_datum, canonical_schema | |
read_buffer = StringIO.new encoded_datum | |
decoder = Avro::IO::BinaryDecoder.new read_buffer | |
reader = Avro::IO::DatumReader.new canonical_schema | |
reader.read decoder | |
end | |
def decompose_message raw_message | |
raw_message.unpack "CH32a*" | |
end | |
def schema_for schema_md5 | |
Avro::Schema.parse SCHEMAS[schema_md5] | |
end | |
end | |
if $0 == __FILE__ | |
prod = AvroProducer.new 'foo-users' | |
datum = {"username" => "foo", "age" => 42, "verified" => true} | |
p [:sending, datum] | |
prod.write datum, SCHEMA | |
cons = AvroConsumer.new 'foo-users', 'foo-channel' | |
read_message = cons.pop | |
p [:received, read_message] | |
prod.stop | |
cons.stop | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment