Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
logstash : input 3 pg table, output 4 es indexes
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://IP:PORT/DB_NAME?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
jdbc_user => "atlas"
jdbc_passwor => "atlaspw"
jdbc_validate_connection => true
jdbc_driver_library => "DRIVER_PATH"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "SELECT region_id, region_type, country_code, country_code3, continent_code, source_from,
st_asgeojson(center_geo_point)::text as center_geo_point_text, center_longitude, center_latitude, jsonn::text
from expedia_region_union where source_timestamp + interval '9 hours' > :sql_last_value"
tracking_column => "source_timestamp"
tracking_column_type => "timestamp"
use_column_value => true
clean_run => false
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run"
tags => ["expedia_region_union"]
}
jdbc {
jdbc_connection_string => "jdbc:postgresql://IP:PORT/DB_NAME?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
jdbc_user => "atlas"
jdbc_passwor => "atlaspw"
jdbc_validate_connection => true
jdbc_driver_library => "DRIVER_PATH"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "*/10 * * * *"
statement => "select id, iata_airport_code, name, name_full, country_code, center_latitude, center_longitude, name_kr as name_korean, name_full_kr as name_korean_full,
st_asgeojson(center_geo_point)::text as center_geo_point_text, la, lo, public_flag, international_flag, source_from,
source_time, iata_airport_metro_code, (select json_build_object('region_id', region_id, 'region_name', region_name,
'region_name_kr', region_name_kr, 'region_type', region_type, 'region_code', region_code)::text from expedia_region_union where region_id = c.region_id) as haha_test from expedia_airport_more c"
jdbc_paging_enabled => true
jdbc_page_size => "100000"
tags => ["expedia_airport_more"]
}
jdbc {
jdbc_connection_string => "jdbc:postgresql://IP:PORT/DB_NAME?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
jdbc_user => "atlas"
jdbc_passwor => "atlaspw"
jdbc_validate_connection => true
jdbc_driver_library => "DRIVER_PATH"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "*/10 * * * *"
statement => "select source_from, region_name_kr as region_name_korean, region_name_full_kr as region_name_korean_full,
region_id, region_name_Full, continent_code, region_type, descendants::text as descendants_text from expedia_region_continent"}
tags => ["expedia_region_continent"]
}
stdin { codec => plain { charset => "UTF-8" }
}
filter {
if "expedia_region_union" in [tags] {
ruby {
code => "
require 'json'
begin
center_geo_point_json = JSON.parse(event.get('center_geo_point_text') || '{}')
event.set('point', center_geo_point_json)
event.remove('center_geo_point_text')
rescue Exception => e
end
begin
location = {lat: event.get('center_latitude').to_f, lon: event.get('center_longitude').to_f}
event.set('location', JSON(location.to_json))
event.set('center_longitude', event.get('center_longitude'))
event.set('center_latitude', event.get('center_latitude'))
rescue Exception => e
end
begin
jsonn_json = JSON.parse(event.get('jsonn').to_s || '{}')
jsonn_json.each {|k,v|
event.set(k,v)
}
event.remove('jsonn')
rescue Exception => e
end
"
}
date { match => ['time', 'UNIX'] }
clone { clones => ['test-union-007', 'test-country-007']}
if [type] == 'test-union-007' {
mutate {
add_field => {"[@metadata][type]" => "test-union-007"}
remove_field => ["tags", "@version", "@timestamp", "country_code", "country_code3", "source_from", "continent_code", "cascaded_city", "capital_contains_yn", "capital_yn", "geo_latitude", "geo_longitude", "center_timezone"]
}
} else {
mutate {
add_field => {"[@metadata][type]" => "test-country-007"}
remove_field => ["region", "city_ancestors", "city_home_flag", "country", "location", "source_from", "ancestor_list", "ancestors", "center_longitude", "center_latitude", "center_timezone",
"nearest_airport", "nearest_airport_country", "parent_city_flag", "point", "region_code", "source_time", "sub_class", "tags", "timezone", "timezone_list", "use_yn_flag", "boundaries_buffer",
"add_continent", "@version", "@timestamp"]
}
if [region_type] != "country" { drop {} }
} else if "expedia_airport_more" in [tags] {
ruby {
code => "require 'json'
begin
center_geo_point_json = JSON.parse(event.get('center_geo_point_text') || '{}')
event.set('point', center_geo_point_json)
event.remove('center_geo_point_text')
rescue Exception => e
#
end
begin
region_json_json = JSON.parse(event.get('haha_test') || '{}')
event.set('region', region_json_json)
event.remove('haha_test')
rescue Exception => e
event.tag('you have to check region_json sql.')
end
begin
location = {lat: event.get('center_latitude').to_f, lon: event.get('center_longitude').to_f}
event.set('location', JSON(location.to_jsonn))
event.set('longitude', event.get('center_longitude'))
event.remove('center_longitude')
event.set('latitude', event.get('center_latitude'))
event.remove('centerl_atitude')
rescue Exception => e
# event.tag('something happens in location block')
end
"
}
mutate {
add_field => {"[@metadata][type]" => "test-airport-007"}
remove_field => ["@timestamp", "@version", "iata_airport_code", "iata_airport_metro_code", "la", "lo", "source_from", "source_time", "tags"]
}
} else {
ruby {
code => "
require 'json'
begin
descendants_json = JSON.parse(event.get('descendants_text'))
event.set('descendants', descendants_json)
event.remove('descendants_text')
rescue Exception => e
end
"
}
mutate {
add_field => {"[@metadata][type]" => "test-continent-007"}
remove_field => ["@timestamp", "@version", "tags"]
}
}
}
output {
if [@metadata][type] == 'test-union-007' {
elasticsarch {
hosts => ["https://호스트:443"]
index => "region_union_logstash_20200629_002"
doc_as_upsert => true
action => "update"
document_id => "%{region_id}"
}
} else if [@metadata][type] == 'test-country-007' {
elasticsearch {
hosts => ["https://호스트:443"]
index => "region_country_logstash_20200629_003"
doc_as_upsert => true
action => "update"
document_id => "%{region_id}"
}
} else if [@metadata][type] == 'test-airport-007' {
elasticsearch {
hosts => ["https://호스트:443"]
index => "region_airport"
doc_as_upsert => true
action => "update"
document_id => "%{region_id}"
}
} else {
elasticsearch {
hosts => ["https://atlas-search.ㅗ.com:443"]
index => "region_airport_logstash_20200629_001"
doc_as_upsert => true
action => "update"
document_id => "%{id}"
}
}
stdout { codec => rubydebug }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.