Skip to content

Instantly share code, notes, and snippets.

@akihiroy
Created April 20, 2011 10:56
Show Gist options
  • Save akihiroy/930988 to your computer and use it in GitHub Desktop.
Save akihiroy/930988 to your computer and use it in GitHub Desktop.
overwrite_queue for Microsoft Asynchronous Agents Library
namespace Concurrency
{
template<class _Type>
class overwrite_queue : public propagator_block<multi_link_registry<ITarget<_Type>>, multi_link_registry<ISource<_Type>>>
{
private:
message<_Type> * _M_pMessage; ///< The message being stored
message<_Type> * _M_pReservedMessage; ///< The message being reserved
// non-copyable
overwrite_queue(const overwrite_queue&);
overwrite_queue& operator=(const overwrite_queue&);
protected:
// Target block
virtual message_status propagate_message(message<_Type> *_PMessage, ISource<_Type> *_PSource)
{
_PMessage = _PSource->accept(_PMessage->msg_id(), this);
if (!_PMessage) return missed;
async_send(_PMessage);
return accepted;
}
virtual message_status send_message(message<_Type> *_PMessage, ISource<_Type> *_PSource)
{
_PMessage = _PSource->accept(_PMessage->msg_id(), this);
if (!_PMessage) return missed;
sync_send(_PMessage);
return accepted;
}
// Source block
virtual void propagate_to_any_targets(message<_Type> *_PMessage)
{
if (_PMessage) {
delete _M_pMessage;
_M_pMessage = _PMessage;
}
if (_M_pReservedFor) {
return;
}
if (_M_pMessage) {
for (target_iterator it = _M_connectedTargets.begin(); *it != NULL; ++it) {
message_status status = (*it)->propagate(_M_pMessage, this);
if (status == accepted || _M_pReservedFor) {
break;
}
}
}
}
virtual message<_Type> * accept_message(runtime_object_identity _MsgId)
{
if (_M_pReservedFor) {
return NULL;
}
message<_Type> *msg = NULL;
if (_M_pMessage && _M_pMessage->msg_id() == _MsgId) {
msg = _M_pMessage;
_M_pMessage = NULL;
}
return msg;
}
virtual bool reserve_message(runtime_object_identity _MsgId)
{
if (!_M_pMessage || _M_pMessage->msg_id() != _MsgId) {
return false;
}
_ASSERTE(_M_pReservedMessage == NULL);
// Move message to reserved message
_M_pReservedMessage = _M_pMessage;
_M_pMessage = NULL;
return true;
}
virtual message<_Type> * consume_message(runtime_object_identity _MsgId)
{
message<_Type> *msg = NULL;
if (_M_pReservedMessage && _M_pReservedMessage->msg_id() == _MsgId) {
msg = _M_pReservedMessage;
_M_pReservedMessage = NULL;
}
return msg;
}
virtual void release_message(runtime_object_identity _MsgId)
{
if (!_M_pReservedMessage || _M_pReservedMessage->msg_id() != _MsgId) {
throw message_not_found();
}
if (_M_pMessage) {
// Arrived new message
delete _M_pReservedMessage;
} else {
_M_pMessage = _M_pReservedMessage;
}
_M_pReservedMessage = NULL;
}
virtual void resume_propagation()
{
if (_M_pMessage) {
async_send(NULL);
}
}
public:
overwrite_queue() : _M_pMessage(NULL), _M_pReservedMessage(NULL)
{
initialize_source_and_target();
}
overwrite_queue(filter_method const& filter) : _M_pMessage(NULL), _M_pReservedMessage(NULL)
{
initialize_source_and_target();
register_filter(filter);
}
overwrite_queue(Scheduler& scheduler) : _M_pMessage(NULL), _M_pReservedMessage(NULL)
{
initialize_source_and_target(&scheduler);
}
overwrite_queue(Scheduler& scheduler, filter_method const& filter) : _M_pMessage(NULL), _M_pReservedMessage(NULL)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
overwrite_queue(ScheduleGroup& schedule_group) : _M_pMessage(NULL), _M_pReservedMessage(NULL)
{
initialize_source_and_target(NULL, &schedule_group);
}
overwrite_queue(ScheduleGroup& schedule_group, filter_method const& filter) : _M_pMessage(NULL), _M_pReservedMessage(NULL)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
~overwrite_queue()
{
// Remove all links.
remove_network_links();
delete _M_pMessage;
delete _M_pReservedMessage;
}
bool enqueue(const _Type& item)
{
return Concurrency::asend<_Type>(this, item);
}
_Type dequeue()
{
return receive<_Type>(this);
}
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment