Created
December 25, 2023 07:07
-
-
Save cm-kazup0n/7b3642334d6f4eb02202672979ac780f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, jsonify | |
from flask.logging import default_handler | |
import logging | |
import logging.handlers | |
import requests | |
from requests.exceptions import Timeout | |
import time | |
import random | |
import os | |
import json | |
import sys | |
import socket | |
import boto3 | |
# AZ名とサービス名のマッピング | |
BACKENDS = { | |
"ap-northeast-1a": "backend-1a.gray-failure-app", | |
"ap-northeast-1c": "backend-1c.gray-failure-app", | |
"ap-northeast-1d": "backend-1d.gray-failure-app", | |
"local": "localhost", | |
} | |
# メタ情報のキャッシュ | |
META_CACHE = {} | |
logger = logging.getLogger("metrics") | |
logger.setLevel(logging.DEBUG) | |
app = Flask(__name__) | |
# ログをJSONに整形するフォーマッター | |
class Formatter(logging.Formatter): | |
def format(self, record): | |
log = {} | |
log["name"] = record.name | |
log["level"] = record.levelname | |
try: | |
log.update(**json.loads(record.msg)) | |
except json.JSONDecodeError: | |
log["msg"] = record.msg | |
return json.dumps(log) | |
# UDPソケットにテキストでログを送信するストリーム | |
class SockStream: | |
def __init__(self, host, port): | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
self.dest = (host, port) | |
def write(self, data): | |
try: | |
self.sock.sendto(data.encode(), self.dest) | |
except Exception as e: | |
app.logger.warn(e) | |
def flush(self): | |
pass | |
# AZごとにバックエンドのレイテンシを決定するヘルパー | |
# 呼び出し30回ごとにパラメータストアから設定を取得する | |
class LatencyInjector: | |
def __init__(self): | |
self.client = boto3.client("ssm") | |
self.cache = None | |
self.ttl = 0 | |
self.az = metadata()["AvailabilityZone"] | |
def get_latency_for(self): | |
self.ttl = self.ttl + 1 | |
if self.cache is None or self.ttl >= 30: | |
self.refresh() | |
latency = self.cache.get(self.az, [200, 500]) | |
return random.randint(latency[0], latency[1]) | |
def refresh(self): | |
app.logger.info("refresh latency cache") | |
try: | |
param = self.client.get_parameter(Name="latencies_for_az") | |
cache = json.loads(param["Parameter"]["Value"]) | |
app.logger.info(json.dumps(cache)) | |
except Exception as e: | |
app.logger.error(str(e)) | |
cache = {} | |
self.cache = cache | |
self.ttl = 0 | |
# Routes | |
@app.route("/") | |
def index(): | |
return jsonify({"status": index_handler()}) | |
@app.route("/backend") | |
def backend(): | |
latency = backend_handler() | |
return jsonify({"status": "ok"}) | |
# Handlers | |
def index_handler(): | |
try: | |
requests.get(f"http://{backend()}/backend", timeout=0.5) | |
health = 0 | |
except Exception as e: | |
app.logger.error(str(e)) | |
health = 1 | |
put_metrics([{"Name": "BackendHealth", "Unit": "None", "Value": health}]) | |
return health | |
def backend_handler(): | |
latency = latency_injector.get_latency_for() | |
put_metrics([{"Name": "BackendLatency", "Unit": "Milliseconds", "Value": latency}]) | |
time.sleep(latency / 1000) | |
return latency | |
# Helpers | |
# タスクメタデータを参照する | |
def metadata(): | |
if len(META_CACHE) > 0: | |
return META_CACHE | |
if "ECS_CONTAINER_METADATA_URI_V4" in os.environ: | |
res = requests.get(f"{os.environ['ECS_CONTAINER_METADATA_URI_V4']}/task") | |
task = res.json() | |
res = requests.get(f"{os.environ['ECS_CONTAINER_METADATA_URI_V4']}") | |
container = res.json() | |
META_CACHE["AvailabilityZone"] = task["AvailabilityZone"] | |
META_CACHE["DockerId"] = container["DockerId"] | |
else: | |
META_CACHE["AvailabilityZone"] = "local" | |
META_CACHE["DockerId"] = "de516a9c-2013-0d82-d413-1de1181276c6" | |
return META_CACHE | |
def backend(): | |
# メタデータから取得したAZ名でバックエンドのサービス名を決定する | |
return BACKENDS[metadata()["AvailabilityZone"]] | |
# メトリクスをEMF形式に整形してログに出力する | |
def put_metrics(metrics): | |
meta = metadata() | |
dimensions = [["AvailabilityZone"], ["AvailabilityZone", "DockerId"]] | |
ts = int(time.time() * 1000) | |
data = { | |
"time": ts, | |
"_aws": { | |
"LogGroupName": "gray-failure-emf", | |
"Timestamp": ts, | |
"CloudWatchMetrics": [ | |
{ | |
"Namespace": "app", | |
"Dimensions": dimensions, | |
"Metrics": list( | |
map(lambda m: {"Name": m["Name"], "Unit": m["Unit"]}, metrics) | |
), | |
} | |
], | |
}, | |
} | |
for k, v in meta.items(): | |
data[k] = v | |
for m in metrics: | |
data[m["Name"]] = m["Value"] | |
logger.info(json.dumps(data)) | |
if __name__ == "__main__": | |
latency_injector = LatencyInjector() | |
default_handler.setFormatter(Formatter()) | |
default_handler.setLevel(logging.DEBUG) | |
stdout = logging.StreamHandler(sys.stdout) | |
stdout.setFormatter(Formatter()) | |
stdout.setLevel(logging.DEBUG) | |
cwa = logging.StreamHandler(SockStream("0.0.0.0", 25888)) | |
cwa.setFormatter(Formatter()) | |
cwa.setLevel(logging.DEBUG) | |
logger.addHandler(stdout) | |
logger.addHandler(cwa) | |
app.run(host="0.0.0.0", port=80, debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment