Skip to content

Instantly share code, notes, and snippets.

@tonisuter
Last active August 29, 2015 14:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tonisuter/a5ec1e0ca0de58ebd8c1 to your computer and use it in GitHub Desktop.
Save tonisuter/a5ec1e0ca0de58ebd8c1 to your computer and use it in GitHub Desktop.
FastFlow & Liquefier

FastFlow

Pipeline

  • Type: ff_pipe
  • Header: <ff/ffpipeline.hpp>

A pipeline is a stream parallel skeleton. It has multiple stages and a task type which defines the type of the elements (tasks) that are passed from stage to stage. Each stage runs on a separate thread and works on one task at a time.

The following example shows how a pipeline with 3 stages and task type int can be created. The pipe is started with a call to the member function run_and_wait_end():

Stage1 first{};
Stage2 second{};
Stage3 third{};
ff_pipe<int> pipe(&first, &second, &third);
pipe.run_and_wait_end();

If the stages are allocated with new, the programmer can call the member function cleanup_nodes() which causes the pipe to automatically delete the stages once it is done:

ff_pipe<int> pipe(new Stage1{}, new Stage2{}, new Stage3{});
pipe.cleanup_nodes();
pipe.run_and_wait_end();

Stage

  • Type: Subclass of ff_node_t
  • Header: <ff/node.hpp>

Each stage has an input and an output stream. One task (of type "pointer to task type") at a time is passed to the stage from its predecessor. The incoming task is passed as an argument to the virtual member function <task_type>* svc(<task_type>* in). This function represents the "body" of the stage and does the main work. It makes changes to the task in and returns it to pass it through the output stream to the next stage. The following example shows the definition of a stage with task type int that multiplies each incoming task by 2 and passes it to the next stage:

struct Stage: ff_node_t<int> {
  int* svc(int* in) {
    *in = *in * 2;
    return in;
  }
};

There are two other virtual member functions called svc_init() and svc_end() which are called when the stage starts up or shuts down, respectively. In contrast to svc() they are not defined "pure" virtual and are therefore optional.

Initial stage

The initial stage doesn't get any tasks through its input stream because there are no preceding stages. Instead, it is responsible for generating the stream of tasks that is supposed to be processed by the stages of the pipeline:

struct InitialStage: ff_node_t<int> {
  int* svc(int*) {
    for(int i = 0; i < 100; ++i) {
      ff_send_out(new int{i});
    }
    return EOS;
  }
};

In the above example, the call to ff_send_out(<task_type>*) is used to pass a task to the output stream without returning from svc(). The constant EOS is a special value that means "end of stream". Therefore, InitialStage is "shut down" afterwards.

Last stage

The last stage shouldn't send any tasks onto its output stream because there are no subsequent stages. It also can't return EOS because this would cause it to shut down after the first task has been processed. Thus, there is another constant GO_ON which causes the stage to stay alive and to keep waiting for new tasks:

struct LastStage: ff_node_t<int> {
  int* svc(int* in) {
    std::cout << *in << std::endl;
    delete in;
    return GO_ON;
  }
};

As shown in the example above, it may also be necessary to delete the incoming tasks in the last stage in order to avoid memory leaks.

Liquefier

Usage

  • Set cursor inside the pipeline code. If the cursor is in the pipeline attribute or anywhere outside the pipeline, the pipeline won't be found.
  • Start refactoring via Refactor -> Generate Pipeline

Implementation

  • Some things can be configured in RefactoringConfig.java
  • If the refactoring is applied in a file called <filename>.cpp, the plug-in generates a new file with the name pipeline_<filename>.cpp.
  • The original file (<filename>.cpp) won't be modified.
  • The new file contains a copy of the code in the original file with the following modifications:
    • Per statement with rpr::kernel attribute (kernel statement) one stage struct will be generated. The names of the stages are Stage1, Stage2, etc.
    • The statement with the rpr::pipeline attribute (pipeline statement) is replaced with the code that instantiates the stages and the pipeline.

Problems & missing features

  • validation of the pipeline only checks if there is a parent statement with a pipeline attribute (maybe more analysis is required to check if this is a valid pipeline)
  • When should we use ff_send_out(new <task_type>{x}) and when ff_send_out(&x)? => detect automatically
  • If necessary, add code in the last stage to delete the tasks to avoid memory leaks.
  • Avoid name clashes (e.g. the name of the svc input parameter is always "in" => may cause conflict)
  • If a variable is used in a kernel statement and its declaration is outside of that statement it is assumed to be a dependency. This means that it is passed as an argument to the stage constructor and stored in a member variable so that the kernel code is still able to access the variable. Right now, the variables are all passed by reference to the constructor => check if the variable is modified or only read?
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment