Skip to content

Instantly share code, notes, and snippets.

@mikejoh
Last active October 1, 2019 06:01
Show Gist options
  • Save mikejoh/1ba111f4d082ea034ebd5f3130c3c2e7 to your computer and use it in GitHub Desktop.
Save mikejoh/1ba111f4d082ea034ebd5f3130c3c2e7 to your computer and use it in GitHub Desktop.
Everything Elasticsearch

Everything Elasticsearch

Architecture

Work in progress

If you install Elasticsearch as-is the node will by default have the following roles:

  • Data
  • Master
  • Ingest

These roles can then be de-coupled and run seperately, this of course means that you can scale the different node types to fit your needs.

Troubleshooting

List max_file_descriptors:

GET /_nodes/stats/process?filter_path=**.max_file_descriptors

Allocation API explaining more on shard allocation status:

GET /_cluster/allocation/explain

How-to use Elasticsearch Ingest pipelines to parse logs sent with Filebeat

We'll use Artifactory logs throughout this guide

Pipelines pre-process documents before indexing, the Ingest node type in Elasticsearch includes a subset of Logstash functionality, part of that is the Ingest pipelines.

In the end of this guide i'll use this pipeline when shipping the log data with Filebeat. The reason for creating the pipeline in the first place was that when Filebeat read a log that it doesn't have a module for it'll send a JSON document containing the raw log entry in a message field. We need to parse that field to extract the interesting parts that we'll use in dashboards and visualizations.

The examples below can be executed (copy-pasted) into Dev Tools within Kibana. The below example includes the pipeline and two example documents which contains faked log entries that we want to parse, the pipeline simulation API is a nice way of testing pipelines. This pipeline uses grok, date and remove processors:

POST _ingest/pipeline/_simulate
{
  "pipeline" : {
    "description": "rtf access log pipeline",
    "processors": [ 
      {
        "grok": {
          "field": "message",
          "patterns": [ 
              "%{TIMESTAMP_ISO8601:rtf_timestamp} \\[%{DATA:rtf_action_resp} %{DATA:rtf_action_type}\\] %{DATA:rtf_repo_name}\\:%{DATA:rtf_file_path}\\s+.* %{USERNAME:username}\/%{IP:client_ip}",
              "%{TIMESTAMP_ISO8601:rtf_timestamp} \\[%{DATA:rtf_action_resp} %{DATA:rtf_action_type}\\] %{DATA:rtf_repo_name}\\:%{DATA:rtf_file_path}\\s+.* %{GREEDYDATA:other}",
              "%{TIMESTAMP_ISO8601:rtf_timestamp} \\[%{DATA:rtf_action_resp} %{DATA:rtf_action_type}\\]  for %{USERNAME:username}\/%{IP:client_ip}"
          ],
          "on_failure" : [
            {
              "set" : {
                "field" : "_index",
                "value" : "{{ _index }}"
              }
            }
          ]
        }
      },
      {
        "date": {
          "field" : "artifactory_timestamp",
          "target_field": "@timestamp", 
          "formats" : [ "YYYY-MM-dd HH:mm:ss,SSS" ],
          "timezone" : "Europe/Stockholm",
          "on_failure" : [
            {
              "set" : {
                "field" : "_index",
                "value" : "{{ _index }}"
              }
            }
          ]
        }
      },
      {
        "remove": {
          "field": "message"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "_doc",
      "_id": "1",
      "_source": {
        "message": "2018-09-06 08:17:31,105 [ACCEPTED DOWNLOAD] pip-remote-cache:0f/14/e3112808b727f72df9531fc2f00b84d4966e66001748b6883a21c767e902/smmap2-2.0.4-py2.py3-none-any.whl for anonymous/192.168.1.10."
      }
    },
    {
      "_index": "index",
      "_type": "_doc",
      "_id": "2",
      "_source": {
        "message": "2018-09-05 12:00:15,178 [ACCEPTED DELETE] auto-trashcan:something.local/a/b/c/file.jar for _system_."
      }
    },
    {
      "_index": "index",
      "_type": "_doc",
      "_id": "3",
      "_source": {
        "message": "2018-09-10 15:27:10,330 [DENIED LOGIN]  for userid1234/1.2.3.4."
      }
    }
  ]
}

So what's going on within the pipeline block above?

  1. The grok processor have two different patterns that will be used when parsing the incoming data, if any of the patterns matches the document will be indexed accordingly. If the matching fails for some reason the error message will be stored in another index with the name failed-filebeat-2018.09.10 (as an example), this way it's easy to keep track of errors and add e.g. alerting when parsing fails.
  2. The date processor will use the artifactory_timestamp and set it as the timestamp for the indexed document. If this parsing fails, as with the grok processor, we sent the error message to the error-index.
  3. The remove processor removes the message field that contains the raw log entry, we don't need to index this since we've successfully parsed it.

Example of a resulting document, which would be indexed in Elasticsearch now looks like this:

{
  "docs": [
    {
      "doc": {
        "_index": "index",
        "_type": "_doc",
        "_id": "id",
        "_source": {
          "artifactory_timestamp": "2018-09-06 08:17:31,105",
          "file_path": "0f/14/e3112808b727f72df9531fc2f00b84d4966e66001748b6883a21c767e902/smmap2-2.0.4-py2.py3-none-any.whl",
          "action_response": "ACCEPTED",
          "@timestamp": "2018-09-06T08:17:31.105+02:00",
          "action_type": "DOWNLOAD",
          "repo_name": "pip-remote-cache",
          "client_ip": "192.168.1.10",
          "username": "anonymous"
        },
        "_ingest": {
          "timestamp": "2018-09-06T20:53:22.588333Z"
        }
      }
    }
  ]
}

To create the pipeline to start using it in Elasticsearch you'll need to create it, this was done through Dev Tools in Kibana:

PUT _ingest/pipeline/rtf-access-log-pipeline
{
  "description": "rtf access log pipeline",
  "processors": [ 
    {
      "grok": {
        "field": "message",
        "patterns": [ 
            "%{TIMESTAMP_ISO8601:rtf_timestamp} \\[%{DATA:rtf_action_resp} %{DATA:rtf_action_type}\\] %{DATA:rtf_repo_name}\\:%{DATA:rtf_file_path}\\s+.* %{USERNAME:username}\/%{IP:client_ip}",
            "%{TIMESTAMP_ISO8601:rtf_timestamp} \\[%{DATA:rtf_action_resp} %{DATA:rtf_action_type}\\] %{DATA:rtf_repo_name}\\:%{DATA:rtf_file_path}\\s+.* %{GREEDYDATA:other}",
            "%{TIMESTAMP_ISO8601:rtf_timestamp} \\[%{DATA:rtf_action_resp} %{DATA:rtf_action_type}\\]  for %{USERNAME:username}\/%{IP:client_ip}"
        ],
        "on_failure" : [
          {
            "set" : {
              "field" : "_index",
              "value" : "{{ _index }}"
            }
          }
        ]
      }
    },
    {
      "date": {
        "field" : "artifactory_timestamp",
        "target_field": "@timestamp", 
        "formats" : [ "YYYY-MM-dd HH:mm:ss,SSS" ],
        "timezone" : "Europe/Stockholm",
        "on_failure" : [
          {
            "set" : {
              "field" : "_index",
              "value" : "{{ _index }}"
            }
          }
        ]
      }
    },
    {
      "remove": {
        "field": "message"
      }
    }
  ]
}

Use Filebeat to send log data to Elasticsearch

To start sending some real data into Elasticsearch from the Artifactory access.log, now that we have a pipeline ready to parse the log entries, i did the following:

  1. Copied the access.log (latest log) from the Artifactory logs directory
  2. Created a directory to keep all relevant files in the same place
  3. Created a bare minimum filebeat.yml configuration file. Note that we add an pipeline parameter with the name of the pipeline we added in the previous section:
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /logs/access.log
  pipeline: "rtf-access-log-pipeline"

setup.template.enabled: false

output.elasticsearch:
  enabled: true
  hosts: ["HOSTNAME_OR_IP_OF_ELASTICSEARCH:9200"]
  1. Next we run the following command from the working directory containing our log and configuration file: docker run --rm -v $PWD/filebeat.yml:/usr/share/filebeat/filebeat.yml -v $PWD/access.log:/logs/access.log docker.elastic.co/beats/filebeat:6.4.0

When running the container Filebeat will parse the whole log through the input and when sending this data to Elasticsearch it will be parsed with the provided pipeline. If you re-run this container it'll do the same procedure again, this is quite nice for back-filling Elasticsearch with log data.

When you'll run Filebeat to send live logs there's good to know that there's a state file that is used internally to keep track of new log entries.

Todo

TODO:

  • Fix the grok pattern so that it works for all kinds of messages in the access.log
  • Test the stdin input of Filebeat
  • Give the parsed fields searchable and descriptive names e.g. artifactory.repo_name
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment