Created
November 4, 2010 14:57
-
-
Save heartnet/662576 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
from __future__ import with_statement # for obsolete python | |
from types import * | |
import re | |
import sys | |
import socket | |
sys.path.insert(0, "lib") | |
from check_type import * | |
# set path | |
path_cidr_list ="/tmp/cidr.txt" | |
paths_class_list =["/tmp/class_A.txt", "/tmp/class_B.txt", "/tmp/class_C.txt"] | |
path_iptables_log ="/var/log/iptables.log" | |
path_iso_3166 ="/root/scripts/misc/iso-3166-1" | |
# Name: classify_address | |
# Explanation: | |
# return values: | |
# class A => 0 | |
# class B => 1 | |
# class C => 2 | |
def classify_address(ip_address): | |
ip_address =check_type(StringType, ip_address) | |
first_octet =int( ip_address.split(".")[0] ) | |
if 1 <= first_octet <= 126: | |
return 0 | |
elif 128 <= first_octet <= 191: | |
return 1 | |
elif 192 <= first_octet <= 223: | |
return 2 | |
# Name: gen_class_list | |
# Explanation: generate list of classified IP address from cidr.txt | |
def gen_class_list(): | |
with open(path_cidr_list, "r") as input_file: | |
class_A, class_B, class_C =[], [], [] | |
list_of_class =[class_A, class_B, class_C] | |
regex_spaces =re.compile("\s+") | |
for line in input_file: | |
line_as_array =regex_spaces.split( line.rstrip() )[1].split("/") | |
network_address =line_as_array[0] | |
first_octet =int( network_address.split(".")[0] ) | |
classified_value =classify_address(network_address) | |
list_of_class[classified_value].append(line) | |
for index in range(len(list_of_class)): | |
with open(paths_class_list[index], "w") as output_file: | |
for line in list_of_class[index]: | |
output_file.write(line) | |
def search_address(ip_address): | |
ip_address =check_type(StringType, ip_address) | |
address_class =classify_address(ip_address) | |
regex_spaces =re.compile("\s+") | |
binary_target_address ="" | |
for octet in ip_address.split("."): | |
octet =int(octet) | |
binary_octet =format(octet, "b").zfill(8) | |
binary_target_address +=binary_octet | |
with open(paths_class_list[address_class], "r") as file: | |
for line in file: | |
line_as_array =regex_spaces.split( line.rstrip() ) | |
country_code =line_as_array[0] | |
network_address =line_as_array[1].split("/")[0] | |
subnetmask =int( line_as_array[1].split("/")[1] ) | |
regex_subnetmask =re.compile( "^\d{%(subnetmask)d}" % locals() ) | |
binary_network_address ="" | |
for octet in network_address.split("."): | |
octet =int(octet) | |
binary_octet =format(octet, "b").zfill(8) | |
binary_network_address +=binary_octet | |
extracted_network_address =regex_subnetmask.match(binary_network_address).group() | |
extracted_target_address =regex_subnetmask.match(binary_target_address).group() | |
if int(extracted_network_address) ^ int(extracted_target_address) == 0: | |
# found | |
return (country_code, network_address, subnetmask) | |
def gen_country_dict(): | |
country_dict ={} | |
with open(path_iso_3166, "r") as file: | |
for line in file: | |
line_as_array =line.rstrip().split(";") | |
country_name =line_as_array[0].capitalize() | |
country_code =line_as_array[1] | |
country_dict[country_code] =country_name | |
return country_dict | |
def analyze_log(): | |
with open(path_iptables_log, "r") as file: | |
country_dict =gen_country_dict() | |
regex_spaces =re.compile("\s+") | |
regex_timestamp =re.compile("^(?P<year>\d{4})-(?P<month>\d{2})-(?P<date>\d{2})T(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})") | |
regex_src_ip_address =re.compile("SRC=(\d+\.\d+\.\d+\.\d+)") | |
regex_protocol =re.compile("PROTO=([a-zA-Z]+)") | |
regex_src_port =re.compile("SPT=(\d+)") | |
regex_dst_port =re.compile("DPT=(\d+)") | |
for line in file: | |
raw_timestamp =regex_spaces.split( line.rstrip() )[0] | |
match_timestamp =regex_timestamp.search(raw_timestamp) | |
dict_timestamp =match_timestamp.groupdict() | |
timestamp ="%s/%s/%s %s:%s:%s" % (dict_timestamp["year"], dict_timestamp["month"], dict_timestamp["date"], dict_timestamp["hour"], dict_timestamp["minute"], dict_timestamp["second"]) | |
src_ip_address =regex_src_ip_address.search( line.rstrip() ).group(1) | |
protocol =regex_protocol.search( line.rstrip() ).group(1) | |
src_port =int( regex_src_port.search( line.rstrip() ).group(1) ) | |
dst_port =int( regex_dst_port.search( line.rstrip() ).group(1) ) | |
result =search_address(src_ip_address) | |
country_code =result[0] | |
src_network =str(result[1]) + "/" + str(result[2]) | |
src_hostname ="" | |
dst_hostname ="localhost" | |
try: | |
src_hostname =socket.gethostbyaddr(src_ip_address)[0] | |
except: | |
src_hostname =src_ip_address | |
if country_code in country_dict.keys(): | |
print "%s [%s] %s: %s (%s):%d => %s:%d (%s)" % (timestamp, country_dict[country_code], src_network, src_ip_address, src_hostname, src_port, dst_hostname, dst_port, protocol) | |
else: | |
print "%s [%s] %s: %s (%s):%d => %s:%d (%s)" % (timestamp, country_code, src_network, src_ip_address, src_hostname, src_port, dst_hostname, dst_port, protocol) | |
gen_class_list() | |
analyze_log() | |
# [EOF] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment