Skip to content

Instantly share code, notes, and snippets.

@steder
Created December 19, 2011 19:11
Show Gist options
  • Star 34 You must be signed in to star a gist
  • Fork 22 You must be signed in to fork a gist
  • Save steder/1498451 to your computer and use it in GitHub Desktop.
Save steder/1498451 to your computer and use it in GitHub Desktop.
Create and update AWS security groups using Python and Boto.
#!/usr/bin/env python
"""
Recipe for creating and updating security groups programmatically.
"""
import collections
import boto
# Follow instruction at http://www.datastax.com/docs/1.0/install/install_ami
# to define the cluster security group rules and client security group rules.
SecurityGroupRule = collections.namedtuple("SecurityGroupRule", ["ip_protocol", "from_port", "to_port", "cidr_ip", "src_group_name"])
CASSANDRA_RULES = [
SecurityGroupRule("tcp", "22", "22", "0.0.0.0/0", None),
SecurityGroupRule("tcp", "1024", "65535", "0.0.0.0/0", "Cassandra Cluster"),
SecurityGroupRule("tcp", "7000", "7000", "0.0.0.0/0", "Cassandra Cluster"),
SecurityGroupRule("tcp", "61620", "61621", "0.0.0.0/0", "Cassandra Cluster"),
SecurityGroupRule("tcp", "7199", "7199", "0.0.0.0/0", None),
SecurityGroupRule("tcp", "8888", "8888", "0.0.0.0/0", None),
SecurityGroupRule("tcp", "8983", "8983", "0.0.0.0/0", None),
SecurityGroupRule("tcp", "9160", "9160", "0.0.0.0/0", None),
]
TEST_RULES = [
# ssh makes life possible
SecurityGroupRule("tcp", "22", "22", "0.0.0.0/0", None),
]
SECURITY_GROUPS = [("Cassandra Cluster", CASSANDRA_RULES),
("Test", TEST_RULES)
]
def get_or_create_security_group(c, group_name, description=""):
"""
"""
groups = [g for g in c.get_all_security_groups() if g.name == group_name]
group = groups[0] if groups else None
if not group:
print "Creating group '%s'..."%(group_name,)
group = c.create_security_group(group_name, "A group for %s"%(group_name,))
return group
def modify_sg(c, group, rule, authorize=False, revoke=False):
src_group = None
if rule.src_group_name:
src_group = c.get_all_security_groups([rule.src_group_name,])[0]
if authorize and not revoke:
print "Authorizing missing rule %s..."%(rule,)
group.authorize(ip_protocol=rule.ip_protocol,
from_port=rule.from_port,
to_port=rule.to_port,
cidr_ip=rule.cidr_ip,
src_group=src_group)
elif not authorize and revoke:
print "Revoking unexpected rule %s..."%(rule,)
group.revoke(ip_protocol=rule.ip_protocol,
from_port=rule.from_port,
to_port=rule.to_port,
cidr_ip=rule.cidr_ip,
src_group=src_group)
def authorize(c, group, rule):
"""Authorize `rule` on `group`."""
return modify_sg(c, group, rule, authorize=True)
def revoke(c, group, rule):
"""Revoke `rule` on `group`."""
return modify_sg(c, group, rule, revoke=True)
def update_security_group(c, group, expected_rules):
"""
"""
print 'Updating group "%s"...'%(group.name,)
import pprint
print "Expected Rules:"
pprint.pprint(expected_rules)
current_rules = []
for rule in group.rules:
if not rule.grants[0].cidr_ip:
current_rule = SecurityGroupRule(rule.ip_protocol,
rule.from_port,
rule.to_port,
"0.0.0.0/0",
rule.grants[0].name)
else:
current_rule = SecurityGroupRule(rule.ip_protocol,
rule.from_port,
rule.to_port,
rule.grants[0].cidr_ip,
None)
if current_rule not in expected_rules:
revoke(c, group, current_rule)
else:
current_rules.append(current_rule)
print "Current Rules:"
pprint.pprint(current_rules)
for rule in expected_rules:
if rule not in current_rules:
authorize(c, group, rule)
def create_security_groups():
"""
attempts to be idempotent:
if the sg does not exist create it,
otherwise just check that the security group contains the rules
we expect it to contain and updates it if it does not.
"""
c = boto.connect_ec2()
for group_name, rules in SECURITY_GROUPS:
group = get_or_create_security_group(c, group_name)
update_security_group(c, group, rules)
if __name__=="__main__":
create_security_groups()
@ozbillwang
Copy link

src_group_name format has been changed to sg-XXXXXXXX-owner_id, but in your code, rule.grants[0].name is always None, so I got this error:

InvalidPermission.NotFoundThe specified rule does not exist in this security group.

It is fine to create a new security group, then I see the same error as @gyoza, if I change one rule (for example, change a from_port).

Another issue is, cidr_ip can be list. This line need be fixed.

https://gist.github.com/steder/1498451#file-aws_sg_recipe-py-L100

@jpatallah
Copy link

Lines 89-101 should be:

for rule in group.rules:
    for grant in rule.grants:
        if not grant.cidr_ip:
            current_rule = SecurityGroupRule(rule.ip_protocol,
                                             rule.from_port,
                                             rule.to_port,
                                             "0.0.0.0/0",
                                             grant.name)
        else:
            current_rule = SecurityGroupRule(rule.ip_protocol,
                                             rule.from_port,
                                             rule.to_port,
                                             grant.cidr_ip,
                                             None)

@presho-anna
Copy link

I am trying to run the update method, and I have received "AttributeError: 'tuple' object has no attribute 'name' ".
This is what I have tried:
update_security_group(boto.connect_ec2(), SECURITY_GROUPS[0], ["tcp", "5000", "5000", "0.0.0.0/0", "Test"])

Any reason why it's failing?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment