Skip to content

Instantly share code, notes, and snippets.

@agup006
Last active August 28, 2023 18:14
Show Gist options
  • Save agup006/c054d7296f2515d99da1941d2ad8236c to your computer and use it in GitHub Desktop.
Save agup006/c054d7296f2515d99da1941d2ad8236c to your computer and use it in GitHub Desktop.
function process(tag, timestamp, record)
-- Splitting the log by tabs to extract the fields
local fields = {}
for field in string.gmatch(record["log"], "([^\t]+)") do
table.insert(fields, field)
end
-- Extracting fields from the split log
local date = fields[1]
local time = fields[2]
local edgeLocation = fields[3]
local scBytes = fields[4]
local cIp = fields[5]
local csMethod = fields[6]
local csHost = fields[7]
local csUriStem = fields[8]
local scStatus = fields[9]
local csReferer = fields[10]
local csUserAgent = fields[11]
local csUriQuery = fields[12]
local csCookie = fields[13]
local xEdgeResultType = fields[14]
local xEdgeRequestId = fields[15]
local xHostHeader = fields[16]
local csProtocol = fields[17]
local csBytes = fields[18]
local timeTaken = fields[19]
local xForwardedFor = fields[20]
local sslProtocol = fields[21]
local sslCipher = fields[22]
local xEdgeResponseResultType = fields[23]
local csProtocolVersion = fields[24]
-- Mapping the extracted fields to the OpenTelemetry schema and additional mappings
record["@timestamp"] = date .. "T" .. time .. "Z"
record["body"] = record["log"]
record["@message"] = record["log"]
record["attributes"] = {}
record["attributes"]["data_stream"] = {}
record["attributes"]["data_stream"]["dataset"] = "cloudfront"
record["attributes"]["data_stream"]["namespace"] = "aws"
record["attributes"]["data_stream"]["type"] = "log"
record["event"] = {}
record["event"]["domain"] = "aws"
record["event"]["source"] = "cloudfront"
record["event"]["category"] = "web"
record["event"]["type"] = csMethod
record["event"]["kind"] = xEdgeResultType
record["event"]["result"] = scStatus
-- Additional mappings
record["aws"] = {}
record["aws"]["cloudfront"] = {}
record["aws"]["cloudfront"]["c-ip"] = cIp
record["aws"]["cloudfront"]["cs-host"] = csHost
record["aws"]["cloudfront"]["cs-referer"] = csReferer
record["aws"]["cloudfront"]["cs-user-agent"] = csUserAgent
record["aws"]["cloudfront"]["cs-bytes"] = tonumber(csBytes)
record["aws"]["cloudfront"]["cs-method"] = csMethod
record["aws"]["cloudfront"]["cs-protocol"] = csProtocol
record["aws"]["cloudfront"]["cs-protocol-version"] = csProtocolVersion
record["aws"]["cloudfront"]["cs-uri-query"] = csUriQuery
record["aws"]["cloudfront"]["cs-uri-stem"] = csUriStem
record["aws"]["cloudfront"]["cs-cookie"] = csCookie
record["aws"]["cloudfront"]["sc-bytes"] = tonumber(scBytes)
record["aws"]["cloudfront"]["sc-status"] = scStatus
record["aws"]["cloudfront"]["ssl-cipher"] = sslCipher
record["aws"]["cloudfront"]["ssl-protocol"] = sslProtocol
record["aws"]["cloudfront"]["time-taken"] = tonumber(timeTaken)
record["aws"]["cloudfront"]["x-edge-location"] = edgeLocation
record["aws"]["cloudfront"]["x-edge-request-id"] = xEdgeRequestId
record["aws"]["cloudfront"]["x-edge-result-type"] = xEdgeResultType
record["aws"]["cloudfront"]["x-edge-response-result-type"] = xEdgeResponseResultType
record["aws"]["cloudfront"]["x-forwarded-for"] = xForwardedFor
record["aws"]["cloudfront"]["x-host-header"] = xHostHeader
-- Removing the original log field to avoid redundancy
record["log"] = nil
return 1, timestamp, record
end
2019-12-04 21:02:31 LAX1 392 192.0.2.100 GET d111111abcdef8.cloudfront.net /index.html 200 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Hit SOX4xwn4XV6Q4rgb7XiVGOHms_BGlTAC4KyHmureZmBNrjGdRLiNIQ== d111111abcdef8.cloudfront.net https 23 0.001 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 Hit HTTP/2.0 - - 11040 0.001 Hit text/html 78 - -
2019-12-04 21:02:31 LAX1 392 192.0.2.100 GET d111111abcdef8.cloudfront.net /index.html 200 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Hit k6WGMNkEzR5BEM_SaF47gjtX9zBDO2m349OY2an0QPEaUum1ZOLrow== d111111abcdef8.cloudfront.net https 23 0.000 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 Hit HTTP/2.0 - - 11040 0.000 Hit text/html 78 - -
2019-12-04 21:02:31 LAX1 392 192.0.2.100 GET d111111abcdef8.cloudfront.net /index.html 200 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Hit f37nTMVvnKvV2ZSvEsivup_c2kZ7VXzYdjC-GUQZ5qNs-89BlWazbw== d111111abcdef8.cloudfront.net https 23 0.001 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 Hit HTTP/2.0 - - 11040 0.001 Hit text/html 78 - -
2019-12-13 22:36:27 SEA19-C1 900 192.0.2.200 GET d111111abcdef8.cloudfront.net /favicon.ico 502 http://www.example.com/ Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Error 1pkpNfBQ39sYMnjjUQjmH2w1wdJnbHYTbag21o_3OfcQgPzdL2RSSQ== www.example.com http 675 0.102 - - - Error HTTP/1.1 - - 25260 0.102 OriginDnsError text/html 507 - -
2019-12-13 22:36:26 SEA19-C1 900 192.0.2.200 GET d111111abcdef8.cloudfront.net / 502 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Error 3AqrZGCnF_g0-5KOvfA7c9XLcf4YGvMFSeFdIetR1N_2y8jSis8Zxg== www.example.com http 735 0.107 - - - Error HTTP/1.1 - - 3802 0.107 OriginDnsError text/html 507 - -
2019-12-13 22:37:02 SEA19-C2 900 192.0.2.200 GET d111111abcdef8.cloudfront.net / 502 - curl/7.55.1 - - Error kBkDzGnceVtWHqSCqBUqtA_cEs2T3tFUBbnBNkB9El_uVRhHgcZfcw== www.example.com http 387 0.103 - - - Error HTTP/1.1 - - 12644 0.103 OriginDnsError text/html 507 - -
function process(tag, timestamp, record)
local log = record["Records"][1] -- Extract the first record from the "Records" array
if not log then
return 0 -- skip this record if decoding fails
end
local transformed_log = {}
-- Mapping for '@timestamp'
transformed_log["@timestamp"] = log["eventTime"] or ""
-- Mapping for 'event'
transformed_log["event"] = {
domain = "cloudtrail",
name = log["eventName"] or "",
source = log["eventSource"] or "",
category = log["eventCategory"] or "",
type = log["eventType"] or "",
kind = log["managementEvent"] and "Management" or "Data",
result = log["readOnly"] and "Read" or "Write"
}
-- Mapping for 'attributes'
transformed_log["attributes"] = {
data_stream = {
dataset = "aws_cloudtrail",
namespace = "observability",
type = "logs"
}
}
-- Mapping for 'cloud' based on the provided mapping
transformed_log["cloud"] = {
provider = "aws",
account = {
id = log["recipientAccountId"] or ""
},
region = log["awsRegion"] or "",
resource_id = "", -- This field is not present in the sample log. You might need to adjust this based on your actual logs.
availability_zone = "", -- This field is also not present in the sample log. Adjust as needed.
platform = "" -- This field is not present in the sample log. Adjust as needed.
}
-- Mapping for 'aws'
transformed_log["aws"] = {
cloudtrail = log -- Directly embed the entire cloudtrail log
}
-- Mapping for 'body'
transformed_log["body"] = record["log"] or "" -- Use the original log entry
-- Check if the transformed log is empty
if next(transformed_log) == nil then
return 0 -- skip this record
end
return 1, timestamp, transformed_log
end
function process(tag, timestamp, record)
-- Extracting fields from the provided ELB log
local fields = {}
for field in string.gmatch(record["log"], "%S+") do
table.insert(fields, field)
end
-- Mapping the extracted fields to the OpenTelemetry Log Format or Elastic Common Schema
local mapped_record = {}
-- Protocol type
local protocol_type = fields[1]
-- Date and Time
mapped_record["@timestamp"] = fields[2]
-- Event details
mapped_record["event"] = {
domain = "elb",
source = fields[4],
category = "http",
type = fields[12],
kind = "event",
result = fields[13]
}
-- HTTP details
mapped_record["http"] = {
request = {
method = fields[12],
url = fields[13]
},
response = {
status_code = tonumber(fields[14])
},
version = string.match(fields[13], "HTTP/(%d.%d)")
}
-- Communication details
mapped_record["communication"] = {
type = protocol_type,
source = {
ip = string.match(fields[4], "(%d+.%d+.%d+.%d+)"),
port = tonumber(string.match(fields[4], ":(%d+)"))
},
destination = {
ip = string.match(fields[5], "(%d+.%d+.%d+.%d+)"),
port = tonumber(string.match(fields[5], ":(%d+)"))
}
}
-- Cloud details
mapped_record["cloud"] = {
provider = "aws",
region = fields[1],
resource_id = fields[3]
}
-- URL details
local protocol, domain, path = string.match(fields[13], "(%w+)://([^/]+)(/[^ ]*)")
mapped_record["url"] = {
original = fields[13],
full = fields[13],
scheme = protocol,
domain = domain,
path = path
}
-- Return the mapped record
return 1, timestamp, mapped_record
end
http 2018-07-02T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 192.168.131.39:2817 10.0.0.1:80 0.000 0.001 0.000 200 200 34 366 "GET http://www.example.com:80/ HTTP/1.1" "curl/7.46.0" - - arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337262-36d228ad5d99923122bbe354" "-" "-" 0 2018-07-02T22:22:48.364000Z "forward" "-" "-" "10.0.0.1:80" "200" "-" "-"
https 2018-07-02T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 192.168.131.39:2817 10.0.0.1:80 0.086 0.048 0.037 200 200 0 57 "GET https://www.example.com:443/ HTTP/1.1" "curl/7.46.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337281-1d84f3d73c47ec4e58577259" "www.example.com" "arn:aws:acm:us-east-2:123456789012:certificate/12345678-1234-1234-1234-123456789012" 1 2018-07-02T22:22:48.364000Z "authenticate,forward" "-" "-" "10.0.0.1:80" "200" "-" "-"
h2 2018-07-02T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 10.0.1.252:48160 10.0.0.66:9000 0.000 0.002 0.000 200 200 5 257 "GET https://10.0.2.105:773/ HTTP/2.0" "curl/7.46.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337327-72bd00b0343d75b906739c42" "-" "-" 1 2018-07-02T22:22:48.364000Z "redirect" "https://example.com:80/" "-" "10.0.0.66:9000" "200" "-" "-"
ws 2018-07-02T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 10.0.0.140:40914 10.0.1.192:8010 0.001 0.003 0.000 101 101 218 587 "GET http://10.0.0.30:80/ HTTP/1.1" "-" - - arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337364-23a8c76965a2ef7629b185e3" "-" "-" 1 2018-07-02T22:22:48.364000Z "forward" "-" "-" "10.0.1.192:8010" "101" "-" "-"
wss 2018-07-02T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 10.0.0.140:44244 10.0.0.171:8010 0.000 0.001 0.000 101 101 218 786 "GET https://10.0.0.30:443/ HTTP/1.1" "-" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2 arn:aws:elasticloadbalancing:us-west-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337364-23a8c76965a2ef7629b185e3" "-" "-" 1 2018-07-02T22:22:48.364000Z "forward" "-" "-" "10.0.0.171:8010" "101" "-" "-"
http 2018-11-30T22:23:00.186641Z app/my-loadbalancer/50dc6c495c0c9188 192.168.131.39:2817 - 0.000 0.001 0.000 200 200 34 366 "GET http://www.example.com:80/ HTTP/1.1" "curl/7.46.0" - - arn:aws:elasticloadbalancing:us-east-2:123456789012:targetgroup/my-targets/73e2d6bc24d8a067 "Root=1-58337364-23a8c76965a2ef7629b185e3" "-" "-" 0 2018-11-30T22:22:48.364000Z "forward" "-" "-" "-" "-" "-" "-"
[SERVICE]
parsers_file /etc/fluent-bit/parsers.conf
[INPUT]
name tail
path elb-sample.log
read_from_head true
[FILTER]
name lua
match *
script elb-otel-1.0.0.lua
call process
[OUTPUT]
name stdout
match *
format json
[SERVICE]
parsers_file /etc/fluent-bit/parsers.conf
[INPUT]
name tail
path cloudfront-sample.log
read_from_head true
[FILTER]
name lua
match *
script cloudfront-otel-1.0.0.lua
call process
[OUTPUT]
name stdout
match *
format json
[SERVICE]
parsers_file /etc/fluent-bit/parsers.conf
[INPUT]
name tail
path rds-sample.log
read_from_head true
[FILTER]
name lua
match *
script rds-otel-1.0.0.lua
call process
[OUTPUT]
name stdout
match *
format json
[SERVICE]
parsers_file /etc/fluent-bit/parsers.conf
[INPUT]
name tail
path vpc-sample.log
read_from_head true
[FILTER]
name lua
match *
script vpc-otel-1.0.0.lua
call process
[OUTPUT]
name stdout
match *
format json
20230501 10:02:19,ip-10-1-0-50,rdsadmin,localhost,7,585281,QUERY,,'select * from information_schema.rds_events_threads_waits_current where (type <> \\'BACKGROUND\\' or name = \\'thread/sql/slave_sql\\') and command <> \\'Sleep\\'',0,,"
function process(tag, timestamp, record)
-- Extract the log field from the Fluent Bit output
local log = record["log"]
-- Split the log by spaces
local fields = {}
for word in string.gmatch(log, "%S+") do
table.insert(fields, word)
end
-- Extract fields based on the VPC Flow log format
local version = fields[1]
local account_id = fields[2]
local interface_id = fields[3]
local srcaddr = fields[4]
local dstaddr = fields[5]
local srcport = tonumber(fields[6])
local dstport = tonumber(fields[7])
local protocol = tonumber(fields[8])
local packets = tonumber(fields[9])
local bytes = tonumber(fields[10])
local start = tonumber(fields[11])
local end_time = tonumber(fields[12])
local action = fields[13]
local log_status = fields[14]
-- Map the extracted fields to the provided mappings
local mapped_log = {
["@timestamp"] = os.date("!%Y-%m-%dT%H:%M:%S", start),
["observedTimestamp"] = os.date("!%Y-%m-%dT%H:%M:%S", end_time),
["body"] = log,
["event"] = {
["category"] = "network_traffic",
["type"] = "connection",
["action"] = action,
["outcome"] = log_status
},
["communication"] = {
["source"] = {
["ip"] = srcaddr,
["port"] = srcport,
["bytes"] = bytes,
["packets"] = packets
},
["destination"] = {
["ip"] = dstaddr,
["port"] = dstport
}
}
}
-- Return the mapped log
return 1, timestamp, mapped_log
end
2 123456789010 eni-11111111aaaaaaaaa - - - - - - - 1431280876 1431280934 - SKIPDATA
2 123456789010 eni-1235b8ca123456789 - - - - - - - 1431280876 1431280934 - NODATA
2 123456789010 eni-1235b8ca123456789 172.31.9.69 172.31.9.12 49761 3389 6 20 4249 1418530010 1418530070 REJECT OK
2 123456789010 eni-1235b8ca123456789 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK
2 123456789010 eni-1235b8ca123456789 203.0.113.12 172.31.16.139 0 0 1 4 336 1432917027 1432917142 ACCEPT OK
2 123456789010 eni-1235b8ca123456789 172.31.16.139 203.0.113.12 0 0 1 4 336 1432917094 1432917142 REJECT OK
2 123456789010 eni-1235b8ca123456789 2001:db8:1234:a100:8d6e:3477:df66:f105 2001:db8:1234:a102:3304:8879:34cf:4071 34892 22 6 54 8855 1477913708 1477913820 ACCEPT OK
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment