Created
December 6, 2023 16:06
-
-
Save airween/3626ad32188ed27a514385acc9d613be to your computer and use it in GitHub Desktop.
Collect CAPEC tags from CRS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import argparse | |
import sys | |
import msc_pyparser | |
import json | |
# use: | |
# ./crs-data-collector.py -r ~/src/coreruleset/rules/*.conf | jq . | less | |
def collect(s, idtag, tagid): | |
# walk a parsed structure | |
# for the details see the project's site: | |
# https://github.com/digitalwave/msc_pyparser#inside-of-structure | |
# 's' is a list of structures | |
# each member represents a rule (or comment or other directive) | |
for r in s: | |
# we want to handle only SecRule and SecAction directives | |
if r['type'].lower() in ["secrule", "secaction"]: | |
# if the item has actions | |
if "actions" in r: | |
acts = {'id': 0, 'capectags': []} | |
# iterate over the list of actions | |
for a in r['actions']: | |
# find the 'id:...' | |
if a['act_name'].lower() == "id": | |
acts['id'] = int(a['act_arg']) | |
# find the 'tag' | |
if a['act_name'].lower() == "tag": | |
# and if the tag is 'capec/...' | |
if a['act_arg'].lower()[0:6] == "capec/": | |
acts['capectags'].append(a['act_arg']) | |
if a['act_arg'] not in tagid: | |
tagid[a['act_arg']] = {'cnt': 0, 'ids': []} | |
tagid[a['act_arg']]['cnt'] += 1 | |
tagid[a['act_arg']]['ids'].append(acts['id']) | |
if len(acts['capectags']) > 0: | |
idtag[acts['id']] = acts['capectags'] | |
return True | |
if __name__ == "__main__": | |
# parse arguments | |
parser = argparse.ArgumentParser(description="CRS data collector") | |
#parser.add_argument("-o", "--output", dest="output", help="Output format json[default]|csv", required=False) | |
parser.add_argument("-r", "--rules", metavar='/path/to/coreruleset/*.conf', type=str, | |
nargs='*', help='Directory path to CRS rules', required=True, | |
action="append") | |
args = parser.parse_args() | |
# append crs paths to a list | |
crspath = [] | |
for l in args.rules: | |
crspath += l | |
# check output format - later if needs | |
"""if args.output is not None: | |
if args.output not in ["json", "csv"]: | |
print("--output can be one of the 'json' or 'csv'. Default value is 'json'") | |
sys.exit(1) | |
oformat = args.output""" | |
# try to read files from given path | |
try: | |
flist = crspath | |
flist.sort() | |
except: | |
print("Can't open files in given path!") | |
sys.exit(1) | |
if len(flist) == 0: | |
print("List of files is empty!") | |
sys.exit(1) | |
# list for data collection | |
idtag = {} | |
tagid = {} | |
# check files | |
for f in flist: | |
if f[-5:] != ".conf": | |
continue | |
try: | |
with open(f, 'r') as inputfile: | |
data = inputfile.read() | |
except: | |
print("Can't open file: %s" % f) | |
sys.exit(1) | |
# create a parser and parse the file into a struct | |
try: | |
mparser = msc_pyparser.MSCParser() | |
mparser.parser.parse(data) | |
except Exception as e: | |
err = e.args[1] | |
if err['cause'] == "lexer": | |
cause = "Lexer" | |
else: | |
cause = "Parser" | |
print("Can't parse config file: %s, cause: %s" % (f, cause)) | |
sys.exit(1) | |
collect(mparser.configlines, idtag, tagid) | |
print(json.dumps({'ids': idtag, 'tags': tagid})) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment