Skip to content

Instantly share code, notes, and snippets.

@hreidar
Created April 14, 2021 15:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hreidar/88b98fb5cc932046912b9b78069418f6 to your computer and use it in GitHub Desktop.
Save hreidar/88b98fb5cc932046912b9b78069418f6 to your computer and use it in GitHub Desktop.
[sources.in_from_kafka]
type = "kafka"
bootstrap_servers = "k8s-c4-m1:30656,k8s-c4-m2:30656,k8s-c4-m3:30656"
group_id = "app1-test"
topics = ["APP1.TEST"]
auto_offset_reset = "beginning"
[sources.metrics]
type = "internal_metrics"
## transforms
[transforms.json]
type = "json_parser"
inputs = ["in_from_kafka"]
field = "message"
drop_invalid = true
#[transforms.json]
# type = "remap"
# inputs = ["in_from_kafka"]
# source = '''
# structured, err = parse_json(.message)
# if err != null {
# log("Unable to parse JSON: " + err, level: "error")
# } else {
# . = merge(., structured)
# }
# del(.structured)
# '''
## ipv6 event router
[transforms.ipv6_event_type]
inputs = ["json"]
type = "route"
route.old = '!exists(.relayAgentRemoteId)'
route.new = 'exists(.relayAgentRemoteId)'
[sinks.debug]
type = "console"
inputs = ["ipv6_event_type.old"]
target = "stdout"
encoding.codec = "text"
## old ipv6 events
[transforms.ipv6-filter]
type = "filter"
inputs = ["ipv6_event_type.old"]
condition.type = "check_fields"
condition."message.contains" = "leased"
[transforms.ipv6-regex]
inputs = ["ipv6-filter"]
type = "regex_parser"
field = "message"
patterns = ['(?s)(?P<client_id>(?:[a-zA-Z0-9]{2}:){8,}[a-zA-Z0-9]{2}),(?P<remote_id>(?:[a-zA-Z0-9]{2}:){8,}[a-zA-Z0-9]{2}),(?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)\s\b(?P<month>[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|[Mm](?:a|ä)?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|[Oo](?:c|k)?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)\b\s*(?P<day>:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])\s(?P<time>(?:2[0123]|[01]?[0-9]):(?:[0-5][0-9]):(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?))\s(?P<year>\d{4});\n\s(?P<ipv6_leases>.+)']
[transforms.ipv6-build_leases_data]
inputs = ["ipv6-regex"]
type = "lua"
version = "2"
hooks.process = """
function (event,emit)
local ipv6_leases = event.log.ipv6_leases
local k = {"state","binding","prefix_name","lease"}
local lease_binding_type = {IA_PD = "Prefix delegation", IA_NA = "Nontemporary", IA_TA = "Temporary"}
event.log.lease_info = {}
j = 1
for s in string.gmatch(ipv6_leases, "%S+") do
i = 1
event.log.lease_info[j] = {}
for l in string.gmatch(s:sub(0,-2), "([^,]+)") do
local key = k[i]
if i == 2 then
event.log.lease_info[j][key] = lease_binding_type[l]
else
event.log.lease_info[j][key] = l
end
i = i + 1
end
j = j + 1
end
event.log.ipv6_leases = nil
emit(event)
end
"""
[transforms.ipv6-fix_month]
inputs = ["ipv6-build_leases_data"]
type = "lua"
version = "2"
hooks.process = """
function (event,emit)
local mt = {JAN = "01", FEB = "02", MAR = "03", APR = "04", MAY = "05", JUN = "06", JUL = "07", AUG = "08", SEP = "09", OCT = "10", NOV = "11", DEC = "12"}
local m = string.upper(event.log.month)
event.log.month = mt[m]
emit(event)
end
"""
[transforms.ipv6-fix_date]
inputs = ["ipv6-fix_month"]
type = "lua"
version = "2"
hooks.process = """
function (event,emit)
event.log.date = string.format("%4d-%02d-%02d", event.log.year, event.log.month, event.log.day)
emit(event)
end
"""
[transforms.ipv6-fix_time]
inputs = ["ipv6-fix_date"]
type = "concat"
items = ["date", "time"]
target = "origin_timestamp"
joiner = "T"
[transforms.ipv6-old-out]
inputs = ["ipv6-fix_time"]
type = "remove_fields"
fields = ["month", "day", "year","date", "time","timestamp"]
## new ipv6 events
[transforms.ipv6-new-out]
type = "remap"
inputs = ["ipv6_event_type.new"]
source = '''
. |= object!(.message)
.client_id = slice!(.client_id, start: -17)
del(.message); del(.relayAgentRemoteId)
'''
## outputs
[sinks.output]
type = "file"
inputs = ["ipv6-new-out", "ipv6-old-out"]
path = "/tmp/output.json"
encoding.codec = "text"
[sinks.to-prometheus]
type = "prometheus_exporter"
inputs = ["metrics"]
address = "0.0.0.0:9509"
default_namespace = "ipv6-test"
buckets = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
quantiles = [0.5, 0.75, 0.9, 0.95, 0.99]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment