Created
July 11, 2013 02:03
-
-
Save masami256/5971920 to your computer and use it in GitHub Desktop.
HTTPのベーシック認証に辞書アタックしてきたらiptablesで弾く。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from datetime import timedelta | |
from datetime import datetime | |
import os | |
import io | |
import re | |
import csv | |
class LogData: | |
def __init__(self, access_date, level, client_ip, detail): | |
self.access_date = access_date | |
self.level = level | |
self.client_ip = client_ip | |
self.detail = detail | |
class LogDataList: | |
def __init__(self): | |
self.data = [] | |
self.index = 0 | |
def append(self, logData): | |
self.data.append(logData) | |
def __iter__(self): | |
return self | |
def next(self): | |
if self.index >= len(self.data): | |
self.index = 0 | |
raise StopIteration | |
ret = self.data[self.index] | |
self.index += 1 | |
return ret | |
def get_log_file_name(): | |
return "/var/log/httpd/error_log" | |
def get_last_timestamp_file_name(): | |
return "/tmp/last_timestamp.txt" | |
def get_blocked_list_file_name(): | |
return "/tmp/blocked_ip_list.txt" | |
def get_datetime_from_string(s): | |
return datetime.strptime(s, "%a %b %d %H:%M:%S %Y") | |
def get_last_timestamp(): | |
name = get_last_timestamp_file_name() | |
if os.path.isfile(name) == False: | |
return datetime(1970, 1, 1) | |
return get_datetime_from_string(open(name).read().strip()) | |
def set_last_timestamp(): | |
with open(get_last_timestamp_file_name(), 'w') as f: | |
s = datetime.now().strftime("%a %b %d %H:%M:%S %Y") | |
f.write(s) | |
def read_log_file(): | |
last_timestamp = get_last_timestamp() | |
print "Last checked time is %(timestamp)s" % {'timestamp':last_timestamp} | |
result = LogDataList() | |
for line in open(get_log_file_name(), 'r'): | |
target_date = get_datetime_from_string(line.split(']')[0][1:]) | |
# Do I have new data? | |
if target_date >= last_timestamp: | |
# Check detail string | |
match = re.search(r'user (.*) not found', line) | |
if match is not None: | |
data = line.split(']') | |
logData = LogData(data[0].split('[')[1], data[1].split('[')[1], data[2].split(' ')[2], data[3].strip()) | |
result.append(logData) | |
return result | |
def exec_iptables_cmd(cmd): | |
print "Do %(cmd)s" % {'cmd':cmd} | |
ret = os.system(cmd) | |
if ret != 0: | |
print "cmd failed" | |
def add_iptables_rule(ip): | |
# Reject all packet or http port | |
#cmd = "iptables -I INPUT -s %(ip)s -j REJECT" % {'ip':ip} | |
cmd = "/sbin/iptables -I INPUT -s %(ip)s -j REJECT -p tcp --dport 80" % {'ip':ip} | |
exec_iptables_cmd(cmd) | |
def del_iptables_rule(ip): | |
# Reject all packet or http port | |
#cmd = "iptables -D INPUT -s %(ip)s -j REJECT" % {'ip':ip} | |
cmd = "/sbin/iptables -D INPUT -s %(ip)s -j REJECT -p tcp --dport 80" % {'ip':ip} | |
exec_iptables_cmd(cmd) | |
def create_need_block_list(blocked_ips, suspect_list): | |
ret = {} | |
cnt = 0 | |
last_ip = "" | |
for data in suspect_list: | |
if (data.client_ip in blocked_ips) == True: | |
print "IP %(ip)s is already in blocked list" % {'ip':data.client_ip} | |
if data.client_ip == last_ip: | |
cnt += 1 | |
else: | |
cnt = 0 | |
last_ip = data.client_ip | |
if cnt >= 3: | |
# I am going to block this IP. | |
# Add it to the list | |
ret[data.client_ip] = (data.access_date, data.client_ip) | |
return ret | |
def block_ips(need_block_list): | |
for key in need_block_list: | |
data = need_block_list[key] | |
# Let's block! | |
add_iptables_rule(data[1]) | |
def unblock_ips(blocked_list): | |
ret = {} | |
curdatetime = datetime.now() | |
del_list = [] | |
for key in blocked_list: | |
data = blocked_list[key] | |
d = get_datetime_from_string(data[0]) | |
if curdatetime > d + timedelta(minutes=30): | |
print "Unblock IP %(ip)s" % {'ip':data[1]} | |
del_iptables_rule(data[1]) | |
del_list.append(key) | |
# remove this ip from map | |
for k in del_list: | |
del blocked_list[k] | |
return blocked_list | |
def get_blocked_ips(): | |
ip_map = {} | |
if os.path.exists(get_blocked_list_file_name()) == False: | |
print "File %(blocked_file)s is not found" % {'blocked_file':get_blocked_list_file_name()} | |
return ip_map | |
# file format should be | |
# date, ip | |
reader = csv.reader(file(get_blocked_list_file_name(), 'r')) | |
for line in reader: | |
if len(line) == 0: | |
break | |
if line[0][0] == '#': | |
continue | |
d = line[0] | |
ip = line[1].strip() | |
if (ip in ip_map) == False: | |
ip_map[ip] = (d, ip) | |
return ip_map | |
def write_back_blocked_ips(need_block_list, blocked_ips): | |
with open(get_blocked_list_file_name(), 'w') as f: | |
writer = csv.writer(f) | |
for key in need_block_list: | |
writer.writerow(need_block_list[key]) | |
for key in blocked_ips: | |
writer.writerow(blocked_ips[key]) | |
def send_log_mail(need_block_list): | |
s = "" | |
for key in need_block_list: | |
s += "Block %(ip)s \n" % {'ip':need_block_list[key][1]} | |
cmd = "echo \"%(data)s\" | mail -s \"block log\" xxx@xxx.xxx" % {'data':s} | |
os.system(cmd) | |
def start(): | |
# Read blocked ip list file. | |
blocked_ips = get_blocked_ips() | |
# Unblock ip if this ip access date is 30 minutes before. | |
# And re-create blocked ip list that does not contain unblocked ip. | |
blocked_ips = unblock_ips(blocked_ips) | |
# Read error_log file to craete suspect ip list. | |
suspect_list = read_log_file() | |
# Get IPs that need block | |
need_block_list = create_need_block_list(blocked_ips, suspect_list) | |
# Do it! | |
block_ips(need_block_list) | |
# Write blocked ip list | |
write_back_blocked_ips(need_block_list, blocked_ips) | |
# write timestampe | |
set_last_timestamp() | |
# send mail | |
send_log_mail(need_block_list) | |
if __name__ == "__main__": | |
start() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment