Skip to content

Instantly share code, notes, and snippets.

@leehinman
Last active September 29, 2023 23:59
Show Gist options
  • Save leehinman/2a7788ac96a091d21f625eff67c7fb48 to your computer and use it in GitHub Desktop.
Save leehinman/2a7788ac96a091d21f625eff67c7fb48 to your computer and use it in GitHub Desktop.
Using Filebeat to ingest data from Splunk

Intro

Filebeat's httpjson input can be used query Splunk's REST API and ingest the original data. This doesn't make use of any Splunk processing (CIM or apps). This doc covers 2 common use cases. The first is when you want to ingest from the current forward. For example if you want to compare how Splunk and Elastic differ in detections on the same data. The second use case is when you want to ingest historical data. For example if you want to pull in the last 6 months of data to compare how Splunk and Elastic differ. This method is good for comparing Elastic and Splunk, but shouldn't be considered a final ingest strategy for customers.

Limitations

This will only work for Filebeat modules where the raw message stored in splunk is the same as what the input provides. Modules that won't work are netflow and Winlogbeat because each of those does heavy processing on the raw data.

Architecture

At it's most basic, filebeat will send a query to the Splunk search/jobs/export REST API endpoint and ingest the returned json information. Contained in this query is a Splunk search term to get the data you are interested in. For example. "search=search sourcetype=access*" might be sufficient to find all the Apache access logs on the Splunk server. The challenge becomes using httpjson to query in intervals to either get new information or to get break up large data into manageable chunks.

Current Data Configuration

In this example filebeat will first get all events ingested in Splunk from 10 seconds ago until now. Every call after that will get the new events that have arrived. The events will be limiited to ones with sourcetype of access. This would be a good starting place for ingesting Apache logs.

Annotated Config

Version

You need to use version 2 so we have access to the necessary httpjson features.

config_version: "2"

Interval

This is how often httpjson will query the Splunk REST API endpoint for new data

interval: 10s

Authenitcation

Splunk requires username and password to access. By default Splunk administrators have rights to use the REST API, you can assign the permissions to a normal user.

auth.basic.user: username
auth.basic.password: password

Cursor

The cursor is what is used to track what information has been received. In this case we will create a new variable called index_earliest and on the last event received we will update it with the value of result.max_indextime

cursor:
  index_earliest:
    value: '[[.last_event.result.max_indextime]]'

Request

The request contains the method and address URL of the Splunk services/search/jobs/export endpoint. It may also contain any custom SSL settings. Splunk servers often have a self signed certificate and TLS should be used because a password is being sent on every query.

request.url: https://example.com:8089/services/search/jobs/export
request.method: POST
request.ssl:
  certificate_authorities:
    - |-
      -----BEGIN CERTIFICATE-----
      MIIDejCCAmICCQCNHBN8tj/FwzANBgkqhkiG9w0BAQsFADB/MQswCQYDVQQGEwJV
      UzELMAkGA1UECAwCQ0ExFjAUBgNVBAcMDVNhbiBGcmFuY2lzY28xDzANBgNVBAoM
      BlNwbHVuazEXMBUGA1UEAwwOU3BsdW5rQ29tbW9uQ0ExITAfBgkqhkiG9w0BCQEW
      EnN1cHBvcnRAc3BsdW5rLmNvbTAeFw0xNzAxMzAyMDI2NTRaFw0yNzAxMjgyMDI2
      NTRaMH8xCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTEWMBQGA1UEBwwNU2FuIEZy
      YW5jaXNjbzEPMA0GA1UECgwGU3BsdW5rMRcwFQYDVQQDDA5TcGx1bmtDb21tb25D
      QTEhMB8GCSqGSIb3DQEJARYSc3VwcG9ydEBzcGx1bmsuY29tMIIBIjANBgkqhkiG
      9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzB9ltVEGk73QvPlxXtA0qMW/SLDQlQMFJ/C/
      tXRVJdQsmcW4WsaETteeWZh8AgozO1LqOa3I6UmrWLcv4LmUAh/T3iZWXzHLIqFN
      WLSVU+2g0Xkn43xSgQEPSvEK1NqZRZv1SWvx3+oGHgu03AZrqTj0HyLujqUDARFX
      sRvBPW/VfDkomHj9b8IuK3qOUwQtIOUr+oKx1tM1J7VNN5NflLw9NdHtlfblw0Ys
      5xI5Qxu3rcCxkKQuwz9KRe4iijOIRMAKX28pbakxU9Nk38Ac3PNadgIk0s7R829k
      980sqGWkd06+C17OxgjpQbvLOR20FtmQybttUsXGR7Bp07YStwIDAQABMA0GCSqG
      SIb3DQEBCwUAA4IBAQCxhQd6KXP2VzK2cwAqdK74bGwl5WnvsyqdPWkdANiKksr4
      ZybJZNfdfRso3fA2oK1R8i5Ca8LK3V/UuAsXvG6/ikJtWsJ9jf+eYLou8lS6NVJO
      xDN/gxPcHrhToGqi1wfPwDQrNVofZcuQNklcdgZ1+XVuotfTCOXHrRoNmZX+HgkY
      gEtPG+r1VwSFowfYqyFXQ5CUeRa3JB7/ObF15WfGUYplbd3wQz/M3PLNKLvz5a1z
      LMNXDwN5Pvyb2epyO8LPJu4dGTB4jOGpYLUjG1UUqJo9Oa6D99rv6sId+8qjERtl
      ZZc1oaC0PKSzBmq+TpbR27B8Zra3gpoA+gavdRZj
      -----END CERTIFICATE-----
  verification_mode: certificate

Request Transforms

This is where most of the logic is. Each will be discussed separately.

  • url.params.search This is sent as the "search" request parameter in the POST body. The part before the "|" is used to select the kind of logs you want. For example "search sourcetype="access*"" is a good start at selecting Apache access logs. The part after the "|" is used to populate every record returned with the maximum index time of this set of results. This will be stored in the cursor so we know the latest results we have received.
  • url.params.output_mode This is sent as the "outputmode" request parameter in the POST body. "json" is used to tell Splunk to send JSON responses. FYI Splunk actually sends ndjson responses from this endpoint even though you requested json.
  • url.params.index_earliest This is sent as the "indexearliest" request parameter in the POST body. For Splunk this controls the earliest record that will be returned. The default value when we start is the current time minus 10 seconds. For subsequent calls the value of cursor.index_earliest is used. So this means it will be the max_indextime from the last successful query. This is what allows us to select "new" data.
  • url.params.index_latest This is sent as the "indexlatest" request parameter in the POST body. For Splunk this controls the last record that will be returned. This is always the current time.
  • header.Content-Type is set to "application/x-www-form-urlencoded" because this is what Splunk expects and it encodes the prameters in the POST body.
  request.transforms:
    - set:
        target: url.params.search
        value: |-
          search sourcetype=access* | streamstats max(_indextime) AS max_indextime
    - set:
        target: url.params.output_mode
        value: "json"
    - set:
        target: url.params.index_earliest
        value: '[[ .cursor.index_earliest ]]'
        default: '[[(now (parseDuration "-10s")).Unix]]'
    - set:
        target: url.params.index_latest
        value: '[[(now).Unix]]'
    - set:
        target: header.Content-Type
value: application/x-www-form-urlencoded

Response

These variables control how httpjson treats the response it gets. Since multiple events are returned per request, this tells us how to break the response up to get individual events. decode_as is set to "application/x-ndjson", because this is what Splunk returns but they don't set the Content-Type correctly. split is set because Splunk can occasionally send multiple raw events inside each JSON. Those multiple events are separated by newlines.

response.decode_as: application/x-ndjson
response.split:
  target: body.result._raw
  type: string
  delimiter: "\n"

Publisherpipeline

We don't want Filebeat to add it's own host information to the event, so we turn that off.

publisher_pipeline.disable_host: true

Processors

Most of the Filebeat pipelines expect the raw message to be in the "message" field. The following processors move the raw message into the correct place and delete the unused fields. Additionally, if no results are returned the "dropevent" processor will drop it. And to make sure we don't index data twice, the "fingerprint" processor produces a unique hash for each event.

processors:
  - decode_json_fields:
      fields: message
      target: json
      add_error_key: true
  - drop_event:
      when:
        not:
          has_fields: ['json.result']
  - fingerprint:
      fields:
        - json.result._cd
        - json.result._indextime
        - json.result._raw
        - json.result._time
        - json.result.host
        - json.result.source
      target_field: "@metadata._id"
  - drop_fields:
      fields: message
  - rename:
      fields:
        - from: json.result._raw
          to: message
        - from: json.result.host
          to: host.name
        - from: json.result.source
          to: file.path
      ignore_missing: true
      fail_on_error: false
  - drop_fields:
      fields: json

Full Config

config_version: "2"
interval: 10s
auth.basic.user: username
auth.basic.password: password
cursor:
  index_earliest:
    value: '[[.last_event.result.max_indextime]]'
request.url: https://example.com:8089/services/search/jobs/export
request.ssl:
  certificate_authorities:
    - |-
      -----BEGIN CERTIFICATE-----
      MIIDejCCAmICCQCNHBN8tj/FwzANBgkqhkiG9w0BAQsFADB/MQswCQYDVQQGEwJV
      UzELMAkGA1UECAwCQ0ExFjAUBgNVBAcMDVNhbiBGcmFuY2lzY28xDzANBgNVBAoM
      BlNwbHVuazEXMBUGA1UEAwwOU3BsdW5rQ29tbW9uQ0ExITAfBgkqhkiG9w0BCQEW
      EnN1cHBvcnRAc3BsdW5rLmNvbTAeFw0xNzAxMzAyMDI2NTRaFw0yNzAxMjgyMDI2
      NTRaMH8xCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTEWMBQGA1UEBwwNU2FuIEZy
      YW5jaXNjbzEPMA0GA1UECgwGU3BsdW5rMRcwFQYDVQQDDA5TcGx1bmtDb21tb25D
      QTEhMB8GCSqGSIb3DQEJARYSc3VwcG9ydEBzcGx1bmsuY29tMIIBIjANBgkqhkiG
      9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzB9ltVEGk73QvPlxXtA0qMW/SLDQlQMFJ/C/
      tXRVJdQsmcW4WsaETteeWZh8AgozO1LqOa3I6UmrWLcv4LmUAh/T3iZWXzHLIqFN
      WLSVU+2g0Xkn43xSgQEPSvEK1NqZRZv1SWvx3+oGHgu03AZrqTj0HyLujqUDARFX
      sRvBPW/VfDkomHj9b8IuK3qOUwQtIOUr+oKx1tM1J7VNN5NflLw9NdHtlfblw0Ys
      5xI5Qxu3rcCxkKQuwz9KRe4iijOIRMAKX28pbakxU9Nk38Ac3PNadgIk0s7R829k
      980sqGWkd06+C17OxgjpQbvLOR20FtmQybttUsXGR7Bp07YStwIDAQABMA0GCSqG
      SIb3DQEBCwUAA4IBAQCxhQd6KXP2VzK2cwAqdK74bGwl5WnvsyqdPWkdANiKksr4
      ZybJZNfdfRso3fA2oK1R8i5Ca8LK3V/UuAsXvG6/ikJtWsJ9jf+eYLou8lS6NVJO
      xDN/gxPcHrhToGqi1wfPwDQrNVofZcuQNklcdgZ1+XVuotfTCOXHrRoNmZX+HgkY
      gEtPG+r1VwSFowfYqyFXQ5CUeRa3JB7/ObF15WfGUYplbd3wQz/M3PLNKLvz5a1z
      LMNXDwN5Pvyb2epyO8LPJu4dGTB4jOGpYLUjG1UUqJo9Oa6D99rv6sId+8qjERtl
      ZZc1oaC0PKSzBmq+TpbR27B8Zra3gpoA+gavdRZj
      -----END CERTIFICATE-----
     verification_mode: certificate
request.method: POST
request.transforms:
  - set:
      target: url.params.search
      value: |-
        search sourcetype=access* | streamstats max(_indextime) AS max_indextime
  - set:
      target: url.params.output_mode
      value: "json"
  - set:
      target: url.params.index_earliest
      value: '[[ .cursor.index_earliest ]]'
      default: '[[(now (parseDuration "-10s")).Unix]]'
  - set:
      target: url.params.index_latest
      value: '[[(now).Unix]]'
  - set:
      target: header.Content-Type
      value: application/x-www-form-urlencoded
response.decode_as: application/x-ndjson
response.split:
  target: body.result._raw
  type: string
  delimiter: "\n"
publisher_pipeline.disable_host: true
processors:
  - decode_json_fields:
      fields: message
      target: json
      add_error_key: true
  - drop_event:
      when:
        not:
          has_fields: ['json.result']
  - fingerprint:
      fields:
        - json.result._cd
        - json.result._indextime
        - json.result._raw
        - json.result._time
        - json.result.host
        - json.result.source
      target_field: "@metadata._id"
  - drop_fields:
      fields: message
  - rename:
      fields:
        - from: json.result._raw
          to: message
        - from: json.result.host
          to: host.name
        - from: json.result.source
          to: file.path
      ignore_missing: true
      fail_on_error: false
  - drop_fields:
      fields: json
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment