Skip to content

Instantly share code, notes, and snippets.

@andrewkroh
Last active May 17, 2022
Embed
What would you like to do?
Adding event.ingested and lag calculations to Winlogbeat events

Adding event.ingested and lag calculations to Winlogbeat events

Create an Ingest Pipeline that will add four fields:

  • event.ingested - Time when the event was processed by Elasticsearch.
  • event.lag.read - Time difference in milliseconds between @timestamp and event.created. This measures how long it took for Winlogbeat read the event from the event log (for WEC this includes the delivery time from forwarder to collector).
  • event.lag.ingest - Time difference in milliseconds between event.created and event.ingested. This measures the time between Winlogbeat reading the event (time when it "created" the document) to when it was written to Elasticsearch.
  • event.lag.total - Time difference in milleseconds between @timestamp and event.ingested.
PUT _ingest/pipeline/winlogbeat-final-pipeline
{
  "processors": [
    {
      "set": {
        "field": "event.ingested",
        "value": "{{_ingest.timestamp}}"
      }
    },
    {
      "script": {
        "source": "def created = Instant.parse(ctx.event.created);\ndef ingested = Instant.parse(ctx.event.ingested);\ndef timestamp = Instant.parse(ctx['@timestamp']);\n\ndef lag1 = ChronoUnit.MILLIS.between(timestamp, created);\ndef lag2 = ChronoUnit.MILLIS.between(created, ingested);\ndef lag3 = ChronoUnit.MILLIS.between(timestamp, ingested);\n\nctx.event.lag = [\"read\":lag1, \"ingest\":lag2, \"total\":lag3];\n\n"
      }
    }
  ],
  "on_failure": [
    {
      "append": {
        "field": "error.message",
        "value": "{{ _ingest.on_failure_message }}"
      }
    }
  ]
}

Add an index template so that the final_pipeline setting is automatically added to new winlogbeat indices. The final_pipeline forces all events written to the index to be processed by the specified pipeline.

PUT _template/winlogbeat-final-pipeline-overlay
{
  "index_patterns": [
    "winlogbeat*"
  ],
  "order": 6,
  "settings": {
    "index": {
      "final_pipeline": "winlogbeat-final-pipeline"
    }
  }
}

Edit the existing index where events are being written now to have the final_pipeline setting. Now incoming events will have those new fields. To be able to use them in a Kibana visualization you will need to refresh the winlogbeat-* index pattern from Kibana.

PUT {use your current winlogbeat write index here}/_settings
{
  "index" : {
    "final_pipeline" : "winlogbeat-final-pipeline"
  }
}
{
"event": {
"ingested": "2020-10-30T16:21:55.848092952Z",
"code": 23,
"lag": {
"total": 2975,
"read": 1960,
"ingest": 1015
},
"provider": "Microsoft-Windows-Sysmon",
"kind": "event",
"created": "2020-10-30T16:21:54.833Z",
"module": "sysmon",
"action": "File Delete (rule: FileDelete)",
"type": [
"deletion"
],
"category": [
"file"
]
}
}
@andrewkroh
Copy link
Author

andrewkroh commented May 17, 2022

Elasticsearch runtime fields can also be used to calculate the difference between timestamps. You can add a new runtime field to a Kibana Data View through the Stack Management UI. This example computes the difference between @timestamp and event.created.

def timestamp = doc['@timestamp'].value.toInstant();
def created = doc['event.created'].value.toInstant();
def lag = ChronoUnit.MILLIS.between(timestamp, created);
emit(lag);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment