Skip to content

Instantly share code, notes, and snippets.

@i-sam
Last active August 29, 2015 14:12
Show Gist options
  • Save i-sam/351b2e886d9d2b3f03ed to your computer and use it in GitHub Desktop.
Save i-sam/351b2e886d9d2b3f03ed to your computer and use it in GitHub Desktop.
#!/usr/bin/python
"""
@author Sam (i-sam on github)
@description: tool for manage aws SG over the regions
It requires boto package and set env variables
AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY
AWS_REGIONS (optional)
Usage example to add new rule for DEV-ENV SG:
sg_manage.py open -n DEV-ENV -p 1200-1205 -c 210.168.120.23/32
sg_manage.py close -n DEV-ENV -p 1200-1205 -c 210.168.120.23/32
sg_manage.py show -n DEV-ENV
sg_manage.py list
"""
import boto.ec2
import argparse
import os
DEFAULT_REGIONS = os.environ.get('AWS_REGIONS') or 'eu-west-1,ap-southeast-1'
ANY = '0.0.0.0/0'
def connect(region="eu-west-1", aws_id='', aws_key=''):
aws_id = aws_id or os.environ.get('AWS_ACCESS_KEY_ID')
aws_key = aws_key or os.environ.get('AWS_SECRET_ACCESS_KEY')
return boto.ec2.connect_to_region(region, aws_access_key_id=aws_id,
aws_secret_access_key=aws_key)
def get_sg_list(creds, region=''):
if region:
conn = connect(region, creds['aws_id'], creds['aws_key'])
print 'Reconnect to region %s' % region
return conn.get_all_security_groups()
def add_rule(sg, from_port, to_port=None, cidr=ANY, proto='tcp'):
print 'adding rule %s %s %s %s' % (proto, from_port, to_port or from_port, cidr)
sg.authorize(proto, from_port, to_port or from_port, cidr)
def remove_rule(sg, from_port, to_port=None, cidr=ANY, proto='tcp'):
print 'revoking rule %s %s %s %s' % (proto, from_port, to_port or from_port, cidr)
sg.revoke(proto, from_port, to_port or from_port, cidr)
def main():
argp = argparse.ArgumentParser(description=__doc__)
subparsers = argp.add_subparsers(dest='command', help='List of commands')
list_parser = subparsers.add_parser('list', help='List of SG in region')
list_parser.add_argument('-r', '--regions', dest='regions', default='eu-west-1',
help='regions')
# add rule
add_parser = subparsers.add_parser('open', help='allow access rule for SG')
add_parser.add_argument('-n', '--name', dest='name', default='',
help='SecurityGroup name', required=True)
add_parser.add_argument('-p', '--port', dest='port', default='',
help='port number', required=True)
add_parser.add_argument('-c', '--cidr', dest='cidr', default=ANY,
help='cidr - ip range of allowed addresses')
# revoke rule
revoke_parser = subparsers.add_parser('close', help='revoke rule from SG')
revoke_parser.add_argument('-n', '--name', dest='name', default='',
help='SecurityGroup name', required=True)
revoke_parser.add_argument('-p', '--port', dest='port', default='',
help='port number', required=True)
revoke_parser.add_argument('-c', '--cidr', dest='cidr', default=ANY,
help='cidr - ip range of allowed addresses')
# add rule
show_parser = subparsers.add_parser('show', help='show rules of SG')
show_parser.add_argument('-n', '--name', dest='name', default='',
help='SecurityGroup name', required=True)
argp.add_argument('--id', dest='aws_id', default='',
help='AWS_ACCESS_KEY_ID will try to take from environment')
argp.add_argument('--key', dest='aws_key', default='',
help='AWS_SECRET_ACCESS_KEY will try to take from environment')
argp.add_argument('--regions', dest='regions', default=DEFAULT_REGIONS,
help='regions that we use')
args = argp.parse_args()
conn_creds = {'aws_id': args.aws_id, 'aws_key': args.aws_key}
sg_detected = None
if hasattr(args, 'port'):
if '-' in args.port:
from_p, to_p = args.port.split('-')
else:
from_p = to_p = args.port
for region in [el.strip() for el in args.regions.split(',')]:
if sg_detected:
break
sgs = get_sg_list(conn_creds, region)
if args.command == 'list':
print [el.name for el in sgs]
continue
for sg in sgs:
if args.name == sg.name:
sg_detected = True
if args.command == 'open':
add_rule(sg, from_port=from_p, to_port=to_p, cidr=args.cidr)
elif args.command == 'close':
remove_rule(sg, from_port=from_p, to_port=to_p, cidr=args.cidr)
elif args.command == 'show':
print [(el.ip_protocol, el.from_port, el.to_port, el.grants) for el in sg.rules]
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment