Sync any FileMaker Table to ElasticSearch using Logstash with a JDBC pipeline. Make sure all fields are ISO compliant (keep naming in legacy SQL scope).
Create a jdbc simple pipeline...
input {
jdbc {
jdbc_driver_library => "fmjdbc.jar"
jdbc_driver_class => "com.filemaker.jdbc.Driver"
jdbc_connection_string => "jdbc:filemaker://your_filemaker_host:2399/Contacts"
jdbc_user => "admin"
jdbc_password => "verysecret"
schedule => "* * * * *"
statement => "SELECT * FROM your_table_name"
}
}
output {
elasticsearch {
hosts => ["your_elasticsearch_host:9200"]
index => "contact"
user => "elastic"
password => "verysecret"
}
stdout {
codec => json
}
}
notes by air.fsans@gmail.com 16AUG2022
- input.jdbc.jdbc_driver_library: add jar to elastic classpath or give an absolute path.
- input.jdbc.statement: do not end with ";"
- output.elasticsearch: give user and password if the elk stack has been securized.
- the last statement is critical, re-check it if the elk stack has been securized.
Save to file (anywhere), as pipeline_name.conf
Add the pipeline to logstash.yml...
...
- pipeline.id: fm_pipeline_name
path.config: "path to the pipeline_name.conf file"
...
Run logstash and verify the log output in stdio, or just use Kibana.
TODO: REVIEW: using pagination throws some sql query errors
- Add a timestamp field update_time with autoenter modification timestamp
- Add a new field update_unix_time = FM Timestamp to unixTine from update_time as calculation (number), see this custom function to properly format it: FM Custom Function: UnixTime()
(update_time is just to make a human readable reference, not really required making update_unix_time autoenter using the same calculation to store the unixtime)
input {
jdbc {
jdbc_driver_library => "/full_path_to_driver/fmjdbc.jar"
jdbc_driver_class => "com.filemaker.jdbc.Driver"
jdbc_connection_string => "jdbc:filemaker://your_filemaker_host:2399/contact"
jdbc_user => "admin"
jdbc_password => "verysecret"
schedule => "* * * * *"
statement => "SELECT * FROM contact WHERE update_unix_time > :sql_last_value ORDER BY update_unix_time ASC"
record_last_run => true
last_run_metadata_path => "/full_path_to_logstash_root/logstash/fm_contact_metadata.txt"
clean_run => false
tracking_column => "update_unix_time"
tracking_column_type => "numeric"
use_column_value => true
sql_log_level => "debug"
}
}
filter{
mutate{
copy => {"id" => "[@metadata][_id]"}
remove_field => ["id", "@version", "update_unix_time"]
}
}
output {
elasticsearch {
hosts => ["your_elasticsearch_host:9200"]
index => "contact"
user => "elastic"
password => "verysecret"
doc_as_upsert => true
action => update
document_id => "%{[@metadata][_id]}"
}
stdout {
codec => json
}
}
Remember that FileMaker JDBC uses your Table Ocurrence instance names, not the real table names. Its a good idea to cretate dedicated TOs with some significative prefix (i.e. xdbc_contact)
Save to file (anywhere) as pipeline_name.conf
Add the pipeline to logstash.yml...
...
- pipeline.id: fm_pipeline_name
path.config: "path to the pipeline_name.conf file"
...
Run logstash and verify log output in stdio, or just use Kibana.