Skip to content

Instantly share code, notes, and snippets.

@akagisho
Last active February 18, 2017 07:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akagisho/559637c7bd4b445adb06e44bf7b6819e to your computer and use it in GitHub Desktop.
Save akagisho/559637c7bd4b445adb06e44bf7b6819e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
'''
mkdir db
echo 'CREATE TABLE cidrs(id INTEGER PRIMARY KEY AUTOINCREMENT, start INTEGER, end INTEGER, data TEXT, created_at INTEGER);' | sqlite3 db/whois.db
echo 'CREATE TABLE ass(id INTEGER PRIMARY KEY AUTOINCREMENT, asn INTEGER UNIQUE, data TEXT, created_at INTEGER);' | sqlite3 db/whois.db
'''
import sys
import argparse
import re
import json
import datetime
import time
import sqlite3
import ipaddress
import urllib2
from ipwhois import IPWhois
import cymruwhois
dbname = 'db/whois.db'
def now():
return int(time.mktime(datetime.datetime.now().timetuple()))
def parse_whois(data):
if data.has_key('nir') and data['nir'] is not None and len(data['nir']['nets']) > 0:
nets = data['nir']['nets']
else:
nets = data['nets']
if 'description' in nets[-1]:
name = nets[-1]['description']
else:
name = nets[-1]['name']
if str(nets[-1]['cidr']).count(' ') == 0:
cidr = nets[-1]['cidr']
else:
cidr = data['asn_cidr']
if nets[-1]['country']:
cc = nets[-1]['country']
else:
cc = data['asn_country_code']
return {
'asn': data['asn'],
'cc': cc,
'cidr': cidr,
'name': name
}
def save_db(cidr, ip_data):
net = ipaddress.ip_network(unicode(cidr))
conn = sqlite3.connect(dbname)
c = conn.cursor()
sql = 'DELETE FROM cidrs WHERE start = ? AND end = ?'
data = (
int(net.network_address),
int(net.broadcast_address)
)
c.execute(sql, data)
conn.commit()
sql = 'INSERT INTO cidrs (start, end, data, created_at) values (?, ?, ?, ?)'
data = (
int(net.network_address),
int(net.broadcast_address),
json.dumps(ip_data),
now()
)
c.execute(sql, data)
conn.commit()
conn.close()
def save_asn_db(asn, as_data):
conn = sqlite3.connect(dbname)
c = conn.cursor()
sql = 'DELETE FROM ass WHERE asn = ?'
data = (asn,)
c.execute(sql, data)
conn.commit()
sql = 'INSERT INTO ass (asn, data, created_at) values (?, ?, ?)'
data = (
int(asn),
as_data,
now()
)
c.execute(sql, data)
conn.commit()
conn.close()
def get_ipinfo(ip):
ttl = now() - (60 * 60 * 24 * 182)
sql = 'SELECT id, start, end, data, (end - start) AS size FROM cidrs WHERE start <= ? AND end >= ? AND created_at > ? ORDER BY (end - start) ASC, created_at DESC'
data = (
int(ipaddress.ip_address(unicode(ip))),
int(ipaddress.ip_address(unicode(ip))),
ttl
)
conn = sqlite3.connect(dbname)
c = conn.cursor()
c.execute(sql, data)
rows = c.fetchall()
conn.close()
if len(rows) != 0 and not args.force:
debug('from db')
result = json.loads(rows[0][3])
data = parse_whois(result)
elif not args.local:
debug('from whois')
obj = IPWhois(args.key)
result = obj.lookup_whois(inc_nir = False)
data = parse_whois(result)
if data['cc'] == 'JP':
if data['name'] == 'Japan Network Information Center' or ipaddress.ip_network(unicode(data['cidr'])).num_addresses > 1024:
debug('from jpnic whois')
result = obj.lookup_whois(nir_field_list = ['name'])
data = parse_whois(result)
save_db(data['cidr'], result)
else:
data = ''
return data
def get_asinfo(asn, ip):
ttl = now() - (60 * 60 * 24 * 182)
sql = 'SELECT id, data FROM ass WHERE asn = ? ORDER BY created_at DESC'
data = (asn,)
conn = sqlite3.connect(dbname)
c = conn.cursor()
c.execute(sql, data)
rows = c.fetchall()
conn.close()
if len(rows) != 0 and not args.force:
owner = rows[0][1]
else:
debug('from cymruwhois')
c = cymruwhois.Client()
r = c.lookup(unicode(ip))
owner = r.owner
if owner != '':
save_asn_db(asn, owner)
owner = re.sub(r'^[^\s]+\s(- )?', '', owner)
owner = re.sub(r', ..$', '', owner)
return owner
def debug(msg):
if args.debug:
sys.stderr.write("%s\n" % msg)
if __name__ == '__main__':
p = argparse.ArgumentParser()
p.add_argument('-l', '--local',
action='store_true',
default=False,
help='use local database only')
p.add_argument('-f', '--force',
action='store_true',
default=False,
help='not use local database')
p.add_argument('-d', '--debug',
action='store_true',
default=False,
help='debug')
p.add_argument('key')
args = p.parse_args()
ip_info = get_ipinfo(args.key)
as_info = get_asinfo(ip_info['asn'], args.key)
ip_info['as_owner'] = as_info
print(json.dumps(ip_info))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment