En el fichero /etc/filebeat/filebeat.yml
agregamos lo siguiente en Filebeat prospectors
- type: log
paths:
- /var/log/squidguard/*-dest.log
fields:
application: squidguard
Luego, en nuestro nunca bien ponderado Logstash, podemos hacer un filtro con base a fields.application
} else if [fields][application] == "squidguard" {
grok {
pattern_definitions => {
"PETICION" => "Request\(%{NOTSPACE:sg_perfil}/%{NOTSPACE:sg_destino}/%{NOTSPACE:sg_tipo}\)"
"CLIENTE" => "%{IPORHOST:sg_ip_cliente}/%{NOTSPACE:sg_ip_cliente_auxiliar} %{NOTSPACE:sg_usuario}"
"RESULTADO" => "%{WORD:sg_method} %{WORD:sg_veredicto}"
}
match => {
"message" => "%{TIMESTAMP_ISO8601:sg_marca_tiempo} \[%{NUMBER:sg_id:int}\] %{PETICION} %{NOTSPACE:sg_url} %{CLIENTE} %{RESULTADO}"
}
}
date {
match => [ "sg_marca_tiempo", "yyyy-MM-dd HH:mm:ss", "ISO8601" ]
}
}
Tengo que aceptar que el filtro dissect debería funcionar mejor, en cuento es más rápido
Y como me dieron un poco más de recursos para el servidor, valía la pena intentarlo de tal forma
} else if [fields][application] == "squidguard" {
dissect {
mapping => {
"message" => "%{sg_dia} %{sg_hora} %{marca} %{sg_peticion} %{sg_url} %{sg_cliente} %{sg_guion} %{sg_metodo} %{sq_veredicto}"
}
remove_field => ["message"]
}
mutate {
remove_field => ["[beat][version]", "source", "[prospector][type]"]
add_field => { "sg_ts" => "%{sg_dia} %{sg_hora}" }
}
date {
match => [ "sg_ts", "yyyy-MM-dd HH:mm:ss" ]
remove_field => ["sg_ts", "sg_dia", "sg_hora"]
}
dissect {
mapping => {
"sg_peticion" => "Request(%{sg_perfil}/%{sg_destino}/%{sg_tipo})"
}
remove_field => ["sg_peticion"]
}
dissect {
mapping => {
"sg_cliente" => "%{sg_ip_cliente}/%{sg_ip_cliente_auxiliar}"
}
remove_field => ["sg_cliente"]
}
grok {
match => {
"sg_url" => "^(?:(?:https?\://)?%{IPORHOST:sg_sitio}(?:\:%{NUMBER})?)?%{NOTSPACE:sg_uri}?$"
}
remove_field => ["sg_url"]
}
}