Created
December 27, 2023 03:54
-
-
Save cm-kazup0n/fd519abef76a8857d835cacee5b151de to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, jsonify | |
import boto3 | |
import json | |
import requests | |
import os | |
import random | |
import time | |
# Add imports for OTel components into the application | |
from opentelemetry import metrics | |
from opentelemetry.sdk.metrics import MeterProvider | |
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader | |
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter | |
backend_health_metrics = [] | |
backend_latency_metrics = [] | |
# Setting up Metrics | |
metric_reader = PeriodicExportingMetricReader( | |
exporter=OTLPMetricExporter(), | |
export_interval_millis=500, | |
) | |
metric_provider = MeterProvider(metric_readers=[metric_reader]) | |
metrics.set_meter_provider(metric_provider) | |
meter = metrics.get_meter(__name__) | |
# AZ名とサービス名のマッピング | |
BACKENDS = { | |
"ap-northeast-1a": "backend-1a.gray-failure-app", | |
"ap-northeast-1c": "backend-1c.gray-failure-app", | |
"ap-northeast-1d": "backend-1d.gray-failure-app", | |
"local": "localhost", | |
} | |
# メタ情報のキャッシュ | |
META_CACHE = {} | |
app = Flask(__name__) | |
def backend_health_metric_callback(options): | |
observes = [] | |
for ts, health in backend_health_metrics: | |
observes.append(metrics.Observation(health, dict(**{'time_unix_nano': ts}, **metadata()))) | |
backend_health_metrics.clear() | |
return observes | |
def backend_latency_metric_callback(options): | |
observes = [] | |
for ts, latency in backend_latency_metrics: | |
observes.append(metrics.Observation(latency, dict(**{'time_unix_nano': ts}, **metadata()))) | |
backend_latency_metrics.clear() | |
return observes | |
meter.create_observable_gauge( | |
"backendhealth", callbacks=[backend_health_metric_callback] | |
) | |
meter.create_observable_gauge( | |
"backendlatency", callbacks=[backend_latency_metric_callback],unit='MilliSecond' | |
) | |
# AZごとにバックエンドのレイテンシを決定するヘルパー | |
# 呼び出し30回ごとにパラメータストアから設定を取得する | |
class LatencyInjector: | |
def __init__(self): | |
self.client = boto3.client("ssm") | |
self.cache = None | |
self.ttl = 0 | |
self.az = metadata()["AvailabilityZone"] | |
def get_latency_for(self): | |
self.ttl = self.ttl + 1 | |
if self.cache is None or self.ttl >= 30: | |
self.refresh() | |
latency = self.cache.get(self.az, [200, 500]) | |
return random.randint(latency[0], latency[1]) | |
def refresh(self): | |
app.logger.info("refresh latency cache") | |
try: | |
param = self.client.get_parameter(Name="latencies_for_az") | |
cache = json.loads(param["Parameter"]["Value"]) | |
app.logger.info(json.dumps(cache)) | |
except Exception as e: | |
app.logger.error(str(e)) | |
cache = {} | |
self.cache = cache | |
self.ttl = 0 | |
# Routes | |
@app.route("/") | |
def index(): | |
return jsonify({"status": index_handler()}) | |
@app.route("/backend") | |
def backend(): | |
latency = backend_handler() | |
return jsonify({"status": "ok"}) | |
# Handlers | |
def index_handler(): | |
try: | |
requests.get(f"http://{backend()}/backend", timeout=0.5) | |
health = 0 | |
except Exception as e: | |
app.logger.error(str(e)) | |
health = 1 | |
backend_health_metrics.append((time.time_ns(), health)) | |
return health | |
def backend_handler(): | |
latency = latency_injector.get_latency_for() | |
backend_latency_metrics.append((time.time_ns(), latency)) | |
time.sleep(latency / 1000) | |
return latency | |
# Helpers | |
# タスクメタデータを参照する | |
def metadata(): | |
if len(META_CACHE) > 0: | |
return META_CACHE | |
if "ECS_CONTAINER_METADATA_URI_V4" in os.environ: | |
res = requests.get(f"{os.environ['ECS_CONTAINER_METADATA_URI_V4']}/task") | |
task = res.json() | |
res = requests.get(f"{os.environ['ECS_CONTAINER_METADATA_URI_V4']}") | |
container = res.json() | |
META_CACHE["AvailabilityZone"] = task["AvailabilityZone"] | |
META_CACHE["DockerId"] = container["DockerId"] | |
else: | |
META_CACHE["AvailabilityZone"] = "local" | |
META_CACHE["DockerId"] = "de516a9c-2013-0d82-d413-1de1181276c6" | |
return META_CACHE | |
def backend(): | |
# メタデータから取得したAZ名でバックエンドのサービス名を決定する | |
return BACKENDS[metadata()["AvailabilityZone"]] | |
if __name__ == "__main__": | |
latency_injector = LatencyInjector() | |
app.run(host="0.0.0.0", port=80, debug=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment