Skip to content

Instantly share code, notes, and snippets.

@lantier
Created September 5, 2022 16:51
Show Gist options
  • Save lantier/4bfd80cee6404ca62b395b93892ce4b7 to your computer and use it in GitHub Desktop.
Save lantier/4bfd80cee6404ca62b395b93892ce4b7 to your computer and use it in GitHub Desktop.
import gzip
import json
import os
import re
import time
import base64
from io import BufferedReader, BytesIO
from urllib.request import Request, urlopen
from urllib.parse import urlencode
import botocore
import boto3
DD_SITE = os.getenv('DD_SITE', default='datadoghq.com')
def _datadog_keys():
if 'kmsEncryptedKeys' in os.environ:
KMS_ENCRYPTED_KEYS = os.environ['kmsEncryptedKeys']
kms = boto3.client('kms')
# kmsEncryptedKeys should be created through the Lambda's encryption
# helpers and as such will have the EncryptionContext
return json.loads(kms.decrypt(
CiphertextBlob=base64.b64decode(KMS_ENCRYPTED_KEYS),
EncryptionContext={'LambdaFunctionName': os.environ['AWS_LAMBDA_FUNCTION_NAME']},
)['Plaintext'])
if 'DD_API_KEY_SECRET_ARN' in os.environ:
SECRET_ARN = os.environ['DD_API_KEY_SECRET_ARN']
DD_API_KEY = json.loads(boto3.client('secretsmanager').get_secret_value(SecretId=SECRET_ARN)['SecretString'])['DD_API_KEY']
return {'api_key': DD_API_KEY}
if 'DD_API_KEY_SSM_NAME' in os.environ:
SECRET_NAME = os.environ['DD_API_KEY_SSM_NAME']
DD_API_KEY = boto3.client('ssm').get_parameter(
Name=SECRET_NAME, WithDecryption=True
)['Parameter']['Value']
return {'api_key': DD_API_KEY}
if 'DD_KMS_API_KEY' in os.environ:
ENCRYPTED = os.environ['DD_KMS_API_KEY']
# For interop with other DD Lambdas taking in DD_KMS_API_KEY, we'll
# optionally try the EncryptionContext associated with this Lambda.
try:
DD_API_KEY = boto3.client('kms').decrypt(
CiphertextBlob=base64.b64decode(ENCRYPTED),
EncryptionContext={'LambdaFunctionName': os.environ['AWS_LAMBDA_FUNCTION_NAME']},
)['Plaintext']
except botocore.exceptions.ClientError:
DD_API_KEY = boto3.client('kms').decrypt(
CiphertextBlob=base64.b64decode(ENCRYPTED),
)['Plaintext']
if type(DD_API_KEY) is bytes:
DD_API_KEY = DD_API_KEY.decode('utf-8')
return {'api_key': DD_API_KEY}
if 'DD_API_KEY' in os.environ:
DD_API_KEY = os.environ['DD_API_KEY']
return {'api_key': DD_API_KEY}
raise ValueError("Datadog API key is not defined, see documentation for environment variable options")
# Preload the keys so we can bail out early if they're misconfigured
datadog_keys = _datadog_keys()
print('INFO Lambda function initialized, ready to send metrics')
def _process_rds_enhanced_monitoring_message(ts, message, account, region):
instance_id = message["instanceID"]
host_id = message["instanceResourceID"]
tags = [
'dbinstanceidentifier:%s' % instance_id,
'aws_account:%s' % account,
'engine:%s' % message["engine"],
]
# metrics generation
# uptime: "54 days, 1:53:04" to be converted into seconds
uptime = 0
uptime_msg = re.split(' days?, ', message["uptime"]) # edge case "1 day 1:53:04"
if len(uptime_msg) == 2:
uptime += 24 * 3600 * int(uptime_msg[0])
uptime_day = uptime_msg[-1].split(':')
uptime += 3600 * int(uptime_day[0])
uptime += 60 * int(uptime_day[1])
uptime += int(uptime_day[2])
stats.gauge(
'aws.rds.uptime', uptime, timestamp=ts, tags=tags, host=host_id
)
stats.gauge(
'aws.rds.virtual_cpus', message["numVCPUs"], timestamp=ts, tags=tags, host=host_id
)
if "loadAverageMinute" in message:
stats.gauge(
'aws.rds.load.1', message["loadAverageMinute"]["one"],
timestamp=ts, tags=tags, host=host_id
)
stats.gauge(
'aws.rds.load.5', message["loadAverageMinute"]["five"],
timestamp=ts, tags=tags, host=host_id
)
stats.gauge(
'aws.rds.load.15', message["loadAverageMinute"]["fifteen"],
timestamp=ts, tags=tags, host=host_id
)
for namespace in ["cpuUtilization", "memory", "tasks", "swap"]:
for key, value in message.get(namespace, {}).items():
stats.gauge(
'aws.rds.%s.%s' % (namespace.lower(), key), value,
timestamp=ts, tags=tags, host=host_id
)
for network_stats in message.get("network", []):
if "interface" in network_stats:
network_tag = ["interface:%s" % network_stats.pop("interface")]
else:
network_tag = []
for key, value in network_stats.items():
stats.gauge(
'aws.rds.network.%s' % key, value,
timestamp=ts, tags=tags + network_tag, host=host_id
)
for disk_stats in message.get("diskIO", []):
disk_tag = []
if "device" in disk_stats:
disk_tag.append("%s:%s" % ("device", disk_stats.pop("device")))
for key, value in disk_stats.items():
stats.gauge(
'aws.rds.diskio.%s' % key, value,
timestamp=ts, tags=tags + disk_tag, host=host_id
)
for fs_stats in message.get("fileSys", []):
fs_tag = []
for tag_key in ["name", "mountPoint"]:
if tag_key in fs_stats:
fs_tag.append("%s:%s" % (tag_key, fs_stats.pop(tag_key)))
for key, value in fs_stats.items():
stats.gauge(
'aws.rds.filesystem.%s' % key, value,
timestamp=ts, tags=tags + fs_tag, host=host_id
)
for process_stats in message.get("processList", []):
process_tag = []
for tag_key in ["name", "id"]:
if tag_key in process_stats:
process_tag.append("%s:%s" % (tag_key, process_stats.pop(tag_key)))
for key, value in process_stats.items():
stats.gauge(
'aws.rds.process.%s' % key, value,
timestamp=ts, tags=tags + process_tag, host=host_id
)
for pd_stats in message.get("physicalDeviceIO", []):
pd_tag = []
if "device" in pd_stats:
pd_tag.append("%s:%s" % ("device", pd_stats.pop("device")))
for key, value in pd_stats.items():
stats.gauge(
'aws.rds.physicaldeviceio.%s' % key, value,
timestamp=ts, tags=tags + pd_tag, host=host_id
)
def lambda_handler(event, context):
''' Process a RDS enhanced monitoring DATA_MESSAGE,
coming from CLOUDWATCH LOGS
'''
# event is a dict containing a base64 string gzipped
with gzip.GzipFile(
fileobj=BytesIO(base64.b64decode(event["awslogs"]["data"]))
) as decompress_stream:
data = b"".join(BufferedReader(decompress_stream))
event = json.loads(data)
account = event['owner']
region = context.invoked_function_arn.split(':', 4)[3]
log_events = event['logEvents']
for log_event in log_events:
message = json.loads(log_event['message'])
ts = log_event['timestamp'] / 1000
_process_rds_enhanced_monitoring_message(ts, message, account, region)
stats.flush()
return {'Status': 'OK'}
# Helpers to send data to Datadog, inspired from https://github.com/DataDog/datadogpy
class Stats(object):
def __init__(self):
self.series = []
def gauge(self, metric, value, timestamp=None, tags=None, host=None):
base_dict = {
'metric': metric,
'points': [(int(timestamp or time.time()), value)],
'type': 'gauge',
'tags': tags,
}
if host:
base_dict.update({'host': host})
self.series.append(base_dict)
def flush(self):
metrics_dict = {
'series': self.series,
}
self.series = []
creds = urlencode(datadog_keys)
data = json.dumps(metrics_dict).encode('ascii')
url = '%s?%s' % (datadog_keys.get('api_host', 'https://app.%s/api/v1/series' % DD_SITE), creds)
req = Request(url, data, {'Content-Type': 'application/json'})
response = urlopen(req)
print('INFO Submitted data with status%s' % response.getcode())
stats = Stats()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment