Last active
March 2, 2022 22:39
-
-
Save lanbugs/0e35ebbd3055d95522b2dd6a9b2c7558 to your computer and use it in GitHub Desktop.
Ripe CIDR collapsed network list generator with country filter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# ---------------------------------------------------------------------------------------------------------------------- | |
# Ripe CIDR list generator with country filter | |
# -------------------------------------------- | |
# Version 1.0 | |
# Written by Maximilian Thoma 2022 | |
# | |
# Parts used from: https://github.com/FireFart/network_info | |
# Only python3 required, no additional libraries | |
# | |
# You can download ripe.dbs via ftp: | |
# wget ftp://ftp.ripe.net/ripe/dbase/split/ripe.db.inetnum.gz | |
# wget ftp://ftp.ripe.net/ripe/dbase/split/ripe.db.inet6num.gz | |
# | |
# Usage: | |
# ripe_cidr.py -i <db_file> -o <output_file> -c <country code> | |
# | |
# ---------------------------------------------------------------------------------------------------------------------- | |
import gzip | |
import ipaddress | |
import argparse | |
import os.path | |
import re | |
def get_records(input_file): | |
""" | |
Get all records from ripe.db.inetnum/inet6num file | |
:param input_file: file or file zipped | |
:return: list(records) | |
""" | |
if input_file.endswith('.gz'): | |
f = gzip.open(input_file, mode='rt', encoding='ISO-8859-1') | |
else: | |
f = open(input_file, mode='rt', encoding='ISO-8859-1') | |
record = '' | |
records = [] | |
for line in f: | |
if line.startswith(' '): | |
continue | |
if line.strip() == '': | |
if record.startswith('inetnum:') or record.startswith('inet6num:'): | |
records.append(record) | |
record = '' | |
else: | |
record = '' | |
else: | |
record += line | |
f.close() | |
print("Total records in db: %s" % len(records)) | |
return records | |
def parse_property(block, name): | |
""" | |
Parse property from block | |
:param block: ripe info block | |
:param name: value | |
:return: result | |
""" | |
match = re.findall(u'^{0:s}:\s*(.*)$'.format(name), block, re.MULTILINE) | |
if match: | |
return " ".join(match) | |
else: | |
return None | |
def parse_property_inetnum(block): | |
""" | |
Parse property inetnum | |
:param block: ripe info block | |
:return: cidr | |
""" | |
# IPv4 | |
match = re.findall('^inetnum:[\s]*((?:\d{1,3}\.){3}\d{1,3}[\s]*-[\s]*(?:\d{1,3}\.){3}\d{1,3})', block, re.MULTILINE) | |
if match: | |
ip_start = re.findall('^inetnum:[\s]*((?:\d{1,3}\.){3}\d{1,3})[\s]*-[\s]*(?:\d{1,3}\.){3}\d{1,3}', block, re.MULTILINE)[0] | |
ip_end = re.findall('^inetnum:[\s]*(?:\d{1,3}\.){3}\d{1,3}[\s]*-[\s]*((?:\d{1,3}\.){3}\d{1,3})', block, re.MULTILINE)[0] | |
cidrs = [ipaddr for ipaddr in ipaddress.summarize_address_range(ipaddress.IPv4Address(ip_start), ipaddress.IPv4Address(ip_end))] | |
return '{}'.format(str(cidrs[0])) | |
# IPv6 | |
else: | |
match = re.findall('^inet6num:[\s]*([0-9a-fA-F:\/]{1,43})', block, re.MULTILINE) | |
if match: | |
return match[0] | |
def generate_list(records, country_code=None): | |
""" | |
Generate cidr list of records | |
:param records: list of records | |
:param country_code: Country code filter | |
:return: filtered list of cidrs | |
""" | |
result = [] | |
if country_code is None: | |
for record in records: | |
result.append(parse_property_inetnum(record)) | |
else: | |
for record in records: | |
cc = parse_property(record, "country") | |
if cc == country_code.upper(): | |
cidr = parse_property_inetnum(record) | |
result.append(cidr) | |
print("Found %s records filtered by country code %s" % (len(result), country_code)) | |
return result | |
def merge_cidrs(cidrs): | |
""" | |
Merge v4 subnets | |
:param cidrs: list of cidrs | |
:return: merged list of cidrs | |
""" | |
objs = [] | |
for cidr in cidrs: | |
if ":" in cidr: | |
objs.append(ipaddress.IPv6Network(cidr)) | |
else: | |
objs.append(ipaddress.IPv4Network(cidr)) | |
pre_result = [ipaddr for ipaddr in ipaddress.collapse_addresses(objs)] | |
result = [] | |
for r in pre_result: | |
result.append(str(r)) | |
print("Records after merge: %s" % len(result)) | |
return result | |
def main(input_file, output_file, country_code=None): | |
""" | |
Main program | |
:param input_file: Input file | |
:param output_file: Output file | |
:return: True | |
""" | |
if os.path.exists(input_file): | |
records = get_records(input_file) | |
net_list = generate_list(records, country_code) | |
merge_list = merge_cidrs(net_list) | |
with open(output_file, "w") as f: | |
for line in merge_list: | |
f.write("%s\n" % line) | |
else: | |
print("Error: input file not exist.") | |
if __name__ == "__main__": | |
# Args parser | |
parser = argparse.ArgumentParser(description="RipeQ") | |
parser.add_argument("-i", dest="input_file", help="input file extracted or gziped", required=True) | |
parser.add_argument("-o", dest="output_file", help="output file", required=True) | |
parser.add_argument("-c", dest="country_code", help="filter output on country code", required=False) | |
args = parser.parse_args() | |
# Start main program | |
main(args.input_file, args.output_file, args.country_code) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment