Created
February 21, 2013 09:25
-
-
Save Mortal/5003456 to your computer and use it in GitHub Desktop.
Patch required to change worker_state to class enum, and the resulting parallel.h file. See also https://github.com/thomasmoelhave/tpie/issues/52
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/tpie/pipelining/parallel.h b/tpie/pipelining/parallel.h | |
index b3735a4..716c434 100644 | |
--- a/tpie/pipelining/parallel.h | |
+++ b/tpie/pipelining/parallel.h | |
@@ -125,22 +125,24 @@ struct options { | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief States of the parallel worker state machine. | |
/////////////////////////////////////////////////////////////////////////////// | |
-enum worker_state { | |
+struct worker_state { | |
+enum type { | |
/** The input is being written by the producer. */ | |
- INITIALIZING, | |
+ initializing, | |
/** The input is being written by the producer. */ | |
- IDLE, | |
+ idle, | |
/** The worker is writing output. */ | |
- PROCESSING, | |
+ processing, | |
/** The worker has filled its output buffer, but has not yet consumed the | |
* input buffer. */ | |
- PARTIAL_OUTPUT, | |
+ partial_output, | |
/** The output is being read by the consumer. */ | |
- OUTPUTTING | |
+ outputting | |
+}; | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
@@ -418,12 +420,12 @@ public: | |
after_base & output(size_t idx) { return *m_outputs[idx]; } | |
/// Shared state, must have mutex to use. | |
- worker_state get_state(size_t idx) { | |
+ worker_state::type get_state(size_t idx) { | |
return m_states[idx]; | |
} | |
/// Shared state, must have mutex to use. | |
- void transition_state(size_t idx, worker_state from, worker_state to) { | |
+ void transition_state(size_t idx, worker_state::type from, worker_state::type to) { | |
if (m_states[idx] != from) { | |
std::stringstream ss; | |
ss << "Invalid state transition " << from << " -> " << to << "; current state is " << m_states[idx]; | |
@@ -436,7 +438,7 @@ public: | |
protected: | |
std::vector<node *> m_inputs; | |
std::vector<after_base *> m_outputs; | |
- std::vector<worker_state> m_states; | |
+ std::vector<worker_state::type> m_states; | |
state_base(const options opts) | |
: opts(opts) | |
@@ -444,7 +446,7 @@ protected: | |
, runningWorkers(0) | |
, m_inputs(opts.numJobs, 0) | |
, m_outputs(opts.numJobs, 0) | |
- , m_states(opts.numJobs, INITIALIZING) | |
+ , m_states(opts.numJobs, worker_state::initializing) | |
{ | |
workerCond = new cond_t[opts.numJobs]; | |
} | |
@@ -599,18 +601,18 @@ public: | |
private: | |
bool is_done() const { | |
switch (st.get_state(parId)) { | |
- case INITIALIZING: | |
+ case worker_state::initializing: | |
throw tpie::exception("INITIALIZING not expected in after::is_done"); | |
- case IDLE: | |
+ case worker_state::idle: | |
return true; | |
- case PROCESSING: | |
+ case worker_state::processing: | |
// The main thread may transition us from Outputting to Idle to | |
// Processing without us noticing, or it may transition us from | |
// Partial_Output to Processing. In either case, we are done | |
// flushing the buffer. | |
return true; | |
- case PARTIAL_OUTPUT: | |
- case OUTPUTTING: | |
+ case worker_state::partial_output: | |
+ case worker_state::outputting: | |
return false; | |
} | |
throw std::runtime_error("Unknown state"); | |
@@ -620,7 +622,7 @@ private: | |
if (m_buffer->m_outputSize == 0) | |
return; | |
lock_t lock(st.mutex); | |
- st.transition_state(parId, PROCESSING, complete ? OUTPUTTING : PARTIAL_OUTPUT); | |
+ st.transition_state(parId, worker_state::processing, complete ? worker_state::outputting : worker_state::partial_output); | |
// notify producer that output is ready | |
st.producerCond.notify_one(); | |
while (!is_done()) { | |
@@ -682,15 +684,15 @@ private: | |
/////////////////////////////////////////////////////////////////////////// | |
bool ready() { | |
switch (st.get_state(parId)) { | |
- case INITIALIZING: | |
+ case worker_state::initializing: | |
throw tpie::exception("INITIALIZING not expected in before::ready"); | |
- case IDLE: | |
+ case worker_state::idle: | |
return false; | |
- case PROCESSING: | |
+ case worker_state::processing: | |
return true; | |
- case PARTIAL_OUTPUT: | |
+ case worker_state::partial_output: | |
throw std::runtime_error("State 'partial_output' was not expected in before::ready"); | |
- case OUTPUTTING: | |
+ case worker_state::outputting: | |
throw std::runtime_error("State 'outputting' was not expected in before::ready"); | |
} | |
throw std::runtime_error("Unknown state"); | |
@@ -734,7 +736,7 @@ private: | |
// virtual invocation | |
st.output(parId).worker_initialize(); | |
- st.transition_state(parId, INITIALIZING, IDLE); | |
+ st.transition_state(parId, worker_state::initializing, worker_state::idle); | |
running_signal _(st.runningWorkers, st.producerCond); | |
while (true) { | |
// wait for transition IDLE -> PROCESSING | |
@@ -871,18 +873,18 @@ private: | |
bool has_ready_pipe() { | |
for (size_t i = 0; i < st->opts.numJobs; ++i) { | |
switch (st->get_state(i)) { | |
- case INITIALIZING: | |
- case PROCESSING: | |
+ case worker_state::initializing: | |
+ case worker_state::processing: | |
break; | |
- case PARTIAL_OUTPUT: | |
- case OUTPUTTING: | |
+ case worker_state::partial_output: | |
+ case worker_state::outputting: | |
// If we have to maintain order of items, the only | |
// outputting worker we consider to be waiting is the | |
// "front worker". | |
if (st->opts.maintainOrder == order::maintain && m_outputOrder.front() != i) | |
break; | |
// fallthrough | |
- case IDLE: | |
+ case worker_state::idle: | |
readyIdx = i; | |
return true; | |
} | |
@@ -903,12 +905,12 @@ private: | |
bool has_outputting_pipe() { | |
for (size_t i = 0; i < st->opts.numJobs; ++i) { | |
switch (st->get_state(i)) { | |
- case INITIALIZING: | |
- case IDLE: | |
- case PROCESSING: | |
+ case worker_state::initializing: | |
+ case worker_state::idle: | |
+ case worker_state::processing: | |
break; | |
- case PARTIAL_OUTPUT: | |
- case OUTPUTTING: | |
+ case worker_state::partial_output: | |
+ case worker_state::outputting: | |
if (st->opts.maintainOrder == order::maintain && m_outputOrder.front() != i) | |
break; | |
readyIdx = i; | |
@@ -931,12 +933,12 @@ private: | |
bool has_processing_pipe() { | |
for (size_t i = 0; i < st->opts.numJobs; ++i) { | |
switch (st->get_state(i)) { | |
- case INITIALIZING: | |
- case IDLE: | |
- case PARTIAL_OUTPUT: | |
- case OUTPUTTING: | |
+ case worker_state::initializing: | |
+ case worker_state::idle: | |
+ case worker_state::partial_output: | |
+ case worker_state::outputting: | |
break; | |
- case PROCESSING: | |
+ case worker_state::processing: | |
return true; | |
} | |
} | |
@@ -1024,35 +1026,35 @@ private: | |
st->producerCond.wait(lock); | |
} | |
switch (st->get_state(readyIdx)) { | |
- case INITIALIZING: | |
+ case worker_state::initializing: | |
throw tpie::exception("State 'INITIALIZING' not expected at this point"); | |
- case IDLE: | |
+ case worker_state::idle: | |
{ | |
// Send buffer to ready worker | |
item_type * first = &inputBuffer[0]; | |
item_type * last = first + written; | |
parallel_input_buffer<T1> & dest = *st->m_inputBuffers[readyIdx]; | |
dest.set_input(array_view<T1>(first, last)); | |
- st->transition_state(readyIdx, IDLE, PROCESSING); | |
+ st->transition_state(readyIdx, worker_state::idle, worker_state::processing); | |
st->workerCond[readyIdx].notify_one(); | |
written = 0; | |
if (st->opts.maintainOrder == order::maintain) | |
m_outputOrder.push(readyIdx); | |
break; | |
} | |
- case PROCESSING: | |
+ case worker_state::processing: | |
throw std::runtime_error("State 'processing' not expected at this point"); | |
- case PARTIAL_OUTPUT: | |
+ case worker_state::partial_output: | |
// Receive buffer (virtual invocation) | |
cons->consume(st->m_outputBuffers[readyIdx]->get_output()); | |
- st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING); | |
+ st->transition_state(readyIdx, worker_state::partial_output, worker_state::processing); | |
st->workerCond[readyIdx].notify_one(); | |
break; | |
- case OUTPUTTING: | |
+ case worker_state::outputting: | |
// Receive buffer (virtual invocation) | |
cons->consume(st->m_outputBuffers[readyIdx]->get_output()); | |
- st->transition_state(readyIdx, OUTPUTTING, IDLE); | |
+ st->transition_state(readyIdx, worker_state::outputting, worker_state::idle); | |
st->workerCond[readyIdx].notify_one(); | |
if (st->opts.maintainOrder == order::maintain) { | |
if (m_outputOrder.front() != readyIdx) { | |
@@ -1090,12 +1092,12 @@ public: | |
// virtual invocation | |
cons->consume(st->m_outputBuffers[readyIdx]->get_output()); | |
- if (st->get_state(readyIdx) == PARTIAL_OUTPUT) { | |
- st->transition_state(readyIdx, PARTIAL_OUTPUT, PROCESSING); | |
+ if (st->get_state(readyIdx) == worker_state::partial_output) { | |
+ st->transition_state(readyIdx, worker_state::partial_output, worker_state::processing); | |
st->workerCond[readyIdx].notify_one(); | |
continue; | |
} | |
- st->transition_state(readyIdx, OUTPUTTING, IDLE); | |
+ st->transition_state(readyIdx, worker_state::outputting, worker_state::idle); | |
if (st->opts.maintainOrder == order::maintain) { | |
if (m_outputOrder.front() != readyIdx) { | |
log_error() << "Producer: Expected " << readyIdx << " in front; got " |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// -*- mode: c++; tab-width: 4; indent-tabs-mode: t; eval: (progn (c-set-style "stroustrup") (c-set-offset 'innamespace 0)); -*- | |
// vi:set ts=4 sts=4 sw=4 noet : | |
// Copyright 2012, The TPIE development team | |
// | |
// This file is part of TPIE. | |
// | |
// TPIE is free software: you can redistribute it and/or modify it under | |
// the terms of the GNU Lesser General Public License as published by the | |
// Free Software Foundation, either version 3 of the License, or (at your | |
// option) any later version. | |
// | |
// TPIE is distributed in the hope that it will be useful, but WITHOUT ANY | |
// WARRANTY; without even the implied warranty of MERCHANTABILITY or | |
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public | |
// License for more details. | |
// | |
// You should have received a copy of the GNU Lesser General Public License | |
// along with TPIE. If not, see <http://www.gnu.org/licenses/> | |
#ifndef __TPIE_PIPELINING_PARALLEL_H__ | |
#define __TPIE_PIPELINING_PARALLEL_H__ | |
#include <tpie/pipelining/node.h> | |
#include <tpie/pipelining/factory_base.h> | |
#include <tpie/array_view.h> | |
#include <boost/shared_ptr.hpp> | |
#include <tpie/pipelining/order.h> | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \file parallel.h Parallel execution of nodes. | |
/// | |
/// Given a sequential computation as a partial pipeline, this parallel | |
/// framework naively parallelizes it by having multiple thread handle some | |
/// items each. | |
/// | |
/// Throughout the code, the input type is named T1, and the output type is | |
/// named T2. | |
/// | |
/// NOTE about item_type: | |
/// If your pipeline to be parallelised is foo() | bar() | baz(), then the | |
/// item_type of foo() must not depend on the destination type that follows | |
/// baz(). This requirement makes it possible for the framework to get the | |
/// input type (T1) before doing the real pipeline instantiation. This is | |
/// required since the synthesized parallel_bits::after node inserted | |
/// after baz() in the above example takes T1 and T2 as template parameters. | |
/// The template code that makes this happen is in | |
/// parallel_bits::factory::generated. | |
/// | |
/// This means that item_type of foo must not be declared inside the foo class. | |
/// When this is done, the type signature of item_type will implicitly depend | |
/// on the destination type, meaning the item_type changes when the baz | |
/// destination type changes. | |
/// END NOTE. | |
/// | |
/// Each worker has a pipeline instance of a parallel_bits::before pushing items to | |
/// the user-supplied pipeline which pushes to an instance of parallel_bits::after. | |
/// | |
/// The producer sits in the main thread and distributes item buffers to | |
/// parallel_bits::befores running in different threads, and the consumer | |
/// receives the items pushed to each after instance. | |
/// | |
/// All nodes have access to a single parallel_bits::state instance | |
/// which has the mutex and the necessary condition variables. | |
/// It also has pointers to the parallel_bits::before and | |
/// parallel_bits::after instances and it holds an array of worker states (of | |
/// enum type parallel_bits::worker_state). | |
/// It also has a options struct which contains the user-supplied | |
/// parameters to the framework (size of item buffer and number of concurrent | |
/// workers). | |
/// | |
/// The TPIE job framework is insufficient for this parallelization code, | |
/// since we get deadlocks if some of the workers are allowed to wait for a | |
/// ready tpie::job worker. Instead, we use boost::threads directly. | |
/// | |
/// Parallel worker states. The main thread has a condition variable | |
/// (producerCond) which is signalled every time a worker changes its own | |
/// state. Each worker thread has a condition variable (workerCond[]) which is | |
/// signalled when the main thread changes the worker's state. | |
/// | |
/// Initializing: Before the input/output buffers are initialized. | |
/// -> Idle (worker thread) | |
/// | |
/// Idle: Input/output buffers are empty. | |
/// -> Processing (main thread) | |
/// | |
/// Processing: Input buffer is full; output buffer is empty. | |
/// -> Partial output (worker thread; signals main) | |
/// -> Outputting (worker thread) | |
/// | |
/// Partial output: Output buffer is full; input buffer is non-empty. | |
/// -> Processing (main thread) | |
/// | |
/// Outputting: Output buffer is full; input buffer is empty. | |
/// -> Idle (main thread) | |
/// | |
/// TODO at some future point: Optimize code for the case where the buffer size | |
/// is one. | |
/////////////////////////////////////////////////////////////////////////////// | |
namespace tpie { | |
namespace pipelining { | |
namespace parallel_bits { | |
// predeclare | |
template <typename T> | |
class before; | |
template <typename dest_t> | |
class before_impl; | |
template <typename T> | |
class after; | |
template <typename T1, typename T2> | |
class state; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief User-supplied options to the parallelism framework. | |
/////////////////////////////////////////////////////////////////////////////// | |
struct options { | |
order::type maintainOrder; | |
size_t numJobs; | |
size_t bufSize; | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief States of the parallel worker state machine. | |
/////////////////////////////////////////////////////////////////////////////// | |
struct worker_state { | |
enum type { | |
/** The input is being written by the producer. */ | |
initializing, | |
/** The input is being written by the producer. */ | |
idle, | |
/** The worker is writing output. */ | |
processing, | |
/** The worker has filled its output buffer, but has not yet consumed the | |
* input buffer. */ | |
partial_output, | |
/** The output is being read by the consumer. */ | |
outputting | |
}; | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Aligned, uninitialized storage. | |
/// | |
/// This class provides access to an array of items aligned to any boundary | |
/// (mostly useful for powers of two). | |
/// They are not constructed or destructed; only the memory resource is | |
/// handled. | |
/// This is used for the nodes that are instantiated once for each | |
/// parallel thread of pipeline computation. They should be stored in an array | |
/// aligned to a cache line, to avoid cache lock contention. | |
/////////////////////////////////////////////////////////////////////////////// | |
template <typename T, size_t Align> | |
class aligned_array { | |
// Compute the size of an item with alignment padding (round up to nearest | |
// multiple of Align). | |
static const size_t aligned_size = (sizeof(T)+Align-1)/Align*Align; | |
uint8_t * m_data; | |
size_t m_size; | |
void dealloc() { | |
delete[] m_data; | |
m_size = 0; | |
} | |
public: | |
aligned_array() : m_data(0), m_size(0) {} | |
~aligned_array() { realloc(0); } | |
T * get(size_t idx) { | |
const size_t addr = (size_t) m_data; | |
// Find the aligned base of the array by rounding the pointer up to the | |
// nearest multiple of Align. | |
const size_t alignedBase = (addr + Align - 1)/Align*Align; | |
// Find the address of the element. | |
const size_t elmAddress = alignedBase + aligned_size * idx; | |
return (T *) elmAddress; | |
} | |
void realloc(size_t elms) { | |
dealloc(); | |
m_size = elms; | |
// The buffer we get is not guaranteed to be aligned to any boundary. | |
// Request Align extra bytes to ensure we can find an aligned buffer of | |
// size aligned_size*elms. | |
m_data = m_size ? new uint8_t[aligned_size * elms + Align] : 0; | |
} | |
size_t size() const { return m_size; } | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Class containing an array of node instances. We cannot use | |
/// tpie::array or similar, since we need to construct the elements in a | |
/// special way. This class is non-copyable since it resides in the refcounted | |
/// state class. | |
/// \tparam fact_t Type of factory constructing the worker | |
/// \tparam Output Type of output items | |
/////////////////////////////////////////////////////////////////////////////// | |
template <typename Input, typename Output> | |
class threads { | |
typedef before<Input> before_t; | |
protected: | |
static const size_t alignment = 64; | |
/** Progress indicator type */ | |
typedef progress_indicator_null pi_t; | |
aligned_array<pi_t, alignment> m_progressIndicators; | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Factory hook that sets the progress indicator of the | |
/// nodes run in parallel to the null progress indicator. | |
/// This way, we can collect the number of steps in the main thread. | |
/////////////////////////////////////////////////////////////////////////// | |
class progress_indicator_hook : public factory_init_hook { | |
threads * t; | |
public: | |
progress_indicator_hook(threads * t) | |
: t(t) | |
, index(0) | |
{ | |
} | |
virtual void init_node(node & r) /*override*/ { | |
r.set_progress_indicator(t->m_progressIndicators.get(index)); | |
} | |
size_t index; | |
}; | |
friend class progress_indicator_hook; | |
std::vector<before_t *> m_dests; | |
public: | |
before_t & operator[](size_t idx) { | |
return *m_dests[idx]; | |
} | |
stream_size_type sum_steps() { | |
stream_size_type res = 0; | |
for (size_t i = 0; i < m_progressIndicators.size(); ++i) { | |
res += m_progressIndicators.get(i)->get_current(); | |
} | |
return res; | |
} | |
virtual ~threads() {} | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Subclass of threads instantiating and managing the pipelines. | |
/////////////////////////////////////////////////////////////////////////////// | |
template <typename Input, typename Output, typename fact_t> | |
class threads_impl : public threads<Input, Output> { | |
private: | |
typedef threads<Input, Output> p_t; | |
/** Progress indicator type */ | |
typedef typename p_t::pi_t pi_t; | |
typedef after<Output> after_t; | |
typedef typename fact_t::template generated<after_t>::type worker_t; | |
typedef typename worker_t::item_type T1; | |
typedef Output T2; | |
typedef before_impl<worker_t> before_t; | |
static const size_t alignment = p_t::alignment; | |
typedef aligned_array<before_t, alignment> aligned_before_t; | |
/** Size of the m_dests array. */ | |
size_t numJobs; | |
/** Allocated array buffer. */ | |
aligned_before_t m_data; | |
public: | |
threads_impl(fact_t fact, | |
state<T1, T2> & st) | |
: numJobs(st.opts.numJobs) | |
{ | |
typename p_t::progress_indicator_hook hook(this); | |
fact.hook_initialization(&hook); | |
// uninitialized allocation | |
m_data.realloc(numJobs); | |
this->m_progressIndicators.realloc(numJobs); | |
this->m_dests.resize(numJobs); | |
// construct elements manually | |
for (size_t i = 0; i < numJobs; ++i) { | |
// for debugging: check that pointer is aligned. | |
if (((size_t) m_data.get(i)) % alignment != 0) { | |
log_warning() << "Thread " << i << " is not aligned: Address " | |
<< m_data.get(i) << " is off by " << | |
(((size_t) m_data.get(i)) % alignment) << " bytes" | |
<< std::endl; | |
} | |
hook.index = i; | |
new (this->m_progressIndicators.get(i)) pi_t(); | |
this->m_dests[i] = | |
new(m_data.get(i)) | |
before_t(st, i, fact.construct(after_t(st, i))); | |
} | |
} | |
virtual ~threads_impl() { | |
for (size_t i = 0; i < numJobs; ++i) { | |
m_data.get(i)->~before_t(); | |
this->m_progressIndicators.get(i)->~pi_t(); | |
} | |
m_data.realloc(0); | |
this->m_progressIndicators.realloc(0); | |
} | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Non-templated virtual base class of after. | |
/////////////////////////////////////////////////////////////////////////////// | |
class after_base : public node { | |
public: | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Called by before::worker to initialize buffers. | |
/////////////////////////////////////////////////////////////////////////// | |
virtual void worker_initialize() = 0; | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Called by before::worker after a batch of items has | |
/// been pushed. | |
/// \param complete Whether the entire input has been processed. | |
/////////////////////////////////////////////////////////////////////////// | |
virtual void flush_buffer(bool complete) = 0; | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Common state in parallel pipelining library. | |
/// This class is instantiated once and kept in a boost::shared_ptr, and it is | |
/// not copy constructible. | |
/// | |
/// Unless noted otherwise, a thread must own the state mutex to access other | |
/// parts of this instance. | |
/////////////////////////////////////////////////////////////////////////////// | |
class state_base { | |
public: | |
typedef boost::mutex mutex_t; | |
typedef boost::condition_variable cond_t; | |
typedef boost::unique_lock<boost::mutex> lock_t; | |
const options opts; | |
/** Single mutex. */ | |
mutex_t mutex; | |
/** Condition variable. | |
* | |
* Who waits: The producer, with the single mutex (waits until at least one | |
* worker has state = IDLE or state = OUTPUTTING). | |
* | |
* Who signals: The par_after, when a worker is OUTPUTTING. */ | |
cond_t producerCond; | |
/** Condition variable, one per worker. | |
* | |
* Who waits: The worker's par_before when waiting for input (wait for | |
* state = PROCESSING), the worker's par_after when waiting for output to | |
* be read (wait for state = IDLE). Waits with the single mutex. | |
* | |
* Who signals: par_producer, when input has been written (sets state to PROCESSING). | |
* par_consumer, when output has been read (sets state to IDLE). | |
*/ | |
cond_t * workerCond; | |
/** Are we done? Shared state, must have mutex to write. */ | |
bool done; | |
/** Shared state, must have mutex to write. */ | |
size_t runningWorkers; | |
/// Must not be used concurrently. | |
void set_input_ptr(size_t idx, node * v) { | |
m_inputs[idx] = v; | |
} | |
/// Must not be used concurrently. | |
void set_output_ptr(size_t idx, after_base * v) { | |
m_outputs[idx] = v; | |
} | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Get the specified before instance. | |
/// | |
/// Enables easy construction of the pipeline graph at runtime. | |
/// | |
/// Shared state, must have mutex to use. | |
/////////////////////////////////////////////////////////////////////////// | |
node & input(size_t idx) { return *m_inputs[idx]; } | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Get the specified after instance. | |
/// | |
/// Serves two purposes: | |
/// First, it enables easy construction of the pipeline graph at runtime. | |
/// Second, it is used by before to send batch signals to | |
/// after. | |
/// | |
/// Shared state, must have mutex to use. | |
/////////////////////////////////////////////////////////////////////////// | |
after_base & output(size_t idx) { return *m_outputs[idx]; } | |
/// Shared state, must have mutex to use. | |
worker_state::type get_state(size_t idx) { | |
return m_states[idx]; | |
} | |
/// Shared state, must have mutex to use. | |
void transition_state(size_t idx, worker_state::type from, worker_state::type to) { | |
if (m_states[idx] != from) { | |
std::stringstream ss; | |
ss << "Invalid state transition " << from << " -> " << to << "; current state is " << m_states[idx]; | |
log_error() << ss.str() << std::endl; | |
throw exception(ss.str()); | |
} | |
m_states[idx] = to; | |
} | |
protected: | |
std::vector<node *> m_inputs; | |
std::vector<after_base *> m_outputs; | |
std::vector<worker_state::type> m_states; | |
state_base(const options opts) | |
: opts(opts) | |
, done(false) | |
, runningWorkers(0) | |
, m_inputs(opts.numJobs, 0) | |
, m_outputs(opts.numJobs, 0) | |
, m_states(opts.numJobs, worker_state::initializing) | |
{ | |
workerCond = new cond_t[opts.numJobs]; | |
} | |
virtual ~state_base() { | |
delete[] workerCond; | |
} | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Instantiated in each thread. | |
/////////////////////////////////////////////////////////////////////////////// | |
template <typename T> | |
class parallel_input_buffer { | |
memory_size_type m_inputSize; | |
array<T> m_inputBuffer; | |
public: | |
array_view<T> get_input() { | |
return array_view<T>(&m_inputBuffer[0], m_inputSize); | |
} | |
void set_input(array_view<T> input) { | |
if (input.size() > m_inputBuffer.size()) | |
throw tpie::exception(m_inputBuffer.size() ? "Input too large" : "Input buffer not initialized"); | |
memory_size_type items = | |
std::copy(input.begin(), input.end(), m_inputBuffer.begin()) | |
-m_inputBuffer.begin(); | |
m_inputSize = items; | |
} | |
parallel_input_buffer(const options & opts) | |
: m_inputSize(0) | |
, m_inputBuffer(opts.bufSize) | |
{ | |
} | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Instantiated in each thread. | |
/////////////////////////////////////////////////////////////////////////////// | |
template <typename T> | |
class parallel_output_buffer { | |
memory_size_type m_outputSize; | |
array<T> m_outputBuffer; | |
friend class after<T>; | |
public: | |
array_view<T> get_output() { | |
return array_view<T>(&m_outputBuffer[0], m_outputSize); | |
} | |
parallel_output_buffer(const options & opts) | |
: m_outputSize(0) | |
, m_outputBuffer(opts.bufSize) | |
{ | |
} | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief State subclass containing the item type specific state, i.e. the | |
/// input/output buffers and the concrete pipes. | |
/////////////////////////////////////////////////////////////////////////////// | |
template <typename T1, typename T2> | |
class state : public state_base { | |
public: | |
typedef boost::shared_ptr<state> ptr; | |
typedef state_base::mutex_t mutex_t; | |
typedef state_base::cond_t cond_t; | |
typedef state_base::lock_t lock_t; | |
array<parallel_input_buffer<T1> *> m_inputBuffers; | |
array<parallel_output_buffer<T2> *> m_outputBuffers; | |
std::auto_ptr<threads<T1, T2> > pipes; | |
template <typename fact_t> | |
state(const options opts, const fact_t & fact) | |
: state_base(opts) | |
, m_inputBuffers(opts.numJobs) | |
, m_outputBuffers(opts.numJobs) | |
{ | |
typedef threads_impl<T1, T2, fact_t> pipes_impl_t; | |
pipes.reset(new pipes_impl_t(fact, *this)); | |
} | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Accepts output items and sends them to the main thread. | |
/////////////////////////////////////////////////////////////////////////////// | |
template <typename T> | |
class after : public after_base { | |
protected: | |
state_base & st; | |
size_t parId; | |
std::auto_ptr<parallel_output_buffer<T> > m_buffer; | |
array<parallel_output_buffer<T> *> & m_outputBuffers; | |
typedef state_base::lock_t lock_t; | |
public: | |
typedef T item_type; | |
template <typename Input> | |
after(state<Input, T> & state, | |
size_t parId) | |
: st(state) | |
, parId(parId) | |
, m_outputBuffers(state.m_outputBuffers) | |
{ | |
state.set_output_ptr(parId, this); | |
set_name("Parallel after", PRIORITY_INSIGNIFICANT); | |
} | |
after(const after & other) | |
: after_base(other) | |
, st(other.st) | |
, parId(other.parId) | |
, m_outputBuffers(other.m_outputBuffers) | |
{ | |
st.set_output_ptr(parId, this); | |
} | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Push to thread-local buffer; flush it when full. | |
/////////////////////////////////////////////////////////////////////////// | |
void push(const T & item) { | |
if (m_buffer->m_outputSize >= m_buffer->m_outputBuffer.size()) | |
flush_buffer_impl(false); | |
m_buffer->m_outputBuffer[m_buffer->m_outputSize++] = item; | |
} | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Invoked by before::worker (in worker thread context). | |
/////////////////////////////////////////////////////////////////////////// | |
virtual void worker_initialize() { | |
m_buffer.reset(new parallel_output_buffer<T>(st.opts)); | |
m_outputBuffers[parId] = m_buffer.get(); | |
} | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Invoked by before::push_all when all input items have been | |
/// pushed. | |
/// \param complete Whether the entire input has been processed. | |
/////////////////////////////////////////////////////////////////////////// | |
virtual void flush_buffer(bool complete) { | |
flush_buffer_impl(complete); | |
} | |
private: | |
bool is_done() const { | |
switch (st.get_state(parId)) { | |
case worker_state::initializing: | |
throw tpie::exception("INITIALIZING not expected in after::is_done"); | |
case worker_state::idle: | |
return true; | |
case worker_state::processing: | |
// The main thread may transition us from Outputting to Idle to | |
// Processing without us noticing, or it may transition us from | |
// Partial_Output to Processing. In either case, we are done | |
// flushing the buffer. | |
return true; | |
case worker_state::partial_output: | |
case worker_state::outputting: | |
return false; | |
} | |
throw std::runtime_error("Unknown state"); | |
} | |
void flush_buffer_impl(bool complete) { | |
if (m_buffer->m_outputSize == 0) | |
return; | |
lock_t lock(st.mutex); | |
st.transition_state(parId, worker_state::processing, complete ? worker_state::outputting : worker_state::partial_output); | |
// notify producer that output is ready | |
st.producerCond.notify_one(); | |
while (!is_done()) { | |
if (st.done) break; | |
st.workerCond[parId].wait(lock); | |
} | |
m_buffer->m_outputSize = 0; | |
} | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Accepts input items from the main thread and sends them down the | |
/// pipeline. This class contains the bulk of the code that is run in each | |
/// worker thread. | |
/////////////////////////////////////////////////////////////////////////////// | |
template <typename T> | |
class before : public node { | |
protected: | |
state_base & st; | |
size_t parId; | |
std::auto_ptr<parallel_input_buffer<T> > m_buffer; | |
array<parallel_input_buffer<T> *> & m_inputBuffers; | |
boost::thread m_worker; | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Overridden in subclass to push a buffer of items. | |
/////////////////////////////////////////////////////////////////////////// | |
virtual void push_all(array_view<T> items) = 0; | |
template <typename Output> | |
before(state<T, Output> & st, size_t parId) | |
: st(st) | |
, parId(parId) | |
, m_inputBuffers(st.m_inputBuffers) | |
{ | |
set_name("Parallel before", PRIORITY_INSIGNIFICANT); | |
} | |
// virtual dtor in node | |
before(const before & other) | |
: st(other.st) | |
, parId(other.parId) | |
, m_inputBuffers(other.m_inputBuffers) | |
{ | |
} | |
public: | |
typedef T item_type; | |
virtual void begin() /*override*/ { | |
node::begin(); | |
boost::thread t(run_worker, this); | |
m_worker.swap(t); | |
} | |
private: | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Check if we are ready to process a batch of input. | |
/////////////////////////////////////////////////////////////////////////// | |
bool ready() { | |
switch (st.get_state(parId)) { | |
case worker_state::initializing: | |
throw tpie::exception("INITIALIZING not expected in before::ready"); | |
case worker_state::idle: | |
return false; | |
case worker_state::processing: | |
return true; | |
case worker_state::partial_output: | |
throw std::runtime_error("State 'partial_output' was not expected in before::ready"); | |
case worker_state::outputting: | |
throw std::runtime_error("State 'outputting' was not expected in before::ready"); | |
} | |
throw std::runtime_error("Unknown state"); | |
} | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Class providing RAII-style bookkeeping of number of workers. | |
/////////////////////////////////////////////////////////////////////////// | |
class running_signal { | |
typedef state_base::cond_t cond_t; | |
memory_size_type & sig; | |
cond_t & producerCond; | |
public: | |
running_signal(memory_size_type & sig, cond_t & producerCond) | |
: sig(sig) | |
, producerCond(producerCond) | |
{ | |
++sig; | |
producerCond.notify_one(); | |
} | |
~running_signal() { | |
--sig; | |
producerCond.notify_one(); | |
} | |
}; | |
static void run_worker(before * self) { | |
self->worker(); | |
} | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Worker thread entry point. | |
/////////////////////////////////////////////////////////////////////////// | |
void worker() { | |
state_base::lock_t lock(st.mutex); | |
m_buffer.reset(new parallel_input_buffer<T>(st.opts)); | |
m_inputBuffers[parId] = m_buffer.get(); | |
// virtual invocation | |
st.output(parId).worker_initialize(); | |
st.transition_state(parId, worker_state::initializing, worker_state::idle); | |
running_signal _(st.runningWorkers, st.producerCond); | |
while (true) { | |
// wait for transition IDLE -> PROCESSING | |
while (!ready()) { | |
if (st.done) { | |
return; | |
} | |
st.workerCond[parId].wait(lock); | |
} | |
lock.unlock(); | |
// virtual invocation | |
push_all(m_buffer->get_input()); | |
lock.lock(); | |
} | |
} | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Concrete before class. | |
/////////////////////////////////////////////////////////////////////////////// | |
template <typename dest_t> | |
class before_impl : public before<typename dest_t::item_type> { | |
typedef typename dest_t::item_type item_type; | |
dest_t dest; | |
public: | |
template <typename Output> | |
before_impl(state<item_type, Output> & st, | |
size_t parId, | |
dest_t dest) | |
: before<item_type>(st, parId) | |
, dest(dest) | |
{ | |
this->add_push_destination(dest); | |
st.set_input_ptr(parId, this); | |
} | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Push all items from buffer and flush output buffer afterwards. | |
/// | |
/// If pipeline is one-to-one, that is, one item output for each item | |
/// input, then the flush at the end is not needed. | |
/////////////////////////////////////////////////////////////////////////// | |
virtual void push_all(array_view<item_type> items) { | |
for (size_t i = 0; i < items.size(); ++i) { | |
dest.push(items[i]); | |
} | |
// virtual invocation | |
this->st.output(this->parId).flush_buffer(true); | |
} | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Node running in main thread, accepting an output buffer | |
/// from the managing producer and forwards them down the pipe. The overhead | |
/// concerned with switching threads dominates the overhead of a virtual method | |
/// call, so this class only depends on the output type and leaves the pushing | |
/// of items to a virtual subclass. | |
/////////////////////////////////////////////////////////////////////////////// | |
template <typename T> | |
class consumer : public node { | |
public: | |
typedef T item_type; | |
virtual void consume(array_view<T>) = 0; | |
// node has virtual dtor | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Concrete consumer implementation. | |
/////////////////////////////////////////////////////////////////////////////// | |
template <typename Input, typename Output, typename dest_t> | |
class consumer_impl : public consumer<typename dest_t::item_type> { | |
typedef state<Input, Output> state_t; | |
typedef typename state_t::ptr stateptr; | |
dest_t dest; | |
stateptr st; | |
public: | |
typedef typename dest_t::item_type item_type; | |
consumer_impl(const dest_t & dest, stateptr st) | |
: dest(dest) | |
, st(st) | |
{ | |
this->add_push_destination(dest); | |
this->set_name("Parallel output", PRIORITY_INSIGNIFICANT); | |
for (size_t i = 0; i < st->opts.numJobs; ++i) { | |
this->add_pull_destination(st->output(i)); | |
} | |
} | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Push all items from output buffer to the rest of the pipeline. | |
/////////////////////////////////////////////////////////////////////////// | |
virtual void consume(array_view<item_type> a) /*override*/ { | |
for (size_t i = 0; i < a.size(); ++i) { | |
dest.push(a[i]); | |
} | |
} | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Producer, running in main thread, managing the parallel execution. | |
/// | |
/// This class contains the bulk of the code that is run in the main thread. | |
/////////////////////////////////////////////////////////////////////////////// | |
template <typename T1, typename T2> | |
class producer : public node { | |
public: | |
typedef T1 item_type; | |
private: | |
typedef state<T1, T2> state_t; | |
typedef typename state_t::ptr stateptr; | |
stateptr st; | |
array<T1> inputBuffer; | |
size_t written; | |
size_t readyIdx; | |
boost::shared_ptr<consumer<T2> > cons; | |
internal_queue<memory_size_type> m_outputOrder; | |
stream_size_type m_steps; | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Check if a worker is waiting for the main thread. | |
/// | |
/// A worker may wait for output to be fetched, or it may wait for input to | |
/// be sent. If there is a worker waiting, this function returns true and | |
/// sets the index of the waiting worker in this->readyIdx. | |
/////////////////////////////////////////////////////////////////////////// | |
bool has_ready_pipe() { | |
for (size_t i = 0; i < st->opts.numJobs; ++i) { | |
switch (st->get_state(i)) { | |
case worker_state::initializing: | |
case worker_state::processing: | |
break; | |
case worker_state::partial_output: | |
case worker_state::outputting: | |
// If we have to maintain order of items, the only | |
// outputting worker we consider to be waiting is the | |
// "front worker". | |
if (st->opts.maintainOrder == order::maintain && m_outputOrder.front() != i) | |
break; | |
// fallthrough | |
case worker_state::idle: | |
readyIdx = i; | |
return true; | |
} | |
} | |
return false; | |
} | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Check if a worker is waiting for the main thread to process its | |
/// output. | |
/// | |
/// This is used in end() instead of has_ready_pipe, since we do not care | |
/// about workers waiting for input when we don't have any input to send. | |
/// | |
/// Like has_ready_pipe, this function sets this->readyIdx if and only if | |
/// it returns true. | |
/////////////////////////////////////////////////////////////////////////// | |
bool has_outputting_pipe() { | |
for (size_t i = 0; i < st->opts.numJobs; ++i) { | |
switch (st->get_state(i)) { | |
case worker_state::initializing: | |
case worker_state::idle: | |
case worker_state::processing: | |
break; | |
case worker_state::partial_output: | |
case worker_state::outputting: | |
if (st->opts.maintainOrder == order::maintain && m_outputOrder.front() != i) | |
break; | |
readyIdx = i; | |
return true; | |
} | |
} | |
return false; | |
} | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Check if a worker is waiting for the main thread to process its | |
/// output. | |
/// | |
/// This is used in end() when we are waiting for workers to finish up. | |
/// When no worker is outputting and no worker is processing, all items | |
/// have been processed. | |
/// | |
/// Does not modify this->readyIdx. | |
/////////////////////////////////////////////////////////////////////////// | |
bool has_processing_pipe() { | |
for (size_t i = 0; i < st->opts.numJobs; ++i) { | |
switch (st->get_state(i)) { | |
case worker_state::initializing: | |
case worker_state::idle: | |
case worker_state::partial_output: | |
case worker_state::outputting: | |
break; | |
case worker_state::processing: | |
return true; | |
} | |
} | |
return false; | |
} | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Propagate progress information. | |
/////////////////////////////////////////////////////////////////////////// | |
void flush_steps() { | |
// The number of items has been forwarded along unchanged to all | |
// the workers (it is still a valid upper bound). | |
// | |
// This means the workers each expect to handle all the items, | |
// which means the number of steps reported in total is scaled up | |
// by the number of workers. | |
// | |
// Therefore, we similarly scale up the number of times we call step. | |
// In effect, every time step() is called once in a single worker, | |
// we process this as if all workers called step(). | |
stream_size_type steps = st->pipes->sum_steps(); | |
if (steps != m_steps) { | |
this->get_progress_indicator()->step(st->opts.numJobs*(steps - m_steps)); | |
m_steps = steps; | |
} | |
} | |
public: | |
template <typename consumer_t> | |
producer(stateptr st, const consumer_t & cons) | |
: st(st) | |
, written(0) | |
, cons(new consumer_t(cons)) | |
, m_steps(0) | |
{ | |
for (size_t i = 0; i < st->opts.numJobs; ++i) { | |
this->add_push_destination(st->input(i)); | |
} | |
this->set_name("Parallel input", PRIORITY_INSIGNIFICANT); | |
memory_size_type usage = | |
st->opts.numJobs * st->opts.bufSize * (sizeof(T1) + sizeof(T2)) // workers | |
+ st->opts.bufSize * sizeof(item_type) // our buffer | |
; | |
this->set_minimum_memory(usage); | |
this->add_push_destination(cons); | |
if (st->opts.maintainOrder == order::maintain) { | |
m_outputOrder.resize(st->opts.numJobs); | |
} | |
} | |
virtual void begin() /*override*/ { | |
node::begin(); | |
inputBuffer.resize(st->opts.bufSize); | |
} | |
/////////////////////////////////////////////////////////////////////////// | |
/// \brief Accumulate input buffer and send off to workers. | |
/// | |
/// Since the parallel producer and parallel consumer run single-threaded | |
/// in the main thread, producer::push is our only opportunity to have the | |
/// consumer call push on its destination. Thus, when we accumulate an | |
/// input buffer, before sending it off to a worker, we might want to have | |
/// the consumer consume an output buffer to free up a parallel worker. | |
/////////////////////////////////////////////////////////////////////////// | |
void push(item_type item) { | |
inputBuffer[written++] = item; | |
if (written < st->opts.bufSize) { | |
// Wait for more items before doing anything expensive such as | |
// locking. | |
return; | |
} | |
state_base::lock_t lock(st->mutex); | |
flush_steps(); | |
empty_input_buffer(lock); | |
} | |
private: | |
void empty_input_buffer(state_base::lock_t & lock) { | |
while (written > 0) { | |
while (!has_ready_pipe()) { | |
st->producerCond.wait(lock); | |
} | |
switch (st->get_state(readyIdx)) { | |
case worker_state::initializing: | |
throw tpie::exception("State 'INITIALIZING' not expected at this point"); | |
case worker_state::idle: | |
{ | |
// Send buffer to ready worker | |
item_type * first = &inputBuffer[0]; | |
item_type * last = first + written; | |
parallel_input_buffer<T1> & dest = *st->m_inputBuffers[readyIdx]; | |
dest.set_input(array_view<T1>(first, last)); | |
st->transition_state(readyIdx, worker_state::idle, worker_state::processing); | |
st->workerCond[readyIdx].notify_one(); | |
written = 0; | |
if (st->opts.maintainOrder == order::maintain) | |
m_outputOrder.push(readyIdx); | |
break; | |
} | |
case worker_state::processing: | |
throw std::runtime_error("State 'processing' not expected at this point"); | |
case worker_state::partial_output: | |
// Receive buffer (virtual invocation) | |
cons->consume(st->m_outputBuffers[readyIdx]->get_output()); | |
st->transition_state(readyIdx, worker_state::partial_output, worker_state::processing); | |
st->workerCond[readyIdx].notify_one(); | |
break; | |
case worker_state::outputting: | |
// Receive buffer (virtual invocation) | |
cons->consume(st->m_outputBuffers[readyIdx]->get_output()); | |
st->transition_state(readyIdx, worker_state::outputting, worker_state::idle); | |
st->workerCond[readyIdx].notify_one(); | |
if (st->opts.maintainOrder == order::maintain) { | |
if (m_outputOrder.front() != readyIdx) { | |
log_error() << "Producer: Expected " << readyIdx << " in front; got " | |
<< m_outputOrder.front() << std::endl; | |
throw tpie::exception("Producer got wrong entry from has_ready_pipe"); | |
} | |
m_outputOrder.pop(); | |
} | |
break; | |
} | |
} | |
} | |
public: | |
virtual void end() /*override*/ { | |
state_base::lock_t lock(st->mutex); | |
flush_steps(); | |
empty_input_buffer(lock); | |
bool done = false; | |
while (!done) { | |
while (!has_outputting_pipe()) { | |
if (!has_processing_pipe()) { | |
done = true; | |
break; | |
} | |
// All items pushed; wait for processors to complete | |
st->producerCond.wait(lock); | |
} | |
if (done) break; | |
// virtual invocation | |
cons->consume(st->m_outputBuffers[readyIdx]->get_output()); | |
if (st->get_state(readyIdx) == worker_state::partial_output) { | |
st->transition_state(readyIdx, worker_state::partial_output, worker_state::processing); | |
st->workerCond[readyIdx].notify_one(); | |
continue; | |
} | |
st->transition_state(readyIdx, worker_state::outputting, worker_state::idle); | |
if (st->opts.maintainOrder == order::maintain) { | |
if (m_outputOrder.front() != readyIdx) { | |
log_error() << "Producer: Expected " << readyIdx << " in front; got " | |
<< m_outputOrder.front() << std::endl; | |
throw tpie::exception("Producer got wrong entry from has_ready_pipe"); | |
} | |
m_outputOrder.pop(); | |
} | |
} | |
// Notify all workers that all processing is done | |
st->done = true; | |
for (size_t i = 0; i < st->opts.numJobs; ++i) { | |
st->workerCond[i].notify_one(); | |
} | |
while (st->runningWorkers > 0) { | |
st->producerCond.wait(lock); | |
} | |
// All workers terminated | |
flush_steps(); | |
inputBuffer.resize(0); | |
} | |
}; | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Factory instantiating a parallel multithreaded pipeline. | |
/////////////////////////////////////////////////////////////////////////////// | |
template <typename fact_t> | |
class factory : public factory_base { | |
fact_t fact; | |
const options opts; | |
public: | |
template <typename dest_t> | |
struct generated { | |
typedef typename dest_t::item_type T2; | |
typedef after<T2> after_t; | |
typedef typename fact_t::template generated<after_t>::type processor_t; | |
typedef typename processor_t::item_type T1; | |
typedef producer<T1, T2> type; | |
}; | |
factory(const fact_t & fact, const options opts) | |
: fact(fact) | |
, opts(opts) | |
{ | |
} | |
template <typename dest_t> | |
typename generated<dest_t>::type | |
construct(const dest_t & dest) const { | |
typedef generated<dest_t> gen_t; | |
typedef typename gen_t::T1 input_type; | |
typedef typename gen_t::T2 output_type; | |
typedef state<input_type, output_type> state_t; | |
typedef consumer_impl<input_type, output_type, dest_t> consumer_t; | |
typedef typename gen_t::type producer_t; | |
typename state_t::ptr st(new state_t(opts, fact)); | |
consumer_t consumer(dest, st); | |
this->init_node(consumer); | |
producer_t producer(st, consumer); | |
this->init_node(producer); | |
return producer; | |
} | |
}; | |
} // namespace parallel_bits | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Runs a pipeline in multiple threads. | |
/// \param maintainOrder Whether to make sure that items are processed and | |
/// output in the order they are input. | |
/// \param numJobs The number of threads to utilize for parallel execution. | |
/// \param bufSize The number of items to store in the buffer sent between | |
/// threads. | |
/////////////////////////////////////////////////////////////////////////////// | |
template <typename fact_t> | |
pipe_middle<parallel_bits::factory<fact_t> > | |
parallel(const pipe_middle<fact_t> & fact, order::type maintainOrder, size_t numJobs, size_t bufSize = 64) { | |
parallel_bits::options opts; | |
opts.maintainOrder = maintainOrder; | |
opts.numJobs = numJobs; | |
opts.bufSize = bufSize; | |
return pipe_middle<parallel_bits::factory<fact_t> > | |
(parallel_bits::factory<fact_t> | |
(fact.factory, opts)); | |
} | |
/////////////////////////////////////////////////////////////////////////////// | |
/// \brief Runs a pipeline in multiple threads, using the number of threads | |
/// reported by tpie::default_worker_count. | |
/// \param maintainOrder Whether to make sure that items are processed and | |
/// output in the order they are input. | |
/// \param numJobs The number of threads to utilize for parallel execution. | |
/// \param bufSize The number of items to store in the buffer sent between | |
/// threads. | |
/////////////////////////////////////////////////////////////////////////////// | |
template <typename fact_t> | |
pipe_middle<parallel_bits::factory<fact_t> > | |
parallel(const pipe_middle<fact_t> & fact, order::type maintainOrder = order::ignore) { | |
return parallel(fact, maintainOrder, default_worker_count()); | |
} | |
template <typename fact_t> | |
pipe_middle<parallel_bits::factory<fact_t> > | |
parallel(const pipe_middle<fact_t> & fact, bool maintainOrder, size_t numJobs, size_t bufSize = 64) { | |
log_fatal() << "The second argument to tpie::pipelining::parallel has changed.\n" | |
<< "Use maintain_order instead of true and arbitrary_order instead of false." | |
<< std::endl; | |
return parallel(fact, maintainOrder ? order::maintain : order::ignore, numJobs, bufSize); | |
} | |
template <typename fact_t> | |
pipe_middle<parallel_bits::factory<fact_t> > | |
parallel(const pipe_middle<fact_t> & fact, bool maintainOrder) { | |
log_fatal() << "The second argument to tpie::pipelining::parallel has changed.\n" | |
<< "Use maintain_order instead of true and arbitrary_order instead of false." | |
<< std::endl; | |
return parallel(fact, maintainOrder ? order::maintain : order::ignore); | |
} | |
} // namespace pipelining | |
} // namespace tpie | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment