Skip to content

Instantly share code, notes, and snippets.

@heartnet
Created November 4, 2010 14:57
Show Gist options
  • Save heartnet/662576 to your computer and use it in GitHub Desktop.
Save heartnet/662576 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
from __future__ import with_statement # for obsolete python
from types import *
import re
import sys
import socket
sys.path.insert(0, "lib")
from check_type import *
# set path
path_cidr_list ="/tmp/cidr.txt"
paths_class_list =["/tmp/class_A.txt", "/tmp/class_B.txt", "/tmp/class_C.txt"]
path_iptables_log ="/var/log/iptables.log"
path_iso_3166 ="/root/scripts/misc/iso-3166-1"
# Name: classify_address
# Explanation:
# return values:
# class A => 0
# class B => 1
# class C => 2
def classify_address(ip_address):
ip_address =check_type(StringType, ip_address)
first_octet =int( ip_address.split(".")[0] )
if 1 <= first_octet <= 126:
return 0
elif 128 <= first_octet <= 191:
return 1
elif 192 <= first_octet <= 223:
return 2
# Name: gen_class_list
# Explanation: generate list of classified IP address from cidr.txt
def gen_class_list():
with open(path_cidr_list, "r") as input_file:
class_A, class_B, class_C =[], [], []
list_of_class =[class_A, class_B, class_C]
regex_spaces =re.compile("\s+")
for line in input_file:
line_as_array =regex_spaces.split( line.rstrip() )[1].split("/")
network_address =line_as_array[0]
first_octet =int( network_address.split(".")[0] )
classified_value =classify_address(network_address)
list_of_class[classified_value].append(line)
for index in range(len(list_of_class)):
with open(paths_class_list[index], "w") as output_file:
for line in list_of_class[index]:
output_file.write(line)
def search_address(ip_address):
ip_address =check_type(StringType, ip_address)
address_class =classify_address(ip_address)
regex_spaces =re.compile("\s+")
binary_target_address =""
for octet in ip_address.split("."):
octet =int(octet)
binary_octet =format(octet, "b").zfill(8)
binary_target_address +=binary_octet
with open(paths_class_list[address_class], "r") as file:
for line in file:
line_as_array =regex_spaces.split( line.rstrip() )
country_code =line_as_array[0]
network_address =line_as_array[1].split("/")[0]
subnetmask =int( line_as_array[1].split("/")[1] )
regex_subnetmask =re.compile( "^\d{%(subnetmask)d}" % locals() )
binary_network_address =""
for octet in network_address.split("."):
octet =int(octet)
binary_octet =format(octet, "b").zfill(8)
binary_network_address +=binary_octet
extracted_network_address =regex_subnetmask.match(binary_network_address).group()
extracted_target_address =regex_subnetmask.match(binary_target_address).group()
if int(extracted_network_address) ^ int(extracted_target_address) == 0:
# found
return (country_code, network_address, subnetmask)
def gen_country_dict():
country_dict ={}
with open(path_iso_3166, "r") as file:
for line in file:
line_as_array =line.rstrip().split(";")
country_name =line_as_array[0].capitalize()
country_code =line_as_array[1]
country_dict[country_code] =country_name
return country_dict
def analyze_log():
with open(path_iptables_log, "r") as file:
country_dict =gen_country_dict()
regex_spaces =re.compile("\s+")
regex_timestamp =re.compile("^(?P<year>\d{4})-(?P<month>\d{2})-(?P<date>\d{2})T(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})")
regex_src_ip_address =re.compile("SRC=(\d+\.\d+\.\d+\.\d+)")
regex_protocol =re.compile("PROTO=([a-zA-Z]+)")
regex_src_port =re.compile("SPT=(\d+)")
regex_dst_port =re.compile("DPT=(\d+)")
for line in file:
raw_timestamp =regex_spaces.split( line.rstrip() )[0]
match_timestamp =regex_timestamp.search(raw_timestamp)
dict_timestamp =match_timestamp.groupdict()
timestamp ="%s/%s/%s %s:%s:%s" % (dict_timestamp["year"], dict_timestamp["month"], dict_timestamp["date"], dict_timestamp["hour"], dict_timestamp["minute"], dict_timestamp["second"])
src_ip_address =regex_src_ip_address.search( line.rstrip() ).group(1)
protocol =regex_protocol.search( line.rstrip() ).group(1)
src_port =int( regex_src_port.search( line.rstrip() ).group(1) )
dst_port =int( regex_dst_port.search( line.rstrip() ).group(1) )
result =search_address(src_ip_address)
country_code =result[0]
src_network =str(result[1]) + "/" + str(result[2])
src_hostname =""
dst_hostname ="localhost"
try:
src_hostname =socket.gethostbyaddr(src_ip_address)[0]
except:
src_hostname =src_ip_address
if country_code in country_dict.keys():
print "%s [%s] %s: %s (%s):%d => %s:%d (%s)" % (timestamp, country_dict[country_code], src_network, src_ip_address, src_hostname, src_port, dst_hostname, dst_port, protocol)
else:
print "%s [%s] %s: %s (%s):%d => %s:%d (%s)" % (timestamp, country_code, src_network, src_ip_address, src_hostname, src_port, dst_hostname, dst_port, protocol)
gen_class_list()
analyze_log()
# [EOF]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment