Skip to content

Instantly share code, notes, and snippets.

@repeatedly
Last active November 2, 2017 22:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save repeatedly/2374f9f8d4947c2694a56e13140de115 to your computer and use it in GitHub Desktop.
Save repeatedly/2374f9f8d4947c2694a56e13140de115 to your computer and use it in GitHub Desktop.
# encoding: UTF-8
require 'date'
require 'excon'
require 'elasticsearch'
require 'uri'
begin
require 'strptime'
rescue LoadError
end
class Fluent::Elasticsearch2Output < Fluent::ObjectBufferedOutput
class ConnectionFailure < StandardError; end
Fluent::Plugin.register_output('elasticsearch2', self)
config_param :host, :string, :default => 'localhost'
config_param :port, :integer, :default => 9200
config_param :user, :string, :default => nil
config_param :password, :string, :default => nil, :secret => true
config_param :path, :string, :default => nil
config_param :scheme, :string, :default => 'http'
config_param :hosts, :string, :default => nil
config_param :target_index_key, :string, :default => nil
config_param :target_type_key, :string, :default => nil
config_param :time_key_format, :string, :default => nil
config_param :logstash_format, :bool, :default => false
config_param :logstash_prefix, :string, :default => "logstash"
config_param :logstash_dateformat, :string, :default => "%Y.%m.%d"
config_param :utc_index, :bool, :default => true
config_param :type_name, :string, :default => "fluentd"
config_param :index_name, :string, :default => "fluentd"
config_param :id_key, :string, :default => nil
config_param :write_operation, :string, :default => "index"
config_param :parent_key, :string, :default => nil
config_param :routing_key, :string, :default => nil
config_param :request_timeout, :time, :default => 5
config_param :reload_connections, :bool, :default => true
config_param :reload_on_failure, :bool, :default => false
config_param :resurrect_after, :time, :default => 60
config_param :time_key, :string, :default => nil
config_param :time_key_exclude_timestamp, :bool, :default => false
config_param :ssl_verify , :bool, :default => true
config_param :client_key, :string, :default => nil
config_param :client_cert, :string, :default => nil
config_param :client_key_pass, :string, :default => nil
config_param :ca_file, :string, :default => nil
config_param :remove_keys, :string, :default => nil
config_param :flatten_hashes, :bool, :default => false
config_param :flatten_hashes_separator, :string, :default => "_"
include Fluent::SetTagKeyMixin
config_set_default :include_tag_key, false
def initialize
super
@time_parser = TimeParser.new(@time_key_format, @router)
end
def configure(conf)
super
@time_parser = TimeParser.new(@time_key_format, @router)
if @remove_keys
@remove_keys = @remove_keys.split(/\s*,\s*/)
end
begin
require 'oj'
Oj.default_options = {mode: :compat}
@dump_proc = Oj.method(:dump)
rescue LoadError
@dump_proc = Yajl.method(:dump)
end
end
def start
super
end
# once fluent v0.14 is released we might be able to use
# Fluent::Parser::TimeParser, but it doesn't quite do what we want - if gives
# [sec,nsec] where as we want something we can call `strftime` on...
class TimeParser
def initialize(time_key_format, router)
@time_key_format = time_key_format
@router = router
@parser = if time_key_format
begin
# Strptime doesn't support all formats, but for those it does it's
# blazingly fast.
strptime = Strptime.new(time_key_format)
Proc.new { |value| strptime.exec(value).to_datetime }
rescue
# Can happen if Strptime doesn't recognize the format; or
# if strptime couldn't be required (because it's not installed -- it's
# ruby 2 only)
Proc.new { |value| DateTime.strptime(value, time_key_format) }
end
else
Proc.new { |value| DateTime.parse(value) }
end
end
def parse(value, event_time)
@parser.call(value)
rescue => e
@router.emit_error_event("Fluent::ElasticsearchOutput::TimeParser.error", Fluent::Engine.now, {'time' => event_time, 'format' => @time_key_format, 'value' => value }, e)
return Time.at(event_time).to_datetime
end
end
def client
@_es ||= begin
excon_options = { client_key: @client_key, client_cert: @client_cert, client_key_pass: @client_key_pass }
adapter_conf = lambda {|f| f.adapter :excon, excon_options }
transport = Elasticsearch::Transport::Transport::HTTP::Faraday.new(get_connection_options.merge(
options: {
reload_connections: @reload_connections,
reload_on_failure: @reload_on_failure,
resurrect_after: @resurrect_after,
retry_on_failure: 5,
transport_options: {
request: { timeout: @request_timeout },
ssl: { verify: @ssl_verify, ca_file: @ca_file }
}
}), &adapter_conf)
es = Elasticsearch::Client.new transport: transport
begin
raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description})!" unless es.ping
rescue *es.transport.host_unreachable_exceptions => e
raise ConnectionFailure, "Can not reach Elasticsearch cluster (#{connection_options_description})! #{e.message}"
end
log.info "Connection opened to Elasticsearch cluster => #{connection_options_description}"
es
end
end
def get_connection_options
raise "`password` must be present if `user` is present" if @user && !@password
hosts = if @hosts
@hosts.split(',').map do |host_str|
# Support legacy hosts format host:port,host:port,host:port...
if host_str.match(%r{^[^:]+(\:\d+)?$})
{
host: host_str.split(':')[0],
port: (host_str.split(':')[1] || @port).to_i,
scheme: @scheme
}
else
# New hosts format expects URLs such as http://logs.foo.com,https://john:pass@logs2.foo.com/elastic
uri = URI(host_str)
%w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
hash
end
end
end.compact
else
[{host: @host, port: @port, scheme: @scheme}]
end.each do |host|
host.merge!(user: @user, password: @password) if !host[:user] && @user
host.merge!(path: @path) if !host[:path] && @path
end
{
hosts: hosts
}
end
def connection_options_description
get_connection_options[:hosts].map do |host_info|
attributes = host_info.dup
attributes[:password] = 'obfuscated' if attributes.has_key?(:password)
attributes.inspect
end.join(', ')
end
def shutdown
super
end
BODY_DELIMITER = "\n"
UPDATE_OP = "update"
UPSERT_OP = "upsert"
CREATE_OP = "create"
INDEX_OP = "index"
def append_record_to_messages(op, meta, header, record, msgs)
case op
when "update", "upsert"
if meta.has_key?("_id")
header[UPDATE_OP] = meta
msgs << @dump_proc.call(header) << BODY_DELIMITER
msgs << @dump_proc.call({"doc" => record, "doc_as_upsert" => op == UPSERT_OP}) << BODY_DELIMITER
end
when "create"
if meta.has_key?("_id")
header[CREATE_OP] = meta
msgs << @dump_proc.call(header) << BODY_DELIMITER
msgs << @dump_proc.call(record) << BODY_DELIMITER
end
when "index"
header[INDEX_OP] = meta
msgs << @dump_proc.call(header) << BODY_DELIMITER
msgs << @dump_proc.call(record) << BODY_DELIMITER
end
end
def flatten_record(record, prefix=[])
ret = {}
if record.is_a? Hash
record.each { |key, value|
ret.merge! flatten_record(value, prefix + [key.to_s])
}
elsif record.is_a? Array
# Don't mess with arrays, leave them unprocessed
ret.merge!({prefix.join(@flatten_hashes_separator) => record})
else
return {prefix.join(@flatten_hashes_separator) => record}
end
ret
end
def write_objects(tag, chunk)
bulk_message = ''
header = {}
meta = {}
chunk.msgpack_each do |time, record|
if @flatten_hashes
record = flatten_record(record)
end
next unless record.is_a? Hash
if @target_index_key && record[@target_index_key]
target_index = record.delete @target_index_key
elsif @logstash_format
if record.has_key?("@timestamp")
dt = record["@timestamp"]
dt = @time_parser.parse(record["@timestamp"], time)
elsif record.has_key?(@time_key)
dt = @time_parser.parse(record[@time_key], time)
record['@timestamp'] = record[@time_key] unless time_key_exclude_timestamp
else
dt = Time.at(time).to_datetime
record.merge!({"@timestamp" => dt.to_s})
end
dt = dt.new_offset(0) if @utc_index
target_index = "#{@logstash_prefix}-#{dt.strftime(@logstash_dateformat)}"
else
target_index = @index_name
end
# Change target_index to lower-case since Elasticsearch doesn't
# allow upper-case characters in index names.
target_index = target_index.downcase
if @include_tag_key
record[@tag_key] = tag
end
if @target_type_key && record[@target_type_key]
target_type = record.delete @target_type_key
else
target_type = @type_name
end
meta.clear
meta["_index"] = target_index
meta["_type"] = target_type
@meta_config_map ||= { 'id_key' => '_id', 'parent_key' => '_parent', 'routing_key' => '_routing' }
@meta_config_map.each_pair do |config_name, meta_key|
record_key = self.instance_variable_get("@#{config_name}")
meta[meta_key] = record[record_key] if record_key && record[record_key]
end
if @remove_keys
@remove_keys.each { |key| record.delete(key) }
end
append_record_to_messages(@write_operation, meta, header, record, bulk_message)
end
send(bulk_message) unless bulk_message.empty?
bulk_message = nil
end
def send(data)
retries = 0
begin
client.bulk body: data
rescue *client.transport.host_unreachable_exceptions => e
if retries < 2
retries += 1
@_es = nil
log.warn "Could not push logs to Elasticsearch, resetting connection and trying again. #{e.message}"
sleep 2**retries
retry
end
raise ConnectionFailure, "Could not push logs to Elasticsearch after #{retries} retries. #{e.message}"
end
end
end
@richm
Copy link

richm commented Nov 2, 2017

@repeatedly Which version of https://github.com/uken/fluent-plugin-elasticsearch was this based off of? That is, what is the difference between this and the original source code? What fixes/enhancements does out_elasticsearch2.rb have over the original?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment