Skip to content

Instantly share code, notes, and snippets.

@masami256
Created July 11, 2013 02:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save masami256/5971920 to your computer and use it in GitHub Desktop.
Save masami256/5971920 to your computer and use it in GitHub Desktop.
HTTPのベーシック認証に辞書アタックしてきたらiptablesで弾く。
#!/usr/bin/env python
from datetime import timedelta
from datetime import datetime
import os
import io
import re
import csv
class LogData:
def __init__(self, access_date, level, client_ip, detail):
self.access_date = access_date
self.level = level
self.client_ip = client_ip
self.detail = detail
class LogDataList:
def __init__(self):
self.data = []
self.index = 0
def append(self, logData):
self.data.append(logData)
def __iter__(self):
return self
def next(self):
if self.index >= len(self.data):
self.index = 0
raise StopIteration
ret = self.data[self.index]
self.index += 1
return ret
def get_log_file_name():
return "/var/log/httpd/error_log"
def get_last_timestamp_file_name():
return "/tmp/last_timestamp.txt"
def get_blocked_list_file_name():
return "/tmp/blocked_ip_list.txt"
def get_datetime_from_string(s):
return datetime.strptime(s, "%a %b %d %H:%M:%S %Y")
def get_last_timestamp():
name = get_last_timestamp_file_name()
if os.path.isfile(name) == False:
return datetime(1970, 1, 1)
return get_datetime_from_string(open(name).read().strip())
def set_last_timestamp():
with open(get_last_timestamp_file_name(), 'w') as f:
s = datetime.now().strftime("%a %b %d %H:%M:%S %Y")
f.write(s)
def read_log_file():
last_timestamp = get_last_timestamp()
print "Last checked time is %(timestamp)s" % {'timestamp':last_timestamp}
result = LogDataList()
for line in open(get_log_file_name(), 'r'):
target_date = get_datetime_from_string(line.split(']')[0][1:])
# Do I have new data?
if target_date >= last_timestamp:
# Check detail string
match = re.search(r'user (.*) not found', line)
if match is not None:
data = line.split(']')
logData = LogData(data[0].split('[')[1], data[1].split('[')[1], data[2].split(' ')[2], data[3].strip())
result.append(logData)
return result
def exec_iptables_cmd(cmd):
print "Do %(cmd)s" % {'cmd':cmd}
ret = os.system(cmd)
if ret != 0:
print "cmd failed"
def add_iptables_rule(ip):
# Reject all packet or http port
#cmd = "iptables -I INPUT -s %(ip)s -j REJECT" % {'ip':ip}
cmd = "/sbin/iptables -I INPUT -s %(ip)s -j REJECT -p tcp --dport 80" % {'ip':ip}
exec_iptables_cmd(cmd)
def del_iptables_rule(ip):
# Reject all packet or http port
#cmd = "iptables -D INPUT -s %(ip)s -j REJECT" % {'ip':ip}
cmd = "/sbin/iptables -D INPUT -s %(ip)s -j REJECT -p tcp --dport 80" % {'ip':ip}
exec_iptables_cmd(cmd)
def create_need_block_list(blocked_ips, suspect_list):
ret = {}
cnt = 0
last_ip = ""
for data in suspect_list:
if (data.client_ip in blocked_ips) == True:
print "IP %(ip)s is already in blocked list" % {'ip':data.client_ip}
if data.client_ip == last_ip:
cnt += 1
else:
cnt = 0
last_ip = data.client_ip
if cnt >= 3:
# I am going to block this IP.
# Add it to the list
ret[data.client_ip] = (data.access_date, data.client_ip)
return ret
def block_ips(need_block_list):
for key in need_block_list:
data = need_block_list[key]
# Let's block!
add_iptables_rule(data[1])
def unblock_ips(blocked_list):
ret = {}
curdatetime = datetime.now()
del_list = []
for key in blocked_list:
data = blocked_list[key]
d = get_datetime_from_string(data[0])
if curdatetime > d + timedelta(minutes=30):
print "Unblock IP %(ip)s" % {'ip':data[1]}
del_iptables_rule(data[1])
del_list.append(key)
# remove this ip from map
for k in del_list:
del blocked_list[k]
return blocked_list
def get_blocked_ips():
ip_map = {}
if os.path.exists(get_blocked_list_file_name()) == False:
print "File %(blocked_file)s is not found" % {'blocked_file':get_blocked_list_file_name()}
return ip_map
# file format should be
# date, ip
reader = csv.reader(file(get_blocked_list_file_name(), 'r'))
for line in reader:
if len(line) == 0:
break
if line[0][0] == '#':
continue
d = line[0]
ip = line[1].strip()
if (ip in ip_map) == False:
ip_map[ip] = (d, ip)
return ip_map
def write_back_blocked_ips(need_block_list, blocked_ips):
with open(get_blocked_list_file_name(), 'w') as f:
writer = csv.writer(f)
for key in need_block_list:
writer.writerow(need_block_list[key])
for key in blocked_ips:
writer.writerow(blocked_ips[key])
def send_log_mail(need_block_list):
s = ""
for key in need_block_list:
s += "Block %(ip)s \n" % {'ip':need_block_list[key][1]}
cmd = "echo \"%(data)s\" | mail -s \"block log\" xxx@xxx.xxx" % {'data':s}
os.system(cmd)
def start():
# Read blocked ip list file.
blocked_ips = get_blocked_ips()
# Unblock ip if this ip access date is 30 minutes before.
# And re-create blocked ip list that does not contain unblocked ip.
blocked_ips = unblock_ips(blocked_ips)
# Read error_log file to craete suspect ip list.
suspect_list = read_log_file()
# Get IPs that need block
need_block_list = create_need_block_list(blocked_ips, suspect_list)
# Do it!
block_ips(need_block_list)
# Write blocked ip list
write_back_blocked_ips(need_block_list, blocked_ips)
# write timestampe
set_last_timestamp()
# send mail
send_log_mail(need_block_list)
if __name__ == "__main__":
start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment