Skip to content

Instantly share code, notes, and snippets.

@alexwoolford
Last active October 10, 2019 04:53
Show Gist options
  • Save alexwoolford/749307e571ecf5512db27037c51e7af8 to your computer and use it in GitHub Desktop.
Save alexwoolford/749307e571ecf5512db27037c51e7af8 to your computer and use it in GitHub Desktop.
# get destination IP's from syslog
CREATE STREAM SRC_DST_GEO WITH AS SELECT
SYSLOG.EXTENSION['src'] "SRC"
, CAST(SYSLOG.EXTENSION['spt'] AS INTEGER) "SPT"
, SYSLOG.EXTENSION['dst'] "DST"
, CAST(SYSLOG.EXTENSION['dpt'] AS INTEGER) "DPT"
, GETGEOFORIP(SYSLOG.EXTENSION['dst']) "DST_GEO"
, CAST(SYSLOG.EXTENSION['in'] AS BIGINT) "BYTES_IN"
, CAST(SYSLOG.EXTENSION['out'] AS BIGINT) "BYTES_OUT"
FROM SYSLOG
# create geoip stream with "lat,lon"
drop stream src_dst_geo_elastic;
kafka-topics --bootstrap-server cp01.woolford.io:9092 --delete --topic SRC_DST_GEO_ELASTIC
create stream src_dst_geo_elastic as
select
rowtime as record_timestamp,
src,
spt,
dst,
dpt,
CAST(dst_geo->location->lat AS STRING) + ',' + CAST(dst_geo->location->lon AS STRING) AS dst_location,
bytes_in,
bytes_out
from src_dst_geo
where dst_geo->location->lat is not null
and dst_geo->location->lon is not null;
# check that the messages are flowing
kafka-avro-console-consumer --bootstrap-server cp01.woolford.io:9092 --topic SRC_DST_GEO_ELASTIC --property schema.registry.url="http://cp01.woolford.io:8081" --from-beginning
# create a kafkaconnect template in elastic to auto-parse timestamp and location
curl -XDELETE elastic.woolford.io:9200/_template/kafkaconnect/
curl -XPUT "http://elastic.woolford.io:9200/_template/kafkaconnect/" -H 'Content-Type: application/json' -d '
{
"template": "*",
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"dynamic_templates": [
{
"dates": {
"match": "*TIMESTAMP",
"mapping": {
"type": "date"
}
}
},
{
"locations": {
"match": "*LOCATION",
"mapping": {
"type": "geo_point"
}
}
}
]
}
}'
# delete the index
curl -XDELETE elastic.woolford.io:9200/src_dst_geo_elastic
# create elastic sink connector
curl -X DELETE http://cp01.woolford.io:8083/connectors/geoip-lookup
curl -X PUT -H "Content-Type: application/json" --data '
{
"name": "geoip-lookup",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type.name": "_doc",
"auto.create.indices.at.start": "true",
"topics": "SRC_DST_GEO_ELASTIC",
"batch.size": "20",
"max.in.flight.requests": "5",
"key.ignore": "true",
"schema.ignore": "false",
"value.converter.schema.registry.url": "http://cp01.woolford.io:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"connection.url": "http://elastic.woolford.io:9200",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schema.registry.url": "http://cp01.woolford.io:8081",
"errors.tolerance": "all"
}' http://cp02.woolford.io:8083/connectors/geoip-lookup/config
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment