Skip to content

Instantly share code, notes, and snippets.

@richcollier
Created July 31, 2018 18:52
Show Gist options
  • Save richcollier/83c08c877b9ff17d1a4a50a66ce7430d to your computer and use it in GitHub Desktop.
Save richcollier/83c08c877b9ff17d1a4a50a66ce7430d to your computer and use it in GitHub Desktop.
#example chain watch passing array of results
POST _xpack/watcher/watch/_execute
{
"watch": {
"trigger": {
"schedule": {
"interval": "5m"
}
},
"input": {
"chain": {
"inputs": [
{
"first": {
"search": {
"request": {
"indices": [
".ml-anomalies-*"
],
"body": {
"size": 100,
"query": {
"bool": {
"filter": [
{
"range": {
"timestamp": {
"gte": "now-2y"
}
}
},
{
"term": {
"result_type": "record"
}
},
{
"term": {
"job_id": "farequote_count_split"
}
},
{
"range": {
"record_score": {
"gte": "5"
}
}
}
]
}
}
}
}
}
}
},
{
"second": {
"transform": {
"script": """
ctx.payload.first.hits.hits.stream().map(h -> "(timestamp:" + h._source.timestamp + " AND airline:" + h._source.partition_field_value +")").collect(Collectors.joining(" OR "))
"""
}
}
},
{
"third":{
"search": {
"request": {
"indices": [
".ml-anomalies-*"
],
"body": {
"size": 100,
"query": {
"bool": {
"filter": [
{
"range": {
"timestamp": {
"gte": "now-2y"
}
}
},
{
"term": {
"result_type": "record"
}
},
{
"term": {
"job_id": "farequote_responsetime"
}
},
{
"range": {
"record_score": {
"gte": "5"
}
}
},
{
"query_string": {
"query": "{{ctx.payload.second._value}}"
}
}
]
}
}
}
}
}
}
}
]
}
},
"condition": {
"compare": {
"ctx.payload.third.hits.total": {
"gt": 0
}
}
},
"actions": {
"log": {
"transform": {
"script": "return ctx.payload.third.hits.hits.stream().map(p -> ['airline':p._source.partition_field_value,'score':p._source.record_score,'timestamp':p._source.timestamp]).collect(Collectors.toList());"
},
"logging": {
"text": """
Anomalies:
{{#ctx.payload._value}}
airline={{airline}} at timestamp={{timestamp}}
{{/ctx.payload._value}}
"""
}
}
}
}
}
@richcollier
Copy link
Author

The "magic" here happens in the second chain where the query string for third input is dynamically built on the fly from the results from the first chain.

So, when run on the sample farequote data, where the first job is a count partition=airline, the first input returns 3 hits, which then gets transformed into the following string:

(timestamp:1486656900000 AND airline:AAL) OR (timestamp:1486656000000 AND airline:AAL) OR (timestamp:1486638900000 AND airline:ACA)

When passed as a query string for the third input (where the job is anomalies in mean(responsetime) partition=airline) then the end result is only 2 results:

Anomalies:
airline=AAL at timestamp=1486656000000
airline=AAL at timestamp=1486656900000

In other words, only 2 of the 3 anomalies from the first ML job were found in the second ML job's anomalies

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment