Skip to content

Instantly share code, notes, and snippets.

@piavlo
Last active December 11, 2015 05:08
Show Gist options
  • Save piavlo/4549829 to your computer and use it in GitHub Desktop.
Save piavlo/4549829 to your computer and use it in GitHub Desktop.
input {
redis {
data_type => "list"
host => "10.0.0.1"
key => "q1"
port => 6379
type => "redis2_q1"
threads => 1
batch_events => 100
}
}
filter {
metrics {
meter => "events"
add_tag => "metric"
}
}
output {
stdout {
# only emit events with the 'metric' tag
tags => "metric"
message => "rate: %{events.rate_1m}"
}
}
output {
elasticsearch {}
}
---------------------------------
batch_events => 0
rate: 150
batch_events => 10
rate: 500
batch_events => 100
rate: 3000
---------------------------------
diff --git a/lib/logstash/inputs/redis.rb b/lib/logstash/inputs/redis.rb
index c92c789..6732064 100644
--- a/lib/logstash/inputs/redis.rb
+++ b/lib/logstash/inputs/redis.rb
@@ -43,6 +43,8 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Threadable
# If redis\_type is pattern_channel, then we will PSUBSCRIBE to the key.
# TODO: change required to true
config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => false
+
+ config :batch_events, :validate => :number, :default => 0
public
def initialize(params)
@@ -110,8 +112,29 @@ class LogStash::Inputs::Redis < LogStash::Inputs::Threadable
private
def list_listener(redis, output_queue)
- response = redis.blpop @key, 0
- queue_event response[1], output_queue
+ if @batch_events > 0
+
+ # wait for at least some data, do not waste whole pipeline
+ response = redis.blpop @key, 0
+ queue_event response[1], output_queue
+
+ BatchLpopScript = <<EOF
+ local i = tonumber(ARGV[1])
+ local res = {}
+ while (i > 0) do
+ res[i] = redis.call('lpop',KEYS[1])
+ i = i-1
+ end
+ return res
+ EOF
+
+ redis.eval(BatchLpopScript,[@key],[@batch_events]).each do |event|
+ queue_event event, output_queue if event
+ end
+ else
+ response = redis.blpop @key, 0
+ queue_event response[1], output_queue
+ end
end
private
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment