import aws_cdk.aws_sns as sns from aws_cdk import Duration, aws_apigateway from aws_cdk import aws_dynamodb as dynamodb from aws_cdk import aws_lambda as _lambda from cdk_monitoring_constructs import (AlarmFactoryDefaults,CustomMetricGroup,ErrorRateThreshold,MetricStatistic,MonitoringFacade, SnsAlarmActionStrategy, ) from constructs import Construct class CrudMonitoring(Construct): def __init__( self, scope: Construct, id_: str, crud_api: aws_apigateway.RestApi, db: dynamodb.Table, idempotency_table: dynamodb.Table, functions: list[_lambda.Function], ) -> None: super().__init__(scope, id_) self.id_ = id_ self.notification_topic = self._build_topic() self._build_high_level_dashboard(crud_api, self.notification_topic) def _build_high_level_dashboard(self, crud_api: aws_apigateway.RestApi, topic: sns.Topic): high_level_facade = MonitoringFacade( self, f'{self.id_}HighFacade', alarm_factory_defaults=AlarmFactoryDefaults( actions_enabled=True, alarm_name_prefix=self.id_, action=SnsAlarmActionStrategy(on_alarm_topic=topic), ), ) high_level_facade.add_large_header('Order REST API High Level Dashboard') high_level_facade.monitor_api_gateway( api=crud_api, add5_xx_fault_rate_alarm={'internal_error': ErrorRateThreshold(max_error_rate=1)}, ) metric_factory = high_level_facade.create_metric_factory() create_metric = metric_factory.create_metric( metric_name='ValidCreateOrderEvents', namespace='orders_kpi', statistic=MetricStatistic.N, dimensions_map={'service': 'Orders'}, label='create order events', period=Duration.days(1), ) group = CustomMetricGroup(metrics=[create_metric], title='Daily Order Requests') high_level_facade.monitor_custom(metric_groups=[group], human_readable_name='Daily KPIs', alarm_friendly_name='KPIs')