Skip to content

Instantly share code, notes, and snippets.

@derickson
Last active March 10, 2021 14:26
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save derickson/83022e9c5154165ff975 to your computer and use it in GitHub Desktop.
Save derickson/83022e9c5154165ff975 to your computer and use it in GitHub Desktop.
Logstash config for http_poller pulling of DC Capital bikeshare data and doing an XML split of the contents.
## Example of pulling data from DC Capital bikeshare to Elasticsearch in real time
## HTTP Poller -> XML Splitting -> Elasticsearch
input {
## pull data from Capital Bikeshare every 60 seconds
http_poller {
urls => {
bikeshare_dc => "https://www.capitalbikeshare.com/data/stations/bikeStations.xml"
}
request_timeout => 30
interval => 60
codec => "plain"
metadata_target => "http_poller_metadata"
}
}
filter {
## interpret the message payload as XML
xml {
source => "message"
target => "parsed"
}
## Split out each "station" record in the XML into a different event
split {
field => "[parsed][station]"
add_field => {
## generate a unique id for the station # X the sensor time to prevent duplicates
id => "%{[parsed][station][id]}-%{[parsed][station][lastCommWithServer]}"
stationName => "%{[parsed][station][name]}"
lastCommWithServer => "%{[parsed][station][lastCommWithServer]}"
lat => "%{[parsed][station][lat]}"
long => "%{[parsed][station][long]}"
numBikes => "%{[parsed][station][nbBikes]}"
numEmptyDocks => "%{[parsed][station][nbEmptyDocks]}"
}
}
mutate {
## Convert the numeric fileds to the appropriate data type from strings
convert => {
"numBikes" => "integer"
"numEmptyDocks" => "integer"
"lat" => "float"
"long" => "float"
}
## put the geospatial value in the correct [ longitude, latitude ] format
add_field => { "location" => [ "%{[long]}", "%{[lat]}" ]}
## get rid of the extra fields we don't need
remove_field => [ "message", "parsed", "lat", "long", "host", "http_poller_metadata"]
}
## use the embedded Unix timestamp
date {
match => ["lastCommWithServer", "UNIX_MS"]
remove_field => ["lastCommWithServer"]
}
}
output {
# stdout { codec => rubydebug }
stdout { codec => dots }
elasticsearch {
## use a time aware index name
index => "bikestatus-dc-%{+YYYY.MM.dd}"
protocol => "http"
## not super important, but it makes sense to override the default which is "log"
document_type => "bikestatus"
## use the generated id as the document id to prevent duplicates
document_id => "%{[id]}"
}
}
## INDEX Template (apply this before loading data!)
# PUT _template/bikestatus
# {
# "template": "bikestatus-*",
# "settings": {
# "number_of_shards": 1,
# "number_of_replicas": 0
# },
# "mappings": {
# "_default_": {
# "dynamic_templates": [
# {
# "string_fields": {
# "mapping": {
# "index": "not_analyzed",
# "omit_norms": true,
# "type": "string",
# "doc_values": true
# },
# "match_mapping_type": "string",
# "match": "*"
# }
# }
# ],
# "_all": {
# "enabled": false
# },
# "properties": {
# "@timestamp": {
# "type": "date",
# "format": "dateOptionalTime",
# "doc_values": true
# },
# "location": {
# "type": "geo_point",
# "geohash": true,
# "fielddata" : {
# "format" : "compressed",
# "precision" : "20m"
# }
# },
# "numBikes": { "type": "integer","doc_values": true },
# "numEmptyDocks": { "type": "integer","doc_values": true }
# }
# }
# }
# }
## Kibana Index Template
## [bikestatus-dc-]YYYY.MM.DD
@vigorousblessings
Copy link

Hello derickson, thanks so much for this. It was really enlighting and i was able to recreate the scenario in my environment using the xml data you referred to.
However the XML generated by my logfiles are nested within the header and body tags, but refer to the same event. is there a way around using split {} for nested elements?

sample log file

FEBA_1465455325238getCorpRetailIndicator10.2EBKNGGMT+05:002016-06-09T07:55:25.2380123456789123456789

json format

{
"FIXML": {
"Header": {
"RequestHeader": {
"MessageKey": {
"RequestUUID": "FEBA_1465449977260",
"ServiceRequestId": "RetCustInq",
"ServiceRequestVersion": "10.2",
"ChannelId": "CIF"
},
"RequestMessageInfo": {
"BankId": "NG",
"TimeZone": "GMT+05:00",
"EntityId": "",
"EntityType": "",
"ArmCorrelationId": "",
"MessageDateTime": "2016-06-09T06:26:17.260"
},
"Security": {
"Token": {
"PasswordToken": {
"UserId": "0123456789",
"Password": ""
}
},
"FICertToken": "",
"RealUserLoginSessionId": "",
"RealUser": "",
"RealUserPwd": "",
"SSOTransferToken": ""
}
}
},
"Body": {
"RetCustInqRequest": {
"RetCustInqRq": {
"CustId": "123456789"
}
}
},
"_xmlns": "http://www.jsp.com/fixml",
"_xmlns:xsi": "http://www.w3.org/2001/XMLSchema-instance",
"_xsi:schemaLocation": "http://www.jsp.com/fixml xxxx.xsd"
}
}

@immortalsolitude
Copy link

For some reason this mapping doesn't seem to work with ELK 6.6

Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>"%{[id]}", :_index=>"index-2019.02.11", :_type=>"bikestatus", :routing=>nil}, #LogStash::Event:0x1ac6e8e1], :response=>{"index"=>{"_index"=>"bikestatus-dc-2019.02.11", "_type"=>"bikestatus", "_id"=>"%{[id]}", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Could not convert [location.index] to boolean", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Failed to parse value [not_analyzed] as only [true] or [false] are allowed."}}}}}}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment