Skip to content

Instantly share code, notes, and snippets.

@Heidistein
Created November 22, 2022 08:59
Show Gist options
  • Save Heidistein/114baada5dd02cc174398d77418c2afd to your computer and use it in GitHub Desktop.
Save Heidistein/114baada5dd02cc174398d77418c2afd to your computer and use it in GitHub Desktop.
cidrmerge
#!/usr/bin/python
# Author: Arjen Heidinga <dexter@beetjevreemd.nl>
# Licence: GPL
# A script that compresses/merges/consolidates a list of CIDR addresses.
# It will remove prefixes that are already contained in a 'supernet'.
# It will try to combine smaller subnets into a supernet (ie find both /24's of a /23)
# It will toss out prefixes that are larger than specified (nothing more specific than /24)
# Code is tries and tested, it probably contains bugs, is used in production.
# It is not very pythonic, for speed reasons. It modifies the same set, because the set is
# very close to the CPython interpreter.
import sys
import ipaddress
from itertools import product
def dbg(*msg):
print(*msg, file=sys.stderr)
class CidrCompress:
def _remove_from_set(self, needlebag:set, haystack:set):
"""
Remove all needles in needlebag from the haystack
"""
dbg(f"removing {len(needlebag)} items")
for needle in needlebag:
haystack.remove(needle)
def _add_to_set(self, needlebag:set, haystack:set):
"""
Remove all needles in needlebag from the haystack
"""
dbg(f"adding {len(needlebag)} items")
for needle in needlebag:
haystack.add(needle)
def _remove_larger_prefixes(self, lst: set, pref:int) -> int:
"""
remove all prefixes that are larger (more specific) then x
@param lst: Work on this list
@param pref: the prefix.
@return: The number of items removed
"""
dbg(f"looking for larger than {pref} prefixed")
rem = set()
for i in lst:
if i.prefixlen > pref:
rem.add(i)
self._remove_from_set(rem, lst)
return len(rem)
def _has_supernet_in_list(self, net, lst:set):
"""
Check if the net had any supernets in list
"""
# Start by finding all supernets upto /0
supernet = net.supernet()
while supernet.prefixlen > 0:
#dbg(f"checking {supernet}")
if supernet in lst:
return True
supernet = supernet.supernet()
return False
def _remove_with_superset(self, lst:set) -> (int, int):
"""
remove entries that are contained within another.
@return removed
"""
dbg("Looking for containing prefixes")
rem = set()
for net in lst:
if self._has_supernet_in_list(net, lst):
#dbg(f"supernet of {net} in list (supernet), removing")
rem.add(net)
next
self._remove_from_set(rem, lst)
return len(rem)
def _combine_subnets_in_supernet(self, lst:set) -> (int, int):
"""
Combine smaller subnets into larger supernets.
@return (removed, added)
"""
dbg("Looking for combineable prefixes")
rem = set()
add = set()
for net in lst:
if net in rem:
next
supernet = net.supernet()
subnets = list(supernet.subnets())
subnets.remove(net)
onet=subnets[0]
if onet in lst:
#dbg(f"removing {net} and {onet}, adding {supernet}")
rem.add(net)
rem.add(onet)
add.add(supernet)
next
self._remove_from_set(rem, lst)
self._add_to_set(add, lst)
return (len(rem), len(add))
def _compress(self, lst:set, minpreflen: int) -> int:
dbg("compressing")
removed = 0
added = 0
rs = self._remove_with_superset(lst)
removed += rs
while True:
(rc,ac) = self._combine_subnets_in_supernet(lst)
removed += rc
added += ac
if ac == 0 and rc == 0:
break
removed += self._remove_larger_prefixes(lst, minpreflen)
return (added,removed)
def compress4(self, lst:set, minpreflen: int = 24) -> int:
return self._compress (lst, minpreflen)
def compress6(self, lst:set, minpreflen: int = 48) -> int:
return self._compress (lst, minpreflen)
def main():
deset=set()
dbg("reading from stdin")
# with open("/home/dexter/tmp/bgp/frysix6") as file:
# lines = file.readlines();
# for line in lines:
# deset.add(ipaddress.IPv6Network(line.strip()))
for line in sys.stdin:
deset.add(ipaddress.ip_network(line.strip()))
startlen=len(deset)
# vier of zes?
# En hoezo kent set geen peek?
ip = deset.pop()
deset.add(ip)
if isinstance(ip, ipaddress.IPv4Network):
added,removed = CidrCompress().compress4(deset)
else:
added,removed = CidrCompress().compress6(deset)
dbg(f"Entries:\n orig: {startlen}\n removed: {removed}\n added: {added}\n result: {len(deset)}\n diff: {len(deset)-startlen}")
# with open("/home/dexter/tmp/bgp/frysix6-out",'w') as file:
# file.writelines([f"{x.with_prefixlen}\n" for x in deset])
for line in [x.with_prefixlen for x in deset]:
print(line)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment