Skip to content

Instantly share code, notes, and snippets.

@kapilt
Created July 23, 2019 21:12
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save kapilt/eb8f9b82473d7fa8295fbf1d10c11c11 to your computer and use it in GitHub Desktop.
Save kapilt/eb8f9b82473d7fa8295fbf1d10c11c11 to your computer and use it in GitHub Desktop.
import boto3
import random
def getconfig(key):
return {
'api-delay-services': ['ec2'],
'api-delay-range': [0, 3],
'api-delay-rate': 10,
'aws-fault-services': ['s3', 'dynamodb'],
'api-fault-rate': 10
}.get(key)
class BotoFaultInjector(object):
def __init__(self):
self._regid = None
self.delay_services = getconfig('api-delay-services')
self.delay_range = getconfig('api-delay-range')
self.delay_rate = getconfig('api-delay-rate')
self.fault_service = getconfig('api-fault-services')
self.fault_rate = getconfig('api-fault-rate')
def attach(self, s):
self._regid = s.register('after-call.*.*', self.invoke)
def invoke(self, http_response, parsed, model, **kwargs):
if model.name in delay_services:
if random.randrange(0, 100) < self.delay_rate:
time.sleep(random.randrange(*self.delay_range))
if model.name in fault_services:
if random.rangrange(0, 100) < self.delay_rate:
http_response["Error"] = {
"Message": "Not Found",
"Code": "ResourceNotFoundException"
}
http_response['status_code'] = 400
if __name__ == '__main__':
boto3.client('ec2')
fault_injector = BotoFaultInjector()
fault_injector.attach(boto3.DEFAULT_SESSION)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment