Skip to content

Instantly share code, notes, and snippets.

@andrewkroh
Last active March 6, 2023 19:09
Show Gist options
  • Save andrewkroh/3ecef990b31f7021f139471b785903f1 to your computer and use it in GitHub Desktop.
Save andrewkroh/3ecef990b31f7021f139471b785903f1 to your computer and use it in GitHub Desktop.
Adding event.ingested and lag calculations to Winlogbeat events

Adding event.ingested and lag calculations to Winlogbeat events

Create an Ingest Pipeline that will add four fields:

  • event.ingested - Time when the event was processed by Elasticsearch.
  • event.lag.read - Time difference in milliseconds between @timestamp and event.created. This measures how long it took for Winlogbeat read the event from the event log (for WEC this includes the delivery time from forwarder to collector).
  • event.lag.ingest - Time difference in milliseconds between event.created and event.ingested. This measures the time between Winlogbeat reading the event (time when it "created" the document) to when it was written to Elasticsearch.
  • event.lag.total - Time difference in milleseconds between @timestamp and event.ingested.
PUT _ingest/pipeline/winlogbeat-final-pipeline
{
  "processors": [
    {
      "set": {
        "field": "event.ingested",
        "value": "{{_ingest.timestamp}}"
      }
    },
    {
      "script": {
        "source": "def created = Instant.parse(ctx.event.created);\ndef ingested = Instant.parse(ctx.event.ingested);\ndef timestamp = Instant.parse(ctx['@timestamp']);\n\ndef lag1 = ChronoUnit.MILLIS.between(timestamp, created);\ndef lag2 = ChronoUnit.MILLIS.between(created, ingested);\ndef lag3 = ChronoUnit.MILLIS.between(timestamp, ingested);\n\nctx.event.lag = [\"read\":lag1, \"ingest\":lag2, \"total\":lag3];\n\n"
      }
    }
  ],
  "on_failure": [
    {
      "append": {
        "field": "error.message",
        "value": "{{ _ingest.on_failure_message }}"
      }
    }
  ]
}

Add an index template so that the final_pipeline setting is automatically added to new winlogbeat indices. The final_pipeline forces all events written to the index to be processed by the specified pipeline.

PUT _template/winlogbeat-final-pipeline-overlay
{
  "index_patterns": [
    "winlogbeat*"
  ],
  "order": 6,
  "settings": {
    "index": {
      "final_pipeline": "winlogbeat-final-pipeline"
    }
  }
}

Edit the existing index where events are being written now to have the final_pipeline setting. Now incoming events will have those new fields. To be able to use them in a Kibana visualization you will need to refresh the winlogbeat-* index pattern from Kibana.

PUT {use your current winlogbeat write index here}/_settings
{
  "index" : {
    "final_pipeline" : "winlogbeat-final-pipeline"
  }
}
{
"event": {
"ingested": "2020-10-30T16:21:55.848092952Z",
"code": 23,
"lag": {
"total": 2975,
"read": 1960,
"ingest": 1015
},
"provider": "Microsoft-Windows-Sysmon",
"kind": "event",
"created": "2020-10-30T16:21:54.833Z",
"module": "sysmon",
"action": "File Delete (rule: FileDelete)",
"type": [
"deletion"
],
"category": [
"file"
]
}
}
@andrewkroh
Copy link
Author

andrewkroh commented May 17, 2022

Elasticsearch runtime fields can also be used to calculate the difference between timestamps. You can add a new runtime field to a Kibana Data View through the Stack Management UI. This example computes the difference between @timestamp and event.created similar to the event.lag.read defined above.

def timestamp = doc['@timestamp'].value.toInstant();
def created = doc['event.created'].value.toInstant();
def lag = ChronoUnit.MILLIS.between(timestamp, created);
emit(lag);

@veritasr3x
Copy link

Thank you for this! My organization implemented the ingest pipeline for non-beats indexes and it works for those as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment