Last active
July 25, 2018 13:36
-
-
Save giwa/41fd15476c2ebeb42db3 to your computer and use it in GitHub Desktop.
トラフィックモニタリング with Kibana, ElasticSearch and Python ref: http://qiita.com/giwa/items/a24c722d8bc1b9a428cb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ brew install elasticsearch | |
$ elasticsearch -v | |
Version: 1.4.4, Build: c88f77f/2015-02-19T13:05:36Z, JVM: 1.7.0_72 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ wget https://download.elasticsearch.org/kibana/kibana/kibana-4.0.1-darwin-x64.tar.gz | |
$ tar zxvf kibana-4.0.1-darwin-x64.tar.gz | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ pip install pyshark elasticsearch requests |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ python packet_cap_es.py <interface> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This app captures packets and extract five tupels. | |
Store these data to elastic search. | |
Elastic search and kibana creates real time packet monitering | |
bashbord. | |
""" | |
import json | |
import sys | |
import datetime | |
import time | |
import pyshark | |
import requests | |
from elasticsearch import Elasticsearch | |
from elasticsearch import helpers | |
URL = "http://localhost:9200" | |
INDEX_URL = URL + "/packets" | |
TYPE_URL = INDEX_URL + "/packet" | |
ACTION = {"_index" : "packets", | |
"_type" : "packet", | |
"_source": {} | |
} | |
def delete_index(): | |
"""Delete an index in elastic search.""" | |
requests.delete(INDEX_URL) | |
def create_index(): | |
"""Create an index in elastic search with timestamp enabled.""" | |
requests.put(INDEX_URL) | |
setting = {"packet" : { | |
"_timestamp" : { | |
"enabled" : True, | |
"path" : "capture_timestamp", | |
}, | |
"numeric_detection" : False, | |
"properties" : { | |
"dstip" : { "type":"string", | |
"index" : "not_analyzed", | |
"store" : True}, | |
"srcip" : { "type":"string", | |
"index" : "not_analyzed", | |
"store" : True} | |
} | |
}} | |
for _ in range(1, 100): | |
try: | |
r = requests.put(TYPE_URL + "/_mapping", data=json.dumps(setting)) | |
break | |
except: | |
time.sleep(1) | |
pass | |
def main(): | |
"""Extract packets and store them to ES""" | |
capture = pyshark.LiveCapture(interface=sys.argv[1]) | |
packet_que = list() | |
es = Elasticsearch() | |
end_time = None | |
for packet in capture.sniff_continuously(): | |
if packet.transport_layer in ("UDP", "TCP"): | |
try: | |
# Why does ES add 9 hours automatically? | |
localtime = float(packet.sniff_timestamp) - 60 * 60 * 9 # GMT + 9 | |
row_timestamp = datetime.datetime.fromtimestamp(localtime) | |
timestamp = row_timestamp.strftime("%Y-%m-%dT%H:%M:%SZ") | |
version = int(packet[1].version) | |
# ip v6 does not have protocol. It has next header instead. | |
if version == 4: | |
protocol = int(packet[1].proto) | |
elif version == 6: | |
protocol = int(packet[1].nxt) | |
else: | |
protocol = None | |
dstip = packet[1].dst | |
srcip = packet[1].src | |
dstport = int(packet[2].dstport) | |
srcport = int(packet[2].srcport) | |
parsed_packet = dict(version=version, protocol=protocol, | |
dstip=dstip, srcip=srcip, | |
dstport=dstport, srcport=srcport, | |
capture_timestamp=timestamp) | |
# For historical graph | |
parsed_packet["@timestamp"] = timestamp | |
action = ACTION.copy() | |
action["_source"].update(parsed_packet) | |
packet_que.append(action) | |
current = time.time() | |
while(end_time is None or current - end_time >= 3): | |
helpers.bulk(es, packet_que) | |
del packet_que[0:len(packet_que)] | |
end_time = time.time() | |
break | |
except Exception as e: | |
time.sleep(1) | |
if __name__ == "__main__": | |
if len(sys.argv) != 2: | |
print >>sys.stderr, "python packet_cap_es.py <interface>" | |
exit(1) | |
delete_index() | |
create_index() | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment