Skip to content

Instantly share code, notes, and snippets.

@doryokujin
Created October 4, 2011 20:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save doryokujin/1262746 to your computer and use it in GitHub Desktop.
Save doryokujin/1262746 to your computer and use it in GitHub Desktop.
How to write format_stream?
#
# shuffle record partitioned by reduce_key
#
<match shuffle**>
type shuffle
reduce_key type
<reducer>
type shuffle_tcp
host delta5
port 24225
</reducer>
<reducer>
type shuffle_tcp
host delta6
port 24225
</reducer>
</match>
module Fluent
class ShuffleOutput < CopyOutput
Plugin.register_output('shuffle', self)
def configure(conf)
raise ConfigError, "'reduce_key' parameter is required on Shuffle Output" unless @reduce_key = conf['reduce_key']
reduce_num = conf.elements.select {|e| e.name == 'reducer'}.length
conf.elements.select {|e|
e.name == 'reducer'
}.each_with_index {|e,id|
type = e['type']
unless type
raise ConfigError, "Missing 'type' parameter on <store> directive"
end
$log.debug "adding store type=#{type.dump}"
output = Plugin.new_output(type)
e['reduce_id'] = id
e['reduce_num'] = reduce_num
e['reduce_key'] = @reduce_key
output.configure(e)
@outputs << output
}
end
end
class TcpShuffleOutput < TcpOutput
Plugin.register_output('shuffle_tcp', self)
def configure(conf)
super
@reduce_id = conf['reduce_id'].to_i
@reduce_num = conf['reduce_num'].to_i
@reduce_key = conf['reduce_key']
end
def partition(key)
return Digest::MD5.hexdigest(key).to_i(16) % @reduce_num
end
def format_stream(tag, es)
# :es is an instance of Fluent::OneEventStream
#
# I want to skip the event if partition(record[@reduce_key])!=@reduce_id
# How to do this? I don't understand es.each method each well...
#
# super class do [tag, es.to_msgpack_stream].to_msgpack
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment