Skip to content

Instantly share code, notes, and snippets.

@gkousiouris
Last active February 4, 2023 13:15
Show Gist options
  • Save gkousiouris/a97ec190bbf75cec594092895d39c01d to your computer and use it in GitHub Desktop.
Save gkousiouris/a97ec190bbf75cec594092895d39c01d to your computer and use it in GitHub Desktop.
Request Aggregator Service Flow

This is a flow to be run as a service, in order to perform request aggregation towards a target endpoint. The purpose is to reduce the stress towards the back-end system, by minimizing the arrival rate of requests and the number of needed serving threads or containers (in the case of FaaS systems). Thus it also reduces costs and improves performance. It is mostly beneficial in cases where the request needs to set up a rather heavyweight environment (thread or container) for performing a relatively small computation (e.g. in model inference, simulation etc.).

initial

The RA gets as input the target endpoint, as well as the method to be invoked and the input data to be the payload of the aggregated call. It holds the requests until a threshold of incoming messages is reached (set in the setBatchSize endpoint), at which point it collates the inputs of the calls into one array request towards the target endpoint. Then it decomposes the outputs and responds to each individual caller.

generic

The Body of the POST call to the RA should include: -msg.payload.targetEndpoint // the target endpoint that needs call reducing -msg.payload.method //of the target endpoint to be invoked -msg.payload.input //body of the call towards the target Endpoint, in an array form -msg.payload.creds //in the form of user:pwd for basic authentication, if needed by the target endpoint

mainflow

The input data should be in an array form inside the msg.payload.input JSON object. Thus the target endpoint should be able to process arrays coming as JSON values. The output of the target service should return array of JSON objects into the msg.payload.output field. The msg.payload in the response also includes the target input of each call, for testing purposes of correct return of responses to each caller The message batch size can be set through a POST /setBatchSize method, that gets the payload number and sets the according flow variable. In future versions, link with model based setting of the batch size will be performed, in which case the setting should include other aspects such as model ID to be used as well as the location of the model inference service. The flow includes also local testing through the testRA2 endpoint, as well as a set of flows for initialization, batch size setting, and getting info on the current state of the batch (current count of messages and target batch size).

helperflows

At the moment the flow supports one instance of request aggregation, meaning a single endpoint for which the calls will be aggregated. In future versions it will be considered to include the ability to support multiple concurrent endpoints.

A relevant paper that investigates the behavior of this flow can be found below: G. Kousiouris, “A self-adaptive batch request aggregation pattern for improving resource management, response time and costs in microservice and serverless environments,” in 40th IEEE International Performance Computing and Communications Conference (IPCCC 2021), IEEE, 2021.

[
{
"id": "a3ac0cc2.19c668",
"type": "join",
"z": "51b5b7dc.30089",
"name": "",
"mode": "custom",
"build": "array",
"property": "payload",
"propertyType": "msg",
"key": "topic",
"joiner": "\\n",
"joinerType": "str",
"accumulate": false,
"timeout": "",
"count": "",
"reduceRight": false,
"reduceExp": "",
"reduceInit": "",
"reduceInitType": "num",
"reduceFixup": "",
"x": 630,
"y": 320,
"wires": [
[
"e23a1311.6fb55",
"1a74d983.2b654e"
]
]
},
{
"id": "8290230f.2d6cb8",
"type": "function",
"z": "51b5b7dc.30089",
"name": "transform",
"func": "var merged = [].concat.apply([], msg.payload);\nmsg.payload={};\nmsg.payload.input=merged;\nmsg.startTime=Date.now();\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 940,
"y": 320,
"wires": [
[
"7524694d.85b5f",
"27b9c22d.373196"
]
]
},
{
"id": "cbd011cd.a29928",
"type": "http in",
"z": "51b5b7dc.30089",
"name": "",
"url": "/requestAggregator",
"method": "post",
"upload": false,
"swaggerDoc": "",
"x": 190,
"y": 320,
"wires": [
[
"cc8e9fa4.88b5c",
"1fedc1f4.c070d6"
]
]
},
{
"id": "afc59082.3248c",
"type": "http response",
"z": "51b5b7dc.30089",
"name": "RESPO",
"statusCode": "",
"headers": {},
"x": 1410,
"y": 540,
"wires": []
},
{
"id": "e23a1311.6fb55",
"type": "function",
"z": "51b5b7dc.30089",
"name": "merge+creds",
"func": "var ids=[];\nvar temp=[];\nmsg.url=msg.payload[0].targetEndpoint;\nmsg.method=msg.payload[0].method;\n\nmsg.creds=msg.payload[0].creds;\n\nmsg.headers={};\nvar auth = 'Basic ' + new Buffer(msg.creds).toString('base64');\nmsg.headers = {\n \"Authorization\": auth\n}\n\nflow.set('batchMeasuredSize',msg.payload.length);\nfor (i=0;i<msg.payload.length;i++){\n for (k=0;k<msg.payload[i].input.length;k++) {\n ids.push(msg.payload[i].id);\n \n }\n delete msg.payload[i].id;\n delete msg.payload[i].oldmsg;\n temp=msg.payload[i].input;\n msg.payload[i]=temp;\n}\nmsg.batchids=ids;\n\n\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 790,
"y": 320,
"wires": [
[
"8290230f.2d6cb8",
"3d2ea52a.23883a"
]
]
},
{
"id": "f33d0d51.398d6",
"type": "join",
"z": "51b5b7dc.30089",
"name": "",
"mode": "auto",
"build": "object",
"property": "payload",
"propertyType": "msg",
"key": "topic",
"joiner": "\\n",
"joinerType": "str",
"accumulate": false,
"timeout": "",
"count": "2",
"reduceRight": false,
"reduceExp": "",
"reduceInit": "",
"reduceInitType": "num",
"reduceFixup": "",
"x": 1210,
"y": 540,
"wires": [
[
"afc59082.3248c",
"a5c89b67.6f32c"
]
]
},
{
"id": "8ac6a54c.9ffbc",
"type": "function",
"z": "51b5b7dc.30089",
"name": "add ids",
"func": "delete msg.headers;\nvar outputs=msg.payload.output;\nvar array=[];\nvar thisvalue={};\nfor (i=0;i<outputs.length;i++){\n \n thisvalue.id=msg.batchids[i];\n thisvalue.output=outputs[i];\n array.push(thisvalue);\n thisvalue={};\n //msg.payload[i].id=msg.batchids[i];\n //msg.payload[i].output=outputs[i];\n \n}\nmsg.payload={};\nmsg.payload=array;\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 1220,
"y": 380,
"wires": [
[
"c05bd7de.4909b",
"4b2dddc4.3b82dc"
]
]
},
{
"id": "c05bd7de.4909b",
"type": "function",
"z": "51b5b7dc.30089",
"name": "group by id",
"func": "\nvar myArray=msg.payload;\n\nvar group_to_values = myArray.reduce(function (obj, item) {\n obj[item.id] = obj[item.id] || [];\n obj[item.id].push(item.output);\n return obj;\n}, {});\n\nvar groups = Object.keys(group_to_values).map(function (key) {\n return {id: key, output: group_to_values[key]};\n});\n\nmsg.payload={};\nmsg.payload=groups;\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 1250,
"y": 420,
"wires": [
[
"1105d025.abf4e8",
"5cac8271.9cb5a4"
]
]
},
{
"id": "1105d025.abf4e8",
"type": "split",
"z": "51b5b7dc.30089",
"name": "",
"splt": "\\n",
"spltType": "str",
"arraySplt": 1,
"arraySpltType": "len",
"stream": false,
"addname": "",
"x": 1230,
"y": 460,
"wires": [
[
"40bb3684.3f921",
"522eba7d.b6de94"
]
]
},
{
"id": "40bb3684.3f921",
"type": "function",
"z": "51b5b7dc.30089",
"name": "trim",
"func": "\nmsg.topic=msg.payload.id;\n//delete msg.parts;\n\nmsg.parts.id=msg.payload.id;\nmsg.parts.count=2;\nmsg.parts.index=0;\nmsg.parts.len=1;\nmsg.parts.type=\"array\";\ndelete msg.headers;\ndelete msg.req;\ndelete msg.res;\ndelete msg.responseUrl;\ndelete msg.redirectList;\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 1230,
"y": 500,
"wires": [
[
"f33d0d51.398d6",
"7bd21cf1.4357b4"
]
]
},
{
"id": "1f200a96.c2e46d",
"type": "function",
"z": "51b5b7dc.30089",
"name": "create artificial msg.parts",
"func": "msg.parts={};\nmsg.parts.id=msg._msgid;\nmsg.parts.count=2;\nmsg.parts.index=1;\nmsg.parts.len=1;\nmsg.parts.type=\"array\";\n\ndelete msg.complete;\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 850,
"y": 540,
"wires": [
[
"f33d0d51.398d6"
]
]
},
{
"id": "1fedc1f4.c070d6",
"type": "function",
"z": "51b5b7dc.30089",
"name": "msg.complete",
"func": "\n//increase call counter\nflow.set('requestCounter',flow.get('requestCounter')+1);\n\nmsg.payload.id=msg._msgid;\nmsg.topic=msg._msgid;\n\n//msg.complete logic\n//could be extended to include time delay\nvar count=flow.get('count');\ncount=count+1;\nvar now=Date.now();\nvar diff=now-flow.get('startTime');\n\nif (count>=flow.get('target')){//||(diff>flow.get('lockInterval'))){\n msg.complete=1;\n count=0;\n flow.set('startTime',Date.now());\n} else {\n \n //wait\n \n}\n\nif (msg.payload.hasOwnProperty('creds')){\n msg.headers={};\n var auth = 'Basic ' + new Buffer(msg.payload.creds).toString('base64');\n msg.headers = {\n \"Authorization\": auth\n }\n}\n\nflow.set('count',count);\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 440,
"y": 320,
"wires": [
[
"a3ac0cc2.19c668",
"1f200a96.c2e46d"
]
]
},
{
"id": "3341291a.4beb8e",
"type": "comment",
"z": "51b5b7dc.30089",
"name": "INPUT SPEC",
"info": "This is a flow to be run as a service, in order to perform request aggregation towards a target endpoint. The purpose\nis to reduce the stress towards the back-end system, by minimizing the arrival rate of requests and the number \nof needed serving threads or containers (in the case of FaaS systems). Thus it also reduces costs and improves\nperformance. It is mostly beneficial in cases where the request needs to set up a rather heavyweight environment\n(thread or container) for performing a relatively small computation (e.g. in model inference, simulation etc.).\n\nThe RA gets as input the target endpoint, as well as the method to be invoked and the input data to be the payload\nof the aggregated call. It holds the requests until a threshold of incoming messages is \nreached (set in the setBatchSize endpoint), at which point it collates the inputs of the calls into one array request towards\nthe target endpoint. Then it decomposes the outputs and responds to each individual caller.\n\nThe Body of the POST call to the RA should include: \n-msg.payload.targetEndpoint // the target endpoint that needs call reducing\n-msg.payload.method //of the target endpoint to be invoked\n-msg.payload.input //body of the call towards the target Endpoint, in an array form\n-msg.payload.creds //in the form of user:pwd for basic authentication, if needed by the target endpoint\n\nThe input data should be in an array form inside the msg.payload.input JSON object. Thus the target endpoint\nshould be able to process arrays coming as JSON values.\n\nThe output of the target service should return array of JSON objects into the msg.payload.output field. \nThe msg.payload in the response also includes the target input of each call, for testing purposes of correct\nreturn of responses to each caller\n\nThe message batch size can be set through a POST /setBatchSize method, that gets the payload number \nand sets the according flow variable. In future versions, link with model based setting of the batch size \nwill be performed, in which case the setting should include other aspects such as model ID to be used as well as \nthe location of the model inference service.\n\nAt the moment the flow supports one instance of request aggregation, meaning a single endpoint for which the \ncalls will be aggregated. In future versions it will be considered to include the ability to support multiple\nconcurrent endpoints.\n\nThe flow includes also local testing through the testRA2 endpoint, as well as a set of flows for initialization,\nbatch size setting, and getting info on the current state of the batch (current count of messages and target batch \nsize).",
"x": 150,
"y": 380,
"wires": []
},
{
"id": "10edfa39.7f1266",
"type": "comment",
"z": "51b5b7dc.30089",
"name": "Endpoint for setting batch size",
"info": "",
"x": 220,
"y": 660,
"wires": []
},
{
"id": "f285f64f.b5717",
"type": "http in",
"z": "51b5b7dc.30089",
"name": "",
"url": "/testRA2",
"method": "post",
"upload": false,
"swaggerDoc": "",
"x": 200,
"y": 1220,
"wires": [
[
"c43de88e.f52b6",
"def350ea.094808"
]
]
},
{
"id": "c43de88e.f52b6",
"type": "function",
"z": "51b5b7dc.30089",
"name": "move input to output",
"func": "var output=msg.payload.input;\nmsg.payload={};\nmsg.payload.output=output;\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 430,
"y": 1220,
"wires": [
[
"1a5c4c1d.ebd334"
]
]
},
{
"id": "1a5c4c1d.ebd334",
"type": "http response",
"z": "51b5b7dc.30089",
"name": "RESPO",
"statusCode": "",
"headers": {},
"x": 620,
"y": 1220,
"wires": []
},
{
"id": "b5545cf.2cc97a",
"type": "function",
"z": "51b5b7dc.30089",
"name": "rand input for testRA2",
"func": "msg.payload={};\nmsg.payload.input=[Math.random(),Math.random()];\nmsg.payload.targetEndpoint='http://localhost:1880/testRA2';\nmsg.payload.method='POST';\nmsg.payload.creds='user:pwd';\nmsg.input=msg.payload.input;\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 320,
"y": 1300,
"wires": [
[
"4ef21af8.2e98ec"
]
]
},
{
"id": "fb640395.db2a98",
"type": "inject",
"z": "51b5b7dc.30089",
"name": "",
"topic": "",
"payload": "",
"payloadType": "date",
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"x": 130,
"y": 1300,
"wires": [
[
"b5545cf.2cc97a"
]
]
},
{
"id": "5613bf1.164d7c",
"type": "debug",
"z": "51b5b7dc.30089",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 720,
"y": 1300,
"wires": []
},
{
"id": "4ef21af8.2e98ec",
"type": "http request",
"z": "51b5b7dc.30089",
"name": "to /requestAggregator",
"method": "POST",
"ret": "obj",
"paytoqs": true,
"url": "http://localhost:1880/requestAggregator",
"tls": "",
"proxy": "",
"authType": "basic",
"x": 540,
"y": 1300,
"wires": [
[
"5613bf1.164d7c"
]
]
},
{
"id": "f1fb2e95.afcec",
"type": "comment",
"z": "51b5b7dc.30089",
"name": "LOCAL TESTING",
"info": "",
"x": 170,
"y": 1160,
"wires": []
},
{
"id": "237ab8f0.e82148",
"type": "function",
"z": "51b5b7dc.30089",
"name": "SET TARGET ",
"func": "\nflow.set('target',msg.payload);\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 430,
"y": 800,
"wires": [
[
"3cdee18.fe2451e"
]
]
},
{
"id": "9c9d4eb4.230cb8",
"type": "http in",
"z": "51b5b7dc.30089",
"name": "",
"url": "/setBatchSize",
"method": "post",
"upload": false,
"swaggerDoc": "",
"x": 160,
"y": 800,
"wires": [
[
"237ab8f0.e82148",
"3a6e34c4.5f69fc"
]
]
},
{
"id": "3cdee18.fe2451e",
"type": "http response",
"z": "51b5b7dc.30089",
"name": "RESPO",
"statusCode": "",
"headers": {},
"x": 610,
"y": 800,
"wires": []
},
{
"id": "fc8283c5.f605b",
"type": "inject",
"z": "51b5b7dc.30089",
"name": "",
"topic": "",
"payload": "",
"payloadType": "date",
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"x": 160,
"y": 720,
"wires": [
[
"e21b4180.ad70e"
]
]
},
{
"id": "e21b4180.ad70e",
"type": "function",
"z": "51b5b7dc.30089",
"name": "set batch size",
"func": "msg.payload={}\nmsg.payload=2;\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 360,
"y": 720,
"wires": [
[
"ea48932c.0367c"
]
]
},
{
"id": "ea48932c.0367c",
"type": "http request",
"z": "51b5b7dc.30089",
"name": "to /setBatchSize",
"method": "POST",
"ret": "obj",
"paytoqs": true,
"url": "http://localhost:1880/setBatchSize",
"tls": "",
"proxy": "",
"authType": "basic",
"x": 550,
"y": 720,
"wires": [
[
"46c60292.e86f0c"
]
]
},
{
"id": "46c60292.e86f0c",
"type": "debug",
"z": "51b5b7dc.30089",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 760,
"y": 720,
"wires": []
},
{
"id": "e69e9b81.28cb18",
"type": "comment",
"z": "51b5b7dc.30089",
"name": "STATE",
"info": "",
"x": 130,
"y": 900,
"wires": []
},
{
"id": "e6fe53d2.849fe8",
"type": "inject",
"z": "51b5b7dc.30089",
"name": "",
"topic": "",
"payload": "",
"payloadType": "date",
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"x": 140,
"y": 940,
"wires": [
[
"bfe511c2.d31bd"
]
]
},
{
"id": "bfe511c2.d31bd",
"type": "function",
"z": "51b5b7dc.30089",
"name": "PRINT COUNT AND TARGET",
"func": "msg.payload={};\nmsg.payload.count=flow.get('count');//current counter\nmsg.payload.target=flow.get('target'); //target batch size\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 360,
"y": 940,
"wires": [
[
"f1fd0a7e.2df3c8"
]
]
},
{
"id": "f1fd0a7e.2df3c8",
"type": "debug",
"z": "51b5b7dc.30089",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "false",
"x": 610,
"y": 940,
"wires": []
},
{
"id": "516d73af.93d044",
"type": "function",
"z": "51b5b7dc.30089",
"name": "INITIALIZATION",
"func": "flow.set('count',0);\nflow.set('target',1);\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 380,
"y": 140,
"wires": [
[]
]
},
{
"id": "9dfbf2d3.57c89",
"type": "inject",
"z": "51b5b7dc.30089",
"name": "",
"topic": "",
"payloadType": "date",
"repeat": "",
"crontab": "",
"once": true,
"onceDelay": 0.1,
"x": 170,
"y": 140,
"wires": [
[
"516d73af.93d044"
]
]
},
{
"id": "7524694d.85b5f",
"type": "debug",
"z": "51b5b7dc.30089",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 1050,
"y": 240,
"wires": []
},
{
"id": "7cf63318.b988ac",
"type": "debug",
"z": "51b5b7dc.30089",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 1250,
"y": 240,
"wires": []
},
{
"id": "5cac8271.9cb5a4",
"type": "debug",
"z": "51b5b7dc.30089",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 1390,
"y": 420,
"wires": []
},
{
"id": "522eba7d.b6de94",
"type": "debug",
"z": "51b5b7dc.30089",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 1390,
"y": 460,
"wires": []
},
{
"id": "4b2dddc4.3b82dc",
"type": "debug",
"z": "51b5b7dc.30089",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 1390,
"y": 380,
"wires": []
},
{
"id": "3602ffce.50a23",
"type": "comment",
"z": "51b5b7dc.30089",
"name": "INITIALIZATION",
"info": "",
"x": 140,
"y": 180,
"wires": []
},
{
"id": "cc8e9fa4.88b5c",
"type": "debug",
"z": "51b5b7dc.30089",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 350,
"y": 400,
"wires": []
},
{
"id": "3a6e34c4.5f69fc",
"type": "debug",
"z": "51b5b7dc.30089",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 270,
"y": 840,
"wires": []
},
{
"id": "3d2ea52a.23883a",
"type": "debug",
"z": "51b5b7dc.30089",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 870,
"y": 240,
"wires": []
},
{
"id": "a5c89b67.6f32c",
"type": "debug",
"z": "51b5b7dc.30089",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 1310,
"y": 620,
"wires": []
},
{
"id": "7bd21cf1.4357b4",
"type": "debug",
"z": "51b5b7dc.30089",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 1390,
"y": 500,
"wires": []
},
{
"id": "1a74d983.2b654e",
"type": "debug",
"z": "51b5b7dc.30089",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 710,
"y": 240,
"wires": []
},
{
"id": "27b9c22d.373196",
"type": "http request",
"z": "51b5b7dc.30089",
"name": "",
"method": "use",
"ret": "obj",
"paytoqs": false,
"url": "",
"tls": "51c6f98.b2bf308",
"proxy": "",
"authType": "",
"x": 1150,
"y": 320,
"wires": [
[
"8ac6a54c.9ffbc",
"7cf63318.b988ac"
]
]
},
{
"id": "def350ea.094808",
"type": "debug",
"z": "51b5b7dc.30089",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 350,
"y": 1160,
"wires": []
},
{
"id": "51c6f98.b2bf308",
"type": "tls-config",
"z": "",
"name": "",
"cert": "",
"key": "",
"ca": "",
"certname": "",
"keyname": "",
"caname": "",
"servername": "",
"verifyservercert": false
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment