Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
logstash-filters-cache-common
class Logstash::Filters::CacheCommon < LogStash::Filters::Base
config :get, validate: :hash, required: false
config :set, validate: :hash, required: false
def register
@get ||= {}
@set ||= {}
if (@get.empty? && @set.empty?)
logger.error("plugin has no directives")
fail("plugin has no directives; add `set` or `get` directive")
end
logger.info("cache configured", log_context)
end
def filter(event)
do_set(event)
do_get(event)
filter_matched(event)
end
##
# @api private
def do_get(event)
return unless @get && !@get.empty?
event_fields_by_memcached_key = @get.each_with_object({}) do |(memcached_key_template, event_field), memo|
memcached_key = event.sprintf(memcached_key_template)
memo[memcached_key] = event_field
end
memcached_keys = event_fields_by_memcached_key.keys
cache_hits_by_memcached_key = cache_mget(memcached_keys)
event_fields_by_memcached_key.each do |memcached_key, event_field|
value = cache_hits_by_memcached_key[memcached_key]
if value.nil?
logger.trace("cache:get miss", log_context(key: memcached_key))
else
logger.trace("cache:get hit", log_context(key: memcached_key, value: value))
event.set(event_field, value)
end
end
end
##
# @api private
def do_set(event)
return unless @set && !@set.empty?
values_by_memcached_key = @set.each_with_object({}) do |(event_field, memcached_key_template), memo|
memcached_key = event.sprintf(memcached_key_template)
value = event.get(event_field)
memo[memcached_key] = value unless value.nil?
end
cache_mset(values_by_memcached_key)
end
##
# Override with multi-get operation if your cache supports getting many
# values with a single operation
# @api public
# @param keys [Array{String}]
# @return [Hash{String => Object}]
def cache_mget(keys)
keys.each_with_object({}) do |key, memo|
memo[key] = cache_get(key)
end
end
##
# Override with multi-set operation if your cache supports setting many
# values with a single operation
# @api public
# @param key_value_map [Hash{String => Object}]
# @return [void]
def cache_mset(key_value_map)
key_value_map.each do |key, value|
cache_set(key, value)
end
end
##
# Override with single-get operation; if you implement `cache_mget`,
# you do not need to also implement this.
# @param key [String]
# @return [Object, nil]
def cache_get(key)
fail NotImplementedError
end
##
# Override with single-set operation; if you implement `cache_mset`,
# you do not need to also implement this.
# @param key [String]
# @param value [Object]
# @return [void]
def cache_set(key, value)
fail NotImplementedError
end
##
# @api private (see `Logstash::Filters::CacheCommon#base_log_context`)
# @param additional_context [Hash{#to_s=>#to_json}]
# @return [Hash{#to_s=>#to_json}]
def log_context(additional_context={})
return base_log_context if additional_context.empty?
base_log_context.merge(additional_context)
end
##
# Override with "base" context for log events, which includes details about
# connection configuration, relevant plugin settings, etc.
# @return [Hash{#to_s=>#to_json}]
def base_log_context
@base_log_context ||= {}.freeze
end
end
class Logstash::Filters::Memcached < Logstash::Filters::CacheCommon
include Logstash::PluginMixins::Memcached # provides configurable dalli_client
config_name 'memcached'
def cache_mget(keys)
dalli_client.get_multi(keys)
end
def cache_set(key, value)
dalli_client.set(key, value)
end
end
# Common Memcached connection config, can be included into Input, Output, or Filter classes.
module Logstash::PluginMixins::Memcached
def self.included(base)
if !base.kind_of?(Class) || !base.ancestors.include?(LogStash::Config::Mixin)
fail("`#{base}` is not a configurable logstash plugin")
end
base.instance_exec do
config :hosts, validate: :array, default: %w(localhost:11211)
config :namespace, validate: :string, required: false
config :ttl, validate: :number, default: 0
end
end
##
# If your concrete implementation defines `register`, don't forget
# to send `super`.
def register
super if defined?(super)
if @ttl < 0
logger.error("ttl cannot be negative")
fail("invalid ttl: cannot be negative")
end
@dalli_client= establish_dalli_connection
end
protected
##
# @api protected
# @return [Dalli::Client]
def dalli_client
@dalli_client
end
private
def establish_dalli_connection
require 'dalli'
hosts = valid_connection_hosts
options = valid_connection_options
logger.info('connecting to memcached', log_context(hosts: hosts, options: options))
Dalli::Client.new(@hosts, options).tap do |client|
begin
client.alive!
rescue Dalli::RingError
logger.error("failed to connect", log_context(hosts: hosts, options: options))
fail("cannot connect to memcached")
end
end
end
def valid_connection_options
{}.tap do |options|
options[:ttl] = @ttl
options[:namespace] = @namespace unless @namespace.nil? || @namespace.empty?
end
end
def valid_connection_hosts
logger.error("configuration: hosts empty!") && fail if @hosts.empty?
@hosts.map(&:to_s)
end
def base_log_context
super().merge(
{}.tap do |context|
context[:hosts] = @hosts
context[:ttl] = @ttl
context[:namespace] = @namespace unless @namespace.nil? || @namespace.empty?
end).freeze
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.