-
-
Save kiwiz/885aa726e50f452bca6113808e4866e4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import os | |
import argparse | |
import json | |
import yaml | |
import tests.common | |
import c7n.credentials | |
import c7n.utils | |
import placebo | |
import shutil | |
import boto3 | |
parser = argparse.ArgumentParser('Policy Tester') | |
parser.add_argument('--assume', dest='assume_role', help='Role to assume') | |
subparsers = parser.add_subparsers(dest='command', help='Execution mode') | |
subparsers.required=True | |
record_parser = subparsers.add_parser('record') | |
record_parser.add_argument('cache', type=str, help='Cache dir to write to') | |
record_parser.add_argument('policy', type=argparse.FileType('r'), help='Policy file to consume') | |
record_parser.add_argument('name', type=str, help='Name of the policy') | |
test_parser = subparsers.add_parser('replay') | |
test_parser.add_argument('cache', type=str, help='Cache dir to consume') | |
test_parser.add_argument('policy', type=argparse.FileType('r'), help='Policy file to consume') | |
test_parser.add_argument('name', type=str, help='Name of the policy') | |
args = parser.parse_args() | |
class TestPolicy(tests.zpill.PillTest): | |
def __init__(self, policy, cache, assume_role=None): | |
self.policy = policy | |
self.assume_role = assume_role | |
self.cleanup = [] | |
self.placebo_dir = cache | |
self.test_dir = os.path.join(self.placebo_dir, self.policy['name']) | |
def record_flight_data(self, test_case, zdata=False, augment=False): | |
""" | |
Patched to add support for assumed_role | |
""" | |
self.recording = True | |
if not (zdata or augment): | |
if os.path.exists(self.test_dir): | |
shutil.rmtree(self.test_dir) | |
os.makedirs(self.test_dir) | |
session = boto3.Session() | |
default_region = session.region_name | |
if not zdata: | |
pill = placebo.attach(session, self.test_dir) | |
else: | |
pill = attach(session, self.archive_path, test_case) | |
pill.record() | |
self.pill = pill | |
self.addCleanup(pill.stop) | |
self.addCleanup(self.cleanUp) | |
def factory(region=None, assume=None): | |
new_session = None | |
if region and region != default_region: | |
new_session = boto3.Session(region_name=region) | |
if self.assume_role is not None: | |
if new_session is None: | |
new_session = boto3.Session() | |
new_session = c7n.credentials.assumed_session(self.assume_role, 'CloudCustodian', new_session) | |
if new_session is not None: | |
assert not zdata | |
new_pill = placebo.attach(new_session, self.test_dir) | |
new_pill.record() | |
self.addCleanup(new_pill.stop) | |
return new_session | |
return session | |
return factory | |
def run(self, replay): | |
if replay: | |
session_factory = self.replay_flight_data(self.policy['name']) | |
output_dir = None | |
else: | |
session_factory = self.record_flight_data(self.policy['name']) | |
output_dir = self.placebo_dir | |
policy = self.load_policy( | |
self.policy, | |
config={'dryrun': True, 'output_dir': output_dir}, | |
session_factory=session_factory, | |
validate=True, | |
output_dir=output_dir, | |
) | |
resources = policy.run() | |
resources = c7n.utils.loads(c7n.utils.dumps(resources)) | |
if replay: | |
with open(os.path.join(self.test_dir, 'resources.json'), 'r') as fh: | |
old_resources = c7n.utils.loads(fh.read()) | |
print(resources == old_resources) | |
for func, args, kw in self.cleanup: | |
func(*args, **kw) | |
def addCleanup(self, func, *args, **kw): | |
self.cleanup.append((func, args, kw)) | |
def get_policy(policies, name): | |
policies = policies.get('policies', []) | |
for policy in policies: | |
if policy.get('name') == name: | |
return policy | |
return None | |
policy = get_policy(yaml.safe_load(args.policy), args.name) | |
if policy is None: | |
print("Policy not found!") | |
replay = args.command == 'replay' | |
runner = TestPolicy(policy, args.cache, args.assume_role) | |
runner.run(replay=replay) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment