Skip to content

Instantly share code, notes, and snippets.

@Jackarain
Last active March 16, 2019 18:09
Show Gist options
  • Save Jackarain/d227ca5da23df741d64f00b8579b4748 to your computer and use it in GitHub Desktop.
Save Jackarain/d227ca5da23df741d64f00b8579b4748 to your computer and use it in GitHub Desktop.
asio中associated_allocator和associated_executor的精巧设计

在asio中,设计了associated_allocator和associated_executor用于用户管理handler统一分配内存和统一调用,通过adl实现hook结构。

如用户提交一个异步操作,此时用户的handler需要保存在asio的内部op队列中,待操作完成,asio将从op队列中取出该op(不一定要直接取,像iocp中,可以绑定在OVERLAPPED结构中,事件完成,直接可以OVERLAPPED中拿到),然后通过op中的handler回调用户,通知异步操作完成。

其中,asio使用了associated_allocator和associated_executor这2个设计,在asio内部存取用户handler到op队列的时候用到,以实现用户定义的allocator和executor的调用。

下面是关于iocp(默认proactor模型更易于分析)中async_receive异步处理的分析(async_send类似),非关键部分代码省略,只例出与主题相关的代码。

// 文件 boost/boost/asio/detail/win_iocp_operation.hpp
class win_iocp_operation
  : public OVERLAPPED
{
public:
  typedef win_iocp_operation operation_type;

  typedef void (*func_type)(
      void*, win_iocp_operation*,
      const boost::system::error_code&, std::size_t);

  void complete(void* owner, const boost::system::error_code& ec,
      std::size_t bytes_transferred)
  {
    func_(owner, this, ec, bytes_transferred);
  }

  void destroy()
  {
    func_(0, this, boost::system::error_code(), 0);
  }

  win_iocp_operation(func_type func)
    : next_(0),
      func_(func)
  {
    reset();
  }

   win_iocp_operation* next_;
   func_type func_;
};

// 文件 boost/boost/asio/detail/operation.hpp
typedef win_iocp_operation operation;

// 文件 boost/boost/asio/detail/win_iocp_socket_recv_op.hpp
template <typename MutableBufferSequence, typename Handler>
class win_iocp_socket_recv_op : public operation
{
	// BOOST_ASIO_DEFINE_HANDLER_PTR(win_iocp_socket_recv_op);
	struct ptr
	{
		Handler* h;  // 用户handler的实际地址指针.
		win_iocp_socket_recv_op* v; // 分配的handler内存, 通过adl来调用分配内存, 如果用户handler有定义自己的分配器, 则调用用户自己的分配器.
		win_iocp_socket_recv_op* p; // 指向v, 构造好的win_iocp_socket_recv_op对象, 通过构造函数保存了buffers_和handler_, move/ref。
		
		// 这一大段就是为了从用户handler中调用allocate分配内存,如果用户handler不存在allocate.
		// 则使用asio自己的allocate,这里是adl方法实现.
		static op* allocate(Handler& handler)
		{
		  typedef typename ::boost::asio::associated_allocator<
			Handler>::type associated_allocator_type;
		  typedef typename ::boost::asio::detail::get_hook_allocator<
			Handler, associated_allocator_type>::type hook_allocator_type;
			
		  // 通过rebind定义分配op(即: win_iocp_socket_recv_op)类型的分配器a
		  // 然后使用a分配win_iocp_socket_recv_op内存空间.
		  BOOST_ASIO_REBIND_ALLOC(hook_allocator_type, op) a(
				::boost::asio::detail::get_hook_allocator<
				  Handler, associated_allocator_type>::get(
					handler, ::boost::asio::get_associated_allocator(handler)));
		  return a.allocate(1);
		}

		void reset()
		{
		  if (p)
		  {
			p->~op(); // 手工调用析构函数,销毁当前win_iocp_socket_recv_op对象, 注意,这时对象所处内存还在v上.
			p = 0;
		  }
		  if (v)
		  {
		    // 同上,调用deallocate释放内存.
			typedef typename ::boost::asio::associated_allocator<
			  Handler>::type associated_allocator_type;
			typedef typename ::boost::asio::detail::get_hook_allocator<
			  Handler, associated_allocator_type>::type hook_allocator_type;
			BOOST_ASIO_REBIND_ALLOC(hook_allocator_type, op) a(
				  ::boost::asio::detail::get_hook_allocator<
					Handler, associated_allocator_type>::get(
					  *h, ::boost::asio::get_associated_allocator(*h)));
			a.deallocate(static_cast<op*>(v), 1);
			v = 0;
		  }
		}
	  }
	  
	};
	
	win_iocp_socket_recv_op(
      const MutableBufferSequence& buffers, Handler& handler)
      // 注意&win_iocp_socket_recv_op::do_complete保存到了父类func_指针, 这是为了可以通过父类
      // win_iocp_operation 非模板类指针,实现win_iocp_socket_recv_op模板的类型擦除,然后通过
      // static win_iocp_socket_recv_op::do_complete函数恢复模板win_iocp_socket_recv_op类型.
    : operation(&win_iocp_socket_recv_op::do_complete),
      buffers_(buffers),
      handler_(BOOST_ASIO_MOVE_CAST(Handler)(handler))
  {
    handler_work<Handler>::start(handler_);
  }

static void do_complete(void* owner, operation* base,
      const boost::system::error_code& result_ec,
      std::size_t bytes_transferred)
  {
    boost::system::error_code ec(result_ec);
	
	// 将base转回win_iocp_socket_recv_op指针,即`this`.
	win_iocp_socket_recv_op* o(static_cast<win_iocp_socket_recv_op*>(base));
	
	handler_work<Handler> w(o->handler_);
	
	// 构造一个ptr, 其成员: h = this->handler_, v = this, p = this
	// 即恢复原来在async_receive中placement new的ptr对象.
	ptr p = { boost::asio::detail::addressof(o->handler_), o, o };
	
	// 这里将用户handler保存到binder2中, 然后调用p.reset释放`this`.
	detail::binder2<Handler, boost::system::error_code, std::size_t>
      handler(o->handler_, ec, bytes_transferred);
    p.h = boost::asio::detail::addressof(handler.handler_);
    p.reset();

	// 调用handler.
	w.complete(handler, handler.handler_);
  }

  MutableBufferSequence buffers_;
  Handler handler_;
};


  // 文件 boost/boost/asio/detail/win_iocp_socket_service_base.hpp
  template <typename MutableBufferSequence, typename Handler>
  void async_receive(base_implementation_type& impl,
      const MutableBufferSequence& buffers,
      socket_base::message_flags flags, Handler& handler)
  {
    typedef win_iocp_socket_recv_op<MutableBufferSequence, Handler> op;

	// 分配的handler内存, 通过adl来调用分配内存, 如果用户handler有定义自己的分配器, 则调用用户自己的分配器.
    typename op::ptr p = { boost::asio::detail::addressof(handler),
      op::ptr::allocate(handler), 0 };

	// placement new, 通过指向v的内存块, 构造好的win_iocp_socket_recv_op对象, 并通过构造函数保存了buffers_和handler_, move/ref。
    p.p = new (p.v) op(impl.state_, impl.cancel_token_, buffers, handler);
	
	// 通过os接口投递异步请求.
	// p.p 将作为OVERLAPPED参数在WSARecv,事件完成时可直接拿到op对象指针.
    start_receive_op(impl, bufs.buffers(), bufs.count(), flags,
        (impl.state_ & socket_ops::stream_oriented) != 0 && bufs.all_empty(),
        p.p);
    p.v = p.p = 0;
  }


// 文件 boost/boost/asio/detail/impl/win_iocp_io_context.ipp
// run 分析.
size_t win_iocp_io_context::do_one(DWORD msec, boost::system::error_code& ec)
{
    DWORD bytes_transferred = 0;
    dword_ptr_t completion_key = 0;
    LPOVERLAPPED overlapped = 0;
    ::SetLastError(0);
    BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle,
        &bytes_transferred, &completion_key, &overlapped,
        msec < gqcs_timeout_ ? msec : gqcs_timeout_);
    DWORD last_error = ::GetLastError();

	if (overlapped)
	{
		// 强制转回win_iocp_operation类型,然而实际类型应该是win_iocp_socket_recv_op
		// 这里win_iocp_operation是win_iocp_socket_recv_op的父类.
		win_iocp_operation* op = static_cast<win_iocp_operation*>(overlapped);
		
		// 调用complete,注意,这是一个CRTP设计,通过complete调用win_iocp_socket_recv_op
		// 的do_complete,实际就是调用win_iocp_socket_recv_op的do_complete。
		// 在do_complete中处理完成事件,比如回调handler_,在那里类型已经恢复为win_iocp_socket_recv_op.
		op->complete(this, result_ec, bytes_transferred);
	}
}

阅读上面代码,需要一定c++和异步相关的基础,否则可能会看不明白,通过这些代码可以学习到很多c++的设计软件系统的思想,比如:关于类型擦除与类型恢复,rebind的使用,adl实现hook机制的调用,通过allocator的设计将库内部内存分配交由用户来分配,库做到内部无内存分配(用户若不想自己实现分配,asio调用默认c++分配器分配)。

asio设计非常精巧,通过一系列模板在编译期完成大量工作,并且做到了0内存分配(需要heap的部分可全交由用户自己实现分配),生成无任何多余开销的代码。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment