Skip to content

Instantly share code, notes, and snippets.

@Lax
Last active August 29, 2015 14:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Lax/7b6e494f31d02b84046f to your computer and use it in GitHub Desktop.
Save Lax/7b6e494f31d02b84046f to your computer and use it in GitHub Desktop.
Consume TSData from kafka, write to opentsdb
source 'http://ruby.taobao.org'
gem "poseidon" # kafka
gem "thrift"
gem "opentsdb"
GEM
remote: http://ruby.taobao.org/
specs:
opentsdb (0.2.0)
poseidon (0.0.5)
thrift (0.9.2.0)
PLATFORMS
ruby
DEPENDENCIES
opentsdb
poseidon
thrift
#! /usr/bin/env ruby
require 'rubygems'
require 'bundler'
Bundler.require
$:.push('gen-rb')
$:.unshift '../../lib/rb/lib'
require 'metrics_constants.rb'
require 'yaml'
$DEBUG=false
class KafkaThriftConsumer
def initialize(name="tsd_json_consumer_#{Process.pid}", partitions=0, config_file="config.yml")
config = YAML.load_file(config_file)
server = config['kafka']['server']
topic = config['kafka']['topic']
#partitions = config['kafka']['partitions']
@consumer = Poseidon::PartitionConsumer.new(name, server['host'], server['port'], topic, partitions, :latest_offset)
end
def consume
return if @consumer.nil?
loop do
@deserializer = Thrift::Deserializer.new(Thrift::BinaryProtocolAcceleratedFactory.new)
messages = @consumer.fetch
messages.each do |m|
tsd = TSData.new
@deserializer.deserialize(tsd, m.value)
yield [m.offset, m.topic, tsd]
end
end
end
end
class TsdWriter
def initialize(config_file="config.yml")
config = YAML.load_file(config_file)
server = config['opentsdb']['server']
@client ||= OpenTSDB::Client.new(server)
end
def put(tsdata)
sample = {
:metric => tsdata.name,
:value => tsdata.v1,
:timestamp => tsdata.timestamp,
:tags => tsdata.tags
}
@client.put(sample)
end
def self.sharedInstance(config="config.yml")
@@shared ||= self.new(config)
end
end
if $0 == __FILE__
config_file = "config.yml"
kafka_thrift_consumer = KafkaThriftConsumer.new("kafka_tsd_consumer", 0, config_file)
kafka_thrift_consumer.consume do |msg|
puts "%d %s %s" % msg
offset, topic, tsd = msg
p tsd
begin
TsdWriter.sharedInstance(config_file).put(tsd)
rescue => err
p err
exit
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment