Skip to content

Instantly share code, notes, and snippets.

@aojea
Created February 20, 2015 13:41
Show Gist options
  • Save aojea/cf77bb1d84216f59395f to your computer and use it in GitHub Desktop.
Save aojea/cf77bb1d84216f59395f to your computer and use it in GitHub Desktop.
Import Netflow CSV Elasticsearch
#!/usr/bin/python2.7
import csv, sys, time, json, elasticsearch
from elasticsearch import Elasticsearch
from elasticsearch import helpers
mapping = {
"fnf1x": {
"properties": {
"ts": {"type": "date", "format" : "YYYY-MM-dd HH:mm:ss"},
"sa": {"type": "string", "index": "not_analyzed"},
"sp": {"type": "integer", "index": "not_analyzed"},
"da": {"type": "string", "index": "not_analyzed"},
"dp": {"type": "integer", "index": "not_analyzed"},
"duration": {"type": "long"},
"proto": {"type": "string"},
"pkt_in": {"type": "long"},
"pkt_out": {"type": "long"},
"byte_in": {"type": "long"},
"byte_out": {"type": "long"}
}
}
}
es=Elasticsearch()
es.indices.create("netflowlab", ignore = [400,404])
es.indices.put_mapping(index="netflowlab", doc_type="fnf1x", body=mapping)
jdata = dict()
actions = list()
j = 0
filename = sys.argv[1]
with open(filename,'r') as fin:
# csv.DictReader uses first line in file for column headings by default
dr = csv.DictReader(fin) # comma is default delimiter
dr.next() # Skip first line
for i in dr:
row = (i['parsedDate'], i['ipLayerProtocolCode'], i['firstSeenSrcIp'], i['firstSeenDestIp'], \
i['firstSeenSrcPort'], i['firstSeenDestPort'], i['durationSeconds'], \
i['firstSeenSrcTotalBytes'],i['firstSeenDestTotalBytes'], \
i['firstSeenSrcPacketCount'], i['firstSeenDestPacketCount'])
ptime = time.strptime(row[0][0:19], "%Y-%m-%d %H:%M:%S")
ctime = time.strftime('%Y-%m-%d %H:%M:%S', ptime)
jdata = { 'ts': ctime, 'sa': row[2], 'sp': int(float(row[4])), 'da': row[3], 'dp': \
int(float(row[5])), 'duration': int(row[6]), 'pkt_in': int(row[9]), 'pkt_out': int(row[10]),\
'byte_in': int(row[7]), 'byte_out': int(row[8]), 'proto': row[1] }
# Too slow
#es.index(index="netflowlab", doc_type="fnf1x", id = j, body = jdata)
action = { '_index': 'netflowlab', '_id': j, '_type': 'fnf1x', '_source': json.dumps(jdata, separators=(',', ':'))}
actions.append(action)
j += 1
if j % 100000 == 0:
elasticsearch.helpers.bulk(es, actions)
print "Indexed %d, working on next 100000" %(j)
actions = list()
elasticsearch.helpers.bulk(es, actions)
print "Indexed %d, finishing." %(j)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment