Skip to content

Instantly share code, notes, and snippets.

@cm-kazup0n
Created December 25, 2023 07:07
Show Gist options
  • Save cm-kazup0n/7b3642334d6f4eb02202672979ac780f to your computer and use it in GitHub Desktop.
Save cm-kazup0n/7b3642334d6f4eb02202672979ac780f to your computer and use it in GitHub Desktop.
from flask import Flask, jsonify
from flask.logging import default_handler
import logging
import logging.handlers
import requests
from requests.exceptions import Timeout
import time
import random
import os
import json
import sys
import socket
import boto3
# AZ名とサービス名のマッピング
BACKENDS = {
"ap-northeast-1a": "backend-1a.gray-failure-app",
"ap-northeast-1c": "backend-1c.gray-failure-app",
"ap-northeast-1d": "backend-1d.gray-failure-app",
"local": "localhost",
}
# メタ情報のキャッシュ
META_CACHE = {}
logger = logging.getLogger("metrics")
logger.setLevel(logging.DEBUG)
app = Flask(__name__)
# ログをJSONに整形するフォーマッター
class Formatter(logging.Formatter):
def format(self, record):
log = {}
log["name"] = record.name
log["level"] = record.levelname
try:
log.update(**json.loads(record.msg))
except json.JSONDecodeError:
log["msg"] = record.msg
return json.dumps(log)
# UDPソケットにテキストでログを送信するストリーム
class SockStream:
def __init__(self, host, port):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.dest = (host, port)
def write(self, data):
try:
self.sock.sendto(data.encode(), self.dest)
except Exception as e:
app.logger.warn(e)
def flush(self):
pass
# AZごとにバックエンドのレイテンシを決定するヘルパー
# 呼び出し30回ごとにパラメータストアから設定を取得する
class LatencyInjector:
def __init__(self):
self.client = boto3.client("ssm")
self.cache = None
self.ttl = 0
self.az = metadata()["AvailabilityZone"]
def get_latency_for(self):
self.ttl = self.ttl + 1
if self.cache is None or self.ttl >= 30:
self.refresh()
latency = self.cache.get(self.az, [200, 500])
return random.randint(latency[0], latency[1])
def refresh(self):
app.logger.info("refresh latency cache")
try:
param = self.client.get_parameter(Name="latencies_for_az")
cache = json.loads(param["Parameter"]["Value"])
app.logger.info(json.dumps(cache))
except Exception as e:
app.logger.error(str(e))
cache = {}
self.cache = cache
self.ttl = 0
# Routes
@app.route("/")
def index():
return jsonify({"status": index_handler()})
@app.route("/backend")
def backend():
latency = backend_handler()
return jsonify({"status": "ok"})
# Handlers
def index_handler():
try:
requests.get(f"http://{backend()}/backend", timeout=0.5)
health = 0
except Exception as e:
app.logger.error(str(e))
health = 1
put_metrics([{"Name": "BackendHealth", "Unit": "None", "Value": health}])
return health
def backend_handler():
latency = latency_injector.get_latency_for()
put_metrics([{"Name": "BackendLatency", "Unit": "Milliseconds", "Value": latency}])
time.sleep(latency / 1000)
return latency
# Helpers
# タスクメタデータを参照する
def metadata():
if len(META_CACHE) > 0:
return META_CACHE
if "ECS_CONTAINER_METADATA_URI_V4" in os.environ:
res = requests.get(f"{os.environ['ECS_CONTAINER_METADATA_URI_V4']}/task")
task = res.json()
res = requests.get(f"{os.environ['ECS_CONTAINER_METADATA_URI_V4']}")
container = res.json()
META_CACHE["AvailabilityZone"] = task["AvailabilityZone"]
META_CACHE["DockerId"] = container["DockerId"]
else:
META_CACHE["AvailabilityZone"] = "local"
META_CACHE["DockerId"] = "de516a9c-2013-0d82-d413-1de1181276c6"
return META_CACHE
def backend():
# メタデータから取得したAZ名でバックエンドのサービス名を決定する
return BACKENDS[metadata()["AvailabilityZone"]]
# メトリクスをEMF形式に整形してログに出力する
def put_metrics(metrics):
meta = metadata()
dimensions = [["AvailabilityZone"], ["AvailabilityZone", "DockerId"]]
ts = int(time.time() * 1000)
data = {
"time": ts,
"_aws": {
"LogGroupName": "gray-failure-emf",
"Timestamp": ts,
"CloudWatchMetrics": [
{
"Namespace": "app",
"Dimensions": dimensions,
"Metrics": list(
map(lambda m: {"Name": m["Name"], "Unit": m["Unit"]}, metrics)
),
}
],
},
}
for k, v in meta.items():
data[k] = v
for m in metrics:
data[m["Name"]] = m["Value"]
logger.info(json.dumps(data))
if __name__ == "__main__":
latency_injector = LatencyInjector()
default_handler.setFormatter(Formatter())
default_handler.setLevel(logging.DEBUG)
stdout = logging.StreamHandler(sys.stdout)
stdout.setFormatter(Formatter())
stdout.setLevel(logging.DEBUG)
cwa = logging.StreamHandler(SockStream("0.0.0.0", 25888))
cwa.setFormatter(Formatter())
cwa.setLevel(logging.DEBUG)
logger.addHandler(stdout)
logger.addHandler(cwa)
app.run(host="0.0.0.0", port=80, debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment