Skip to content

Instantly share code, notes, and snippets.

@nathwill
Last active August 29, 2015 22:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nathwill/0d28a63e7ff371303bf7 to your computer and use it in GitHub Desktop.
Save nathwill/0d28a63e7ff371303bf7 to your computer and use it in GitHub Desktop.
POC for generic kv pairs decoder for heka
local l = require "lpeg"
require "cjson"
l.locale(l)
local sp = l.space^0
local unreserved = l.digit + l.alnum + l.S"/=-.,_~ "
local name = l.C(unreserved^1) * sp
local comment = l.P"#" * (1 - l.P"\n")^1 * l.P"\n"
local sep = l.P(read_config("pair_separator"))
local pair = comment^0 * l.Cg(name * read_config("kv_separator") * sp * name) * comment^0 * sep^-1
local rig = l.Cf(l.Ct("") * pair^0, rawset)
local ok, numeric_fields = pcall(cjson.decode, read_config("numeric_fields"))
local msg = {
Type = read_config('type') or 'kvpairs',
Payload = nil,
Fields = nil
}
function process_message()
local data = read_message("Payload")
msg.Fields = rig:match(data)
if not msg.Fields then return -1 end
-- keep payload, or not
if payload_keep then
msg.Payload = read_message("Payload")
end
for i, name in ipairs(numeric_fields) do
msg.Fields[name] = tonumber(msg.Fields[name])
end
inject_message(msg)
return 0
end
@nathwill
Copy link
Author

example config:

[DemoProcessInput]
type = "ProcessInput"
ticker_interval = 5
stdout = true
stderr = false
decoder = "redis_info"

    [DemoProcessInput.command.0]
    bin = "/usr/bin/redis-cli"
    args = ["info"]

[redis_info]
type = "SandboxDecoder"
filename = "lua_decoders/pairs.lua"

  [redis_info.config]
  pair_separator = "\n"
  kv_separator = ":"
  numeric_fields = '["process_id", "tcp_port", "uptime_in_seconds", "uptime_in_days", "hz", "lru_clock", "connected_clients", "client_longest_output_list", "client_biggest_input_buf", "blocked_clients", "used_memory", "used_memory_rss", "used_memory_peak", "used_memory_lua", "mem_fragmentation_ratio", "loading", "rdb_changes_since_last_save", "rdb_bgsave_in_progress", "rdb_last_save_time", "rdb_last_bgsave_time_sec", "rdb_current_bgsave_time_sec", "aof_enabled", "aof_rewrite_in_progress", "aof_rewrite_scheduled", "aof_last_rewrite_time_sec", "aof_current_rewrite_time_sec", "total_connections_received", "total_commands_processed", "instantaneous_ops_per_sec", "total_net_input_bytes", "total_net_output_bytes", "instantaneous_input_kbps", "instantaneous_output_kbps", "rejected_connections", "sync_full", "sync_partial_ok", "sync_partial_err", "expired_keys", "evicted_keys", "keyspace_hits", "keyspace_misses", "pubsub_channels", "pubsub_patterns", "latest_fork_usec", "connected_slaves", "master_repl_offset", "repl_backlog_active", "repl_backlog_size", "repl_backlog_first_byte_offset", "repl_backlog_histlen", "used_cpu_sys", "used_cpu_user", "used_cpu_sys_children", "used_cpu_user_children"]'

[RstEncoder]

[LogOutput]
message_matcher = "TRUE"
encoder = "RstEncoder"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment