Created
May 12, 2023 17:57
-
-
Save lanbugs/2d8163c782d3d0aca289b38231fb7358 to your computer and use it in GitHub Desktop.
Broadcom ProxySG CPL Multimerge
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import yaml | |
from loguru import logger | |
import glob | |
from pprint import pprint | |
import json | |
from netaddr import cidr_merge | |
import ipaddress | |
@logger.catch | |
def cpl_parser(lines): | |
root = {} | |
START = False | |
END = False | |
for line in lines: | |
if re.match('^\;.*', line): | |
continue | |
if re.match('define (condition|subnet|category|action) (.*)', line): | |
# FIND START | |
get_start = re.compile('define (condition|subnet|category|action) (.*)') | |
match = get_start.match(line) | |
TYPE = match.group(1) | |
GROUP_NAME = match.group(2).replace("\"","") | |
if TYPE not in root.keys(): | |
root[TYPE] = {} | |
root[TYPE][GROUP_NAME] = [] | |
START = True | |
END = False | |
if re.match('^(.*)$', line) and START is True and END is False and not re.match('define (condition|subnet|category|action) (.*)', line) and not re.match('^end.*', line): | |
# CONTENT | |
get_content = re.compile('^(.*)$') | |
match = get_content.match(line) | |
if len(match.group(1).strip()) > 1: | |
root[TYPE][GROUP_NAME].append(match.group(1).strip()) | |
if re.match('^end .*$', line) and START is True: | |
# FIND END | |
print(line) | |
END = True | |
return root | |
@logger.catch | |
def main(): | |
buffer = {} | |
buffer['condition'] = {} | |
buffer['subnet'] = {} | |
buffer['category'] = {} | |
buffer['action'] = {} | |
for FILE in glob.glob("src/*.cpl"): | |
with open(FILE, "r") as cpl: | |
for xtype, value in cpl_parser(cpl.readlines()).items(): | |
# extract each group of given type | |
for xgroup, xvalue in value.items(): | |
# create group if not existing | |
if xgroup not in buffer[xtype].keys(): | |
buffer[xtype][xgroup] = [] | |
# merge lists | |
buffer[xtype][xgroup] = buffer[xtype][xgroup] + xvalue | |
with open("results_raw.json", "w") as f: | |
json.dump(buffer, f, indent=4) | |
# remove duplicates | |
for xtype, value in buffer.items(): | |
for xgroup, xvalue in value.items(): | |
buffer[xtype][xgroup] = list(set(xvalue)) | |
with open("results_remove_dup.json", "w") as f: | |
json.dump(buffer, f, indent=4) | |
# merge subnets | |
for xtype, value in buffer.items(): | |
for xgroup, xvalue in value.items(): | |
try: | |
tmp = cidr_merge(xvalue) | |
tmp_list = [] | |
for x in tmp: | |
# for single host addr /32 not required | |
if "/32" in str(x.cidr): | |
tmp_list.append(str(x.cidr).replace("/32","")) | |
else: | |
tmp_list.append(str(x.cidr)) | |
buffer[xtype][xgroup] = tmp_list | |
except: | |
buffer[xtype][xgroup] = xvalue | |
with open("results_merge_subnets.json", "w") as f: | |
json.dump(buffer, f, indent=4) | |
# merge client.address | |
for xtype, value in buffer.items(): | |
for xgroup, xvalue in value.items(): | |
try: | |
# temp buffers | |
buffer_subnets = [] | |
buffer_other = [] | |
for element in xvalue: | |
if "client.address" in element: | |
# split client address | |
head, value = element.split("=") | |
try: # is single ip? | |
ipaddress.ip_address(value) | |
buffer_subnets.append(value) | |
except: | |
try: # is subnet ? | |
ipaddress.ip_network(value) | |
buffer_subnets.append(value) | |
except: # add to crap | |
buffer_other.append(value) | |
#print(buffer_subnets) | |
#print(buffer_other) | |
# merge subnets | |
tmp = cidr_merge(buffer_subnets) | |
tmp_list = [] | |
for x in tmp: | |
# for single host addr /32 not required | |
if "/32" in str(x.cidr): | |
tmp_list.append(str(x.cidr).replace("/32", "")) | |
else: | |
tmp_list.append(str(x.cidr)) | |
xresult = [] | |
for element in tmp_list: | |
xresult.append(f"client.address={element}") | |
for element in buffer_other: | |
xresult.append(f"client.address={element}") | |
if len(xresult) != 0: | |
buffer[xtype][xgroup] = xresult | |
except: | |
pass | |
with open("results_final.json", "w") as f: | |
json.dump(buffer, f, indent=4) | |
with open("results.txt", "w") as f: | |
# build final cpl | |
for xtype, value in buffer.items(): | |
for xgroup, xvalue in value.items(): | |
f.write(f"define {xtype} {xgroup}\n") | |
for line in xvalue: | |
f.write(f" {line}\n") | |
f.write(f"end {xtype} {xgroup}\n") | |
f.write("\n") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment