- Type:
ff_pipe
- Header:
<ff/ffpipeline.hpp>
A pipeline is a stream parallel skeleton. It has multiple stages and a task type which defines the type of the elements (tasks) that are passed from stage to stage. Each stage runs on a separate thread and works on one task at a time.
The following example shows how a pipeline with 3 stages and task type int
can be created. The pipe is started
with a call to the member function run_and_wait_end()
:
Stage1 first{};
Stage2 second{};
Stage3 third{};
ff_pipe<int> pipe(&first, &second, &third);
pipe.run_and_wait_end();
If the stages are allocated with new, the programmer can call the member function cleanup_nodes() which causes the pipe to automatically delete the stages once it is done:
ff_pipe<int> pipe(new Stage1{}, new Stage2{}, new Stage3{});
pipe.cleanup_nodes();
pipe.run_and_wait_end();
- Type: Subclass of
ff_node_t
- Header:
<ff/node.hpp>
Each stage has an input and an output stream. One task (of type "pointer to task type") at a time is passed to the stage from its predecessor. The incoming task is passed as an argument to the virtual member function <task_type>* svc(<task_type>* in)
. This function represents the "body" of the stage and does the main work. It makes changes to the task in
and returns it to
pass it through the output stream to the next stage. The following example shows the definition of a stage with task type int
that multiplies each incoming task by 2 and passes it to the next stage:
struct Stage: ff_node_t<int> {
int* svc(int* in) {
*in = *in * 2;
return in;
}
};
There are two other virtual member functions called svc_init()
and svc_end()
which are called when the stage starts up or shuts down, respectively. In contrast to svc()
they are not defined "pure" virtual and are therefore optional.
The initial stage doesn't get any tasks through its input stream because there are no preceding stages. Instead, it is responsible for generating the stream of tasks that is supposed to be processed by the stages of the pipeline:
struct InitialStage: ff_node_t<int> {
int* svc(int*) {
for(int i = 0; i < 100; ++i) {
ff_send_out(new int{i});
}
return EOS;
}
};
In the above example, the call to ff_send_out(<task_type>*)
is used to pass a task to the output stream without returning
from svc()
. The constant EOS
is a special value that means "end of stream". Therefore, InitialStage is "shut down" afterwards.
The last stage shouldn't send any tasks onto its output stream because there are no subsequent stages. It also can't return
EOS
because this would cause it to shut down after the first task has been processed. Thus, there is another constant GO_ON
which causes the stage to stay alive and to keep waiting for new tasks:
struct LastStage: ff_node_t<int> {
int* svc(int* in) {
std::cout << *in << std::endl;
delete in;
return GO_ON;
}
};
As shown in the example above, it may also be necessary to delete the incoming tasks in the last stage in order to avoid memory leaks.
- Set cursor inside the pipeline code. If the cursor is in the pipeline attribute or anywhere outside the pipeline, the pipeline won't be found.
- Start refactoring via Refactor -> Generate Pipeline
- Some things can be configured in
RefactoringConfig.java
- If the refactoring is applied in a file called
<filename>.cpp
, the plug-in generates a new file with the namepipeline_<filename>.cpp
. - The original file (
<filename>.cpp
) won't be modified. - The new file contains a copy of the code in the original file with the following modifications:
- Per statement with
rpr::kernel
attribute (kernel statement) one stage struct will be generated. The names of the stages are Stage1, Stage2, etc. - The statement with the
rpr::pipeline
attribute (pipeline statement) is replaced with the code that instantiates the stages and the pipeline.
- Per statement with
- validation of the pipeline only checks if there is a parent statement with a
pipeline
attribute (maybe more analysis is required to check if this is a valid pipeline) - When should we use
ff_send_out(new <task_type>{x})
and whenff_send_out(&x)
? => detect automatically - If necessary, add code in the last stage to delete the tasks to avoid memory leaks.
- Avoid name clashes (e.g. the name of the svc input parameter is always "in" => may cause conflict)
- If a variable is used in a kernel statement and its declaration is outside of that statement it is assumed to be a dependency. This means that it is passed as an argument to the stage constructor and stored in a member variable so that the kernel code is still able to access the variable. Right now, the variables are all passed by reference to the constructor => check if the variable is modified or only read?