Skip to content

Instantly share code, notes, and snippets.

@nbigot
Created April 2, 2017 12:59
Show Gist options
  • Save nbigot/4e72a52f4a77db3bbbd435a2d1b8ac4c to your computer and use it in GitHub Desktop.
Save nbigot/4e72a52f4a77db3bbbd435a2d1b8ac4c to your computer and use it in GitHub Desktop.
Python script to request Elasticseach to find ip attacks reported by fail2ban and sorted into Elasticsearch index via Filebeat
# -*- coding: utf-8 -*-
"""Find stats about ip of attackers in filebeat from fail2ban
Python script to request Elasticseach to find ip attacks reported by fail2ban
and sorted into Elasticsearch index via Filebeat
"""
import ipaddress
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError, TransportError
def search_attacks(hostname,
max_pages=100,
start_page=1,
items_per_page=1000,
index_name='filebeat-*',
doc_type='log',
timeout='30s'):
try:
body = {
"query": {
"bool": {
"must": [
{"term": {"message": "fail2ban.filter"}},
{"term": {"message": "sshd"}},
{"match": {"input_type": "log"}},
{"match": {"source": "/var/log/fail2ban.log"}}
]
}
},
"aggs": {
"attack_date": {
"filter": {
"range": {
"@timestamp": {
"gte": "now-1y",
"lte": "now"
}
}
},
"aggs": {
"popular_attack_date": {
"date_histogram": {
"field": "@timestamp",
"interval": "day",
"format": "yyyy-MM-dd"
}
}
}
}
}
}
items_attack_dates = []
msg_attack_list = []
try:
for page in range(start_page, max_pages):
start_item_index = page * items_per_page
es = Elasticsearch(hosts=[hostname])
res = es.search(
index=index_name,
doc_type=doc_type,
_source=True,
from_=start_item_index,
size=items_per_page,
request_cache=True,
timeout=timeout,
body=body
)
_items_attack_dates = res['aggregations']['attack_date']['popular_attack_date']['buckets']
items_attack_dates.extend(_items_attack_dates)
# dict_attack_dates = {item['key_as_string']: item['doc_count'] for item in items_attack_dates}
_msg_attack_list = [item['_source']['message'].split() for item in res['hits']['hits']]
msg_attack_list.extend(_msg_attack_list)
if not _items_attack_dates:
break
except TransportError:
pass
dict_attack_date = {}
dict_attack_ip = {}
dict_attack_ip_count = {}
for attack in msg_attack_list:
attack_date = attack[0]
if attack[7] != 'already':
attack_from_ip = attack[7]
else:
attack_from_ip = attack[6]
try:
ipaddress.ip_address(attack_from_ip)
except ValueError:
print("error: skip line with invalid ip address: {}".format(attack))
continue
if attack_date not in dict_attack_date:
dict_attack_date[attack_date] = {}
if attack_from_ip not in dict_attack_date[attack_date]:
dict_attack_date[attack_date][attack_from_ip] = 1
else:
dict_attack_date[attack_date][attack_from_ip] += 1
if attack_from_ip not in dict_attack_ip:
dict_attack_ip[attack_from_ip] = {}
if attack_date not in dict_attack_ip[attack_from_ip]:
dict_attack_ip[attack_from_ip] = [attack_date]
else:
dict_attack_ip[attack_from_ip].append(attack_date)
if attack_from_ip not in dict_attack_ip_count:
dict_attack_ip_count[attack_from_ip] = 1
else:
dict_attack_ip_count[attack_from_ip] += 1
print("\nattacks by date:")
ordered_dates = [(d, dict_attack_date[d]) for d in dict_attack_date]
ordered_dates.sort(key=lambda tup: tup[0], reverse=False)
for d, ips in ordered_dates:
print(d, ips)
print("\nattacks by ip and by date:")
for ip in dict_attack_ip:
print(ip, dict_attack_ip[ip])
print("\nattacks by ip:")
for ip in dict_attack_ip_count:
print(ip, dict_attack_ip_count[ip])
print("\ntop 50 recidivists attacks by ip:")
recidivists = [(ip, dict_attack_ip_count[ip]) for ip in dict_attack_ip_count]
recidivists.sort(key=lambda tup: tup[1], reverse=True)
for ip, cpt in recidivists[:50]:
print(ip, cpt)
# return res['hits']
except NotFoundError:
return None
except Exception as ex:
print(ex)
if __name__ == '__main__':
import sys
search_attacks(hostname=sys.argv[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment