Created
November 22, 2022 08:59
-
-
Save Heidistein/114baada5dd02cc174398d77418c2afd to your computer and use it in GitHub Desktop.
cidrmerge
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Author: Arjen Heidinga <dexter@beetjevreemd.nl> | |
# Licence: GPL | |
# A script that compresses/merges/consolidates a list of CIDR addresses. | |
# It will remove prefixes that are already contained in a 'supernet'. | |
# It will try to combine smaller subnets into a supernet (ie find both /24's of a /23) | |
# It will toss out prefixes that are larger than specified (nothing more specific than /24) | |
# Code is tries and tested, it probably contains bugs, is used in production. | |
# It is not very pythonic, for speed reasons. It modifies the same set, because the set is | |
# very close to the CPython interpreter. | |
import sys | |
import ipaddress | |
from itertools import product | |
def dbg(*msg): | |
print(*msg, file=sys.stderr) | |
class CidrCompress: | |
def _remove_from_set(self, needlebag:set, haystack:set): | |
""" | |
Remove all needles in needlebag from the haystack | |
""" | |
dbg(f"removing {len(needlebag)} items") | |
for needle in needlebag: | |
haystack.remove(needle) | |
def _add_to_set(self, needlebag:set, haystack:set): | |
""" | |
Remove all needles in needlebag from the haystack | |
""" | |
dbg(f"adding {len(needlebag)} items") | |
for needle in needlebag: | |
haystack.add(needle) | |
def _remove_larger_prefixes(self, lst: set, pref:int) -> int: | |
""" | |
remove all prefixes that are larger (more specific) then x | |
@param lst: Work on this list | |
@param pref: the prefix. | |
@return: The number of items removed | |
""" | |
dbg(f"looking for larger than {pref} prefixed") | |
rem = set() | |
for i in lst: | |
if i.prefixlen > pref: | |
rem.add(i) | |
self._remove_from_set(rem, lst) | |
return len(rem) | |
def _has_supernet_in_list(self, net, lst:set): | |
""" | |
Check if the net had any supernets in list | |
""" | |
# Start by finding all supernets upto /0 | |
supernet = net.supernet() | |
while supernet.prefixlen > 0: | |
#dbg(f"checking {supernet}") | |
if supernet in lst: | |
return True | |
supernet = supernet.supernet() | |
return False | |
def _remove_with_superset(self, lst:set) -> (int, int): | |
""" | |
remove entries that are contained within another. | |
@return removed | |
""" | |
dbg("Looking for containing prefixes") | |
rem = set() | |
for net in lst: | |
if self._has_supernet_in_list(net, lst): | |
#dbg(f"supernet of {net} in list (supernet), removing") | |
rem.add(net) | |
next | |
self._remove_from_set(rem, lst) | |
return len(rem) | |
def _combine_subnets_in_supernet(self, lst:set) -> (int, int): | |
""" | |
Combine smaller subnets into larger supernets. | |
@return (removed, added) | |
""" | |
dbg("Looking for combineable prefixes") | |
rem = set() | |
add = set() | |
for net in lst: | |
if net in rem: | |
next | |
supernet = net.supernet() | |
subnets = list(supernet.subnets()) | |
subnets.remove(net) | |
onet=subnets[0] | |
if onet in lst: | |
#dbg(f"removing {net} and {onet}, adding {supernet}") | |
rem.add(net) | |
rem.add(onet) | |
add.add(supernet) | |
next | |
self._remove_from_set(rem, lst) | |
self._add_to_set(add, lst) | |
return (len(rem), len(add)) | |
def _compress(self, lst:set, minpreflen: int) -> int: | |
dbg("compressing") | |
removed = 0 | |
added = 0 | |
rs = self._remove_with_superset(lst) | |
removed += rs | |
while True: | |
(rc,ac) = self._combine_subnets_in_supernet(lst) | |
removed += rc | |
added += ac | |
if ac == 0 and rc == 0: | |
break | |
removed += self._remove_larger_prefixes(lst, minpreflen) | |
return (added,removed) | |
def compress4(self, lst:set, minpreflen: int = 24) -> int: | |
return self._compress (lst, minpreflen) | |
def compress6(self, lst:set, minpreflen: int = 48) -> int: | |
return self._compress (lst, minpreflen) | |
def main(): | |
deset=set() | |
dbg("reading from stdin") | |
# with open("/home/dexter/tmp/bgp/frysix6") as file: | |
# lines = file.readlines(); | |
# for line in lines: | |
# deset.add(ipaddress.IPv6Network(line.strip())) | |
for line in sys.stdin: | |
deset.add(ipaddress.ip_network(line.strip())) | |
startlen=len(deset) | |
# vier of zes? | |
# En hoezo kent set geen peek? | |
ip = deset.pop() | |
deset.add(ip) | |
if isinstance(ip, ipaddress.IPv4Network): | |
added,removed = CidrCompress().compress4(deset) | |
else: | |
added,removed = CidrCompress().compress6(deset) | |
dbg(f"Entries:\n orig: {startlen}\n removed: {removed}\n added: {added}\n result: {len(deset)}\n diff: {len(deset)-startlen}") | |
# with open("/home/dexter/tmp/bgp/frysix6-out",'w') as file: | |
# file.writelines([f"{x.with_prefixlen}\n" for x in deset]) | |
for line in [x.with_prefixlen for x in deset]: | |
print(line) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment