Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@doryokujin
Created October 5, 2011 09:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save doryokujin/1264020 to your computer and use it in GitHub Desktop.
Save doryokujin/1264020 to your computer and use it in GitHub Desktop.
Shuffle Stream
## Send massage for servers defined by <forward> tag.
## Each input record are shuffled by key and partition function.
## Multiple keys are o.k.(sep by ',') and partition function is very flexible.
## The shuffle idea is the same as basic MapReduce.
<match shuffle**>
type shuffle
key key1,key2
flush_interval 1s
<forward>
type shuffle_tcp
host delta5
port 24225
</forward>
<forward>
type shuffle_tcp
host delta6
port 24225
</forward>
</match>
module Fluent
class ShuffleOutput < CopyOutput
Plugin.register_output('shuffle', self)
def configure(conf)
raise ConfigError, "'key' parameter is required on Shuffle Output" unless key = conf['key']
flush_interval = conf['flush_interval'] || 60
num = conf.elements.select {|e| e.name == 'forward'}.length
conf.elements.select {|e|
e.name == 'forward'
}.each_with_index {|e,id|
type = e['type']
unless type
raise ConfigError, "Missing 'type' parameter on <store> directive"
end
$log.debug "adding store type=#{type.dump}"
output = Plugin.new_output(type)
# Add configuration for TcpShuffleOutput
e['_id'] = id
e['_num'] = num
e['_key'] = key
e['flush_interval'] = flush_interval
output.configure(e)
@outputs << output
}
end
end
class TcpShuffleOutput < TcpOutput
Plugin.register_output('shuffle_tcp', self)
def configure(conf)
super
@id = conf['_id'].to_i
@num = conf['_num'].to_i
@key_list = conf['_key'].split(',')
@writer.flush_interval = Config.time_value(conf['flush_interval']).to_i
end
def partition(mapped_keys)
#
# mapped_keys is a hash of key value pair.
# You can make very flexible partition function.
#
return Digest::MD5.hexdigest(mapped_keys.values.to_s).to_i(16) % @num
end
def format_stream(tag, es)
#
# :es is an instance of Fluent::OneEventStream
#
# message PackedForward {
# 1: string tag
# 2: raw entries # msgpack stream of Entry
# }
#
out = ''
es.each {|event|
key_list = @key_list.collect{|key| event.record[key]}
mapped_keys = Hash[*@key_list.zip(key_list).flatten]
if partition(mapped_keys)==@id
out << [ event.time, event.record ].to_msgpack
end
}
return [tag,out].to_msgpack
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment