Skip to content

Instantly share code, notes, and snippets.

@gkousiouris
gkousiouris / README.md
Last active May 14, 2022 07:42
Poll2Push Converter and example with Openwhisk

The flow inserts a subflow named Poll2PushConverter, in order to simplify the process of polling towards an endpoint that does not support blocking calls up to completion. The initial call returns after a specified period and is then fed to the Poll2PushConverter in order to undertake the polling. Upon successful completion (or reaching of max number of attempts), the subflow triggers a message to one of the respective outputs. poll2pushsubflow

An example flow is also included with relation to how to apply it to an Openwhisk web action invocation that defaults to 65 seconds of waiting time in the initial blocking call, while the action takes longer to complete. poll2pushexampleflow

The docker image of the used action can be found here and is a delay action that takes as inputs the iterations and the

@gkousiouris
gkousiouris / README.md
Last active May 14, 2022 07:43
Artificial Delay flow with Openwhisk Action Wrapper

The core flow handles a control loop where delay and iteration numbers are dynamically passed on as input variables during the action invocation. This is needed to enhance experiment automation, so that we can pass through arguments the number of n repetitions of the main delay loop, in order to simulate a flow of n functions, each with the set delay. The respective flow should also be executable as an Openwhisk function, which means it needs to abide by the respective interface of the latter (one POST /init method and one POST /run method. The function node in the middle subflow extracts parameters from the incoming message and passes them through the msg.delay and msg.iterations fields. The follow-up node is a Node-RED built-in delay node that gets the needed delay from the assigned msg.delay field. The whole process is iterated until the number of needed delays are met.

dynamicdelayflow

@gkousiouris
gkousiouris / README.md
Last active May 14, 2022 07:43
Split Join Parallelized Execution Pattern

This is a template subflow for parallelizing the execution of a function or process on the provided input data (Single Program Multiple Data pattern). The subflow gets the initial message and chunks it down based on the msg.payload.value.splitsize value. Each batch of input rows is forwared as input to one of three means of execution (configured in the msg.payload.value.execution as one of "faas","local_multithread","local_multiprocess"):

  • an Openwhisk action (whose name is included in the msg.payload.value.action field)
  • a local thread or
  • a newly spawned local process (name of script to execute in msg.payload.value.shellscript)

Pattern Images-Split and Join drawio (1)

Therefore if an array of 1000 rows (included in the msg.payload.value.values) is inserted and the splitsize is set to 10, it will create 100 inner calls of the msg.payload.value.action if the execution is set to "faas". In this cas

@gkousiouris
gkousiouris / README.md
Last active July 9, 2022 16:57
Edge Extract Transform Load (ETL) Data Collection Pattern

This subflow handles the pushing of data to an HTTP out endpoint. It accepts JSON input (object or array) in the msg.payload, which is considered the data to be pushed. The flow is intended to be run as a service at the edge, serving as an intermediate data collector. The main goal is to ensure a robust and reliable process for data collection, that is not affected by network connectivity loss or other disturbing factors at the edge. The main logic of the flow appears in the following figure.

Pattern Images-Edge ETL Service drawio

The Node-RED input layer can be used to adapt to diverse protocols needed at the Edge IoT side. An example of an HTTP In local interface that then uses the Edge ETL subflow appears in the following figure. Of course any other type of Node-RED connector node can be used (AMQP, MQTT etc) for getting the data in the msg.payload.

![usage](https://user-images.githubusercontent.

@gkousiouris
gkousiouris / README.md
Last active November 18, 2022 18:20
Set Rate Load Generator for Openwhisk with Function Chain

The flow can be used to generate a set rate load towards an endpoint. It has been adapted to Openwhisk endpoint rationale, but it can also be used in generic endpoints with minor modifications. The main requirement is to have a status endpoint, in which the initial call result will be made available after the end of the request processing.

Detailed information for the input and output data specification is included in the flow. image

The flow can be utilized directly in Node-RED, generating the traffic from the local flow, or it can be deployed and executed as an Openwhisk action. image

For this purpose it has been wrapped around a container available here:

@gkousiouris
gkousiouris / README.md
Last active February 4, 2023 13:10
Producer-Consumer Pattern Implementation using the Semaphore Service Subflow

This is an example distributed producer/consumer problem implementation using the Semaphore Service Subflow.

This is based on a typical implementation that includes the use of a mutex, a full counter (initialized at 0) and an empty counter (initialized at the MAX needed slots, in this case 3). Pattern Images-SEMAPHORE SERVICE drawio

The logic of the producer/consumer appears below:

producer() {
	while (TRUE) {
@gkousiouris
gkousiouris / README.md
Last active February 4, 2023 13:15
Request Aggregator Service Flow

This is a flow to be run as a service, in order to perform request aggregation towards a target endpoint. The purpose is to reduce the stress towards the back-end system, by minimizing the arrival rate of requests and the number of needed serving threads or containers (in the case of FaaS systems). Thus it also reduces costs and improves performance. It is mostly beneficial in cases where the request needs to set up a rather heavyweight environment (thread or container) for performing a relatively small computation (e.g. in model inference, simulation etc.).

initial

The RA gets as input the target endpoint, as well as the method to be invoked and the input data to be the payload of the aggregated call. It holds the requests until a threshold of incoming messages is reached (set in the setBatchSize endpoint), at which point it collates the inputs of the calls into one array request towards the target e

@gkousiouris
gkousiouris / README.md
Last active February 4, 2023 13:29
OpenWhisk Skeleton Interface for Node-RED flows as functions

This flow implements the necessary interface needed by a Node-RED flow, in order to be executed as a function inside Openwhisk . Openwhisk can execute in a serverless manner any type of Docker container, as long as it implements two methods on port 8080:

owskeletonflow_witherror

-A POST /init , used in the typical OW runtime images (such as node.js etc) to retrieve the function code from the OW function DB. In the Node-RED function runtime this is not needed for functional purposes, since the code is already embedded in the flow, but it is needed in order to abide by the OW interface that expects this method. It needs to be stressed that in a warm execution (reuse of existing container from previous same function execution), the /init method is not re-executed. Hence any needed initialization per invocation should not be performed inside this method.

-A POST /run method, which is called upon inv

@gkousiouris
gkousiouris / README.md
Last active February 8, 2023 07:30
Semaphore Service Subflow

This is a subflow to create a semaphore service to act as a distributed lock/synchronization mechanism. It uses flow variables in order to store the state of a semaphore. Changes in the semaphore value are uninterruptable at the level of the node.js process. This is due to the fact that the latter executes them in the single threaded eventloop and does not interrupt their execution unless an async method or worker thread is used in the function's implementation.

main

The subflow defines 5 endpoints:

  • POST /semaphore for new semaphore creation. The body should contain name and initialization value of the created semaphore: {"name":"<semname>","value":1}. The value needs to be a positive integer. If not positive, a 400 HTTP error code is returned. A positive float will be converted to integer. If the semaphore already exists, a 303 HTTP error code is returned.
  • DELETE /semaphore/:name nee
@gkousiouris
gkousiouris / README.md
Last active April 25, 2023 17:26
Minio Webhook Automated Annotator

The specific subflow can be used as a webhook endpoint (/events) for getting notifications from a Minio installation and automating the enrichment of a newly inserted object with metadata.

It filters notification messages based on new objects put but also if they already have the specific metadatum needed. This is because the flow puts the object again with the new metadata, so it creates a new notification and thus an infinite loop if this comparison is not made.

The flow requires the node-red collection @reggae_ulli/node-red-contrib-minio-all-fix for the interface with the Minio server.

Pattern Images-MINIO WEBHOOK ANNOTATOR drawio

The user can configure which metadata field this skip comparison can be performed in the relevant switch node 'if already has metadata skip'.