Skip to content

Instantly share code, notes, and snippets.

@zdeslav
Last active August 29, 2015 13:59
Show Gist options
  • Save zdeslav/10879666 to your computer and use it in GitHub Desktop.
Save zdeslav/10879666 to your computer and use it in GitHub Desktop.
stack-bound pipeline implementation
#include <string>
#include <map>
#include <functional>
#include <memory>
typedef int ErrorCode;
enum RequestType {
Auth,
CashIn,
Undefined
};
struct Request
{
RequestType request_type;
std::string payload;
};
struct Response {};
class filter_exception : public std::runtime_error
{
ErrorCode m_error;
public:
filter_exception(ErrorCode error, const char* what)
: m_error(error), std::runtime_error(what) {}
ErrorCode error() const { return m_error; }
};
struct Context
{
Request request;
Response response;
ErrorCode error_code;
std::string error;
std::string last_stage;
int receive_time, queue_time, processing_time;
};
template <char id, int error_code>
struct Filter
{
std::string name() const { return std::string("filter") + std::to_string(id - '0'); }
ErrorCode process(Context& ctxt) { return error_code; }
};
typedef Filter<'1', 0> Filter1;
typedef Filter<'2', 0> Filter2;
typedef Filter<'3', 0> Filter3;
typedef Filter<'4', 0> Filter4;
typedef Filter<'5', 4> FilterError;
template <typename First, typename... Rest>
class Pipeline
{
struct Terminator {};
public:
void Process(Context& ctxt) {
ctxt.error_code = 0;
Handle<First, Rest..., Terminator>(ctxt);
}
private:
template<typename Current, typename ...Following>
typename std::enable_if<!std::is_same<Current, Terminator>::value, void>::type
Handle(Context& ctxt, int level = 1)
{
try
{
{
Current filter;
ctxt.last_stage = filter.name();
ctxt.error_code = filter.process(ctxt);
int rest = sizeof...(Following)-1;
printf("%*c%*c stage: %s | err: %d | to go: %d\n", level, '.', rest+1, ' ',
filter.name().c_str(), ctxt.error_code, rest);
}
if (!ctxt.error_code) Handle<Following...>(ctxt, level + 1); // no error, proceed to next filter
}
catch (filter_exception& e) {
ctxt.error_code = e.error();
ctxt.error = e.what();
}
catch (std::exception& e) {
ctxt.error_code = -1;
ctxt.error = e.what();
}
}
template<typename Current>
typename std::enable_if<std::is_same<Current, Terminator>::value, void>::type
Handle(Context& ctxt, int level) { /* NOP */ }
};
typedef std::function< void(Context&)> pipeline_fn;
std::map<RequestType, pipeline_fn> pipelineHandlers;
template<class T>
void RegisterPipeline(RequestType rq)
{
pipelineHandlers[rq] = [](Context& c) { T pipeline; pipeline.Process(c); };
}
typedef Pipeline<Filter1,
Filter2,
Filter3,
Filter1>
CashInPipeline;
typedef Pipeline<Filter3,
Filter4,
FilterError,
Filter3>
BrokenPipeline;
int _tmain(int argc, _TCHAR* argv[])
{
RegisterPipeline<CashInPipeline>(CashIn);
RegisterPipeline<BrokenPipeline>(Auth);
Request requests[] = { { CashIn }, { Auth }, { CashIn } };
for (auto& rq : requests)
{
pipeline_fn handler = pipelineHandlers[rq.request_type];
Context ctxt = { rq };
handler(ctxt);
printf("processed context - rq: %d, err: %d (%s), last stage: %s\n",
rq.request_type, ctxt.error_code, ctxt.error.c_str(), ctxt.last_stage.c_str());
printf("-----------------------------------------------------------\n");
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment