Skip to content

Instantly share code, notes, and snippets.

@kapilt
Created November 5, 2016 16:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kapilt/b50060c4a563d6861ee2977cf6d1bacc to your computer and use it in GitHub Desktop.
Save kapilt/b50060c4a563d6861ee2977cf6d1bacc to your computer and use it in GitHub Desktop.
from common import BaseTest
class UnitTest(BaseTest):
def test_ingress_remove(self):
# replay a recorded session
factory = self.replay_flight_data(
'test_security_group_ingress_filter')
# record a new session
#factory = self.record_flight_data(
# 'test_security_group_ingress_filter')
# factory corresponds to c7n.credentials.SessionFactory
# Create the test infrastructure that the test will checking
# this is best practice, we do have many tests that setup the
# environment external to the test for resources that take time
# to create (rds, etc), ideally for them is to document the setup
# in comments/docstrings
# This test setups a vpc, and security group to test a policy against
client = factory().client('ec2')
vpc_id = client.create_vpc(CidrBlock="10.4.0.0/16")['Vpc']['VpcId']
sg_id = client.create_security_group(
GroupName="web-tier",
VpcId=vpc_id,
Description="for apps")['GroupId']
# The tests that create infrastructure, should leave the environment clean
# by cleaning up after themselves.
self.addCleanup(client.delete_vpc, VpcId=vpc_id)
client.authorize_security_group_ingress(
GroupId=sg_id,
IpProtocol='tcp',
FromPort=0,
ToPort=62000,
CidrIp='10.2.0.0/16')
self.addCleanup(client.delete_security_group, GroupId=sg_id)
# Now run the policy
p = self.load_policy({
'name': 'sg-find',
'resource': 'security-group',
'filters': [
{'VpcId': vpc_id},
{'type': 'ingress',
'IpProtocol': 'tcp',
'FromPort': 0},
{'GroupName': 'web-tier'}],
'actions': [
{'type': 'remove-permissions',
'ingress': 'matched'}]},
# Note we pass in the session factory thats placebo attached
session_factory=factory)
resources = p.run()
# Verify that we matched the resource and our action worked.
self.assertEqual(len(resources), 1)
self.assertEqual(resources[0]['GroupId'], sg_id)
group_info = client.describe_security_groups(
GroupIds=[sg_id])['SecurityGroups'][0]
self.assertEqual(group_info.get('IpPermissions', []), [])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment