Last active
March 28, 2019 12:21
-
-
Save mateobur/61b86311cee43e596f1a06d725f16a04 to your computer and use it in GitHub Desktop.
Kube-system Kubernetes security with Sysdig Secure (Autogenerate Python script)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import yaml | |
import sys | |
import collections | |
from collections import OrderedDict as OD | |
def dict_representer(dumper, data): | |
return dumper.represent_dict(data.iteritems()) | |
def create_rule(rule, desc, condition, output, priority): | |
return OD([('rule', rule), ('desc', desc), ('condition', condition), | |
('output', output), ('priority', priority)]) | |
def generate_pod_rules(pod): | |
pod_rules = [] | |
podname = pod['podname'] | |
# Authorized process list | |
proc_list = OD([('list', podname + '_authorized_processes'), | |
('items', pod['proc'])]) | |
pod_rules.append(proc_list) | |
proc_rule = create_rule( | |
podname + | |
' allowed processes', | |
'Whitelist of authorized ' + | |
podname + | |
' processes', | |
'spawned_process and not proc.name in (' + | |
podname + | |
'_authorized_processes)', | |
'Unauthorized process (%proc.cmdline) running in (%container.id)', | |
'ERROR') | |
pod_rules.append(proc_rule) | |
# Write directories | |
if pod['write_dir']: | |
write_dir_list = pod['write_dir'] | |
write_dir_macro = "" | |
for dir in write_dir_list[:-1]: | |
write_dir_macro += "evt.arg[1] startswith " + dir + " or " | |
write_dir_macro += "evt.arg[1] startswith " + write_dir_list[-1] | |
write_dir_yaml = OD( | |
[('macro', podname + '_write_allowed_directories'), | |
('condition', write_dir_macro)]) | |
pod_rules.append(write_dir_yaml) | |
write_condition = 'open_write and not ' + podname + '_write_allowed_directories' | |
else: | |
write_condition = 'open_write' | |
write_rule = create_rule( | |
'Write to non write allowed dir (' + podname + ')', | |
'attempt to write to directories that should be immutable', | |
write_condition, | |
'Writing to non write allowed dir (user=%user.name command=%proc.cmdline file=%fd.name)', | |
'ERROR') | |
pod_rules.append(write_rule) | |
outbound_condition = 'outbound' | |
if pod['outbound_proc']: | |
outbound_list = OD( | |
[('list', podname + '_outbound_processes'), ('items', pod['outbound_proc'])]) | |
pod_rules.append(outbound_list) | |
outbound_condition += ' and not proc.name in (' + \ | |
podname + '_outbound_processes)' | |
outbound_rule = create_rule( | |
'Unauthorized process opened an outbound connection (' + podname + ')', | |
'A ' + podname + ' process tried to open an outbound connection and is not whitelisted', | |
outbound_condition, | |
'Non-whitelisted process opened an outbound connection (command=%proc.cmdline connection=%fd.name)', | |
'WARNING') | |
pod_rules.append(outbound_rule) | |
listen_list = OD([('list', podname + '_listen_port_proc'), | |
('items', pod['listen_proc'])]) | |
pod_rules.append(listen_list) | |
listen_rule = create_rule( | |
'Unauthorized process opened a port (' + podname + ')', | |
'A ' + podname + ' process tried to open a port and is not whitelisted', | |
'evt.type=listen and not proc.name in (' + podname + '_listen_port_proc)', | |
'Non-whitelisted process opened a port (command=%proc.cmdline connection=%fd.name)', | |
'WARNING') | |
pod_rules.append(listen_rule) | |
return yaml.dump(pod_rules, default_flow_style=False) | |
rule_input = open(sys.argv[1], 'r') | |
yaml_input = yaml.load(rule_input) | |
yaml.add_representer(collections.OrderedDict, dict_representer) | |
ruleset = "" | |
for pod in yaml_input: | |
ruleset += generate_pod_rules(pod) | |
ruleset = ruleset.replace('- list', '\n- list') | |
ruleset = ruleset.replace('- macro', '\n- macro') | |
ruleset = ruleset.replace('- rule', '\n- rule') | |
print ruleset |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment