Created
April 20, 2011 10:56
-
-
Save akihiroy/930988 to your computer and use it in GitHub Desktop.
overwrite_queue for Microsoft Asynchronous Agents Library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Concurrency | |
{ | |
template<class _Type> | |
class overwrite_queue : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>> | |
{ | |
private: | |
message<_Type> * _M_pMessage; ///< The message being stored | |
message<_Type> * _M_pReservedMessage; ///< The message being reserved | |
// non-copyable | |
overwrite_queue(const overwrite_queue&); | |
overwrite_queue& operator=(const overwrite_queue&); | |
protected: | |
// Target block | |
virtual message_status propagate_message(message<_Type> *_PMessage, ISource<_Type> *_PSource) | |
{ | |
_PMessage = _PSource->accept(_PMessage->msg_id(), this); | |
if (!_PMessage) return missed; | |
async_send(_PMessage); | |
return accepted; | |
} | |
virtual message_status send_message(message<_Type> *_PMessage, ISource<_Type> *_PSource) | |
{ | |
_PMessage = _PSource->accept(_PMessage->msg_id(), this); | |
if (!_PMessage) return missed; | |
sync_send(_PMessage); | |
return accepted; | |
} | |
// Source block | |
virtual void propagate_to_any_targets(message<_Type> *_PMessage) | |
{ | |
if (_PMessage) { | |
delete _M_pMessage; | |
_M_pMessage = _PMessage; | |
} | |
if (_M_pReservedFor) { | |
return; | |
} | |
if (_M_pMessage) { | |
for (target_iterator it = _M_connectedTargets.begin(); *it != NULL; ++it) { | |
message_status status = (*it)->propagate(_M_pMessage, this); | |
if (status == accepted || _M_pReservedFor) { | |
break; | |
} | |
} | |
} | |
} | |
virtual message<_Type> * accept_message(runtime_object_identity _MsgId) | |
{ | |
if (_M_pReservedFor) { | |
return NULL; | |
} | |
message<_Type> *msg = NULL; | |
if (_M_pMessage && _M_pMessage->msg_id() == _MsgId) { | |
msg = _M_pMessage; | |
_M_pMessage = NULL; | |
} | |
return msg; | |
} | |
virtual bool reserve_message(runtime_object_identity _MsgId) | |
{ | |
if (!_M_pMessage || _M_pMessage->msg_id() != _MsgId) { | |
return false; | |
} | |
_ASSERTE(_M_pReservedMessage == NULL); | |
// Move message to reserved message | |
_M_pReservedMessage = _M_pMessage; | |
_M_pMessage = NULL; | |
return true; | |
} | |
virtual message<_Type> * consume_message(runtime_object_identity _MsgId) | |
{ | |
message<_Type> *msg = NULL; | |
if (_M_pReservedMessage && _M_pReservedMessage->msg_id() == _MsgId) { | |
msg = _M_pReservedMessage; | |
_M_pReservedMessage = NULL; | |
} | |
return msg; | |
} | |
virtual void release_message(runtime_object_identity _MsgId) | |
{ | |
if (!_M_pReservedMessage || _M_pReservedMessage->msg_id() != _MsgId) { | |
throw message_not_found(); | |
} | |
if (_M_pMessage) { | |
// Arrived new message | |
delete _M_pReservedMessage; | |
} else { | |
_M_pMessage = _M_pReservedMessage; | |
} | |
_M_pReservedMessage = NULL; | |
} | |
virtual void resume_propagation() | |
{ | |
if (_M_pMessage) { | |
async_send(NULL); | |
} | |
} | |
public: | |
overwrite_queue() : _M_pMessage(NULL), _M_pReservedMessage(NULL) | |
{ | |
initialize_source_and_target(); | |
} | |
overwrite_queue(filter_method const& filter) : _M_pMessage(NULL), _M_pReservedMessage(NULL) | |
{ | |
initialize_source_and_target(); | |
register_filter(filter); | |
} | |
overwrite_queue(Scheduler& scheduler) : _M_pMessage(NULL), _M_pReservedMessage(NULL) | |
{ | |
initialize_source_and_target(&scheduler); | |
} | |
overwrite_queue(Scheduler& scheduler, filter_method const& filter) : _M_pMessage(NULL), _M_pReservedMessage(NULL) | |
{ | |
initialize_source_and_target(&scheduler); | |
register_filter(filter); | |
} | |
overwrite_queue(ScheduleGroup& schedule_group) : _M_pMessage(NULL), _M_pReservedMessage(NULL) | |
{ | |
initialize_source_and_target(NULL, &schedule_group); | |
} | |
overwrite_queue(ScheduleGroup& schedule_group, filter_method const& filter) : _M_pMessage(NULL), _M_pReservedMessage(NULL) | |
{ | |
initialize_source_and_target(NULL, &schedule_group); | |
register_filter(filter); | |
} | |
~overwrite_queue() | |
{ | |
// Remove all links. | |
remove_network_links(); | |
delete _M_pMessage; | |
delete _M_pReservedMessage; | |
} | |
bool enqueue(const _Type& item) | |
{ | |
return Concurrency::asend<_Type>(this, item); | |
} | |
_Type dequeue() | |
{ | |
return receive<_Type>(this); | |
} | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment