Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save imweijh/40b2502666bcd9915beb97e705b7f665 to your computer and use it in GitHub Desktop.
Save imweijh/40b2502666bcd9915beb97e705b7f665 to your computer and use it in GitHub Desktop.
Process multiples lines of unstructured logs as one structured record by Logstash prior pushing them to Elasticsearch.
input {
beats {
port => 5301
}
}
filter {
if [fields][type] == "monthly-indexed.r2web.bsd.alljob" {
mutate {
gsub => ["message","/"," "]
}
grok {
match => {
"message" => [
"%{WORD:m_id}%{SPACE}%{NUMBER}%{SPACE}%{WORD:log_type}%{SPACE}Job%{SPACE}%{NUMBER:job_id}%{SPACE}%{WORD:user}%{SPACE}%{WORD:job_mem_name}%{SPACE}started%{SPACE}on%{SPACE}(?<start_date>\d\d \d\d \d\d)%{SPACE}at%{SPACE}%{TIME:start_time}%{SPACE}in%{SPACE}subsystem%{SPACE}%{WORD:subsystem_name}%{SPACE}in%{SPACE}%{WORD:mainsystem_name}%{GREEDYDATA}",
"%{WORD:m_id}%{SPACE}%{NUMBER}%{SPACE}%{WORD:log_type}%{SPACE}Job%{SPACE}%{NUMBER:job_id}%{SPACE}%{WORD:user}%{SPACE}%{WORD:job_mem_name}%{SPACE}ended%{SPACE}on%{SPACE}(?<end_date>\d\d \d\d \d\d)%{SPACE}at%{SPACE}%{TIME:end_time};%{SPACE}%{NUMBER:elapsed_time_sec}%{SPACE}seconds used; end code%{SPACE}%{INT:return_code}%{SPACE}%{GREEDYDATA}"
]
}
}
if "_grokparsefailure" in [tags] {
drop { }
}
if [log_type] == "INFO" {
aggregate {
task_id => "%{job_id}"
code => "
map['entry_id'] ||= event.get('job_id') + event.get('user') + event.get('job_mem_name')
map['job_id'] = event.get('job_id')
map['data_center'] = event.get('[fields][logs_from]')
map['table'] = event.get('subsystem_name')
map['job_mem_name'] = event.get('job_mem_name')
map['application'] = event.get('subsystem_name')
map['group'] = event.get('subsystem_name')
map['owner'] = event.get('user')
map['node_id'] = event.get('[fields][logs_from]')
map['order_date'] ||= event.get('start_date')
map['order_id'] ||= event.get('start_date')
map['start_datetime'] = event.get('start_date') + '_' + event.get('start_time')
map['info_message'] = event.get('message')
event.cancel()
"
map_action => "create"
}
}
if [log_type] == "COMPLETION" {
aggregate {
task_id => "%{job_id}"
code => "
map['entry_id'] ||= event.get('job_id') + event.get('user') + event.get('job_mem_name')
map['order_date'] ||= event.get('end_date')
map['order_id'] ||= event.get('end_date')
map['end_datetime'] = event.get('end_date') + '_' + event.get('end_time')
map['rerun_counter'] = '1'
map['job_status'] = event.get('return_code')
map['elapsed_time_sec'] = event.get('elapsed_time_sec')
map['completion_message'] = event.get('message')
items_to_remove = ['message', 'm_id', 'log_type', 'job_id', 'user', 'job_mem_name', 'end_date', 'end_time', 'elapsed_time_sec', 'return_code']
items_to_remove.each{|item| event.remove(item)}
map.each{|field, value| event.set(field, value)}
"
map_action => "update"
end_of_task => true
timeout => 120
}
}
if "_aggregatetimeout" in [tags] {
drop { }
}
date {
match => ["start_datetime", "yy MM dd_HH:mm:ss"]
target => "@timestamp"
timezone => "Asia/Singapore"
}
date {
match => ["start_datetime", "yy MM dd_HH:mm:ss"]
target => "start_datetime"
timezone => "Asia/Singapore"
}
date {
match => ["end_datetime", "yy MM dd_HH:mm:ss"]
target => "end_datetime"
timezone => "Asia/Singapore"
}
date {
match => ["order_date", "yy MM dd"]
target => "order_date"
timezone => "Asia/Singapore"
}
}
}
output {
if [fields][type] == "monthly-indexed.r2web.bsd.alljob" {
elasticsearch {
hosts => "elasticsearch:9200"
index => "common_ctrlm_%{+yyyy_MM}_r2web_bsd_alljob_stats"
document_id => "%{entry_id}"
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment