Skip to content

Instantly share code, notes, and snippets.

@ru-rocker

ru-rocker/Async.java

Last active Apr 26, 2021
Embed
What would you like to do?
CDK CloudWatch Lambda Java
// RandomMessageProcessor.java
// snippet code
@Async("asyncExecutor")
public void process(RequestDto requestDto) {
if(reentrantLock.isLocked()) {
logger.info("Another thread is processing the processor!");
return;
}
reentrantLock.lock();
Random rand = new Random();
try {
Thread.sleep(rand.nextInt(2000));
logger.info("Message is: {}", requestDto);
} catch (InterruptedException e) {
logger.error("InterruptedException", e);
} finally {
reentrantLock.unlock();
}
}
# JobTriggerLambdaStack.py
# Dashboard
dashboard = _cw.Dashboard(
self,
id='JobTriggerLambdaDashboard',
dashboard_name='JobTriggerLambdaDashboard',
)
# Duration Widget
duration_widget = _cw.GraphWidget(
title='Lambda Duration',
width=12,
)
duration_metrics = _cw.Metric(
namespace='AWS/Lambda',
metric_name='Duration',
dimensions={
'FunctionName': my_lambda.function_name
},
statistic='p99.00',
period=core.Duration.seconds(60),
)
duration_widget.add_left_metric(duration_metrics)
# and the rest of widgets..
# add to dashboard
dashboard.add_widgets(duration_widget,stats_widget)
# alarm: anomaly detection for duration
duration_anomaly_cfnalarm = _cw.CfnAlarm(
self,
"DurationAnomalyAlarm",
actions_enabled=True,
alarm_actions=[topic.topic_arn],
alarm_name="CDK_DurationAnomalyAlarm",
comparison_operator="GreaterThanUpperThreshold",
datapoints_to_alarm=2,
evaluation_periods=2,
metrics=[
_cw.CfnAlarm.MetricDataQueryProperty(
expression="ANOMALY_DETECTION_BAND(m1, 2)",
id="ad1"
),
_cw.CfnAlarm.MetricDataQueryProperty(
id="m1",
metric_stat=_cw.CfnAlarm.MetricStatProperty(
metric=_cw.CfnAlarm.MetricProperty(
metric_name='Duration',
namespace='AWS/Lambda',
dimensions=[_cw.CfnAlarm.DimensionProperty(
name='FunctionName', value=my_lambda.function_name)],
),
period=core.Duration.minutes(5).to_seconds(),
stat="p99.00"
)
)
],
ok_actions=[topic.topic_arn],
threshold_metric_id="ad1",
treat_missing_data="missing",
)
# and the rest of alarms..
# JobTriggerLambdaStack.py
# Create cloudwatch events
rule = _events.Rule(
self,
id='event-trigger',
schedule=_events.Schedule.rate(core.Duration.minutes(1))
)
rule.add_target(target=_et.LambdaFunction(my_lambda))
# JobTriggerLambdaStack.py
# Create lambda
my_lambda = _lambda.Function(
self, 'testLambdaVPC_CDK',
function_name='CDK_test_job_trigger_lambda_vpc',
runtime=_lambda.Runtime.PYTHON_3_7,
code=_lambda.Code.asset('lambda'),
handler='testLambdaVPC_CDK.handler',
vpc=vpc,
log_retention=_logs.RetentionDays.THREE_DAYS,
)
core.Tags.of(my_lambda).add('src.projectKey', 'job-scheduler-poc')
my_lambda.add_to_role_policy(
_iam.PolicyStatement(
effect=_iam.Effect.ALLOW,
actions=[
"ssm:Describe*",
"ssm:Get*",
"ssm:List*"
],
resources=["*"],
)
)
# JobTriggerLambdaStack.py
# Create SNS
topic = _sns.Topic(
self,
"JobTriggerSNS_POC",
topic_name="CDK_JobTriggerSNS_POC",
)
core.Tags.of(topic).add('src.projectKey', 'job-scheduler-poc')
topic.add_subscription(_subs.EmailSubscription(
email_address='your-email@xyz.com'))
# JobTriggerLambdaStack.py
# Create random parameter and store into SSM
_ssm.StringParameter(self,
id="API_KEY",
string_value=''.join(random.choice(
string.ascii_uppercase + string.digits) for _ in range(32)),
parameter_name="/job-trigger-lambda/api_key",
)
# JobTriggerLambdaStack.py
# Get VPC
vpc = Vpc.from_vpc_attributes(
self,
id='vpc-dev',
vpc_id='YOUR_VPC_ID',
availability_zones=core.Fn.get_azs(),
private_subnet_ids=[
'YOUR_PRIVATE_SUBNET_ID1', 'YOUR_PRIVATE_SUBNET_ID2', 'YOUR_PRIVATE_SUBNET_ID2'],
private_subnet_route_table_ids=[
'YOUR_PRIVATE_ROUTE_TABLE_ID1', 'YOUR_PRIVATE_ROUTE_TABLE_ID2', 'YOUR_PRIVATE_ROUTE_TABLE_ID3']
)
// CloudWatchAgentAspect.java -- snippet code
@Around("execution(* com.rurocker.aws.lambdacloudwatch.processor..*(..)))")
public Object sendMetrics(ProceedingJoinPoint proceedingJoinPoint) {
Object result = null;
MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();
String className = methodSignature.getDeclaringType().getSimpleName();
Dimension dimension = new Dimension()
.withName("JOB_LATENCY")
.withValue(className);
double status = 1d;
long start = System.currentTimeMillis();
try {
result = proceedingJoinPoint.proceed();
} catch (Throwable t) {
logger.error("Error!", t);
status = 0d;
}
long end = System.currentTimeMillis();
double value = (double) (end - start);
MetricDatum latencyDatum = new MetricDatum()
.withMetricName("JOB_LATENCY")
.withUnit(StandardUnit.Milliseconds)
.withValue(value)
.withDimensions(dimension);
MetricDatum statusDatum = new MetricDatum()
.withMetricName("JOB_STATUS")
.withUnit(StandardUnit.None)
.withValue(status)
.withDimensions(dimension);
PutMetricDataRequest request = new PutMetricDataRequest()
.withNamespace("job-scheduler-poc")
.withMetricData(latencyDatum,statusDatum);
amazonCloudWatch.putMetricData(request);
return result;
}
<!-- snippet -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-cloudwatch</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-cloudwatchmetrics</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-events</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-ssm</artifactId>
</dependency>
// ParameterRetrieval.java --snippet code
public char[] getApiKey() {
GetParameterRequest parametersRequest =
new GetParameterRequest()
.withName(paremeterStoreKey)
.withWithDecryption(false);
final GetParameterResult parameterResult =
awsSimpleSystemsManagement.getParameter(parametersRequest);
return parameterResult.getParameter().getValue().toCharArray();
}
import json
import requests
import boto3
# lambda/testLambdaVPC_CDK.py
def handler(event, context):
client = boto3.client('ssm')
url = 'http://your-server-url:port/aws-poc-notify'
api_key_response = client.get_parameter(
Name='/job-trigger-lambda/api_key',
WithDecryption=False
)
api_key = api_key_response['Parameter']['Value']
payload = {
'id': event['id'],
'account': event['account']
}
headers = {
'Content-Type': 'application/json',
'API_KEY': api_key,
}
data = json.dumps(payload);
r = requests.post(url=url,data=data,headers=headers)
status_code = r.status_code
if status_code < 200 or status_code > 299:
raise Exception('status_code is {}'.format(status_code))
return "OK"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment