Skip to content

Instantly share code, notes, and snippets.

@gkousiouris
Last active February 4, 2023 13:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gkousiouris/4ef7edabc7c651334d9104bebc6d65f4 to your computer and use it in GitHub Desktop.
Save gkousiouris/4ef7edabc7c651334d9104bebc6d65f4 to your computer and use it in GitHub Desktop.
Producer-Consumer Pattern Implementation using the Semaphore Service Subflow

This is an example distributed producer/consumer problem implementation using the Semaphore Service Subflow.

This is based on a typical implementation that includes the use of a mutex, a full counter (initialized at 0) and an empty counter (initialized at the MAX needed slots, in this case 3). Pattern Images-SEMAPHORE SERVICE drawio

The logic of the producer/consumer appears below:

producer() {
	while (TRUE) {
		produce_item(&item);
		down(&empty);
		down(&mutex);
		enter_item(&item);
		up(&mutex);
		up(&full);
	}
}


consumer() {
	while (TRUE) {
		down(&full);
		down(&mutex);
		remove_item(&item);
		up(&mutex);
		up(&empty);
		consume_item(&item);
	}
}


The call sequence to implement it is included in the flow.

To run the example, you need to use the initialization inject, that creates the 3 needed semaphores. Then a periodic inject can be set for the producer and the consumer flows. One can monitor the status of the full semaphore value through the Status section. One the test has been complete and you deactivate the producer/consumer injects, you can inject the Cleanup section in order to delete any remaining semaphore status.

prodcons_example

[
{
"id": "3fd93e9558924dee",
"type": "subflow",
"name": "Semaphore Service",
"info": "\nThis is a subflow to create a semaphore service to act as a distributed lock/synchronization mechanism. It uses flow variables in order to store the state of a semaphore. Changes in the semaphore value are uninterruptable at the level of the node.js process. This is due to the fact that the latter executes them in the single threaded eventloop and does not interrupt their execution unless an async method or worker thread is used in the function's implementation.\n\nThe subflow defines 5 endpoints:\n * `POST /semaphore` for new semaphore creation. The body should contain name and initialization value of the created semaphore: `{\"name\":\"<semname>\",\"value\":1}`. The value needs to be a positive integer. If not positive, a `400 HTTP error code` is returned. A positive float will be converted to integer. If the semaphore already exists, a `303 HTTP error code` is returned. \n * `DELETE /semaphore` needs to have only the name attribute in the body (`{\"name\":\"<semname>\"}`). If the semaphore does not exist a `404 HTTP error code` is returned\n * `GET /semaphore/value/:name` for retrieving the current value. If the semaphore does not exist a `404 HTTP error code` is returned\n * `POST /semaphore/up` for increasing the value by 1 with a body of the semaphore name `{\"name\":\"<semname>\"}` . If the semaphore does not exist a `404 HTTP error code` is returned.\n * `POST /semaphore/down` for decreasing the value by 1 with a body of the semaphore name `{\"name\":\"<semname>\"}`. If the semaphore is already at 0,a relevant message `Semaphore locked` with a `409 HTTP error code` is returned. If the semaphore does not exist a `404 HTTP error code` is returned.\n\nA created semaphore can be used as a lock (if initialized at 1). As in any semaphore related library, it is the responsibility of the clients of the distributed application to correctly use a call sequence that will indicate if the client can proceed or not to what is considered the critical section or to correctly use the up/down methods. \n\nFor example, a semaphore locked by one client (with a down at 1, resulting to the value being 0) can be unlocked by another client that performs afterwards an up from 0 at the same semaphore. There is no notion of lock ownership by a specific client that performed the initial down. Compared to the typical semaphore libraries, this implementation does not have the ability to make the calling process sleep, if the semaphore is locked. \n\n A created semaphore can also be used as a synchronization counter (any initialization value>0 can be used) in a type of producer/consumer problem. However the lock gets activated at 0, like commonly in semaphores, thus a reverse semantics semaphore needs to be used. For example defining the max available slots and then reducing by one for each producer client. \n\nThe GET method is included only for informative reasons. There is no guarantee that the value might not change by the time the response is received by the client.\n\nNotifying the calling processes when a semaphore gets unlocked is not implemented at the moment (e.g. through a callback URL) but it is a feature that can be added in the future if needed. Add a github comment if you think it would be useful. \n \n\n",
"category": "PHYSICS Helpers",
"in": [],
"out": [],
"env": [],
"meta": {},
"color": "#E9967A",
"icon": "font-awesome/fa-lock"
},
{
"id": "e20b29f5c8914dd0",
"type": "http in",
"z": "3fd93e9558924dee",
"name": "",
"url": "/semaphore",
"method": "post",
"upload": false,
"swaggerDoc": "",
"x": 370,
"y": 120,
"wires": [
[
"c8b282531e638f3b"
]
]
},
{
"id": "baa9998a904ab1b9",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "200",
"headers": {},
"x": 1080,
"y": 60,
"wires": []
},
{
"id": "0ae9a7bd4967d8ee",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "303",
"headers": {},
"x": 920,
"y": 180,
"wires": []
},
{
"id": "e24756a645f1032f",
"type": "http in",
"z": "3fd93e9558924dee",
"name": "",
"url": "/semaphore/value/:name",
"method": "get",
"upload": false,
"swaggerDoc": "",
"x": 400,
"y": 560,
"wires": [
[
"493f59ece63ecbf2"
]
]
},
{
"id": "1722d49bbed4ab42",
"type": "http in",
"z": "3fd93e9558924dee",
"name": "",
"url": "/semaphore/up",
"method": "post",
"upload": false,
"swaggerDoc": "",
"x": 380,
"y": 740,
"wires": [
[
"5fd9e5c5e202d585"
]
]
},
{
"id": "f6eee1f0a68aae33",
"type": "comment",
"z": "3fd93e9558924dee",
"name": "INPUT FORMAT: NAME AND INIT VALUE",
"info": "{\n \"name\": \"<sem_name>\",\n \"value\": 1 //or any positive integer\n}",
"x": 260,
"y": 60,
"wires": []
},
{
"id": "9e264d0b278a8561",
"type": "http in",
"z": "3fd93e9558924dee",
"name": "",
"url": "/semaphore/down",
"method": "post",
"upload": false,
"swaggerDoc": "",
"x": 380,
"y": 860,
"wires": [
[
"26ee2ee0c7a67793"
]
]
},
{
"id": "c8b282531e638f3b",
"type": "function",
"z": "3fd93e9558924dee",
"name": "check previous",
"func": "\nmsg.exists=false;\nmsg.isnegative=false;\nif (flow.get(msg.payload.name)){\n //exists\n msg.exists=true;\n} else{\n if (msg.payload.value<0){\n msg.isnegative=true;\n msg.payload={};\n msg.payload=\"Value needs to be positive\";\n } else {\n msg.payload.value=parseInt(msg.payload.value,10);\n flow.set(msg.payload.name,msg.payload);\n }\n}\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 600,
"y": 120,
"wires": [
[
"ed351f537268c28b"
]
]
},
{
"id": "ed351f537268c28b",
"type": "switch",
"z": "3fd93e9558924dee",
"name": "exists",
"property": "exists",
"propertyType": "msg",
"rules": [
{
"t": "false"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 770,
"y": 120,
"wires": [
[
"0269c9d4a0e8db15"
],
[
"0ae9a7bd4967d8ee"
]
]
},
{
"id": "5fd9e5c5e202d585",
"type": "function",
"z": "3fd93e9558924dee",
"name": "UP",
"func": "\nmsg.exists=false;\n\nif (flow.get(msg.payload.name)){\n //exists\n var sem=flow.get(msg.payload.name);\n sem.value++;\n msg.exists=true;\n flow.set(msg.payload.name,sem);\n msg.payload=flow.get(msg.payload.name);\n} else{\n msg.payload=\"semaphore not found\";\n \n}\n\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 570,
"y": 740,
"wires": [
[
"efd55cb64b7c59bb"
]
]
},
{
"id": "493f59ece63ecbf2",
"type": "function",
"z": "3fd93e9558924dee",
"name": "GET",
"func": "\nmsg.exists=false;\nif (flow.get(msg.req.params.name)){\n //exists\n msg.exists=true;\n msg.payload=flow.get(msg.req.params.name);\n} else{\n msg.payload=\"semaphore not found\";\n \n}\n\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 630,
"y": 560,
"wires": [
[
"20b5c180fbee73ab"
]
]
},
{
"id": "20b5c180fbee73ab",
"type": "switch",
"z": "3fd93e9558924dee",
"name": "exists",
"property": "exists",
"propertyType": "msg",
"rules": [
{
"t": "true"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 770,
"y": 560,
"wires": [
[
"f017ed4b5beb12c1"
],
[
"516e90ee21897008"
]
]
},
{
"id": "f017ed4b5beb12c1",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "200",
"headers": {},
"x": 940,
"y": 520,
"wires": []
},
{
"id": "516e90ee21897008",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "404",
"headers": {},
"x": 940,
"y": 580,
"wires": []
},
{
"id": "26ee2ee0c7a67793",
"type": "function",
"z": "3fd93e9558924dee",
"name": "DOWN",
"func": "msg.exists=false;\nmsg.locked=false;\nif (flow.get(msg.payload.name)){\n //exists\n msg.exists=true;\n var sem=flow.get(msg.payload.name);\n if (sem.value!=0){\n sem.value--;\n flow.set(msg.payload.name,sem);\n msg.payload=flow.get(msg.payload.name);\n } else {\n msg.locked=true;\n msg.payload='Semaphore locked';\n }\n} else{\n msg.payload=\"semaphore not found\";\n \n}\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 580,
"y": 860,
"wires": [
[
"477b0099e82e1db7"
]
]
},
{
"id": "efd55cb64b7c59bb",
"type": "switch",
"z": "3fd93e9558924dee",
"name": "exists",
"property": "exists",
"propertyType": "msg",
"rules": [
{
"t": "true"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 730,
"y": 740,
"wires": [
[
"dfe23e7e89991261"
],
[
"c413c977e3137760"
]
]
},
{
"id": "dfe23e7e89991261",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "200",
"headers": {},
"x": 900,
"y": 700,
"wires": []
},
{
"id": "c413c977e3137760",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "404",
"headers": {},
"x": 900,
"y": 760,
"wires": []
},
{
"id": "477b0099e82e1db7",
"type": "switch",
"z": "3fd93e9558924dee",
"name": "exists",
"property": "exists",
"propertyType": "msg",
"rules": [
{
"t": "true"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 730,
"y": 860,
"wires": [
[
"f56927749049be1f"
],
[
"6d25340b25b49348"
]
]
},
{
"id": "585130516407d9f3",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "200",
"headers": {},
"x": 1060,
"y": 820,
"wires": []
},
{
"id": "6d25340b25b49348",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "404",
"headers": {},
"x": 920,
"y": 920,
"wires": []
},
{
"id": "f56927749049be1f",
"type": "switch",
"z": "3fd93e9558924dee",
"name": "locked",
"property": "locked",
"propertyType": "msg",
"rules": [
{
"t": "false"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 870,
"y": 840,
"wires": [
[
"585130516407d9f3"
],
[
"2320e6935d39792c"
]
]
},
{
"id": "2320e6935d39792c",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "409",
"headers": {},
"x": 1060,
"y": 860,
"wires": []
},
{
"id": "b935e803799059e3",
"type": "http in",
"z": "3fd93e9558924dee",
"name": "",
"url": "/semaphore/:name",
"method": "delete",
"upload": false,
"swaggerDoc": "",
"x": 390,
"y": 340,
"wires": [
[
"b1d0e34e546440b4"
]
]
},
{
"id": "b1d0e34e546440b4",
"type": "function",
"z": "3fd93e9558924dee",
"name": "check previous",
"func": "\nmsg.exists=true;\nif (flow.get(msg.req.params.name)){\n //no delete method available for flow context, bypass with undefined\n flow.set(msg.payload.name,undefined);\n msg.payload=\"Deletion Successful\";\n \n} else{\n msg.payload=\"Semaphore not found\";\n msg.exists=false;\n}\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 600,
"y": 340,
"wires": [
[
"76d822d9f7a2b3d2"
]
]
},
{
"id": "76d822d9f7a2b3d2",
"type": "switch",
"z": "3fd93e9558924dee",
"name": "exists",
"property": "exists",
"propertyType": "msg",
"rules": [
{
"t": "true"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 770,
"y": 340,
"wires": [
[
"a371ee130532d2ca"
],
[
"36512c117d7e6cbc"
]
]
},
{
"id": "a371ee130532d2ca",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "200",
"headers": {},
"x": 920,
"y": 300,
"wires": []
},
{
"id": "36512c117d7e6cbc",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "404",
"headers": {},
"x": 920,
"y": 400,
"wires": []
},
{
"id": "0269c9d4a0e8db15",
"type": "switch",
"z": "3fd93e9558924dee",
"name": "isnegative",
"property": "isnegative",
"propertyType": "msg",
"rules": [
{
"t": "false"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 920,
"y": 80,
"wires": [
[
"baa9998a904ab1b9"
],
[
"edef503521df3454"
]
]
},
{
"id": "edef503521df3454",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "400",
"headers": {},
"x": 1080,
"y": 120,
"wires": []
},
{
"id": "7dc15d8ba9702218",
"type": "comment",
"z": "3fd93e9558924dee",
"name": "Limitation",
"info": "In Node-RED there is no ability to define two different methods on the same endpoint, thus no DELETE method can be defined directly in the /semaphore endpoint. A new endpoint was defined for the delete operation.",
"x": 340,
"y": 300,
"wires": []
},
{
"id": "ebf7968b4fbea43b",
"type": "inject",
"z": "d4a43bb856c71cab",
"name": "create lock",
"props": [
{
"p": "payload"
},
{
"p": "url",
"v": "http://localhost:1880/semaphore",
"vt": "str"
},
{
"p": "method",
"v": "POST",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "{\"name\":\"mutex\",\"value\":1}",
"payloadType": "json",
"x": 140,
"y": 140,
"wires": [
[
"38a33f6327e61640"
]
]
},
{
"id": "38a33f6327e61640",
"type": "http request",
"z": "d4a43bb856c71cab",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 290,
"y": 140,
"wires": [
[
"b73ffbde19fba2fa"
]
]
},
{
"id": "b73ffbde19fba2fa",
"type": "switch",
"z": "d4a43bb856c71cab",
"name": "",
"property": "statusCode",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "200",
"vt": "num"
},
{
"t": "neq",
"v": "200",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 430,
"y": 140,
"wires": [
[
"622310cb8c75f354"
],
[]
]
},
{
"id": "622310cb8c75f354",
"type": "function",
"z": "d4a43bb856c71cab",
"name": "create empty",
"func": "msg.headers={};\nmsg.payload={\"name\":\"empty\",\"value\":3};\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 590,
"y": 140,
"wires": [
[
"710650d7fceb82db"
]
]
},
{
"id": "710650d7fceb82db",
"type": "http request",
"z": "d4a43bb856c71cab",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 750,
"y": 140,
"wires": [
[
"a6f0256241cefc61"
]
]
},
{
"id": "1ebd81a3e2946b9f",
"type": "http request",
"z": "d4a43bb856c71cab",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 730,
"y": 240,
"wires": [
[
"1ab44ea2da0df9e1"
]
]
},
{
"id": "84df3d4522acb707",
"type": "function",
"z": "d4a43bb856c71cab",
"name": "create full",
"func": "msg.headers={};\nmsg.payload={\"name\":\"full\",\"value\":0};\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 560,
"y": 240,
"wires": [
[
"1ebd81a3e2946b9f"
]
]
},
{
"id": "82f6d059c895d039",
"type": "comment",
"z": "d4a43bb856c71cab",
"name": "Initialization",
"info": "",
"x": 150,
"y": 100,
"wires": []
},
{
"id": "a6f0256241cefc61",
"type": "switch",
"z": "d4a43bb856c71cab",
"name": "",
"property": "statusCode",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "200",
"vt": "num"
},
{
"t": "neq",
"v": "200",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 910,
"y": 140,
"wires": [
[
"84df3d4522acb707"
],
[]
]
},
{
"id": "1ab44ea2da0df9e1",
"type": "switch",
"z": "d4a43bb856c71cab",
"name": "",
"property": "statusCode",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "200",
"vt": "num"
},
{
"t": "neq",
"v": "200",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 890,
"y": 240,
"wires": [
[
"ffd37de7db1c3069"
],
[]
]
},
{
"id": "ffd37de7db1c3069",
"type": "debug",
"z": "d4a43bb856c71cab",
"name": "INIT OK",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "payload",
"targetType": "msg",
"statusVal": "",
"statusType": "auto",
"x": 1020,
"y": 240,
"wires": []
},
{
"id": "369d0336b2d39852",
"type": "http request",
"z": "d4a43bb856c71cab",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 290,
"y": 920,
"wires": [
[
"cb1cae76d1c85cda"
]
]
},
{
"id": "cb1cae76d1c85cda",
"type": "switch",
"z": "d4a43bb856c71cab",
"name": "",
"property": "statusCode",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "200",
"vt": "num"
},
{
"t": "neq",
"v": "200",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 430,
"y": 920,
"wires": [
[
"8b73c7b5e9178d8c"
],
[]
]
},
{
"id": "8b73c7b5e9178d8c",
"type": "function",
"z": "d4a43bb856c71cab",
"name": "delete empty",
"func": "msg.headers={};\nmsg.payload={\"name\":\"empty\"};\nmsg.method='DELETE';\nmsg.url='http://localhost:1880/semaphore/empty';\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 590,
"y": 920,
"wires": [
[
"5dc764edbe7060da"
]
]
},
{
"id": "5dc764edbe7060da",
"type": "http request",
"z": "d4a43bb856c71cab",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 750,
"y": 920,
"wires": [
[
"3a1b70c6f43996db"
]
]
},
{
"id": "f60b6b122b6bf1de",
"type": "http request",
"z": "d4a43bb856c71cab",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 730,
"y": 1020,
"wires": [
[
"8da92f8f33d14529"
]
]
},
{
"id": "8b03f3aab3727570",
"type": "comment",
"z": "d4a43bb856c71cab",
"name": "Cleanup",
"info": "",
"x": 140,
"y": 880,
"wires": []
},
{
"id": "3a1b70c6f43996db",
"type": "switch",
"z": "d4a43bb856c71cab",
"name": "",
"property": "statusCode",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "200",
"vt": "num"
},
{
"t": "neq",
"v": "200",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 910,
"y": 920,
"wires": [
[
"24bc0decc5db7a77"
],
[]
]
},
{
"id": "8da92f8f33d14529",
"type": "switch",
"z": "d4a43bb856c71cab",
"name": "",
"property": "statusCode",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "200",
"vt": "num"
},
{
"t": "neq",
"v": "200",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 890,
"y": 1020,
"wires": [
[
"2fad09b14eb67be1"
],
[]
]
},
{
"id": "2fad09b14eb67be1",
"type": "debug",
"z": "d4a43bb856c71cab",
"name": "CLEANUP OK",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "payload",
"targetType": "msg",
"statusVal": "",
"statusType": "auto",
"x": 1040,
"y": 1020,
"wires": []
},
{
"id": "d38a817a0361466e",
"type": "inject",
"z": "d4a43bb856c71cab",
"name": "TEST DELETE",
"props": [
{
"p": "payload"
},
{
"p": "url",
"v": "http://localhost:1880/semaphore/mutex",
"vt": "str"
},
{
"p": "method",
"v": "DELETE",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "{\"name\":\"mutex\"}",
"payloadType": "json",
"x": 120,
"y": 920,
"wires": [
[
"369d0336b2d39852"
]
]
},
{
"id": "24bc0decc5db7a77",
"type": "function",
"z": "d4a43bb856c71cab",
"name": "delete full",
"func": "msg.headers={};\nmsg.payload={\"name\":\"full\"};\nmsg.method='DELETE';\nmsg.url='http://localhost:1880/semaphore/full';\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 560,
"y": 1020,
"wires": [
[
"f60b6b122b6bf1de"
]
]
},
{
"id": "96357dc6a5b05193",
"type": "comment",
"z": "d4a43bb856c71cab",
"name": "Producer",
"info": "",
"x": 140,
"y": 340,
"wires": []
},
{
"id": "9c0a24bb1ac844dd",
"type": "inject",
"z": "d4a43bb856c71cab",
"name": "DOWN EMPTY",
"props": [
{
"p": "payload"
},
{
"p": "url",
"v": "http://localhost:1880/semaphore/down",
"vt": "str"
},
{
"p": "method",
"v": "POST",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "{\"name\":\"empty\"}",
"payloadType": "json",
"x": 120,
"y": 420,
"wires": [
[
"daf16c869a981355"
]
]
},
{
"id": "d0372c941e51a738",
"type": "http request",
"z": "d4a43bb856c71cab",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 450,
"y": 420,
"wires": [
[
"51f6fe232dda2417"
]
]
},
{
"id": "51f6fe232dda2417",
"type": "switch",
"z": "d4a43bb856c71cab",
"name": "",
"property": "statusCode",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "200",
"vt": "num"
},
{
"t": "neq",
"v": "200",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 590,
"y": 420,
"wires": [
[
"955a833b3f1e3f6f"
],
[
"a07f7339b919c944"
]
]
},
{
"id": "daf16c869a981355",
"type": "function",
"z": "d4a43bb856c71cab",
"name": "down empty",
"func": "msg.headers={};\nmsg.payload={\"name\":\"empty\"};\nmsg.method='POST';\nmsg.url='http://localhost:1880/semaphore/down';\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 290,
"y": 420,
"wires": [
[
"d0372c941e51a738"
]
]
},
{
"id": "955a833b3f1e3f6f",
"type": "function",
"z": "d4a43bb856c71cab",
"name": "down mutex",
"func": "msg.headers={};\nmsg.payload={\"name\":\"mutex\"};\nmsg.method='POST';\nmsg.url='http://localhost:1880/semaphore/down';\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 750,
"y": 420,
"wires": [
[
"a46d5b4f8ca3fefd"
]
]
},
{
"id": "a07f7339b919c944",
"type": "delay",
"z": "d4a43bb856c71cab",
"name": "",
"pauseType": "delay",
"timeout": "5",
"timeoutUnits": "seconds",
"rate": "1",
"nbRateUnits": "1",
"rateUnits": "second",
"randomFirst": "1",
"randomLast": "5",
"randomUnits": "seconds",
"drop": false,
"allowrate": false,
"x": 400,
"y": 500,
"wires": [
[
"daf16c869a981355"
]
]
},
{
"id": "a46d5b4f8ca3fefd",
"type": "http request",
"z": "d4a43bb856c71cab",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 910,
"y": 420,
"wires": [
[
"881c9e3c11f4ab02"
]
]
},
{
"id": "97aeeafc81238ef4",
"type": "function",
"z": "d4a43bb856c71cab",
"name": "CRITICAL OPERATION",
"func": "\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 1250,
"y": 420,
"wires": [
[
"b040e53986846e56"
]
]
},
{
"id": "881c9e3c11f4ab02",
"type": "switch",
"z": "d4a43bb856c71cab",
"name": "",
"property": "statusCode",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "200",
"vt": "num"
},
{
"t": "neq",
"v": "200",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 1050,
"y": 420,
"wires": [
[
"97aeeafc81238ef4",
"1d53ad49f0c780c7"
],
[
"c3d539f65d5e4c03"
]
]
},
{
"id": "c3d539f65d5e4c03",
"type": "delay",
"z": "d4a43bb856c71cab",
"name": "",
"pauseType": "delay",
"timeout": "5",
"timeoutUnits": "seconds",
"rate": "1",
"nbRateUnits": "1",
"rateUnits": "second",
"randomFirst": "1",
"randomLast": "5",
"randomUnits": "seconds",
"drop": false,
"allowrate": false,
"x": 880,
"y": 500,
"wires": [
[
"955a833b3f1e3f6f"
]
]
},
{
"id": "b040e53986846e56",
"type": "function",
"z": "d4a43bb856c71cab",
"name": "up mutex",
"func": "msg.headers={};\nmsg.payload={\"name\":\"mutex\"};\nmsg.method='POST';\nmsg.url='http://localhost:1880/semaphore/up';\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 1500,
"y": 420,
"wires": [
[
"659dccb5c3b588bc"
]
]
},
{
"id": "659dccb5c3b588bc",
"type": "http request",
"z": "d4a43bb856c71cab",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 1650,
"y": 420,
"wires": [
[
"d2ed7b58ddb5864e"
]
]
},
{
"id": "d2ed7b58ddb5864e",
"type": "switch",
"z": "d4a43bb856c71cab",
"name": "",
"property": "statusCode",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "200",
"vt": "num"
},
{
"t": "neq",
"v": "200",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 1810,
"y": 420,
"wires": [
[
"429285cf6d596a2a",
"6c5d68ad34a4e61e"
],
[
"fd0ccdcf1d54f8ae"
]
]
},
{
"id": "fd0ccdcf1d54f8ae",
"type": "delay",
"z": "d4a43bb856c71cab",
"name": "",
"pauseType": "delay",
"timeout": "5",
"timeoutUnits": "seconds",
"rate": "1",
"nbRateUnits": "1",
"rateUnits": "second",
"randomFirst": "1",
"randomLast": "5",
"randomUnits": "seconds",
"drop": false,
"allowrate": false,
"x": 1640,
"y": 500,
"wires": [
[
"b040e53986846e56"
]
]
},
{
"id": "429285cf6d596a2a",
"type": "function",
"z": "d4a43bb856c71cab",
"name": "up full",
"func": "msg.headers={};\nmsg.payload={\"name\":\"full\"};\nmsg.method='POST';\nmsg.url='http://localhost:1880/semaphore/up';\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 1990,
"y": 420,
"wires": [
[
"2a586e6ffeff6ddc"
]
]
},
{
"id": "2a586e6ffeff6ddc",
"type": "http request",
"z": "d4a43bb856c71cab",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 2150,
"y": 420,
"wires": [
[
"97834eb7562165f5"
]
]
},
{
"id": "97834eb7562165f5",
"type": "switch",
"z": "d4a43bb856c71cab",
"name": "",
"property": "statusCode",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "200",
"vt": "num"
},
{
"t": "neq",
"v": "200",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 2310,
"y": 420,
"wires": [
[],
[
"99573a33cd16b6d1"
]
]
},
{
"id": "99573a33cd16b6d1",
"type": "delay",
"z": "d4a43bb856c71cab",
"name": "",
"pauseType": "delay",
"timeout": "5",
"timeoutUnits": "seconds",
"rate": "1",
"nbRateUnits": "1",
"rateUnits": "second",
"randomFirst": "1",
"randomLast": "5",
"randomUnits": "seconds",
"drop": false,
"allowrate": false,
"x": 2140,
"y": 500,
"wires": [
[
"429285cf6d596a2a"
]
]
},
{
"id": "1d53ad49f0c780c7",
"type": "debug",
"z": "d4a43bb856c71cab",
"name": "IN CRITICAL SECTION",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"statusVal": "",
"statusType": "auto",
"x": 1210,
"y": 360,
"wires": []
},
{
"id": "6c5d68ad34a4e61e",
"type": "debug",
"z": "d4a43bb856c71cab",
"name": "OUT CRITICAL SECTION",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"statusVal": "",
"statusType": "auto",
"x": 1730,
"y": 360,
"wires": []
},
{
"id": "cd80f06155194fdf",
"type": "comment",
"z": "d4a43bb856c71cab",
"name": "Consumer",
"info": "",
"x": 140,
"y": 660,
"wires": []
},
{
"id": "79fccd63af4c0c02",
"type": "inject",
"z": "d4a43bb856c71cab",
"name": "DOWN FULL",
"props": [],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"x": 110,
"y": 720,
"wires": [
[
"991a8ff6e37a437b"
]
]
},
{
"id": "7f9d85ca2bad99f6",
"type": "http request",
"z": "d4a43bb856c71cab",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 450,
"y": 720,
"wires": [
[
"c3794d12247356dc"
]
]
},
{
"id": "c3794d12247356dc",
"type": "switch",
"z": "d4a43bb856c71cab",
"name": "",
"property": "statusCode",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "200",
"vt": "num"
},
{
"t": "neq",
"v": "200",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 590,
"y": 720,
"wires": [
[
"6cb4217f5a94a22a"
],
[
"cc537e413f2c9f74"
]
]
},
{
"id": "991a8ff6e37a437b",
"type": "function",
"z": "d4a43bb856c71cab",
"name": "down full",
"func": "msg.headers={};\nmsg.payload={\"name\":\"full\"};\nmsg.method='POST';\nmsg.url='http://localhost:1880/semaphore/down';\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 280,
"y": 720,
"wires": [
[
"7f9d85ca2bad99f6"
]
]
},
{
"id": "6cb4217f5a94a22a",
"type": "function",
"z": "d4a43bb856c71cab",
"name": "down mutex",
"func": "msg.headers={};\nmsg.payload={\"name\":\"mutex\"};\nmsg.method='POST';\nmsg.url='http://localhost:1880/semaphore/down';\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 750,
"y": 720,
"wires": [
[
"15380a23740ca1a7"
]
]
},
{
"id": "cc537e413f2c9f74",
"type": "delay",
"z": "d4a43bb856c71cab",
"name": "",
"pauseType": "delay",
"timeout": "5",
"timeoutUnits": "seconds",
"rate": "1",
"nbRateUnits": "1",
"rateUnits": "second",
"randomFirst": "1",
"randomLast": "5",
"randomUnits": "seconds",
"drop": false,
"allowrate": false,
"x": 400,
"y": 800,
"wires": [
[
"991a8ff6e37a437b"
]
]
},
{
"id": "15380a23740ca1a7",
"type": "http request",
"z": "d4a43bb856c71cab",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 910,
"y": 720,
"wires": [
[
"e026f41b3480dc4f"
]
]
},
{
"id": "049a8488327975c6",
"type": "function",
"z": "d4a43bb856c71cab",
"name": "CRITICAL OPERATION",
"func": "\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 1250,
"y": 720,
"wires": [
[
"3bf64408ef74b133"
]
]
},
{
"id": "e026f41b3480dc4f",
"type": "switch",
"z": "d4a43bb856c71cab",
"name": "",
"property": "statusCode",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "200",
"vt": "num"
},
{
"t": "neq",
"v": "200",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 1050,
"y": 720,
"wires": [
[
"049a8488327975c6",
"e54eab58b7d49eb4"
],
[
"e99295203d2483e5"
]
]
},
{
"id": "e99295203d2483e5",
"type": "delay",
"z": "d4a43bb856c71cab",
"name": "",
"pauseType": "delay",
"timeout": "5",
"timeoutUnits": "seconds",
"rate": "1",
"nbRateUnits": "1",
"rateUnits": "second",
"randomFirst": "1",
"randomLast": "5",
"randomUnits": "seconds",
"drop": false,
"allowrate": false,
"x": 880,
"y": 800,
"wires": [
[
"6cb4217f5a94a22a"
]
]
},
{
"id": "3bf64408ef74b133",
"type": "function",
"z": "d4a43bb856c71cab",
"name": "up mutex",
"func": "msg.headers={};\nmsg.payload={\"name\":\"mutex\"};\nmsg.method='POST';\nmsg.url='http://localhost:1880/semaphore/up';\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 1500,
"y": 720,
"wires": [
[
"157d4c9a19d5bf83"
]
]
},
{
"id": "157d4c9a19d5bf83",
"type": "http request",
"z": "d4a43bb856c71cab",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 1650,
"y": 720,
"wires": [
[
"23bdbff8425355c6"
]
]
},
{
"id": "23bdbff8425355c6",
"type": "switch",
"z": "d4a43bb856c71cab",
"name": "",
"property": "statusCode",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "200",
"vt": "num"
},
{
"t": "neq",
"v": "200",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 1810,
"y": 720,
"wires": [
[
"ab02d3ed8399b67b",
"4bf50c4fce3ae25a"
],
[
"a5535315569e5dcc"
]
]
},
{
"id": "a5535315569e5dcc",
"type": "delay",
"z": "d4a43bb856c71cab",
"name": "",
"pauseType": "delay",
"timeout": "5",
"timeoutUnits": "seconds",
"rate": "1",
"nbRateUnits": "1",
"rateUnits": "second",
"randomFirst": "1",
"randomLast": "5",
"randomUnits": "seconds",
"drop": false,
"allowrate": false,
"x": 1640,
"y": 800,
"wires": [
[
"3bf64408ef74b133"
]
]
},
{
"id": "ab02d3ed8399b67b",
"type": "function",
"z": "d4a43bb856c71cab",
"name": "up empty",
"func": "msg.headers={};\nmsg.payload={\"name\":\"empty\"};\nmsg.method='POST';\nmsg.url='http://localhost:1880/semaphore/up';\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 2000,
"y": 720,
"wires": [
[
"101713813f84cbaf"
]
]
},
{
"id": "101713813f84cbaf",
"type": "http request",
"z": "d4a43bb856c71cab",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 2150,
"y": 720,
"wires": [
[
"206522090f2c9fc6"
]
]
},
{
"id": "206522090f2c9fc6",
"type": "switch",
"z": "d4a43bb856c71cab",
"name": "",
"property": "statusCode",
"propertyType": "msg",
"rules": [
{
"t": "eq",
"v": "200",
"vt": "num"
},
{
"t": "neq",
"v": "200",
"vt": "str"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 2310,
"y": 720,
"wires": [
[],
[
"6b89c5aa5c791035"
]
]
},
{
"id": "6b89c5aa5c791035",
"type": "delay",
"z": "d4a43bb856c71cab",
"name": "",
"pauseType": "delay",
"timeout": "5",
"timeoutUnits": "seconds",
"rate": "1",
"nbRateUnits": "1",
"rateUnits": "second",
"randomFirst": "1",
"randomLast": "5",
"randomUnits": "seconds",
"drop": false,
"allowrate": false,
"x": 2140,
"y": 800,
"wires": [
[
"ab02d3ed8399b67b"
]
]
},
{
"id": "e54eab58b7d49eb4",
"type": "debug",
"z": "d4a43bb856c71cab",
"name": "IN CRITICAL SECTION",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"statusVal": "",
"statusType": "auto",
"x": 1210,
"y": 660,
"wires": []
},
{
"id": "4bf50c4fce3ae25a",
"type": "debug",
"z": "d4a43bb856c71cab",
"name": "OUT CRITICAL SECTION",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"statusVal": "",
"statusType": "auto",
"x": 1750,
"y": 660,
"wires": []
},
{
"id": "1340c757b279fb16",
"type": "inject",
"z": "d4a43bb856c71cab",
"name": "GET SIZE OF FULL VALUE",
"props": [
{
"p": "url",
"v": "http://localhost:1880/semaphore/value/full",
"vt": "str"
},
{
"p": "method",
"v": "GET",
"vt": "str"
},
{
"p": "payload"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "{}",
"payloadType": "json",
"x": 140,
"y": 1200,
"wires": [
[
"37b599fd03593626"
]
]
},
{
"id": "37b599fd03593626",
"type": "http request",
"z": "d4a43bb856c71cab",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 350,
"y": 1200,
"wires": [
[
"18de4ef8d2a2a44d"
]
]
},
{
"id": "18de4ef8d2a2a44d",
"type": "debug",
"z": "d4a43bb856c71cab",
"name": "STATUS",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "payload",
"targetType": "msg",
"statusVal": "",
"statusType": "auto",
"x": 520,
"y": 1200,
"wires": []
},
{
"id": "d7524d2229baa120",
"type": "comment",
"z": "d4a43bb856c71cab",
"name": "QUEUE STATUS",
"info": "",
"x": 160,
"y": 1140,
"wires": []
},
{
"id": "86c873351055feda",
"type": "subflow:3fd93e9558924dee",
"z": "d4a43bb856c71cab",
"name": "",
"env": [],
"x": 170,
"y": 40,
"wires": []
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment