This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
schemaVersion: '2.2' | |
description: Run a CPU stress on an instance | |
parameters: | |
duration: | |
type: String | |
description: 'Specify the duration - in seconds - of the CPU burn. (Required) ' | |
default: "60" | |
cpu: | |
type: String |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import random | |
def getconfig(key): | |
return { | |
'api-delay-services': ['ec2'], | |
'api-delay-range': [0, 3], | |
'api-delay-rate': 10, | |
'aws-fault-services': ['s3', 'dynamodb'], | |
'api-fault-rate': 10 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def corrupt_statuscode(func): | |
def wrapper(*args, **kwargs): | |
result = func(*args, **kwargs) | |
error_code, rate = get_config('error_code') | |
if not error_code: | |
return result | |
print("Error from config {0} at a rate of {1}".format(error_code, rate)) | |
# add injection approx rate% of the time | |
if random.random() <= rate: | |
print("corrupting now") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def corrupt_exception(func): | |
def wrapper(*args, **kwargs): | |
result = func(*args, **kwargs) | |
exception_msg, rate = get_config('exception_msg') | |
if not exception_msg: | |
return result | |
print("exception_msg from config {0} with a rate of {1}".format(exception_msg, rate)) | |
# add injection approx rate% of the time | |
if random.random() <= rate: | |
print("corrupting now") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def corrupt_delay(func): | |
def latency(*args, **kw): | |
delay, rate = get_config('delay') | |
if not delay: | |
return func(*args, **kw) | |
print("delay: {0}, rate: {1}".format(delay, rate)) | |
# if delay and rate exist, delaying with that value at that rate | |
start = time.time() | |
if delay > 0 and rate >= 0: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
service: ChaosInjectionLayer | |
frameworkVersion: ">=1.34.0 <2.0.0" | |
provider: | |
name: aws | |
layers: | |
latencyInjection: | |
path: ./ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def get_config(config_key): | |
param = SSMParameter(os.environ['FAILURE_INJECTION_PARAM']) | |
try: | |
value = json.loads(param.value) | |
if not value["isEnabled"]: | |
return 0, 1 | |
return value[config_key], value.get('rate', 1) | |
except InvalidParameterError as e: | |
# key does not exist in SSM | |
raise InvalidParameterError("{} does not exist in SSM".format(e)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import unicode_literals | |
import sys | |
import logging | |
import boto3 | |
import os | |
log = logging.getLogger() | |
log.setLevel(logging.DEBUG) | |
sys.path.insert(0, './vendor') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division, unicode_literals | |
import sys | |
sys.path.insert(0, '/opt/python/.vendor') | |
from ssm_cache import SSMParameter | |
import time | |
import random | |
import json | |
import requests |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def cust_fun(): | |
print("Hello from the deep layers!!:") | |
return 1 |