Last active
April 1, 2022 15:59
-
-
Save richcollier/7e5603c366b9fcece6f1a8b1b3cf4d3f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#chained watch for anomalies across jobs" | |
POST _xpack/watcher/watch/_execute | |
{ | |
"watch": { | |
"trigger": { | |
"schedule": { | |
"interval": "1m" | |
} | |
}, | |
"metadata": { | |
"watch_timespan" : "20m", //how far back watch looks each invocation (should be > 2x bucket_span) | |
"lookback_window" : "10m", //how far back to look in other jobs for related anomalies | |
"job1_name" : "it_ops_kpi", | |
"job1_min_anomaly_score": 75, //minimum anomaly score (bucket score) for job1 | |
"job2_name" : "it_ops_network", | |
"job2_min_record_score" : 10, //minimum record score for anomalies in job2 | |
"job3_name" : "it_ops_sql", | |
"job3_min_record_score" : 5 //minimum record score for anomalies in job3 | |
}, | |
"input": { | |
"chain": { | |
"inputs": [ | |
{ | |
"job1": { | |
"search": { | |
"request": { | |
"indices": [ | |
".ml-anomalies-*" | |
], | |
"body": { | |
"query": { | |
"bool": { | |
"filter": [ | |
{ "range": { "timestamp": {"gte": "now-{{ctx.metadata.watch_timespan}}"}}}, | |
{ "term": {"result_type": "bucket"}}, | |
{ "term": {"job_id": "{{ctx.metadata.job1_name}}"}}, | |
{ "range": {"anomaly_score": {"gte": "{{ctx.metadata.job1_min_anomaly_score}}"}}} | |
] | |
} | |
} | |
} | |
} | |
} | |
} | |
}, | |
{ | |
"job2": { | |
"search": { | |
"request": { | |
"indices": [ | |
".ml-anomalies-*" | |
], | |
"body": { | |
"query": { | |
"bool": { | |
"filter": [ | |
{ "range": { "timestamp": {"gte": "{{ctx.payload.job1.hits.hits.0._source.timestamp}}||-{{ctx.metadata.lookback_window}}", "lte": "{{ctx.payload.job1.hits.hits.0._source.timestamp}}"}}}, | |
{ "term": {"result_type": "record"}}, | |
{ "term": {"job_id": "{{ctx.metadata.job2_name}}"}}, | |
// example of optional filters | |
// { "regexp":{"field_name":{"value":"Out.*"}}}, | |
// { "range": {"actual": {"gte":"1000.0"}}}, | |
{ "range": {"record_score": {"gte": "{{ctx.metadata.job2_min_record_score}}"}}} | |
] | |
} | |
} | |
} | |
} | |
} | |
} | |
}, | |
{ | |
"job3": { | |
"search": { | |
"request": { | |
"indices": [ | |
".ml-anomalies-*" | |
], | |
"body": { | |
"query": { | |
"bool": { | |
"filter": [ | |
{ "range": { "timestamp": {"gte": "{{ctx.payload.job1.hits.hits.0._source.timestamp}}||-{{ctx.metadata.lookback_window}}", "lte": "{{ctx.payload.job1.hits.hits.0._source.timestamp}}"}}}, | |
{ "term": {"result_type": "record"}}, | |
{ "term": {"job_id": "{{ctx.metadata.job3_name}}"}}, | |
// example of optional filters | |
// { "regexp":{"hostname":{"value":"dbserver.*"}}}, | |
{ "range": {"record_score": {"gte": "{{ctx.metadata.job3_min_record_score}}"}}} | |
] | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
] | |
} | |
}, | |
"condition" : { | |
"script" : { | |
// return true only if all 3 jobs returned "hits", thus all had anomalies matching input conditions | |
"source" : "return ctx.payload.job1.hits.total > 0 && ctx.payload.job2.hits.total > 0 && ctx.payload.job3.hits.total > 0" | |
} | |
}, | |
"actions": { | |
"log": { | |
// use java stream() to collect hits from each into collections | |
"transform": { | |
"script": "return ['anomaly_score': ctx.payload.job1.hits.hits.0._source.anomaly_score, 'bucket_time': Instant.ofEpochMilli(ctx.payload.job1.hits.hits.0._source.timestamp).atZone(ZoneOffset.UTC).format(DateTimeFormatter.ofPattern('yyyy-MM-dd HH:mm:ss')),'job2_anomaly_details':ctx.payload.job2.hits.hits.stream().map(p -> ['bucket_time': Instant.ofEpochMilli(ctx.payload.job2.hits.hits.0._source.timestamp).atZone(ZoneOffset.UTC).format(DateTimeFormatter.ofPattern('yyyy-MM-dd HH:mm:ss')),'field_name':p._source.field_name,'score':p._source.record_score,'actual':p._source.actual.0,'typical':p._source.typical.0]).collect(Collectors.toList()),'job3_anomaly_details':ctx.payload.job3.hits.hits.stream().map(p -> ['bucket_time': Instant.ofEpochMilli(ctx.payload.job3.hits.hits.0._source.timestamp).atZone(ZoneOffset.UTC).format(DateTimeFormatter.ofPattern('yyyy-MM-dd HH:mm:ss')),'hostname':p._source.hostname.0,'field_name':p._source.field_name,'score':p._source.record_score,'actual':p._source.actual.0,'typical':p._source.typical.0]).collect(Collectors.toList())]" | |
}, | |
"logging": { | |
"text": "[CRITICAL] Anomaly Alert for job {{ctx.metadata.job1_name}}: score={{ctx.payload.anomaly_score}} at {{ctx.payload.bucket_time}} UTC \nPossibly influenced by these other anomalous metrics (within the prior 10 minutes):\njob:{{ctx.metadata.job2_name}}: (anomalies with at least a record score of {{ctx.metadata.job2_min_record_score}}):\n{{#ctx.payload.job2_anomaly_details}}field={{field_name}}: score={{score}}, value={{actual}} (typical={{typical}}) at {{bucket_time}} UTC\n{{/ctx.payload.job2_anomaly_details}}\njob:{{ctx.metadata.job3_name}}: (anomalies with at least a record score of {{ctx.metadata.job3_min_record_score}}):\n{{#ctx.payload.job3_anomaly_details}}hostname={{hostname}} field={{field_name}}: score={{score}}, value={{actual}} (typical={{typical}}) at {{bucket_time}} UTC\n{{/ctx.payload.job3_anomaly_details}}" | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example output: