Skip to content

Instantly share code, notes, and snippets.

@shaon
Created March 20, 2014 06:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shaon/7007ecfc63d7d023aa04 to your computer and use it in GitHub Desktop.
Save shaon/7007ecfc63d7d023aa04 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from boto.exception import EC2ResponseError
from eucaops import Eucaops
from eutester.eutestcase import EutesterTestCase
import time
import os
class EC2IamTest(EutesterTestCase):
def __init__(self):
self.setuptestcase()
self.setup_parser()
self.parser.add_argument("--user-name", default="admin")
self.unique_number = str(int(time.time()))
self.parser.add_argument("--user-account", default="eutester-account" + self.unique_number)
self.get_args()
self.tester = Eucaops( config_file=self.args.config, password=self.args.password )
def clean_method(self):
pass
def CreateResourcesTest(self):
self.groups = { 'eutester_allusers': [],
'eutester_admins': ['user_admin01', 'user_admin02', 'user_admin03'],
'eutester_developers': ['user_developer01', 'user_developer02', 'user_developer03', 'user_developer04', 'user_developer05'],
'eutester_managers': ['user_manager01', 'user_manager02', 'user_manager03'],
'eutester_sysadmins': ['user_sysadmin01', 'user_sysadmin02'] }
self.account = self.tester.create_account(self.args.user_account)
keys = self.tester.create_access_key(self.args.user_name, self.args.user_account)
access_key = keys['access_key_id']
secret_key = keys['secret_access_key']
self.account_tester = Eucaops(aws_access_key_id=access_key, aws_secret_access_key=secret_key,
ec2_ip=self.tester.ec2.host, s3_ip=self.tester.s3.host, s3_path=self.tester.get_s3_path(),
username=self.args.user_name, account=self.args.user_account)
for group, users in self.groups.iteritems():
self.account_tester.create_group(group)
for user in users:
self.account_tester.create_user(user)
self.account_tester.add_user_to_group(group, user)
eutester_allusers_policy = """
{
"Statement": [{
"Effect": "Deny",
"Action": "*",
"Resource": "*",
"Condition": {
"NotIpAddress": {
"aws:SourceIp": ["10.0.0.0/8"]
}
}
}]
}
"""
eutester_developers_policy = """
{
"Statement": [{
"Action": [
"ec2:DescribeInstances",
"ec2:CreateKeyPair",
"ec2:DeleteKeyPair",
"ec2:DescribeKeyPairs",
"ec2:DescribeImages",
"ec2:RunInstances",
"ec2:StartInstances",
"ec2:StopInstances",
"ec2:TerminateInstances",
"ec2:AuthorizeSecurityGroupEgress",
"ec2:AuthorizeSecurityGroupIngress",
],
"Effect": "Allow",
"Resource": "*"
}]
}
"""
eutester_managers_policy = """
{
"Statement": [{
"Action": [
"ec2:Describe*"
],
"Effect": "Allow",
"Resource": "*"
}]
}
"""
eutester_admins_policy = """
{
"Statement": [{
"Action": "*",
"Effect": "Allow",
"Resource": "*"
}]
}
"""
eutester_sysadmins_policy = """
{
"Statement": [{
"Action": [
"ec2:AuthorizeSecurityGroupEgress",
"ec2:AuthorizeSecurityGroupIngress",
"ec2:CreateImage",
"ec2:CreateSecurityGroup",
"ec2:CreateSnapshot",
"ec2:CreateVolume",
"ec2:DeleteSecurityGroup",
"ec2:DeleteSnapshot",
"ec2:DeleteVolume",
"ec2:DeregisterImage",
"ec2:DescribeImages",
"ec2:DescribeInstances",
"ec2:DescribeSecurityGroups",
"ec2:DescribeSnapshots",
"ec2:DescribeVolumes",
"ec2:RevokeSecurityGroupEgress",
"ec2:RevokeSecurityGroupIngress"
],
"Effect": "Allow",
"Resource": "*"
}]
}
"""
for group in self.groups:
self.account_tester.attach_policy_group(group, str(group + '_policy'), vars()[group+'_policy'])
def IAMPolicyTest(self):
# test from developers group
test_user = self.groups['eutester_developers'][0]
keys = self.account_tester.create_access_key(test_user)
access_key = keys['access_key_id']
secret_key = keys['secret_access_key']
self.iam_tester = Eucaops(aws_access_key_id=access_key, aws_secret_access_key=secret_key,
ec2_ip=self.tester.ec2.host,s3_ip=self.tester.s3.host, s3_path=self.tester.get_s3_path(),
username=test_user, account=self.args.user_account)
# "ec2:CreateKeyPair"
try:
self.keypair = self.iam_tester.add_keypair( "keypair-" + str(time.time()))
self.keypath = '%s/%s.pem' % (os.curdir, self.keypair.name)
except EC2ResponseError as e:
self.fail("Was unable to create keypair as developer '" + test_user)
# "ec2:*SecurityGroup*"
try:
self.iam_tester.authorize_group_by_name(group_name='default')
self.iam_tester.authorize_group_by_name(group_name='default', port=-1, protocol="icmp")
except EC2ResponseError as e:
self.fail("Was unable to authorize security group rules as developer '" + test_user + "'")
# "ec2:RunInstances"
try:
self.image = self.iam_tester.get_emi(root_device_type="ebs")
self.reservation = self.iam_tester.run_instance(self.image, keypair=self.keypair.name)
instance = self.reservation.instances[0]
self.debug("Was able to run instance as developer '" + test_user + "'")
except EC2ResponseError as e:
self.fail("Was unable to run instance as developer '" + test_user + "'")
# "ec2:StopInstances"
try:
self.iam_tester.stop_instances(self.reservation)
self.debug("Was able to stop instance '" + instance.id + "' as developer '" + test_user + "'")
except EC2ResponseError as e:
self.fail("Was unable to stop instance as developer '" + test_user + "'")
# "ec2:StartInstances"
try:
self.iam_tester.start_instances(self.reservation)
self.debug("Was able to start instance '" + instance.id + "' as developer " + test_user + "'")
except EC2ResponseError as e:
self.fail("Was unable to start instance as developer '" + test_user + "'")
# "ec2:TerminateInstances"
try:
self.debug("Terminating instance '" + instance.id + "' as developer '" + test_user + "'")
self.iam_tester.terminate_instances(reservation=self.reservation)
except EC2ResponseError as e:
self.fail("Was unable to terminate instance from developer '" + test_user + "'")
# manager's test
test_user = self.groups['eutester_managers'][0]
keys = self.account_tester.create_access_key(test_user)
access_key = keys['access_key_id']
secret_key = keys['secret_access_key']
self.iam_tester = Eucaops(aws_access_key_id=access_key, aws_secret_access_key=secret_key,
ec2_ip=self.tester.ec2.host,s3_ip=self.tester.s3.host, s3_path=self.tester.get_s3_path(),
username=test_user, account=self.args.user_account)
try:
self.keypair = self.iam_tester.add_keypair( "keypair-" + str(time.time()))
self.errormsg("Failed because user '" + test_user + "' was able to create keypair.")
except EC2ResponseError as e:
self.iam_tester.debug("Failed to create '"+ self.keypair.name + "' as expected. Reason: " + e.error_code)
def RemoveResourcesTest(self):
for group, users in self.groups.iteritems():
self.account_tester.detach_policy_group(group, str('eutester_' + group + '_policy'))
for i, user in enumerate(users):
self.account_tester.remove_user_from_group(group, user)
self.account_tester.delete_user(user)
if (i+1) == len(users):
self.account_tester.delete_group(group)
self.tester.delete_account(self.args.user_account, True)
# ipython
## add resources
# for group, users in groups.iteritems():
# account_tester.create_group(group)
# for user in users:
# account_tester.create_user(user)
# account_tester.add_user_to_group(group, user)
## remove resources
# for group, users in groups.iteritems():
# for i, user in enumerate(users):
# account_tester.remove_user_from_group(group, user)
# account_tester.delete_user(user)
# if (i+1) == len(users):
# account_tester.delete_group(group)
if __name__ == "__main__":
testcase = EC2IamTest()
### Use the list of tests passed from config/command line to determine what subset of tests to run
### or use a predefined list
list = testcase.args.tests or ["CreateResourcesTest", "IAMPolicyTest", "RemoveResourcesTest"]
### Convert test suite methods to EutesterUnitTest objects
unit_list = [ ]
for test in list:
unit_list.append( testcase.create_testunit_by_name(test) )
### Run the EutesterUnitTest objects
result = testcase.run_test_case_list(unit_list,clean_on_exit=True)
exit(result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment