Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
logstash : input 3 pg table, output 4 es indexes
input {
jdbc {
jdbc_connection_string => "jdbc:postgresql://IP:PORT/DB_NAME?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
jdbc_user => "atlas"
jdbc_passwor => "atlaspw"
jdbc_validate_connection => true
jdbc_driver_library => "DRIVER_PATH"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "* * * * *"
statement => "SELECT region_id, region_type, country_code, country_code3, continent_code, source_from,
st_asgeojson(center_geo_point)::text as center_geo_point_text, center_longitude, center_latitude, jsonn::text
from expedia_region_union where source_timestamp + interval '9 hours' > :sql_last_value"
tracking_column => "source_timestamp"
tracking_column_type => "timestamp"
use_column_value => true
clean_run => false
last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run"
tags => ["expedia_region_union"]
}
jdbc {
jdbc_connection_string => "jdbc:postgresql://IP:PORT/DB_NAME?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
jdbc_user => "atlas"
jdbc_passwor => "atlaspw"
jdbc_validate_connection => true
jdbc_driver_library => "DRIVER_PATH"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "*/10 * * * *"
statement => "select id, iata_airport_code, name, name_full, country_code, center_latitude, center_longitude, name_kr as name_korean, name_full_kr as name_korean_full,
st_asgeojson(center_geo_point)::text as center_geo_point_text, la, lo, public_flag, international_flag, source_from,
source_time, iata_airport_metro_code, (select json_build_object('region_id', region_id, 'region_name', region_name,
'region_name_kr', region_name_kr, 'region_type', region_type, 'region_code', region_code)::text from expedia_region_union where region_id = c.region_id) as haha_test from expedia_airport_more c"
jdbc_paging_enabled => true
jdbc_page_size => "100000"
tags => ["expedia_airport_more"]
}
jdbc {
jdbc_connection_string => "jdbc:postgresql://IP:PORT/DB_NAME?useTimezone=true&useLegacyDatetimeCode=false&serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=utf8"
jdbc_user => "atlas"
jdbc_passwor => "atlaspw"
jdbc_validate_connection => true
jdbc_driver_library => "DRIVER_PATH"
jdbc_driver_class => "org.postgresql.Driver"
schedule => "*/10 * * * *"
statement => "select source_from, region_name_kr as region_name_korean, region_name_full_kr as region_name_korean_full,
region_id, region_name_Full, continent_code, region_type, descendants::text as descendants_text from expedia_region_continent"}
tags => ["expedia_region_continent"]
}
stdin { codec => plain { charset => "UTF-8" }}
filter {
if "expedia_region_union" in [tags] {
ruby {
code => "
require 'json'
begin
center_geo_point_json = JSON.parse(event.get('center_geo_point_text') || '{}')
event.set('point', center_geo_point_json)
event.remove('center_geo_point_text')
rescue Exception => e
# event.tag('malfunctioned center_geo_point')
end
begin
location = { lat: event.get('center_latitude').to_f, lon: event.get('center_longitude').to_f}
event.set('location', JSON(location.to_json))
event.set('center_longitude', event.get('center_longitude'))
event.set('center_latitude', event.get('center_latitude'))
rescue Exception => e
# event.tag('something happens in location block ')
end
begin
jsonn_json = JSON.parse(event.get('jsonn').to_s || '{}')
jsonn_json.each {|k,v|
event.set(k,v)
}
event.remove('jsonn')
rescue Exception => e
# event.tag('something happens in jsonn')
end
"
}
date {
match => ['time', 'UNIX']
}
clone {
clones => ['test-union-007', ]
}
}
}
output {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.