Last active December 17, 2022 11:34
FluentD to Elasticsearch Index with Custom Timestamp

This is an example of forwarding logs to elasticsearch using fluentd. In the process, it does use a custom time key.

The setup

Run Elasticsearch and FluentD locally

#run elasticsearch
docker run -e discovery.type=single-node -e -p 9200:9200 -d elasticsearch:7.2.0
#install fluentd

gem install fluentd
fluentd -s conf
fluentd -c conf/fluentd.conf

Consistent timestamp format for better performance

Java Timestamp Format:
GoLang Timestamp Format:

match configuration for fluentd.conf

<match elasticsearch.**>
  @type elasticsearch
  host localhost
  port 9200
  reconnect_on_error true
  compression_level default_compression
  flush_interval 10s
  num_threads 2

  logstash_format true
  utc_index false
  time_key_format "%Y-%m-%dT%H:%M:%S.%N%z"
  time_key time
  time_key_exclude_timestamp true
  include_timestamp false

Sample log matching elasticsearch

echo '{"client_addr":"","level":"info","msg":"Sent response.","req_id":3,"req_method":"POST","req_path":"/v1/data/secret/policy/allow","resp_bytes":15,"resp_duration":3.9623,"resp_status":200,"time":"2020-05-11T08:49:27.999+1000"}'|fluent-cat elasticsearch.test
echo '{"client_addr":"","level":"info","msg":"Sent response.","req_id":3,"req_method":"POST","req_path":"/v1/data/secret/policy/allow","resp_bytes":15,"resp_duration":3.9623,"resp_status":200,"time":"2020-05-11T08:49:27.999666+1000"}'|fluent-cat elasticsearch.test
echo '{"client_addr":"","level":"info","msg":"Sent response.","req_id":3,"req_method":"POST","req_path":"/v1/data/secret/policy/allow","resp_bytes":15,"resp_duration":3.9623,"resp_status":200,"time":"2020-05-11T09:45:54.069-0500"}'|fluent-cat elasticsearch.test
echo '{"client_addr":"","level":"info","msg":"Sent response.","req_id":6,"req_method":"POST","req_path":"/v1/data/secret/policy/allow","resp_bytes":15,"resp_duration":1.427,"resp_status":200,"time":"2020-05-11T14:47:52.425+0000"}'|fluent-cat elasticsearch.test

Verify that the date range query is working for time field

d=$(date +%Y.%m.%d)
curl -X GET "localhost:9200/logstash-$d/_search?pretty" -H 'Content-Type: application/json' -d'
    "query": {
        "range" : {
            "time" : {
                "gt" : "2020-05-11T00:00:00+0000",
                "lte" :  "now/d"

Expected Output

  "took" : 172,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    "max_score" : 1.0,
    "hits" : [
        "_index" : "logstash-2020.05.11",
        "_type" : "_doc",
        "_id" : "B8REBHIBnzeqJrvolFjN",
        "_score" : 1.0,
        "_source" : {
          "client_addr" : "",
          "level" : "info",
          "msg" : "Sent response.",
          "req_id" : 3,
          "req_method" : "POST",
          "req_path" : "/v1/data/secret/policy/allow",
          "resp_bytes" : 15,
          "resp_duration" : 3.9623,
          "resp_status" : 200,
          "time" : "2020-05-11T09:45:54.069-0500"
        "_index" : "logstash-2020.05.11",
        "_type" : "_doc",
        "_id" : "CMREBHIBnzeqJrvolFjN",
        "_score" : 1.0,
        "_source" : {
          "client_addr" : "",
          "level" : "info",
          "msg" : "Sent response.",
          "req_id" : 6,
          "req_method" : "POST",
          "req_path" : "/v1/data/secret/policy/allow",
          "resp_bytes" : 15,
          "resp_duration" : 1.427,
          "resp_status" : 200,
          "time" : "2020-05-11T14:47:52.425+0000"
