Skip to content

Instantly share code, notes, and snippets.

@renestein
Created May 4, 2020 08:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save renestein/e85bde8c461b839b51f195892aae92b9 to your computer and use it in GitHub Desktop.
Save renestein/e85bde8c461b839b51f195892aae92b9 to your computer and use it in GitHub Desktop.
Tasks::Task<size_t> WhenAsyncForkJoinDataflowThenAllInputsProcessedImpl(int inputItemsCount)
{
//Create TransformBlock. As the name of the block suggests, TransformBlock transforms input to output.
//Following block transforms int to string.
auto transform1 = DataFlowAsyncFactory::CreateTransformBlock<int, int>([](const int& item)-> Tasks::Task<int>
{
//Simulate work
co_await Tasks::GetCompletedTask();
auto message = "int: " + to_string(item) + "\n";
cout << message;
co_return item;
});
//Fork dataflow (even numbers are processed in one TransformBlock, for odd numbers create another transformBlock)
auto transform2 = DataFlowAsyncFactory::CreateTransformBlock<int, string>([](const int& item)->Tasks::Task<string>
{
//Simulate work
co_await Tasks::GetCompletedTask();
auto message = "Even number: " + to_string(item) + "\n";
cout << message;
co_return to_string(item);
},
//Accept only even numbers.
//Condition is evaluated for every input.
//If the condition evaluates to true, input is accepted; otherwise input is ignored.
[](const int& item)
{
return item % 2 == 0;
});
auto transform3 = DataFlowAsyncFactory::CreateTransformBlock<int, string>([](const int& item)->Tasks::Task<string>
{
//Simulate work
co_await Tasks::GetCompletedTask();
auto message = "Odd number: " + to_string(item) + "\n";
cout << message;
co_return to_string(item);
},
//Accept only odd numbers.
//Condition is evaluated for every input.
//If the condition evaluates to true, input is accepted; otherwise input is ignored.
[](const int& item)
{
return item % 2 != 0;
});
//End fork.
vector<string> _processedItems{};
auto finalAction = DataFlowSyncFactory::CreateActionBlock<string>([&_processedItems](const string& item)
{
auto message = "Final action: " + item + "\n";
cout << message;
_processedItems.push_back(item);
});
//Fork
transform1->ConnectTo(transform2);
transform1->ConnectTo(transform3);
//end fork
//Join
transform3->ConnectTo(finalAction);
transform2->ConnectTo(finalAction);
//End join
//Start dataflow
transform1->Start();
//Add input data to the first block.
for (int i = 0; i < inputItemsCount; ++i)
{
co_await transform1->AcceptInputAsync(i);
}
//All input data are in the dataflow. Send notification that no more data will be added.
//This does not mean that all data in the dataflow are processed!
transform1->Complete();
//Wait for last block.
co_await finalAction->Completion();
const auto processedItemsCount = _processedItems.size();
co_return processedItemsCount;
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment