Skip to content

Instantly share code, notes, and snippets.

@gkousiouris
Last active February 8, 2023 07:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gkousiouris/5145332c834610e917776e2835bd8037 to your computer and use it in GitHub Desktop.
Save gkousiouris/5145332c834610e917776e2835bd8037 to your computer and use it in GitHub Desktop.
Semaphore Service Subflow

This is a subflow to create a semaphore service to act as a distributed lock/synchronization mechanism. It uses flow variables in order to store the state of a semaphore. Changes in the semaphore value are uninterruptable at the level of the node.js process. This is due to the fact that the latter executes them in the single threaded eventloop and does not interrupt their execution unless an async method or worker thread is used in the function's implementation.

main

The subflow defines 5 endpoints:

  • POST /semaphore for new semaphore creation. The body should contain name and initialization value of the created semaphore: {"name":"<semname>","value":1}. The value needs to be a positive integer. If not positive, a 400 HTTP error code is returned. A positive float will be converted to integer. If the semaphore already exists, a 303 HTTP error code is returned.
  • DELETE /semaphore/:name needs to have only the name parameter in the call. If the semaphore does not exist a 404 HTTP error code is returned
  • GET /semaphore/value/:name for retrieving the current value. If the semaphore does not exist a 404 HTTP error code is returned
  • POST /semaphore/up for increasing the value by 1 with a body of the semaphore name {"name":"<semname>"} . If the semaphore does not exist a 404 HTTP error code is returned.
  • POST /semaphore/down for decreasing the value by 1 with a body of the semaphore name {"name":"<semname>"}. If the semaphore is already at 0,a relevant message Semaphore locked with a 409 HTTP error code is returned. If the semaphore does not exist a 404 HTTP error code is returned.

API

A created semaphore can be used as a lock (if initialized at 1). As in any semaphore related library, it is the responsibility of the clients of the distributed application to correctly use a call sequence that will indicate if the client can proceed or not to what is considered the critical section or to correctly use the up/down methods.

For example, a semaphore locked by one client (with a down at 1, resulting to the value being 0) can be unlocked by another client that performs afterwards an up from 0 at the same semaphore. There is no notion of lock ownership by a specific client that performed the initial down. Compared to the typical semaphore libraries, this implementation does not have the ability to make the calling process sleep, if the semaphore is locked.

A created semaphore can also be used as a synchronization counter (any initialization value>0 can be used) in a type of producer/consumer problem. However the lock gets activated at 0, like commonly in semaphores, thus a reverse semantics semaphore needs to be used. For example defining the max available slots and then reducing by one for each producer client. An example producer/consumer implementation using this subflow is included here.

The GET method is included only for informative reasons. There is no guarantee that the value might not change by the time the response is received by the client.

Notifying the calling processes when a semaphore gets unlocked is not implemented at the moment (e.g. through a callback URL) but it is a feature that can be added in the future if needed. Add a github comment if you think it would be useful.

[
{
"id": "3fd93e9558924dee",
"type": "subflow",
"name": "Semaphore Service",
"info": "\nThis is a subflow to create a semaphore service to act as a distributed lock/synchronization mechanism. It uses flow variables in order to store the state of a semaphore. Changes in the semaphore value are uninterruptable at the level of the node.js process. This is due to the fact that the latter executes them in the single threaded eventloop and does not interrupt their execution unless an async method or worker thread is used in the function's implementation.\n\nThe subflow defines 5 endpoints:\n * `POST /semaphore` for new semaphore creation. The body should contain name and initialization value of the created semaphore: `{\"name\":\"<semname>\",\"value\":1}`. The value needs to be a positive integer. If not positive, a `400 HTTP error code` is returned. A positive float will be converted to integer. If the semaphore already exists, a `303 HTTP error code` is returned. \n * `DELETE /semaphore` needs to have only the name attribute in the body (`{\"name\":\"<semname>\"}`). If the semaphore does not exist a `404 HTTP error code` is returned\n * `GET /semaphore/value/:name` for retrieving the current value. If the semaphore does not exist a `404 HTTP error code` is returned\n * `POST /semaphore/up` for increasing the value by 1 with a body of the semaphore name `{\"name\":\"<semname>\"}` . If the semaphore does not exist a `404 HTTP error code` is returned.\n * `POST /semaphore/down` for decreasing the value by 1 with a body of the semaphore name `{\"name\":\"<semname>\"}`. If the semaphore is already at 0,a relevant message `Semaphore locked` with a `409 HTTP error code` is returned. If the semaphore does not exist a `404 HTTP error code` is returned.\n\nA created semaphore can be used as a lock (if initialized at 1). As in any semaphore related library, it is the responsibility of the clients of the distributed application to correctly use a call sequence that will indicate if the client can proceed or not to what is considered the critical section or to correctly use the up/down methods. \n\nFor example, a semaphore locked by one client (with a down at 1, resulting to the value being 0) can be unlocked by another client that performs afterwards an up from 0 at the same semaphore. There is no notion of lock ownership by a specific client that performed the initial down. Compared to the typical semaphore libraries, this implementation does not have the ability to make the calling process sleep, if the semaphore is locked. \n\n A created semaphore can also be used as a synchronization counter (any initialization value>0 can be used) in a type of producer/consumer problem. However the lock gets activated at 0, like commonly in semaphores, thus a reverse semantics semaphore needs to be used. For example defining the max available slots and then reducing by one for each producer client. \n\nThe GET method is included only for informative reasons. There is no guarantee that the value might not change by the time the response is received by the client.\n\nNotifying the calling processes when a semaphore gets unlocked is not implemented at the moment (e.g. through a callback URL) but it is a feature that can be added in the future if needed. Add a github comment if you think it would be useful. \n \n\n",
"category": "PHYSICS Helpers",
"in": [],
"out": [],
"env": [],
"meta": {},
"color": "#E9967A",
"icon": "font-awesome/fa-lock"
},
{
"id": "e20b29f5c8914dd0",
"type": "http in",
"z": "3fd93e9558924dee",
"name": "",
"url": "/semaphore",
"method": "post",
"upload": false,
"swaggerDoc": "",
"x": 370,
"y": 120,
"wires": [
[
"c8b282531e638f3b"
]
]
},
{
"id": "baa9998a904ab1b9",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "200",
"headers": {},
"x": 1080,
"y": 60,
"wires": []
},
{
"id": "0ae9a7bd4967d8ee",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "303",
"headers": {},
"x": 920,
"y": 180,
"wires": []
},
{
"id": "e24756a645f1032f",
"type": "http in",
"z": "3fd93e9558924dee",
"name": "",
"url": "/semaphore/value/:name",
"method": "get",
"upload": false,
"swaggerDoc": "",
"x": 400,
"y": 560,
"wires": [
[
"493f59ece63ecbf2"
]
]
},
{
"id": "1722d49bbed4ab42",
"type": "http in",
"z": "3fd93e9558924dee",
"name": "",
"url": "/semaphore/up",
"method": "post",
"upload": false,
"swaggerDoc": "",
"x": 380,
"y": 740,
"wires": [
[
"5fd9e5c5e202d585"
]
]
},
{
"id": "f6eee1f0a68aae33",
"type": "comment",
"z": "3fd93e9558924dee",
"name": "INPUT FORMAT: NAME AND INIT VALUE",
"info": "{\n \"name\": \"<sem_name>\",\n \"value\": 1 //or any positive integer\n}",
"x": 260,
"y": 60,
"wires": []
},
{
"id": "9e264d0b278a8561",
"type": "http in",
"z": "3fd93e9558924dee",
"name": "",
"url": "/semaphore/down",
"method": "post",
"upload": false,
"swaggerDoc": "",
"x": 380,
"y": 860,
"wires": [
[
"26ee2ee0c7a67793"
]
]
},
{
"id": "c8b282531e638f3b",
"type": "function",
"z": "3fd93e9558924dee",
"name": "check previous",
"func": "\nmsg.exists=false;\nmsg.isnegative=false;\nif (flow.get(msg.payload.name)){\n //exists\n msg.exists=true;\n} else{\n if (msg.payload.value<0){\n msg.isnegative=true;\n msg.payload={};\n msg.payload=\"Value needs to be positive\";\n } else {\n msg.payload.value=parseInt(msg.payload.value,10);\n flow.set(msg.payload.name,msg.payload);\n }\n}\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 600,
"y": 120,
"wires": [
[
"ed351f537268c28b"
]
]
},
{
"id": "ed351f537268c28b",
"type": "switch",
"z": "3fd93e9558924dee",
"name": "exists",
"property": "exists",
"propertyType": "msg",
"rules": [
{
"t": "false"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 770,
"y": 120,
"wires": [
[
"0269c9d4a0e8db15"
],
[
"0ae9a7bd4967d8ee"
]
]
},
{
"id": "5fd9e5c5e202d585",
"type": "function",
"z": "3fd93e9558924dee",
"name": "UP",
"func": "\nmsg.exists=false;\n\nif (flow.get(msg.payload.name)){\n //exists\n var sem=flow.get(msg.payload.name);\n sem.value++;\n msg.exists=true;\n flow.set(msg.payload.name,sem);\n msg.payload=flow.get(msg.payload.name);\n} else{\n msg.payload=\"semaphore not found\";\n \n}\n\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 570,
"y": 740,
"wires": [
[
"efd55cb64b7c59bb"
]
]
},
{
"id": "493f59ece63ecbf2",
"type": "function",
"z": "3fd93e9558924dee",
"name": "GET",
"func": "\nmsg.exists=false;\nif (flow.get(msg.req.params.name)){\n //exists\n msg.exists=true;\n msg.payload=flow.get(msg.req.params.name);\n} else{\n msg.payload=\"semaphore not found\";\n \n}\n\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 630,
"y": 560,
"wires": [
[
"20b5c180fbee73ab"
]
]
},
{
"id": "20b5c180fbee73ab",
"type": "switch",
"z": "3fd93e9558924dee",
"name": "exists",
"property": "exists",
"propertyType": "msg",
"rules": [
{
"t": "true"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 770,
"y": 560,
"wires": [
[
"f017ed4b5beb12c1"
],
[
"516e90ee21897008"
]
]
},
{
"id": "f017ed4b5beb12c1",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "200",
"headers": {},
"x": 940,
"y": 520,
"wires": []
},
{
"id": "516e90ee21897008",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "404",
"headers": {},
"x": 940,
"y": 580,
"wires": []
},
{
"id": "26ee2ee0c7a67793",
"type": "function",
"z": "3fd93e9558924dee",
"name": "DOWN",
"func": "msg.exists=false;\nmsg.locked=false;\nif (flow.get(msg.payload.name)){\n //exists\n msg.exists=true;\n var sem=flow.get(msg.payload.name);\n if (sem.value!=0){\n sem.value--;\n flow.set(msg.payload.name,sem);\n msg.payload=flow.get(msg.payload.name);\n } else {\n msg.locked=true;\n msg.payload='Semaphore locked';\n }\n} else{\n msg.payload=\"semaphore not found\";\n \n}\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 580,
"y": 860,
"wires": [
[
"477b0099e82e1db7"
]
]
},
{
"id": "efd55cb64b7c59bb",
"type": "switch",
"z": "3fd93e9558924dee",
"name": "exists",
"property": "exists",
"propertyType": "msg",
"rules": [
{
"t": "true"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 730,
"y": 740,
"wires": [
[
"dfe23e7e89991261"
],
[
"c413c977e3137760"
]
]
},
{
"id": "dfe23e7e89991261",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "200",
"headers": {},
"x": 900,
"y": 700,
"wires": []
},
{
"id": "c413c977e3137760",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "404",
"headers": {},
"x": 900,
"y": 760,
"wires": []
},
{
"id": "477b0099e82e1db7",
"type": "switch",
"z": "3fd93e9558924dee",
"name": "exists",
"property": "exists",
"propertyType": "msg",
"rules": [
{
"t": "true"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 730,
"y": 860,
"wires": [
[
"f56927749049be1f"
],
[
"6d25340b25b49348"
]
]
},
{
"id": "585130516407d9f3",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "200",
"headers": {},
"x": 1060,
"y": 820,
"wires": []
},
{
"id": "6d25340b25b49348",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "404",
"headers": {},
"x": 920,
"y": 920,
"wires": []
},
{
"id": "f56927749049be1f",
"type": "switch",
"z": "3fd93e9558924dee",
"name": "locked",
"property": "locked",
"propertyType": "msg",
"rules": [
{
"t": "false"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 870,
"y": 840,
"wires": [
[
"585130516407d9f3"
],
[
"2320e6935d39792c"
]
]
},
{
"id": "2320e6935d39792c",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "409",
"headers": {},
"x": 1060,
"y": 860,
"wires": []
},
{
"id": "b935e803799059e3",
"type": "http in",
"z": "3fd93e9558924dee",
"name": "",
"url": "/semaphore/:name",
"method": "delete",
"upload": false,
"swaggerDoc": "",
"x": 390,
"y": 340,
"wires": [
[
"b1d0e34e546440b4"
]
]
},
{
"id": "b1d0e34e546440b4",
"type": "function",
"z": "3fd93e9558924dee",
"name": "check previous",
"func": "\nmsg.exists=true;\nif (flow.get(msg.req.params.name)){\n //no delete method available for flow context, bypass with undefined\n flow.set(msg.payload.name,undefined);\n msg.payload=\"Deletion Successful\";\n \n} else{\n msg.payload=\"Semaphore not found\";\n msg.exists=false;\n}\n\nreturn msg;",
"outputs": 1,
"noerr": 0,
"initialize": "",
"finalize": "",
"libs": [],
"x": 600,
"y": 340,
"wires": [
[
"76d822d9f7a2b3d2"
]
]
},
{
"id": "76d822d9f7a2b3d2",
"type": "switch",
"z": "3fd93e9558924dee",
"name": "exists",
"property": "exists",
"propertyType": "msg",
"rules": [
{
"t": "true"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 770,
"y": 340,
"wires": [
[
"a371ee130532d2ca"
],
[
"36512c117d7e6cbc"
]
]
},
{
"id": "a371ee130532d2ca",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "200",
"headers": {},
"x": 920,
"y": 300,
"wires": []
},
{
"id": "36512c117d7e6cbc",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "404",
"headers": {},
"x": 920,
"y": 400,
"wires": []
},
{
"id": "0269c9d4a0e8db15",
"type": "switch",
"z": "3fd93e9558924dee",
"name": "isnegative",
"property": "isnegative",
"propertyType": "msg",
"rules": [
{
"t": "false"
},
{
"t": "else"
}
],
"checkall": "true",
"repair": false,
"outputs": 2,
"x": 920,
"y": 80,
"wires": [
[
"baa9998a904ab1b9"
],
[
"edef503521df3454"
]
]
},
{
"id": "edef503521df3454",
"type": "http response",
"z": "3fd93e9558924dee",
"name": "",
"statusCode": "400",
"headers": {},
"x": 1080,
"y": 120,
"wires": []
},
{
"id": "7dc15d8ba9702218",
"type": "comment",
"z": "3fd93e9558924dee",
"name": "Limitation",
"info": "In Node-RED there is no ability to define two different methods on the same endpoint, thus no DELETE method can be defined directly in the /semaphore endpoint. A new endpoint was defined for the delete operation.",
"x": 340,
"y": 300,
"wires": []
},
{
"id": "77f418dcee9bd21b",
"type": "inject",
"z": "64647354bddcd607",
"name": "TEST NEW",
"props": [
{
"p": "payload"
},
{
"p": "url",
"v": "http://localhost:1880/semaphore",
"vt": "str"
},
{
"p": "method",
"v": "POST",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "{\"name\":\"myfirst4\",\"value\":1.5}",
"payloadType": "json",
"x": 310,
"y": 700,
"wires": [
[
"7bf3eaa1ac3d1e74"
]
]
},
{
"id": "7bf3eaa1ac3d1e74",
"type": "http request",
"z": "64647354bddcd607",
"name": "",
"method": "use",
"ret": "txt",
"paytoqs": "ignore",
"url": "",
"tls": "",
"persist": false,
"proxy": "",
"authType": "",
"x": 550,
"y": 700,
"wires": [
[
"5503979ebf75dc98"
]
]
},
{
"id": "5503979ebf75dc98",
"type": "debug",
"z": "64647354bddcd607",
"name": "",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"statusVal": "",
"statusType": "auto",
"x": 730,
"y": 700,
"wires": []
},
{
"id": "784f40a4f9e865c6",
"type": "inject",
"z": "64647354bddcd607",
"name": "GET VALUE",
"props": [
{
"p": "url",
"v": "http://localhost:1880/semaphore/value/myfirst4",
"vt": "str"
},
{
"p": "method",
"v": "GET",
"vt": "str"
},
{
"p": "payload"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "{}",
"payloadType": "json",
"x": 310,
"y": 820,
"wires": [
[
"7bf3eaa1ac3d1e74"
]
]
},
{
"id": "871d8b154c55c8ee",
"type": "inject",
"z": "64647354bddcd607",
"name": "TEST UP",
"props": [
{
"p": "payload"
},
{
"p": "url",
"v": "http://localhost:1880/semaphore/up",
"vt": "str"
},
{
"p": "method",
"v": "POST",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "{\"name\":\"myfirst4\"}",
"payloadType": "json",
"x": 300,
"y": 880,
"wires": [
[
"7bf3eaa1ac3d1e74"
]
]
},
{
"id": "67d067d192043e9d",
"type": "inject",
"z": "64647354bddcd607",
"name": "TEST DOWN",
"props": [
{
"p": "payload"
},
{
"p": "url",
"v": "http://localhost:1880/semaphore/down",
"vt": "str"
},
{
"p": "method",
"v": "POST",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "{\"name\":\"myfirst1\"}",
"payloadType": "json",
"x": 310,
"y": 940,
"wires": [
[
"7bf3eaa1ac3d1e74"
]
]
},
{
"id": "fd3774e001a54827",
"type": "comment",
"z": "64647354bddcd607",
"name": "TESTING SECTION",
"info": "",
"x": 320,
"y": 640,
"wires": []
},
{
"id": "378e095fd97d3020",
"type": "subflow:3fd93e9558924dee",
"z": "64647354bddcd607",
"name": "",
"env": [],
"x": 570,
"y": 520,
"wires": []
},
{
"id": "a02038f4f17655fa",
"type": "inject",
"z": "64647354bddcd607",
"name": "TEST DELETE",
"props": [
{
"p": "payload"
},
{
"p": "url",
"v": "http://localhost:1880/semaphore/myfirst4",
"vt": "str"
},
{
"p": "method",
"v": "DELETE",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": false,
"onceDelay": 0.1,
"topic": "",
"payload": "{\"name\":\"myfirst4\"}",
"payloadType": "json",
"x": 320,
"y": 760,
"wires": [
[
"7bf3eaa1ac3d1e74"
]
]
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment