Filebeat's httpjson
input can be used query Splunk's REST API and ingest the original data. This doesn't make use of any Splunk processing (CIM or apps). This doc covers 2 common use cases. The first is when you want to ingest from the current forward. For example if you want to compare how Splunk and Elastic differ in detections on the same data. The second use case is when you want to ingest historical data. For example if you want to pull in the last 6 months of data to compare how Splunk and Elastic differ. This method is good for comparing Elastic and Splunk, but shouldn't be considered a final ingest strategy for customers.
This will only work for Filebeat modules where the raw message stored in splunk is the same as what the input provides. Modules that won't work are netflow and Winlogbeat because each of those does heavy processing on the raw data.
At it's most basic, filebeat will send a query to the Splunk search/jobs/export REST API endpoint and ingest the returned json information. Contained in this query is a Splunk search term to get the data you are interested in. For example. "search=search sourcetype=access*" might be sufficient to find all the Apache access logs on the Splunk server. The challenge becomes using httpjson
to query in intervals to either get new information or to get break up large data into manageable chunks.
In this example filebeat will first get all events ingested in Splunk from 10 seconds ago until now. Every call after that will get the new events that have arrived. The events will be limiited to ones with sourcetype of access. This would be a good starting place for ingesting Apache logs.
You need to use version 2 so we have access to the necessary httpjson
features.
config_version: "2"
This is how often httpjson
will query the Splunk REST API endpoint for new data
interval: 10s
Splunk requires username and password to access. By default Splunk administrators have rights to use the REST API, you can assign the permissions to a normal user.
auth.basic.user: username
auth.basic.password: password
The cursor is what is used to track what information has been received. In this case we will create a new variable called index_earliest
and on the last event received we will update it with the value of result.max_indextime
cursor:
index_earliest:
value: '[[.last_event.result.max_indextime]]'
The request contains the method and address URL of the Splunk services/search/jobs/export
endpoint. It may also contain any custom SSL settings. Splunk servers often have a self signed certificate and TLS should be used because a password is being sent on every query.
request.url: https://example.com:8089/services/search/jobs/export
request.method: POST
request.ssl:
certificate_authorities:
- |-
-----BEGIN CERTIFICATE-----
MIIDejCCAmICCQCNHBN8tj/FwzANBgkqhkiG9w0BAQsFADB/MQswCQYDVQQGEwJV
UzELMAkGA1UECAwCQ0ExFjAUBgNVBAcMDVNhbiBGcmFuY2lzY28xDzANBgNVBAoM
BlNwbHVuazEXMBUGA1UEAwwOU3BsdW5rQ29tbW9uQ0ExITAfBgkqhkiG9w0BCQEW
EnN1cHBvcnRAc3BsdW5rLmNvbTAeFw0xNzAxMzAyMDI2NTRaFw0yNzAxMjgyMDI2
NTRaMH8xCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTEWMBQGA1UEBwwNU2FuIEZy
YW5jaXNjbzEPMA0GA1UECgwGU3BsdW5rMRcwFQYDVQQDDA5TcGx1bmtDb21tb25D
QTEhMB8GCSqGSIb3DQEJARYSc3VwcG9ydEBzcGx1bmsuY29tMIIBIjANBgkqhkiG
9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzB9ltVEGk73QvPlxXtA0qMW/SLDQlQMFJ/C/
tXRVJdQsmcW4WsaETteeWZh8AgozO1LqOa3I6UmrWLcv4LmUAh/T3iZWXzHLIqFN
WLSVU+2g0Xkn43xSgQEPSvEK1NqZRZv1SWvx3+oGHgu03AZrqTj0HyLujqUDARFX
sRvBPW/VfDkomHj9b8IuK3qOUwQtIOUr+oKx1tM1J7VNN5NflLw9NdHtlfblw0Ys
5xI5Qxu3rcCxkKQuwz9KRe4iijOIRMAKX28pbakxU9Nk38Ac3PNadgIk0s7R829k
980sqGWkd06+C17OxgjpQbvLOR20FtmQybttUsXGR7Bp07YStwIDAQABMA0GCSqG
SIb3DQEBCwUAA4IBAQCxhQd6KXP2VzK2cwAqdK74bGwl5WnvsyqdPWkdANiKksr4
ZybJZNfdfRso3fA2oK1R8i5Ca8LK3V/UuAsXvG6/ikJtWsJ9jf+eYLou8lS6NVJO
xDN/gxPcHrhToGqi1wfPwDQrNVofZcuQNklcdgZ1+XVuotfTCOXHrRoNmZX+HgkY
gEtPG+r1VwSFowfYqyFXQ5CUeRa3JB7/ObF15WfGUYplbd3wQz/M3PLNKLvz5a1z
LMNXDwN5Pvyb2epyO8LPJu4dGTB4jOGpYLUjG1UUqJo9Oa6D99rv6sId+8qjERtl
ZZc1oaC0PKSzBmq+TpbR27B8Zra3gpoA+gavdRZj
-----END CERTIFICATE-----
verification_mode: certificate
This is where most of the logic is. Each will be discussed separately.
url.params.search
This is sent as the "search" request parameter in the POST body. The part before the "|" is used to select the kind of logs you want. For example "search sourcetype="access*"" is a good start at selecting Apache access logs. The part after the "|" is used to populate every record returned with the maximum index time of this set of results. This will be stored in the cursor so we know the latest results we have received.url.params.output_mode
This is sent as the "outputmode" request parameter in the POST body. "json" is used to tell Splunk to send JSON responses. FYI Splunk actually sends ndjson responses from this endpoint even though you requested json.url.params.index_earliest
This is sent as the "indexearliest" request parameter in the POST body. For Splunk this controls the earliest record that will be returned. The default value when we start is the current time minus 10 seconds. For subsequent calls the value ofcursor.index_earliest
is used. So this means it will be themax_indextime
from the last successful query. This is what allows us to select "new" data.url.params.index_latest
This is sent as the "indexlatest" request parameter in the POST body. For Splunk this controls the last record that will be returned. This is always the current time.header.Content-Type
is set to "application/x-www-form-urlencoded" because this is what Splunk expects and it encodes the prameters in the POST body.
request.transforms:
- set:
target: url.params.search
value: |-
search sourcetype=access* | streamstats max(_indextime) AS max_indextime
- set:
target: url.params.output_mode
value: "json"
- set:
target: url.params.index_earliest
value: '[[ .cursor.index_earliest ]]'
default: '[[(now (parseDuration "-10s")).Unix]]'
- set:
target: url.params.index_latest
value: '[[(now).Unix]]'
- set:
target: header.Content-Type
value: application/x-www-form-urlencoded
These variables control how httpjson
treats the response it gets. Since multiple events are returned per request, this tells us how to break the response up to get individual events. decode_as
is set to "application/x-ndjson", because this is what Splunk returns but they don't set the Content-Type correctly. split
is set because Splunk can occasionally send multiple raw events inside each JSON. Those multiple events are separated by newlines.
response.decode_as: application/x-ndjson
response.split:
target: body.result._raw
type: string
delimiter: "\n"
We don't want Filebeat to add it's own host information to the event, so we turn that off.
publisher_pipeline.disable_host: true
Most of the Filebeat pipelines expect the raw message to be in the "message" field. The following processors move the raw message into the correct place and delete the unused fields. Additionally, if no results are returned the "dropevent" processor will drop it. And to make sure we don't index data twice, the "fingerprint" processor produces a unique hash for each event.
processors:
- decode_json_fields:
fields: message
target: json
add_error_key: true
- drop_event:
when:
not:
has_fields: ['json.result']
- fingerprint:
fields:
- json.result._cd
- json.result._indextime
- json.result._raw
- json.result._time
- json.result.host
- json.result.source
target_field: "@metadata._id"
- drop_fields:
fields: message
- rename:
fields:
- from: json.result._raw
to: message
- from: json.result.host
to: host.name
- from: json.result.source
to: file.path
ignore_missing: true
fail_on_error: false
- drop_fields:
fields: json
config_version: "2"
interval: 10s
auth.basic.user: username
auth.basic.password: password
cursor:
index_earliest:
value: '[[.last_event.result.max_indextime]]'
request.url: https://example.com:8089/services/search/jobs/export
request.ssl:
certificate_authorities:
- |-
-----BEGIN CERTIFICATE-----
MIIDejCCAmICCQCNHBN8tj/FwzANBgkqhkiG9w0BAQsFADB/MQswCQYDVQQGEwJV
UzELMAkGA1UECAwCQ0ExFjAUBgNVBAcMDVNhbiBGcmFuY2lzY28xDzANBgNVBAoM
BlNwbHVuazEXMBUGA1UEAwwOU3BsdW5rQ29tbW9uQ0ExITAfBgkqhkiG9w0BCQEW
EnN1cHBvcnRAc3BsdW5rLmNvbTAeFw0xNzAxMzAyMDI2NTRaFw0yNzAxMjgyMDI2
NTRaMH8xCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTEWMBQGA1UEBwwNU2FuIEZy
YW5jaXNjbzEPMA0GA1UECgwGU3BsdW5rMRcwFQYDVQQDDA5TcGx1bmtDb21tb25D
QTEhMB8GCSqGSIb3DQEJARYSc3VwcG9ydEBzcGx1bmsuY29tMIIBIjANBgkqhkiG
9w0BAQEFAAOCAQ8AMIIBCgKCAQEAzB9ltVEGk73QvPlxXtA0qMW/SLDQlQMFJ/C/
tXRVJdQsmcW4WsaETteeWZh8AgozO1LqOa3I6UmrWLcv4LmUAh/T3iZWXzHLIqFN
WLSVU+2g0Xkn43xSgQEPSvEK1NqZRZv1SWvx3+oGHgu03AZrqTj0HyLujqUDARFX
sRvBPW/VfDkomHj9b8IuK3qOUwQtIOUr+oKx1tM1J7VNN5NflLw9NdHtlfblw0Ys
5xI5Qxu3rcCxkKQuwz9KRe4iijOIRMAKX28pbakxU9Nk38Ac3PNadgIk0s7R829k
980sqGWkd06+C17OxgjpQbvLOR20FtmQybttUsXGR7Bp07YStwIDAQABMA0GCSqG
SIb3DQEBCwUAA4IBAQCxhQd6KXP2VzK2cwAqdK74bGwl5WnvsyqdPWkdANiKksr4
ZybJZNfdfRso3fA2oK1R8i5Ca8LK3V/UuAsXvG6/ikJtWsJ9jf+eYLou8lS6NVJO
xDN/gxPcHrhToGqi1wfPwDQrNVofZcuQNklcdgZ1+XVuotfTCOXHrRoNmZX+HgkY
gEtPG+r1VwSFowfYqyFXQ5CUeRa3JB7/ObF15WfGUYplbd3wQz/M3PLNKLvz5a1z
LMNXDwN5Pvyb2epyO8LPJu4dGTB4jOGpYLUjG1UUqJo9Oa6D99rv6sId+8qjERtl
ZZc1oaC0PKSzBmq+TpbR27B8Zra3gpoA+gavdRZj
-----END CERTIFICATE-----
verification_mode: certificate
request.method: POST
request.transforms:
- set:
target: url.params.search
value: |-
search sourcetype=access* | streamstats max(_indextime) AS max_indextime
- set:
target: url.params.output_mode
value: "json"
- set:
target: url.params.index_earliest
value: '[[ .cursor.index_earliest ]]'
default: '[[(now (parseDuration "-10s")).Unix]]'
- set:
target: url.params.index_latest
value: '[[(now).Unix]]'
- set:
target: header.Content-Type
value: application/x-www-form-urlencoded
response.decode_as: application/x-ndjson
response.split:
target: body.result._raw
type: string
delimiter: "\n"
publisher_pipeline.disable_host: true
processors:
- decode_json_fields:
fields: message
target: json
add_error_key: true
- drop_event:
when:
not:
has_fields: ['json.result']
- fingerprint:
fields:
- json.result._cd
- json.result._indextime
- json.result._raw
- json.result._time
- json.result.host
- json.result.source
target_field: "@metadata._id"
- drop_fields:
fields: message
- rename:
fields:
- from: json.result._raw
to: message
- from: json.result.host
to: host.name
- from: json.result.source
to: file.path
ignore_missing: true
fail_on_error: false
- drop_fields:
fields: json