Skip to content

Instantly share code, notes, and snippets.

@giwa
Last active July 25, 2018 13:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save giwa/41fd15476c2ebeb42db3 to your computer and use it in GitHub Desktop.
Save giwa/41fd15476c2ebeb42db3 to your computer and use it in GitHub Desktop.
トラフィックモニタリング with Kibana, ElasticSearch and Python ref: http://qiita.com/giwa/items/a24c722d8bc1b9a428cb
$ brew install elasticsearch
$ elasticsearch -v
Version: 1.4.4, Build: c88f77f/2015-02-19T13:05:36Z, JVM: 1.7.0_72
$ wget https://download.elasticsearch.org/kibana/kibana/kibana-4.0.1-darwin-x64.tar.gz
$ tar zxvf kibana-4.0.1-darwin-x64.tar.gz
$ pip install pyshark elasticsearch requests
$ python packet_cap_es.py <interface>
"""
This app captures packets and extract five tupels.
Store these data to elastic search.
Elastic search and kibana creates real time packet monitering
bashbord.
"""
import json
import sys
import datetime
import time
import pyshark
import requests
from elasticsearch import Elasticsearch
from elasticsearch import helpers
URL = "http://localhost:9200"
INDEX_URL = URL + "/packets"
TYPE_URL = INDEX_URL + "/packet"
ACTION = {"_index" : "packets",
"_type" : "packet",
"_source": {}
}
def delete_index():
"""Delete an index in elastic search."""
requests.delete(INDEX_URL)
def create_index():
"""Create an index in elastic search with timestamp enabled."""
requests.put(INDEX_URL)
setting = {"packet" : {
"_timestamp" : {
"enabled" : True,
"path" : "capture_timestamp",
},
"numeric_detection" : False,
"properties" : {
"dstip" : { "type":"string",
"index" : "not_analyzed",
"store" : True},
"srcip" : { "type":"string",
"index" : "not_analyzed",
"store" : True}
}
}}
for _ in range(1, 100):
try:
r = requests.put(TYPE_URL + "/_mapping", data=json.dumps(setting))
break
except:
time.sleep(1)
pass
def main():
"""Extract packets and store them to ES"""
capture = pyshark.LiveCapture(interface=sys.argv[1])
packet_que = list()
es = Elasticsearch()
end_time = None
for packet in capture.sniff_continuously():
if packet.transport_layer in ("UDP", "TCP"):
try:
# Why does ES add 9 hours automatically?
localtime = float(packet.sniff_timestamp) - 60 * 60 * 9 # GMT + 9
row_timestamp = datetime.datetime.fromtimestamp(localtime)
timestamp = row_timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")
version = int(packet[1].version)
# ip v6 does not have protocol. It has next header instead.
if version == 4:
protocol = int(packet[1].proto)
elif version == 6:
protocol = int(packet[1].nxt)
else:
protocol = None
dstip = packet[1].dst
srcip = packet[1].src
dstport = int(packet[2].dstport)
srcport = int(packet[2].srcport)
parsed_packet = dict(version=version, protocol=protocol,
dstip=dstip, srcip=srcip,
dstport=dstport, srcport=srcport,
capture_timestamp=timestamp)
# For historical graph
parsed_packet["@timestamp"] = timestamp
action = ACTION.copy()
action["_source"].update(parsed_packet)
packet_que.append(action)
current = time.time()
while(end_time is None or current - end_time >= 3):
helpers.bulk(es, packet_que)
del packet_que[0:len(packet_que)]
end_time = time.time()
break
except Exception as e:
time.sleep(1)
if __name__ == "__main__":
if len(sys.argv) != 2:
print >>sys.stderr, "python packet_cap_es.py <interface>"
exit(1)
delete_index()
create_index()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment