Skip to content

Instantly share code, notes, and snippets.

@andrewalexander
Created October 5, 2016 17:52
Show Gist options
  • Save andrewalexander/62dd690d6fcc2f560c47f796b025fbd2 to your computer and use it in GitHub Desktop.
Save andrewalexander/62dd690d6fcc2f560c47f796b025fbd2 to your computer and use it in GitHub Desktop.
import os
import uuid
from c7n.policy import load
from c7n.resources import load_resources
from c7n.utils import Bag, yaml_load
def convert_sg_to_describe(event):
# convert CFT event to the format custodian expects (describe_security_groups)
# TODO: want some kind of schema validation for the event
sg = event['ResourceProperties']
incoming_egress = sg.get('SecurityGroupEgress', [])
incoming_ingress = sg.get('SecurityGroupIngress', [])
# convert egress
egress = []
for index, rule in enumerate(incoming_egress):
egress.append({})
# Make sure to set IP Protocol to 'all' (-1) if we don't set ports
if not rule.get('FromPort') and not rule.get('ToPort'):
egress[index]['IpProtocol'] = "-1"
else:
egress[index]['IpProtocol'] = rule.get('IpProtocol')
egress[index]['FromPort'] = int(rule.get('FromPort'))
egress[index]['ToPort'] = int(rule.get('ToPort'))
egress[index]['IpRanges'] = [rule.get('CidrIp')] or []
egress[index]['UserIdGroupPairs'] = list(rule.get('DestinationSecurityGroupId') or [])
# not sure if we can support these in CloudFormation, but the Describe call has it
egress[index]['PrefixListIds'] = []
# convert ingress
ingress = []
for index, rule in enumerate(incoming_ingress):
ingress.append({})
# Make sure to set IP Protocol to 'all' (-1) if we don't set ports
if not rule.get('FromPort') and not rule.get('ToPort'):
ingress[index]['IpProtocol'] = "-1"
else:
ingress[index]['IpProtocol'] = rule.get('IpProtocol')
ingress[index]['FromPort'] = int(rule.get('FromPort'))
ingress[index]['ToPort'] = int(rule.get('ToPort'))
ingress[index]['IpRanges'] = [rule.get('CidrIp')] or []
if rule.get('SourceSecurityGroupOwnerId'):
ingress[index]['UserIdGroupPairs'] = list('%s/%s' % (
rule['SourceSecurityGroupOwnerId'],
rule.get('SourceSecurityGroupId'))
or [])
else:
ingress[index]['UserIdGroupPairs'] = list(rule.get('SourceSecurityGroupId') or [])
ingress[index]['PrefixListIds'] = []
# convert tags:
tags = []
for tag in sg['Tags']:
for k, v in tag.iteritems():
tags.append({'Key': k, 'Value': v})
# build final dict
describe_dict = {
'SecurityGroups': [{
'IpPermissionsEgress': egress,
'Description': sg['GroupDescription'],
'Tags': tags,
'IpPermissions': ingress,
'GroupName': sg['GroupName'],
'VpcId': sg['VpcId'],
'OwnerId': event['accountId'],
'GroupId': event['groupId']
}]
}
return describe_dict
def main():
load_resources()
config = Bag({
'region': os.environ.get('AWS_DEFAULT_REGION', 'us-east-1'),
'cache': '',
'profile': None,
'assume_role': None,
'log_group': None,
'metrics_enabled': True,
'output_dir': '/tmp/' + str(uuid.uuid4()),
'cache_period': 0,
'dryrun': False})
incoming_event = {
"ResourceProperties": {
"GroupName": "TestInternetSG",
"GroupDescription": "Typical Internet-Facing Security Group",
"VpcId": "vpc-1234abcd",
"SecurityGroupIngress": [{
"CidrIp" : '10.0.0.0/8',
"FromPort" : 53,
"IpProtocol" : 'tcp',
"ToPort" : 53
}],
"SecurityGroupEgress": [],
"Tags": [{
"Key": "Name",
"Value": "InternetSecurityGroup"
}
]
},
'accountId': '123456789012',
'groupId': 'sg-abcd1234'
}
cfn_dict = convert_sg_to_describe(incoming_event)
policies = load(config, 'sg-rules.yml')
fake_security_groups = cfn_dict['SecurityGroups']
for p in policies:
print p.name
print p.get_resource_manager().filter_resources(fake_security_groups)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment