Skip to content

Instantly share code, notes, and snippets.

@kiwiz kiwiz/ Secret
Created Jan 14, 2020

What would you like to do?
#!/usr/bin/env python3
import os
import argparse
import json
import yaml
import tests.common
import c7n.credentials
import c7n.utils
import placebo
import shutil
import boto3
parser = argparse.ArgumentParser('Policy Tester')
parser.add_argument('--assume', dest='assume_role', help='Role to assume')
subparsers = parser.add_subparsers(dest='command', help='Execution mode')
record_parser = subparsers.add_parser('record')
record_parser.add_argument('cache', type=str, help='Cache dir to write to')
record_parser.add_argument('policy', type=argparse.FileType('r'), help='Policy file to consume')
record_parser.add_argument('name', type=str, help='Name of the policy')
test_parser = subparsers.add_parser('replay')
test_parser.add_argument('cache', type=str, help='Cache dir to consume')
test_parser.add_argument('policy', type=argparse.FileType('r'), help='Policy file to consume')
test_parser.add_argument('name', type=str, help='Name of the policy')
args = parser.parse_args()
class TestPolicy(tests.zpill.PillTest):
def __init__(self, policy, cache, assume_role=None):
self.policy = policy
self.assume_role = assume_role
self.cleanup = []
self.placebo_dir = cache
self.test_dir = os.path.join(self.placebo_dir, self.policy['name'])
def record_flight_data(self, test_case, zdata=False, augment=False):
Patched to add support for assumed_role
self.recording = True
if not (zdata or augment):
if os.path.exists(self.test_dir):
session = boto3.Session()
default_region = session.region_name
if not zdata:
pill = placebo.attach(session, self.test_dir)
pill = attach(session, self.archive_path, test_case)
self.pill = pill
def factory(region=None, assume=None):
new_session = None
if region and region != default_region:
new_session = boto3.Session(region_name=region)
if self.assume_role is not None:
if new_session is None:
new_session = boto3.Session()
new_session = c7n.credentials.assumed_session(self.assume_role, 'CloudCustodian', new_session)
if new_session is not None:
assert not zdata
new_pill = placebo.attach(new_session, self.test_dir)
return new_session
return session
return factory
def run(self, replay):
if replay:
session_factory = self.replay_flight_data(self.policy['name'])
output_dir = None
session_factory = self.record_flight_data(self.policy['name'])
output_dir = self.placebo_dir
policy = self.load_policy(
config={'dryrun': True, 'output_dir': output_dir},
resources =
resources = c7n.utils.loads(c7n.utils.dumps(resources))
if replay:
with open(os.path.join(self.test_dir, 'resources.json'), 'r') as fh:
old_resources = c7n.utils.loads(
print(resources == old_resources)
for func, args, kw in self.cleanup:
func(*args, **kw)
def addCleanup(self, func, *args, **kw):
self.cleanup.append((func, args, kw))
def get_policy(policies, name):
policies = policies.get('policies', [])
for policy in policies:
if policy.get('name') == name:
return policy
return None
policy = get_policy(yaml.safe_load(args.policy),
if policy is None:
print("Policy not found!")
replay = args.command == 'replay'
runner = TestPolicy(policy, args.cache, args.assume_role)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.