Skip to content

Instantly share code, notes, and snippets.

@gkousiouris
Last active June 19, 2023 13:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gkousiouris/359b15796c59e574de354e7f243ed3c4 to your computer and use it in GitHub Desktop.
Save gkousiouris/359b15796c59e574de354e7f243ed3c4 to your computer and use it in GitHub Desktop.
High Availability Router for Openwhisk

This is a pattern subflow in order to implement a routing logic. The subflow is used to divert traffic among locations based on a condition set. The default location is the first output whereas the fallback location is the second output.

router

The user needs to select through the subflow UI the following parameters:

  • redirection metric: one of the monitored parameters of the Openwhisk monitoring subflow, i.e. wait time, init time, duration or success percentage
  • redirection value is the target value above or under which the redirection should occur
  • comparison logic through the boolean redirectIfLarger parameter. This is used to define the comparison between the target value and the current value. So if redirectIfLargeris set to true, the redirection will occur if the redirection metric is larger than the redirection value
  • primary ratio may be used to define whether the redirection will be of a fallback nature or a shared traffic option. Thus a default value of 0 will redirect all traffic to the secondary output. A non zero value will probabilistically redirect the according percentage of requests to the primary endpoint and the remaining ones to the second output.

The ratio can also be defined at runtime through a POST /endpoints method with a body like { "endpoints":[ { "endpoint":"primary", "percent":90 } ] }. This will override the set value in the UI and can be used for runtime adaptation in order to support e.g. the implementation of Circuit Breaker approaches.

options

The Router node needs also to be fed with the output of the OW monitor and logger subflow, targeting at the default location. Based on this input and the user selection, the decision is made.

In the aforementioned screenshot configuration, 30% of the requests coming in at the Test Msg inject node will be forwarded to the redirection endpoint if the wait time of the default Openwhisk location exceeds 500 milliseconds. The remaining 70% will be forwarded to the primary endpoint output. If the wait time is under 500 milliseconds, all of the messages will be directed towards the primary output. The decision will be updated each time new metrics from the OW monitor come in. This is configurable in the OW monitor subflow (polling period option). In case of inability to reach the default Openwhisk endpoint (indicated by a relevant failed value in the incoming msg.statusCode property of the OW monitor output) the Router switches by default to the redirection output. Thus if the normal message that needs to be routed has a msg.statusCode property, it should be removed before feeding into the Router node.

[
{
"id": "343bbf5623c098fc",
"type": "subflow",
"name": "Router",
"info": "This is a pattern subflow in order to implement a routing logic. The subflow is used to divert traffic among locations based on a condition set. The default location is the first output whereas the fallback location is the second output.\n\nThe user needs to select through the subflow UI the following parameters:\n - `redirection metric`: one of the monitored parameters of the Openwhisk monitoring subflow, i.e. `wait time`, `init time`, `duration` or `success percentage`\n- `redirection value` is the target value above or under which the redirection should occur\n- comparison logic through the boolean `redirectIfLarger` parameter. This is used to define the comparison between the target value and the current value. So if `redirectIfLarger`is set to true, the redirection will occur if the `redirection metric` is larger than the `redirection value`\n- `primary ratio` may be used to define whether the redirection will be of a fallback nature or a shared traffic option. Thus a default value of `0` will redirect all traffic to the secondary output. A non zero value will probabilistically redirect the according percentage of requests to the primary endpoint and the remaining ones to the second output.\n\nThe ratio can also be defined at runtime through a `POST /endpoints` method with a body like `{\n \"endpoints\":[\n {\n \"endpoint\":\"primary\",\n \"percent\":90 \n }\n ]\n}`. This will override the set value in the UI and can be used for runtime adaptation in order to support e.g. the implementation of Circuit Breaker approaches.\n\nThe Router node needs also to be fed with the output of the [OW monitor and logger subflow](https://flows.nodered.org/flow/a86475720659b3ed9eb5024052d94b1d/in/HXSkA2JJLcGA), targeting at the default location. Based on this input and the user selection, the decision is made.\n\nIn case of inability to reach the default Openwhisk endpoint (indicated by a relevant failed value in the incoming `msg.statusCode` property of the OW monitor output) the Router switches by default to the redirection output. Thus if the normal message that needs to be routed has a `msg.statusCode` property, it should be removed before feeding into the Router node.\n\n",
"category": "PHYSICS",
"in": [
{
"x": 160,
"y": 100,
"wires": [
{
"id": "b3334a9fce2e641d"
}
]
}
],
"out": [
{
"x": 900,
"y": 80,
"wires": [
{
"id": "1cd90e99e32a545a",
"port": 0
}
]
},
{
"x": 900,
"y": 140,
"wires": [
{
"id": "1cd90e99e32a545a",
"port": 1
}
]
}
],
"env": [
{
"name": "redirectionMetric",
"type": "str",
"value": "",
"ui": {
"type": "select",
"opts": {
"opts": [
{
"l": {
"en-US": "waitTime"
},
"v": "waitTime"
},
{
"l": {
"en-US": "initTime"
},
"v": "initTime"
},
{
"l": {
"en-US": "duration"
},
"v": "duration"
},
{
"l": {
"en-US": "successPercentage (%)"
},
"v": "successPercentage"
}
]
}
}
},
{
"name": "redirectIfLarger",
"type": "bool",
"value": "true"
},
{
"name": "redirectionValue",
"type": "num",
"value": ""
},
{
"name": "primary",
"type": "num",
"value": "0",
"ui": {
"label": {
"en-US": "Primary ratio (%) in case of failure"
}
}
}
],
"meta": {},
"color": "#FDD0A2",
"icon": "node-red/batch.svg"
},
{
"id": "8ea38f4cb0d69b72",
"type": "function",
"z": "343bbf5623c098fc",
"name": "set monitor flow variables",
"func": "flow.set('successPercentage',msg.payload.results.successPercentage);\nflow.set('waitTime',msg.payload.results.movingAverageWaitTime);\nflow.set('initTime',msg.payload.results.movingAverageInitTime);\nflow.set('duration',msg.payload.results.movingAverageDuration);\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 610,
"y": 280,
"wires": [
[]
]
},
{
"id": "1cd90e99e32a545a",
"type": "function",
"z": "343bbf5623c098fc",
"name": "probabilistic selection",
"func": "\nvar endpoints=flow.get('endpoints');\nif (env.get('redirectIfLarger')){ //redirection if limit larger than current \n if (env.get('redirectionValue')<flow.get(env.get('redirectionMetric'))){\n \n if(probability(endpoints.endpoints[0].percent/100)){//primary endpoint probability\n // Code to run if success\n return [msg,null] //primary endpoint\n \n } else {\n return [null,msg] //secondary endpoint\n }\n \n } else {\n return [msg,null] //do not redirect\n }\n} else { //if smaller than\n if (env.get('redirectionValue')>flow.get(env.get('redirectionMetric'))){ //target > current\n if(probability(endpoints.endpoints[0].percent/100)){//primary endpoint probability\n // Code to run if primary endpoint selected\n return [msg,null] //primary endpoint\n \n } else {\n return [null,msg] //secondary endpoint\n }\n } else {\n return [msg,null]\n }\n}\n\nfunction probability(n){\n return Math.random() < n;\n}\n\n\n\n\nreturn msg;",
"outputs": 2,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 760,
"y": 100,
"wires": [
[],
[]
]
},
{
"id": "0fe910ae9e842aa0",
"type": "http in",
"z": "343bbf5623c098fc",
"name": "",
"url": "/endpoints",
"method": "delete",
"upload": false,
"swaggerDoc": "",
"x": 310,
"y": 560,
"wires": [
[
"e434fb6d22f068ee"
]
]
},
{
"id": "e434fb6d22f068ee",
"type": "function",
"z": "343bbf5623c098fc",
"name": "check previous",
"func": "\nflow.set(\"endpoints\",undefined);\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 530,
"y": 560,
"wires": [
[
"c8ced585843e8548"
]
]
},
{
"id": "c8ced585843e8548",
"type": "switch",
"z": "343bbf5623c098fc",
"name": "exists",
"property": "exists",
"propertyType": "msg",
"rules": [
{
"t": "true"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 700,
"y": 560,
"wires": [
[
"e2974b79c1614578"
],
[
"69b6225e6524ac4b"
]
]
},
{
"id": "e2974b79c1614578",
"type": "http response",
"z": "343bbf5623c098fc",
"name": "",
"statusCode": "200",
"headers": {},
"x": 850,
"y": 520,
"wires": []
},
{
"id": "69b6225e6524ac4b",
"type": "http response",
"z": "343bbf5623c098fc",
"name": "",
"statusCode": "404",
"headers": {},
"x": 850,
"y": 620,
"wires": []
},
{
"id": "54d7bf70a2ebb2af",
"type": "comment",
"z": "343bbf5623c098fc",
"name": "Limitation",
"info": "In Node-RED there is no ability to define two different methods on the same endpoint, thus no DELETE method can be defined directly in the /semaphore endpoint. A new endpoint was defined for the delete operation.",
"x": 270,
"y": 520,
"wires": []
},
{
"id": "1f626a94bca88319",
"type": "http in",
"z": "343bbf5623c098fc",
"name": "",
"url": "/endpoints",
"method": "post",
"upload": false,
"swaggerDoc": "",
"x": 290,
"y": 780,
"wires": [
[
"c519fde194729c6c"
]
]
},
{
"id": "4b9e30c340af7c56",
"type": "http response",
"z": "343bbf5623c098fc",
"name": "",
"statusCode": "200",
"headers": {},
"x": 850,
"y": 740,
"wires": []
},
{
"id": "fefebfbe6b28c97b",
"type": "http response",
"z": "343bbf5623c098fc",
"name": "",
"statusCode": "303",
"headers": {},
"x": 850,
"y": 840,
"wires": []
},
{
"id": "646568ba8b9d7dc1",
"type": "comment",
"z": "343bbf5623c098fc",
"name": "INPUT FORMAT: NAME AND INIT VALUE",
"info": "{\n \"endpoints\":[\n {\n \"endpoint\":\"\",\n \"percent\":100 \n },\n {\n \"endpoint\":\"\",\n \"percent\":0 \n }\n ]\n \n \n}\n\nThe /endpoints should be used to include the overall\npolicy.So any policy included would override the previous one completely\n\n",
"x": 380,
"y": 720,
"wires": []
},
{
"id": "c519fde194729c6c",
"type": "function",
"z": "343bbf5623c098fc",
"name": "check previous",
"func": "flow.set('endpoints',msg.payload);\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 530,
"y": 780,
"wires": [
[
"f09c2049a68137cc"
]
]
},
{
"id": "f09c2049a68137cc",
"type": "switch",
"z": "343bbf5623c098fc",
"name": "exists",
"property": "exists",
"propertyType": "msg",
"rules": [
{
"t": "false"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 700,
"y": 780,
"wires": [
[
"4b9e30c340af7c56"
],
[
"fefebfbe6b28c97b"
]
]
},
{
"id": "3a603d1a6cbccc8d",
"type": "http in",
"z": "343bbf5623c098fc",
"name": "",
"url": "/endpoints",
"method": "get",
"upload": false,
"swaggerDoc": "",
"x": 290,
"y": 1000,
"wires": [
[
"eba9e1613372d043"
]
]
},
{
"id": "12ee9c78e7c9ab2a",
"type": "http response",
"z": "343bbf5623c098fc",
"name": "",
"statusCode": "200",
"headers": {},
"x": 850,
"y": 960,
"wires": []
},
{
"id": "0c94016f9fde8a41",
"type": "http response",
"z": "343bbf5623c098fc",
"name": "",
"statusCode": "303",
"headers": {},
"x": 850,
"y": 1060,
"wires": []
},
{
"id": "eba9e1613372d043",
"type": "function",
"z": "343bbf5623c098fc",
"name": "check previous",
"func": "msg.payload=flow.get('endpoints');\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 530,
"y": 1000,
"wires": [
[
"79650c326c575f4e"
]
]
},
{
"id": "79650c326c575f4e",
"type": "switch",
"z": "343bbf5623c098fc",
"name": "exists",
"property": "exists",
"propertyType": "msg",
"rules": [
{
"t": "false"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 700,
"y": 1000,
"wires": [
[
"12ee9c78e7c9ab2a"
],
[
"0c94016f9fde8a41"
]
]
},
{
"id": "cf79f3baec7d5d6a",
"type": "inject",
"z": "343bbf5623c098fc",
"name": "",
"props": [
{
"p": "payload"
},
{
"p": "topic",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payloadType": "date",
"x": 230,
"y": 1180,
"wires": [
[
"5fe406b64ca1d58f"
]
]
},
{
"id": "5fe406b64ca1d58f",
"type": "function",
"z": "343bbf5623c098fc",
"name": "",
"func": "var endpoints={};\nif (!(flow.get('endpoints'))){\n flow.set('endpoints',endpoints); \n}\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 390,
"y": 1180,
"wires": [
[
"8c9c4314522d810e"
]
]
},
{
"id": "8c9c4314522d810e",
"type": "debug",
"z": "343bbf5623c098fc",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "false",
"statusVal": "",
"statusType": "auto",
"x": 570,
"y": 1180,
"wires": []
},
{
"id": "3ca93063c77c91b1",
"type": "comment",
"z": "343bbf5623c098fc",
"name": "Behaviour",
"info": "User selects the desired metric and limit value, as well as percentage of redirection if the limit is met. If the limit is not met, the default behavior is to send the requests only to the main endpoint.\n\nPattern reroutes based on this .\n\nPattern should continue to monitor the initial endpoint and switch back if the limit goes down (this might create an oscillation, so we need to check the circuit breaker pattern--also the one implemented in GSoC 2022)\n\nor have a step-wise logic to decrease percentage every time window based on observed output\n\n",
"x": 100,
"y": 40,
"wires": []
},
{
"id": "85cd586fa89b738e",
"type": "comment",
"z": "343bbf5623c098fc",
"name": "Routing implementation- To include some form of oscillation avoidance",
"info": "Needs to call the get /endpoints to adapt the switch",
"x": 450,
"y": 340,
"wires": []
},
{
"id": "7fdf5c0b84efc82b",
"type": "comment",
"z": "343bbf5623c098fc",
"name": "Decision Logic",
"info": "Needs to contact the /post endpoints to enforce it",
"x": 600,
"y": 20,
"wires": []
},
{
"id": "36f62309b13fbf26",
"type": "function",
"z": "343bbf5623c098fc",
"name": "Switch if monitor",
"func": "if (msg.payload.hasOwnProperty('results')&&msg.payload.hasOwnProperty('rawData')){\n return [null,msg];\n} else {\n return [msg,null];\n}\n",
"outputs": 2,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 540,
"y": 100,
"wires": [
[
"1cd90e99e32a545a"
],
[
"8ea38f4cb0d69b72"
]
]
},
{
"id": "dbdf0b9c534a3332",
"type": "comment",
"z": "343bbf5623c098fc",
"name": "DEFAULT",
"info": "",
"x": 860,
"y": 40,
"wires": []
},
{
"id": "7f018369a42d3e09",
"type": "comment",
"z": "343bbf5623c098fc",
"name": "REDIRECTION",
"info": "",
"x": 880,
"y": 180,
"wires": []
},
{
"id": "b3334a9fce2e641d",
"type": "function",
"z": "343bbf5623c098fc",
"name": "if OW unreachable",
"func": "\n//if flow variable exists, use that, else initialize with UI value \nvar endpoints=flow.get('endpoints') ;\nif (typeof endpoints==\"undefined\"){\n //var endpoints=new Array();\n var endpoint={'endpoints':[{'endpoint':'primary','percent':env.get('primary')}]};\n //endpoints.push(endpoint);\n flow.set('endpoints',endpoint)\n}\n\n\nif (msg.hasOwnProperty('statusCode')){//from monitor\n if (Number.isInteger(msg.statusCode)){\n if (msg.statusCode>202){\n return [null,msg];\n } else {\n return [msg,null];\n }\n }else {\n return [null,msg];\n }\n} else { //routed message\n return [msg,null];\n}\nreturn msg;",
"outputs": 2,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 320,
"y": 100,
"wires": [
[
"36f62309b13fbf26"
],
[
"dbb4b67326c50f8a"
]
]
},
{
"id": "dbb4b67326c50f8a",
"type": "function",
"z": "343bbf5623c098fc",
"name": "set Inf flow variables",
"func": "flow.set('successPercentage',0);\nflow.set('waitTime',10000000);\nflow.set('initTime',10000000);\nflow.set('duration',10000000);\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 320,
"y": 280,
"wires": [
[]
]
},
{
"id": "903b50d050ef4be7",
"type": "function",
"z": "343bbf5623c098fc",
"name": "static_selection",
"func": "\n\nif (env.get('redirectIfLarger')){ //redirection if limit larger than current \n if (env.get('redirectionValue')<flow.get(env.get('redirectionMetric'))){\n return [null,msg] //redirect\n } else {\n return [msg,null] //do not redirect\n }\n} else { //if smaller than\n if (env.get('redirectionValue')>flow.get(env.get('redirectionMetric'))){ //target > current\n return [null,msg]\n } else {\n return [msg,null]\n }\n}\n\n\nreturn msg;",
"outputs": 2,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 840,
"y": 400,
"wires": [
[],
[]
]
},
{
"id": "8262c8e75cfbabdd",
"type": "subflow",
"name": "OW monitor",
"info": "\n\nThis is a subflow node in order to monitor the latest performance (sliding window) of Openwhisk actions. The flow pings periodically the target Openwhisk installation in order to retrieve the last executed actions and extract statistics from their execution.\n\nConfiguration includes:\n - the polling time of the monitor (default 30 seconds, `msg.pollingPeriod`)\n - the target Openwhisk endpoint (`msg.targetEndpoint`)\n - credentials for that endpoint (`msg.creds`) in the form of user:key\n - the window of time (in minutes) in the past for which you want to retrieve results (`msg.window`)\n - an optional action name (`msg.action`), if one wants to filter specific action activations\n\nThe configuration can be set either via the flow UI or the incoming message. The incoming message prevails over the UI-set value.\n\nThe monitor can be stopped via a `msg.stop=true` message. The subflow uses a flow variable in order to control starting and stopping, hence only one OW monitor should be used in each flow.\n\nThe outputs are 4:\n - `output 1-MONITOR` is the normal performance report\n - `output 2-STOP` is the notification of the STOP operation\n - `output 3-ERRORS` is used for filtering ERRORS and according logs in the executed functions\n - `output 4-UNREACHABLE` is used when the Openwhisk endpoint is unreachable, either due to `msg.statusCode`>200 value in the Openwhisk API call for getting the activation or othe error cases such as `ECONNREFUSED` or `ENOTFOUND`\n\nThe results in Output 1 include the raw data acquired (`msg.payload.rawData`) in the last window of time, as well as the moving averages of duration (`msg.payload.results.movingAverageDuration`), init time (`msg.payload.results.movingAverageInitTime`) and wait time (`msg.payload.results.movingAverageWaitTime`) of the window. The number of cold starts are also included (`msg.payload.results.coldStarts`) as well as the percentage with relation to the total calls (`msg.payload.results.coldStartPercentage`). If no activations are found, then the averages return NaN values.\n\n",
"category": "PHYSICS PEF",
"in": [
{
"x": 60,
"y": 80,
"wires": [
{
"id": "408b32b6eb687a08"
}
]
}
],
"out": [
{
"x": 860,
"y": 160,
"wires": [
{
"id": "fdcd7c09df04efbb",
"port": 0
}
]
},
{
"x": 560,
"y": 80,
"wires": [
{
"id": "e3862e86d11b08d5",
"port": 0
}
]
},
{
"x": 1080,
"y": 320,
"wires": [
{
"id": "14511977ba16aed2",
"port": 0
}
]
},
{
"x": 860,
"y": 240,
"wires": [
{
"id": "9d53b42bbf75940d",
"port": 1
}
]
}
],
"env": [
{
"name": "targetEndpoint",
"type": "str",
"value": "http://10.100.59.182:3233/api/v1",
"ui": {
"label": {
"en-US": "Target Endpoint"
}
}
},
{
"name": "pollingPeriod",
"type": "num",
"value": "30",
"ui": {
"label": {
"en-US": "Polling Period (seconds)"
}
}
},
{
"name": "window",
"type": "num",
"value": "10",
"ui": {
"label": {
"en-US": "Window of Time (minutes)"
}
}
},
{
"name": "namespace",
"type": "str",
"value": "guest",
"ui": {
"label": {
"en-US": "Namespace"
}
}
},
{
"name": "action",
"type": "str",
"value": "",
"ui": {
"label": {
"en-US": "Optional Action Name"
}
}
},
{
"name": "creds",
"type": "cred",
"ui": {
"label": {
"en-US": "Credentials (user:key)"
}
}
}
],
"meta": {
"module": "node-red-contrib-owmonitor",
"version": "0.0.1",
"author": "George Kousiours <gkousiou@hua.gr>",
"desc": "This is a subflow node in order to monitor the latest performance (sliding window) of Openwhisk actions. The flow pings periodically the target Openwhisk installation in order to retrieve the last executed actions and extract statistics from their execution.",
"keywords": "openwhisk, performance, monitor, sliding window",
"license": "Apache-2.0"
},
"color": "#E9967A",
"icon": "node-red/status.svg"
},
{
"id": "f81580b2bf9ad891",
"type": "http request",
"z": "8262c8e75cfbabdd",
"name": "",
"method": "use",
"ret": "obj",
"paytoqs": "ignore",
"url": "",
"tls": "df3a306242e21be5",
"persist": false,
"proxy": "",
"authType": "",
"x": 190,
"y": 180,
"wires": [
[
"d2ded97115fa9473",
"9d53b42bbf75940d"
]
]
},
{
"id": "408b32b6eb687a08",
"type": "function",
"z": "8262c8e75cfbabdd",
"name": "defaults",
"func": "if (msg.hasOwnProperty('window')){\n \n} else {\n msg.window=env.get('window');\n}\n\nif (msg.hasOwnProperty('targetEndpoint')){\n \n} else {\n msg.targetEndpoint=env.get('targetEndpoint');\n}\n\nif (msg.hasOwnProperty('pollingPeriod')){\n \n} else {\n msg.pollingPeriod=env.get('pollingPeriod');\n}\n\nif (msg.hasOwnProperty('creds')){\n \n} else {\n msg.creds=env.get('creds');\n}\n\nif (msg.hasOwnProperty('namespace')){\n \n} else {\n msg.namespace=env.get('namespace');\n}\n\nif (msg.hasOwnProperty('action')){\n \n} else {\n msg.action=env.get('action');\n}\n\nif (msg.hasOwnProperty('stop')){\n flow.set('stop',msg.stop);\n} else {\n flow.set('stop',false);\n}\n\nmsg.headers={};\nvar auth = 'Basic ' + new Buffer(msg.creds).toString('base64');\nmsg.headers = {\n \"Authorization\": auth\n}\n\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 180,
"y": 80,
"wires": [
[
"e3862e86d11b08d5"
]
]
},
{
"id": "e3862e86d11b08d5",
"type": "function",
"z": "8262c8e75cfbabdd",
"name": "produce rate",
"func": "\nfunction sayHi(input) {\n \n if (flow.get('stop')) {\n msg.payload=\"STOPPED\";\n node.send([msg,null]);\n clearTimeout(timerId);\n \n } else {\n msg.since=Date.now()-msg.window*60*1000; //remove window of time from timestamp\n //add since filter\n msg.url=msg.baseurl+'?since='+msg.since;\n msg.url=msg.url+'&limit=0';//by default, OW returns 30 results. We need this to return without limit\n //add optional action name filter. This needs to be second since it may not always exist\n if (msg.action!==\"\"){\n msg.url=msg.url+'&name='+msg.action;\n }\n\n node.send([null,msg])\n \n }\n}\n\nmsg.baseurl=msg.targetEndpoint+'/namespaces/'+msg.namespace+'/activations';\n\nmsg.method='GET';\nvar timerId=setInterval(sayHi, msg.pollingPeriod*1000,msg);\n\n\n",
"outputs": "2",
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 370,
"y": 80,
"wires": [
[],
[
"f81580b2bf9ad891"
]
]
},
{
"id": "b61d171594e2cdf3",
"type": "function",
"z": "8262c8e75cfbabdd",
"name": "preprocess results",
"func": "\n\nvar resultsArray=[];\n\nvar results={};\n\nvar errorsArray=[];\n\nfor (k=0;k<msg.payload.length;k++){\n //from OW\n results.activationId=msg.payload[k].activationId;\n results.duration=msg.payload[k].duration;\n results.start=msg.payload[k].start;\n results.end=msg.payload[k].end;\n results.action=msg.payload[k].name;\n results.namespace=msg.payload[k].namespace;\n results.statusCode=msg.payload[k].statusCode;\n results.version=msg.payload[k].version;\n \n if (msg.payload[k].statusCode!=0){\n errorsArray.push(msg.payload[k].activationId);\n }\n \n //results.success=msg.payload[k].response.success; This is not included in the summary results\n for (i=0;i<msg.payload[k].annotations.length;i++){\n if (msg.payload[k].annotations[i].key==='waitTime'){\n results.waitTime=msg.payload[k].annotations[i].value;\n }\n if (msg.payload[k].annotations[i].key==='initTime'){\n results.initTime=msg.payload[k].annotations[i].value;\n }\n \n }\n \n if (!(results.hasOwnProperty('initTime'))){\n results.initTime=0;\n }\n if (msg.action===\"\"){\n resultsArray.push(results); \n } else {\n if (results.action===msg.action){\n resultsArray.push(results);\n }\n }\n \n results={};\n \n} \nmsg.payload={};\nmsg.payload=resultsArray;\nmsg.errors=errorsArray;\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 550,
"y": 160,
"wires": [
[
"fdcd7c09df04efbb",
"9a0a8ecbe2031562"
]
]
},
{
"id": "fdcd7c09df04efbb",
"type": "function",
"z": "8262c8e75cfbabdd",
"name": "averages",
"func": "var rawData=msg.payload;\nmsg.payload={};\nmsg.payload.rawData=rawData;\nvar samples=msg.payload.rawData.length;\n\nmsg.results={};\nmsg.results.averageDuration=0;\nmsg.results.averageWaitTime=0;\nmsg.results.averageInitTime=0;\nmsg.results.coldStarts=0;\nmsg.results.successPercentage=0;\n\nvar duration= [];//=new Array();\nvar waitTime=[];\nvar initTime=[];\n\n\nfor (i=0;i<msg.payload.rawData.length;i++){\n msg.results.averageDuration=msg.results.averageDuration+msg.payload.rawData[i].duration;\n duration.push(msg.payload.rawData[i].duration);\n \n msg.results.averageWaitTime=msg.results.averageWaitTime+msg.payload.rawData[i].waitTime;\n waitTime.push(msg.payload.rawData[i].waitTime);\n \n msg.results.averageInitTime=msg.results.averageInitTime+msg.payload.rawData[i].initTime;\n initTime.push(msg.payload.rawData[i].initTime);\n if (msg.payload.rawData[i].initTime!==0){\n msg.results.coldStarts=msg.results.coldStarts+1;\n }\n if (msg.payload.rawData[i].statusCode===0){\n msg.results.successPercentage=msg.results.successPercentage+1;\n }\n \n\n}\nmsg.results.successPercentage=(msg.results.successPercentage/msg.payload.rawData.length)*100;\nmsg.results.averageDuration=toFixedNumber(msg.results.averageDuration/samples,2);\nmsg.results.averageWaitTime=toFixedNumber(msg.results.averageWaitTime/samples,2);\nmsg.results.averageInitTime=toFixedNumber(msg.results.averageInitTime/samples,2);\nmsg.results.coldStartPercentage=toFixedNumber((msg.results.coldStarts/samples)*100,2);\n\nmsg.results.movingAverageDuration=msg.results.averageDuration;\ndelete msg.results.averageDuration;\n\nmsg.results.movingAverageWaitTime=msg.results.averageWaitTime;\ndelete msg.results.averageWaitTime;\n\nmsg.results.movingAverageInitTime=msg.results.averageInitTime;\ndelete msg.results.averageInitTime;\n\n\nmsg.payload.results=msg.results;\nreturn msg;\n\n\nfunction toFixedNumber(num, digits, base){\n var pow = Math.pow(base||10, digits);\n return Math.round(num*pow) / pow;\n}\n",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 740,
"y": 160,
"wires": [
[]
]
},
{
"id": "b3d54c50ea53f87c",
"type": "split",
"z": "8262c8e75cfbabdd",
"name": "",
"splt": "\\n",
"spltType": "str",
"arraySplt": 1,
"arraySpltType": "len",
"stream": false,
"addname": "",
"x": 370,
"y": 320,
"wires": [
[
"6be2109aa289fde8"
]
]
},
{
"id": "9a0a8ecbe2031562",
"type": "change",
"z": "8262c8e75cfbabdd",
"name": "",
"rules": [
{
"t": "set",
"p": "payload",
"pt": "msg",
"to": "errors",
"tot": "msg"
}
],
"action": "",
"property": "",
"from": "",
"to": "",
"reg": false,
"x": 220,
"y": 320,
"wires": [
[
"b3d54c50ea53f87c"
]
]
},
{
"id": "6be2109aa289fde8",
"type": "function",
"z": "8262c8e75cfbabdd",
"name": "prepare call",
"func": "msg.url=msg.targetEndpoint+'/namespaces/'+msg.namespace+'/activations/'+msg.payload;\nmsg.method='GET';\nmsg.headers={};\nvar auth = 'Basic ' + new Buffer(msg.creds).toString('base64');\nmsg.headers = {\n \"Authorization\": auth\n}\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 510,
"y": 320,
"wires": [
[
"29a1f27bad73001c"
]
]
},
{
"id": "29a1f27bad73001c",
"type": "http request",
"z": "8262c8e75cfbabdd",
"name": "",
"method": "use",
"ret": "obj",
"paytoqs": "ignore",
"url": "",
"tls": "df3a306242e21be5",
"persist": false,
"proxy": "",
"authType": "",
"x": 670,
"y": 320,
"wires": [
[
"75fded98834b7338"
]
]
},
{
"id": "14511977ba16aed2",
"type": "join",
"z": "8262c8e75cfbabdd",
"name": "",
"mode": "auto",
"build": "object",
"property": "payload",
"propertyType": "msg",
"key": "topic",
"joiner": "\\n",
"joinerType": "str",
"accumulate": "false",
"timeout": "",
"count": "",
"reduceRight": false,
"x": 970,
"y": 320,
"wires": [
[]
]
},
{
"id": "75fded98834b7338",
"type": "function",
"z": "8262c8e75cfbabdd",
"name": "logs string",
"func": "msg.payload.logs=msg.payload.logs.join('\\r\\n');\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 830,
"y": 320,
"wires": [
[
"14511977ba16aed2"
]
]
},
{
"id": "d26d863e7351f69d",
"type": "comment",
"z": "8262c8e75cfbabdd",
"name": "STOP",
"info": "",
"x": 650,
"y": 80,
"wires": []
},
{
"id": "5dd2549588fa70e5",
"type": "comment",
"z": "8262c8e75cfbabdd",
"name": "PERFORMANCE STATISTICS",
"info": "",
"x": 1030,
"y": 160,
"wires": []
},
{
"id": "fd5f7e1e8c09478f",
"type": "comment",
"z": "8262c8e75cfbabdd",
"name": "ERROR LOGS",
"info": "",
"x": 1000,
"y": 360,
"wires": []
},
{
"id": "d2ded97115fa9473",
"type": "debug",
"z": "8262c8e75cfbabdd",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"statusVal": "",
"statusType": "auto",
"x": 420,
"y": 500,
"wires": []
},
{
"id": "9d53b42bbf75940d",
"type": "switch",
"z": "8262c8e75cfbabdd",
"name": "if 200",
"property": "statusCode",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "200",
"vt": "num"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 370,
"y": 200,
"wires": [
[
"b61d171594e2cdf3"
],
[]
]
},
{
"id": "7d70f42032c20622",
"type": "comment",
"z": "8262c8e75cfbabdd",
"name": "UNREACHABLE",
"info": "",
"x": 980,
"y": 240,
"wires": []
},
{
"id": "df3a306242e21be5",
"type": "tls-config",
"name": "",
"cert": "",
"key": "",
"ca": "",
"certname": "",
"keyname": "",
"caname": "",
"servername": "",
"verifyservercert": false,
"alpnprotocol": ""
},
{
"id": "dc2a7db83b0544b0",
"type": "subflow:8262c8e75cfbabdd",
"z": "7831a4c7222f0fe4",
"name": "OW MONITOR AND LOGGER",
"env": [
{
"name": "creds",
"type": "cred"
}
],
"x": 390,
"y": 300,
"wires": [
[
"aa84967bdd2c2b36",
"6c31c8cc842743f0"
],
[
"83069fd14364b127"
],
[
"97c64095d60494ab"
],
[
"780b18f5a28b3237",
"6c31c8cc842743f0"
]
]
},
{
"id": "263a31c9b704bc17",
"type": "inject",
"z": "7831a4c7222f0fe4",
"name": "start",
"props": [
{
"p": "payload"
},
{
"p": "topic",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payloadType": "date",
"x": 160,
"y": 280,
"wires": [
[
"dc2a7db83b0544b0"
]
]
},
{
"id": "354759a8c5b688b8",
"type": "inject",
"z": "7831a4c7222f0fe4",
"name": "stop",
"props": [
{
"p": "stop",
"v": "true",
"vt": "bool"
},
{
"p": "topic",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payloadType": "str",
"x": 160,
"y": 360,
"wires": [
[
"dc2a7db83b0544b0"
]
]
},
{
"id": "aa84967bdd2c2b36",
"type": "debug",
"z": "7831a4c7222f0fe4",
"name": "MONITOR",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"statusVal": "",
"statusType": "auto",
"x": 670,
"y": 220,
"wires": []
},
{
"id": "83069fd14364b127",
"type": "debug",
"z": "7831a4c7222f0fe4",
"name": "STOP",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "payload",
"targetType": "msg",
"statusVal": "",
"statusType": "auto",
"x": 650,
"y": 260,
"wires": []
},
{
"id": "97c64095d60494ab",
"type": "debug",
"z": "7831a4c7222f0fe4",
"name": "ERRORS",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "payload",
"targetType": "msg",
"statusVal": "",
"statusType": "auto",
"x": 660,
"y": 300,
"wires": []
},
{
"id": "6c31c8cc842743f0",
"type": "subflow:343bbf5623c098fc",
"z": "7831a4c7222f0fe4",
"name": "",
"env": [
{
"name": "redirectionMetric",
"value": "waitTime",
"type": "str"
},
{
"name": "redirectionValue",
"value": "500",
"type": "num"
},
{
"name": "primary",
"value": "70",
"type": "num"
},
{
"name": "strategy",
"value": "fallBack",
"type": "str"
},
{
"name": "redirectionLarger",
"value": "false",
"type": "bool"
}
],
"x": 350,
"y": 480,
"wires": [
[
"eeb78fc27003d3f1"
],
[
"96d013c327712248"
]
]
},
{
"id": "ed059505418bb835",
"type": "inject",
"z": "7831a4c7222f0fe4",
"name": "TEST MSG",
"props": [
{
"p": "payload"
},
{
"p": "topic",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "test message",
"payloadType": "str",
"x": 170,
"y": 480,
"wires": [
[
"6c31c8cc842743f0"
]
]
},
{
"id": "eeb78fc27003d3f1",
"type": "debug",
"z": "7831a4c7222f0fe4",
"name": "NORMAL",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"statusVal": "",
"statusType": "auto",
"x": 520,
"y": 460,
"wires": []
},
{
"id": "96d013c327712248",
"type": "debug",
"z": "7831a4c7222f0fe4",
"name": "REDIRECTED",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"statusVal": "",
"statusType": "auto",
"x": 540,
"y": 520,
"wires": []
},
{
"id": "14a3a427834a9ec9",
"type": "comment",
"z": "7831a4c7222f0fe4",
"name": "PLUG-IN NORMAL MSG",
"info": "",
"x": 170,
"y": 520,
"wires": []
},
{
"id": "923a844b7cab751a",
"type": "comment",
"z": "7831a4c7222f0fe4",
"name": "PLUG IN DEFAULT ENDPOINT",
"info": "",
"x": 570,
"y": 420,
"wires": []
},
{
"id": "cbd928806abdd7a7",
"type": "comment",
"z": "7831a4c7222f0fe4",
"name": "PLUG IN REDIRECTION ENDPOINT",
"info": "",
"x": 590,
"y": 560,
"wires": []
},
{
"id": "780b18f5a28b3237",
"type": "debug",
"z": "7831a4c7222f0fe4",
"name": "UNREACHABLE",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"statusVal": "",
"statusType": "auto",
"x": 690,
"y": 340,
"wires": []
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment