Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@holly
Last active October 7, 2021 04:02
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save holly/6c08ce8756e4c2257068 to your computer and use it in GitHub Desktop.
Save holly/6c08ce8756e4c2257068 to your computer and use it in GitHub Desktop.
make afrinic/apnic/arin/ripe/lacnic delegated-latest ipv4 and ipv6 list script
#!/usr/bin/env python
# vim:fileencoding=utf-8
""" [NAME] script or package easy description
[DESCRIPTION] script or package description
"""
from datetime import datetime
from argparse import ArgumentParser
import pprint
import time
import warnings
import os, sys, io
import signal
import urllib
import urllib.request
import tempfile
import os.path
import re
import ipaddress
import traceback
from subprocess import Popen, PIPE, STDOUT
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing import cpu_count
__author__ = 'holly'
__version__ = '1.0'
DESCRIPTION = 'this is description'
DELEGATED_LATEST_MAP = {
'afrinic' : 'http://ftp.afrinic.net/pub/stats/afrinic/delegated-afrinic-extended-latest',
'apnic' : 'http://ftp.apnic.net/pub/stats/apnic/delegated-apnic-extended-latest',
'arin' : 'http://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest',
'ripe' : 'http://ftp.ripe.net/pub/stats/ripencc/delegated-ripencc-extended-latest',
'lacnic' : 'http://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-extended-latest',
}
DOWNLOAD_FILE_CACHE = 86400 # 1day
parser = ArgumentParser(description=DESCRIPTION)
parser.add_argument('--version', action='version', version='%(prog)s ' + __version__)
parser.add_argument('--registry', '-r', action='store', nargs='*', default=sorted(DELEGATED_LATEST_MAP.keys()), help='delegated registries(afrinic|apnic|arin|ripe|lacnic)')
parser.add_argument('--output-dir', '-d', action='store', default=os.getcwd(), help='new file output directory')
parser.add_argument('--force', '-f', action='store_true', help='force execution')
args = parser.parse_args()
class NetworkData(object):
SPLIT_SEP = '\|'
LINE_FORMAT = "{cc}\t{start}/{cidr}\t{start}/{mask}"
NETWORK_CIDR_MAP = {
# allocated : cidr prefix : mask
"2147483648" : { "cidr": "1", "mask": "128.0.0.0" },
"1073741824" : { "cidr": "2", "mask": "192.0.0.0" },
"536870912" : { "cidr": "3", "mask": "224.0.0.0" },
"268435456" : { "cidr": "4", "mask": "240.0.0.0" },
"134217728" : { "cidr": "5", "mask": "248.0.0.0" },
"67108864" : { "cidr": "6", "mask": "252.0.0.0" },
"33554432" : { "cidr": "7", "mask": "254.0.0.0" },
"16777216" : { "cidr": "8", "mask": "255.0.0.0" },
"8388608" : { "cidr": "9", "mask": "255.128.0.0" },
"4194304" : { "cidr": "10", "mask": "255.192.0.0" },
"2097152" : { "cidr": "11", "mask": "255.224.0.0" },
"1048576" : { "cidr": "12", "mask": "255.240.0.0" },
"524288" : { "cidr": '13', "mask": "255.248.0.0" },
"262144" : { "cidr": '14', "mask": "255.252.0.0" },
"131072" : { "cidr": '15', "mask": "255.254.0.0" },
"65536" : { "cidr": '16', "mask": "255.255.0.0" },
"32768" : { "cidr": '17', "mask": "255.255.128.0" },
"16384" : { "cidr": '18', "mask": "255.255.192.0" },
"8192" : { "cidr": '19', "mask": "255.255.224.0" },
"4096" : { "cidr": '20', "mask": "255.255.240.0" },
"2048" : { "cidr": '21', "mask": "255.255.248.0" },
"1024" : { "cidr": '22', "mask": "255.255.252.0" },
"512" : { "cidr": '23', "mask": "255.255.254.0" },
"256" : { "cidr": '24', "mask": "255.255.255.0" },
"128" : { "cidr": '25', "mask": "255.255.255.128" },
"64" : { "cidr": '26', "mask": "255.255.255.192" },
"32" : { "cidr": '27', "mask": "255.255.255.224" },
"16" : { "cidr": '28', "mask": "255.255.255.240" },
"8" : { "cidr": '29', "mask": "255.255.255.248" },
"4" : { "cidr": '30', "mask": "255.255.255.252" },
"2" : { "cidr": '31', "mask": "255.255.255.254" },
"1" : { "cidr": '32', "mask": "255.255.255.255" },
}
def __init__(self, line):
self.__line = line
self.__rir = None
self.__cc = None
self.__iptype = None
self.__start = None
self.__value = None
self.__status = None
self.__mask = None
self.__lines = []
# make line string
self.__initialize()
def analyzed(self):
return True if len(self.__lines) > 0 else False
def make_lines(self, iptype, cc, start, value):
lines = []
format_dict = { "cc": cc, "start": start, "mask": value, "cidr": value }
if iptype == "ipv6":
print("[iptype:ipv6 cc:{0} start:{1} allocated:{2} range:{1}/{2}] valid allocated. OK".format(cc, start, value))
lines.append(self.__class__.LINE_FORMAT.format(**format_dict))
elif iptype == "ipv4":
# for ipv4
subnet_map = self.__class__.NETWORK_CIDR_MAP.get(value)
if subnet_map:
print("[iptype:ipv4 cc:{0} start:{1} allocated:{2} range:{1}/{3}] valid allocated. OK".format(cc, start, value, subnet_map["cidr"]))
format_dict["cidr"] = subnet_map["cidr"]
format_dict["mask"] = subnet_map["mask"]
lines.append(self.__class__.LINE_FORMAT.format(**format_dict))
if len(lines) == 0:
# for ipv4 invalid allocated
print("[iptype:ipv4 cc:{0} start:{1} allocated:{2}] invalid allocated... start adhoc calclation".format(cc, start, value))
allocated = int(value)
for check_allocated in sorted(self.__class__.NETWORK_CIDR_MAP.keys(), key=int, reverse=True):
if allocated < int(check_allocated):
before_check_allocated = check_allocated
continue
try:
subnet_map = self.__class__.NETWORK_CIDR_MAP.get(check_allocated)
network_str = "{0}/{1}".format(start, subnet_map["cidr"])
print(" trying {0}...".format(network_str))
network = ipaddress.ip_network(network_str)
rest_allocated = allocated - int(check_allocated)
except ValueError as e:
print(type(e), str(e), "challenge next cidr")
continue
else:
format_dict["cidr"] = subnet_map["cidr"]
format_dict["mask"] = subnet_map["mask"]
lines.append(self.__class__.LINE_FORMAT.format(**format_dict))
next_start = network[-1] + 1
print(" calclate network segment", network)
print(" cc:{0} next network start value:{1} rest_allocated:{2}".format(cc, next_start, rest_allocated))
if rest_allocated > 0:
print(" -> [iptype:ipv4 cc:{0} start:{1} allocated:{2}] start recursive make_lines method".format(cc, next_start, str(rest_allocated)))
lines.extend(self.make_lines(iptype, cc, next_start, str(rest_allocated)))
break
return lines
@property
def iptype(self):
return self.__iptype
def __initialize(self):
line = self.__line.strip()
if re.match("^#", line):
return
fields = re.split(self.__class__.SPLIT_SEP, line)
if len(fields) < 7:
return
rir = fields[0]
cc = fields[1]
iptype = fields[2]
start = fields[3]
value = fields[4]
status = fields[6]
if cc == "*" or cc == "":
return
if (iptype == "ipv4" or iptype == "ipv6") and (status == "assigned" or status == "allocated"):
pass
else:
return
lines = self.make_lines(iptype, cc, start, value)
# set property
self.__rir = rir
self.__cc = cc
self.__iptype = iptype
self.__start = start
self.__value = value
self.__status = status
self.__lines.extend(lines)
def __str__(self):
if not self.analyzed():
return ""
return "\n".join(self.__lines) + "\n"
def is_download_real_file(download_file):
if args.force:
return True
if not os.path.exists(download_file):
return True
current_time = time.time()
latest_mtime = os.path.getmtime(download_file)
if (current_time - latest_mtime) > DOWNLOAD_FILE_CACHE:
return True
else:
return False
def download_delegated_latest(delegated_latest_url, download_file):
req = urllib.request.Request(delegated_latest_url)
with urllib.request.urlopen(req) as res:
with open(download_file, "w") as f:
for line in res:
print(line.decode("utf-8").strip(), file=f)
def make_list(rir, delegated_latest_url):
if not rir in args.registry:
return
print("{0}: make_list start".format(rir))
popen_args = ["sort", "-k", "1,1", "-k", "2n"]
download_file = os.path.join(tempfile.gettempdir(), rir) + ".txt.tmp"
if is_download_real_file(download_file):
print("{0}: {1} download start".format(rir, delegated_latest_url))
download_delegated_latest(delegated_latest_url, download_file)
print("{0}: {1} download end".format(rir, delegated_latest_url))
else:
print("{0}: download skip. exit".format(rir))
return
new_ipv4_file = os.path.join(args.output_dir, "{0}-ipv4-latest.txt".format(rir))
new_ipv6_file = os.path.join(args.output_dir, "{0}-ipv6-latest.txt".format(rir))
map = {
'ipv4': { 'new_file': new_ipv4_file },
'ipv6': { 'new_file': new_ipv6_file },
}
for key in map.keys():
map[key]['f'] = open(map[key]['new_file'], "w")
map[key]['proc'] = Popen(popen_args, stdin=PIPE, stdout=map[key]['f'], stderr=STDOUT, universal_newlines=True)
print("{0}: create new file start({1})".format(rir, map[key]['new_file']))
with open(download_file, "r") as f:
for line in f:
# afrinic|KE|ipv4|213.150.96.0|8192|20100301|allocated
# arin|US|ipv6|2001:400::|32|19990803|allocated|04f048163e37eef48d891498545eefc0
data = NetworkData(line)
if not data.analyzed():
continue
map[data.iptype]['proc'].stdin.write(str(data))
for key in map.keys():
map[key]['f'].close()
map[key]['proc'].communicate()
print("{0}: create new file end({1})".format(rir, map[key]['new_file']))
print("{0}: make_list end".format(rir))
def main():
""" [FUNCTIONS] method or functon description
"""
with ThreadPoolExecutor(max_workers=cpu_count()) as executor:
futures = { executor.submit(make_list, rir, delegated_latest_url): rir for rir, delegated_latest_url in DELEGATED_LATEST_MAP.items() }
for future in as_completed(futures):
rir = futures[future]
try:
result = future.result()
except urllib.error.HTTPError as e:
print('{0}: http error'.format(rir))
print(e.read().decode("utf-8").strip())
except Exception as e:
print(type(e))
print(traceback.format_exc())
print('{0}: generated an exception: {1}'.format(rir, e))
else:
print('{0}: completed'.format(rir))
sys.exit(0)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment