Last active
February 18, 2017 07:56
-
-
Save akagisho/559637c7bd4b445adb06e44bf7b6819e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
mkdir db | |
echo 'CREATE TABLE cidrs(id INTEGER PRIMARY KEY AUTOINCREMENT, start INTEGER, end INTEGER, data TEXT, created_at INTEGER);' | sqlite3 db/whois.db | |
echo 'CREATE TABLE ass(id INTEGER PRIMARY KEY AUTOINCREMENT, asn INTEGER UNIQUE, data TEXT, created_at INTEGER);' | sqlite3 db/whois.db | |
''' | |
import sys | |
import argparse | |
import re | |
import json | |
import datetime | |
import time | |
import sqlite3 | |
import ipaddress | |
import urllib2 | |
from ipwhois import IPWhois | |
import cymruwhois | |
dbname = 'db/whois.db' | |
def now(): | |
return int(time.mktime(datetime.datetime.now().timetuple())) | |
def parse_whois(data): | |
if data.has_key('nir') and data['nir'] is not None and len(data['nir']['nets']) > 0: | |
nets = data['nir']['nets'] | |
else: | |
nets = data['nets'] | |
if 'description' in nets[-1]: | |
name = nets[-1]['description'] | |
else: | |
name = nets[-1]['name'] | |
if str(nets[-1]['cidr']).count(' ') == 0: | |
cidr = nets[-1]['cidr'] | |
else: | |
cidr = data['asn_cidr'] | |
if nets[-1]['country']: | |
cc = nets[-1]['country'] | |
else: | |
cc = data['asn_country_code'] | |
return { | |
'asn': data['asn'], | |
'cc': cc, | |
'cidr': cidr, | |
'name': name | |
} | |
def save_db(cidr, ip_data): | |
net = ipaddress.ip_network(unicode(cidr)) | |
conn = sqlite3.connect(dbname) | |
c = conn.cursor() | |
sql = 'DELETE FROM cidrs WHERE start = ? AND end = ?' | |
data = ( | |
int(net.network_address), | |
int(net.broadcast_address) | |
) | |
c.execute(sql, data) | |
conn.commit() | |
sql = 'INSERT INTO cidrs (start, end, data, created_at) values (?, ?, ?, ?)' | |
data = ( | |
int(net.network_address), | |
int(net.broadcast_address), | |
json.dumps(ip_data), | |
now() | |
) | |
c.execute(sql, data) | |
conn.commit() | |
conn.close() | |
def save_asn_db(asn, as_data): | |
conn = sqlite3.connect(dbname) | |
c = conn.cursor() | |
sql = 'DELETE FROM ass WHERE asn = ?' | |
data = (asn,) | |
c.execute(sql, data) | |
conn.commit() | |
sql = 'INSERT INTO ass (asn, data, created_at) values (?, ?, ?)' | |
data = ( | |
int(asn), | |
as_data, | |
now() | |
) | |
c.execute(sql, data) | |
conn.commit() | |
conn.close() | |
def get_ipinfo(ip): | |
ttl = now() - (60 * 60 * 24 * 182) | |
sql = 'SELECT id, start, end, data, (end - start) AS size FROM cidrs WHERE start <= ? AND end >= ? AND created_at > ? ORDER BY (end - start) ASC, created_at DESC' | |
data = ( | |
int(ipaddress.ip_address(unicode(ip))), | |
int(ipaddress.ip_address(unicode(ip))), | |
ttl | |
) | |
conn = sqlite3.connect(dbname) | |
c = conn.cursor() | |
c.execute(sql, data) | |
rows = c.fetchall() | |
conn.close() | |
if len(rows) != 0 and not args.force: | |
debug('from db') | |
result = json.loads(rows[0][3]) | |
data = parse_whois(result) | |
elif not args.local: | |
debug('from whois') | |
obj = IPWhois(args.key) | |
result = obj.lookup_whois(inc_nir = False) | |
data = parse_whois(result) | |
if data['cc'] == 'JP': | |
if data['name'] == 'Japan Network Information Center' or ipaddress.ip_network(unicode(data['cidr'])).num_addresses > 1024: | |
debug('from jpnic whois') | |
result = obj.lookup_whois(nir_field_list = ['name']) | |
data = parse_whois(result) | |
save_db(data['cidr'], result) | |
else: | |
data = '' | |
return data | |
def get_asinfo(asn, ip): | |
ttl = now() - (60 * 60 * 24 * 182) | |
sql = 'SELECT id, data FROM ass WHERE asn = ? ORDER BY created_at DESC' | |
data = (asn,) | |
conn = sqlite3.connect(dbname) | |
c = conn.cursor() | |
c.execute(sql, data) | |
rows = c.fetchall() | |
conn.close() | |
if len(rows) != 0 and not args.force: | |
owner = rows[0][1] | |
else: | |
debug('from cymruwhois') | |
c = cymruwhois.Client() | |
r = c.lookup(unicode(ip)) | |
owner = r.owner | |
if owner != '': | |
save_asn_db(asn, owner) | |
owner = re.sub(r'^[^\s]+\s(- )?', '', owner) | |
owner = re.sub(r', ..$', '', owner) | |
return owner | |
def debug(msg): | |
if args.debug: | |
sys.stderr.write("%s\n" % msg) | |
if __name__ == '__main__': | |
p = argparse.ArgumentParser() | |
p.add_argument('-l', '--local', | |
action='store_true', | |
default=False, | |
help='use local database only') | |
p.add_argument('-f', '--force', | |
action='store_true', | |
default=False, | |
help='not use local database') | |
p.add_argument('-d', '--debug', | |
action='store_true', | |
default=False, | |
help='debug') | |
p.add_argument('key') | |
args = p.parse_args() | |
ip_info = get_ipinfo(args.key) | |
as_info = get_asinfo(ip_info['asn'], args.key) | |
ip_info['as_owner'] = as_info | |
print(json.dumps(ip_info)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment