Last active
October 10, 2019 04:53
-
-
Save alexwoolford/749307e571ecf5512db27037c51e7af8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# get destination IP's from syslog | |
CREATE STREAM SRC_DST_GEO WITH AS SELECT | |
SYSLOG.EXTENSION['src'] "SRC" | |
, CAST(SYSLOG.EXTENSION['spt'] AS INTEGER) "SPT" | |
, SYSLOG.EXTENSION['dst'] "DST" | |
, CAST(SYSLOG.EXTENSION['dpt'] AS INTEGER) "DPT" | |
, GETGEOFORIP(SYSLOG.EXTENSION['dst']) "DST_GEO" | |
, CAST(SYSLOG.EXTENSION['in'] AS BIGINT) "BYTES_IN" | |
, CAST(SYSLOG.EXTENSION['out'] AS BIGINT) "BYTES_OUT" | |
FROM SYSLOG | |
# create geoip stream with "lat,lon" | |
drop stream src_dst_geo_elastic; | |
kafka-topics --bootstrap-server cp01.woolford.io:9092 --delete --topic SRC_DST_GEO_ELASTIC | |
create stream src_dst_geo_elastic as | |
select | |
rowtime as record_timestamp, | |
src, | |
spt, | |
dst, | |
dpt, | |
CAST(dst_geo->location->lat AS STRING) + ',' + CAST(dst_geo->location->lon AS STRING) AS dst_location, | |
bytes_in, | |
bytes_out | |
from src_dst_geo | |
where dst_geo->location->lat is not null | |
and dst_geo->location->lon is not null; | |
# check that the messages are flowing | |
kafka-avro-console-consumer --bootstrap-server cp01.woolford.io:9092 --topic SRC_DST_GEO_ELASTIC --property schema.registry.url="http://cp01.woolford.io:8081" --from-beginning | |
# create a kafkaconnect template in elastic to auto-parse timestamp and location | |
curl -XDELETE elastic.woolford.io:9200/_template/kafkaconnect/ | |
curl -XPUT "http://elastic.woolford.io:9200/_template/kafkaconnect/" -H 'Content-Type: application/json' -d ' | |
{ | |
"template": "*", | |
"settings": { | |
"number_of_shards": 1, | |
"number_of_replicas": 1 | |
}, | |
"mappings": { | |
"dynamic_templates": [ | |
{ | |
"dates": { | |
"match": "*TIMESTAMP", | |
"mapping": { | |
"type": "date" | |
} | |
} | |
}, | |
{ | |
"locations": { | |
"match": "*LOCATION", | |
"mapping": { | |
"type": "geo_point" | |
} | |
} | |
} | |
] | |
} | |
}' | |
# delete the index | |
curl -XDELETE elastic.woolford.io:9200/src_dst_geo_elastic | |
# create elastic sink connector | |
curl -X DELETE http://cp01.woolford.io:8083/connectors/geoip-lookup | |
curl -X PUT -H "Content-Type: application/json" --data ' | |
{ | |
"name": "geoip-lookup", | |
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", | |
"type.name": "_doc", | |
"auto.create.indices.at.start": "true", | |
"topics": "SRC_DST_GEO_ELASTIC", | |
"batch.size": "20", | |
"max.in.flight.requests": "5", | |
"key.ignore": "true", | |
"schema.ignore": "false", | |
"value.converter.schema.registry.url": "http://cp01.woolford.io:8081", | |
"value.converter": "io.confluent.connect.avro.AvroConverter", | |
"connection.url": "http://elastic.woolford.io:9200", | |
"key.converter": "org.apache.kafka.connect.storage.StringConverter", | |
"key.converter.schema.registry.url": "http://cp01.woolford.io:8081", | |
"errors.tolerance": "all" | |
}' http://cp02.woolford.io:8083/connectors/geoip-lookup/config |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment