boost.asio代码学习

一、placement new进行内存重用react

boost/asio/detail/reactive_socket_service_base.hpp中,async_receive须要建立一个reactive_socket_recv_op,该对象不是直接从系统new出来的,而是先查找空闲列表,找到一块可以放得下该op对象的内存(boost_asio_handler_alloc_helper::allocate),而后对该内存进行placement new构造对象。windows

// Allocate and construct an operation to wrap the handler.
    typedef reactive_socket_recv_op<MutableBufferSequence, Handler> op;
    typename op::ptr p = { boost::asio::detail::addressof(handler),
      boost_asio_handler_alloc_helpers::allocate(
        sizeof(op), handler), 0 };
    p.p = new (p.v) op(impl.socket_, impl.state_, buffers, flags, handler);

二、hash_map的实现多线程

在boost/detail/hash_map.hpp中,利用std::list实现了hash_map。做者并无为每一个bucket建一个容器来存放拉链的值,而是只用了一个std::list用于存放全部的值,每一个bucket存放的是该bucket中元素在list中的起始与终止iterator。hash_map在select_reactor.ipp中被用到,用于存储socket到对应op的映射。app

三、async_send/async_receive中的传入的bufferssocket

template <typename ConstBufferSequence, typename Handler>
  void async_send(base_implementation_type& impl,
      const ConstBufferSequence& buffers,
      socket_base::message_flags flags, Handler& handler)

async_send/async_receive是模板方法,发送的内容是一个buffers的序列,最终底层调用async

ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);

或者tcp

int WSAAPI WSASend (

  SOCKET s,

  LPWSABUF lpBuffers,

  DWORD dwBufferCount,

  LPDWORD lpNumberOfBytesSent,

  int iFlags,

  LPWSAOVERLAPPED lpOverlapped,

  LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine

  );

这样多个buffer不用先合成一个大的buffer,需是直接交给底层发送,相似于writev/readv。函数

这里的BufferSequence能够是简单的const_buffers_1/mutable_buffers_1,底层只包含一个const_buffer或mutable_buffer。用boost.asio.buffer对char*,len包装生成const_buffers_1或mutable_buffers_1。BufferSequence也能够是std::vector<const_buffer> 或是boost::array<mutable_buffer, 3>等。全部这些BufferSequence须要支持begin()与end()。this

因为 typedef reactive_socket_send_op<ConstBufferSequence, Handler> op也是模板定义,不一样的ConstBufferSequence会具现化不一样的reactive_socket_sendop,在reactive_socket_sendop中会存放ConstBufferSequence,当socket可写时,回调reactive_socket_send_op_base的perform。线程

static bool do_perform(reactor_op* base)
  {
    reactive_socket_send_op_base* o(
        static_cast<reactive_socket_send_op_base*>(base));

    buffer_sequence_adapter<boost::asio::const_buffer,
        ConstBufferSequence> bufs(o->buffers_);

    return socket_ops::non_blocking_send(o->socket_,
          bufs.buffers(), bufs.count(), o->flags_,
          o->ec_, o->bytes_transferred_);
  }

由模板类buffer_sequence_adapter使用偏特化机制,将BufferSequence中每一个Buffer的指针、长度信息写入LPWSABUF或是struct iovec*中,再由non_blocking_send调用WSASend或sendmsg发送数据。

四、  win_iocp_io_service        --------  task_io_service

       win_iocp_socket_service --------  reactive_socket_service   

       其中reactive_socket_service使用reactor来模拟proactor效果。

       (reactor主要有epoll_reactor/select_reactor/dev_poll_reactor/kqueue_reactor)

五、asio中的超时

      async_send/async_receive中都不能带超时,只能用另一个deadline_timer来实现,这样形成超时的代码与发送接收的回调代码只能分开来写,很不方便。 实际上,在reactor上加上超时仍是比较容易的,但多是windows的iocp却没有什么好的办法,咱们不能在iocp上面自由地取消一个操做,而只能取消一个socket上的全部操做或是关闭套节字,因此只能取交集了。

    windows下的超时使用CreateWaitableTimer/SetWaitableTimer并用独立线程来实现超时机制。(是否能够用RegisterWaitForSingleObject与UnregisterWaitEx函数来实现或者用timeSetEvent/timeKillEvent来实现?)

六、epoll_reactor中的per_descriptor_data

每一个套节字会在epoll_reactor中注册,由allocate_descriptor_state分配一个per_descriptor_data,存放在object_pool<descriptor_state> registered_descriptors_中; 而object_pool中包含两个list,live_list_一个用于存放当前已分配的descriptor_state,free_list_用于存放释放的descriptor_state,实现循环利用。

int epoll_reactor::register_descriptor(socket_type descriptor,
    epoll_reactor::per_descriptor_data& descriptor_data)
{
  descriptor_data = allocate_descriptor_state();

  {
    mutex::scoped_lock descriptor_lock(descriptor_data->mutex_);

    descriptor_data->reactor_ = this;
    descriptor_data->descriptor_ = descriptor;
    descriptor_data->shutdown_ = false;
  }

  epoll_event ev = { 0, { 0 } };
  ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET;
  descriptor_data->registered_events_ = ev.events;
  ev.data.ptr = descriptor_data;
  int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev);
  if (result != 0)
    return errno;

  return 0;
}
epoll_reactor::descriptor_state* epoll_reactor::allocate_descriptor_state()
{
  mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_);
  return registered_descriptors_.alloc();
}
epoll_event ev = { 0, { 0 } };
          ev.events = descriptor_data->registered_events_ | EPOLLOUT;
          ev.data.ptr = descriptor_data;
          if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0)
          {
            descriptor_data->registered_events_ |= ev.events;
          }

当epoll_wait返回时,直接将ev.data.ptr做为descriptor_data,而后进行处理。 这在单线程下没什么问题。但若是在多线程环境下,epoll_wait期间,该descriptor_data可能先被被释放(如调用tcp.socket.close)存入free_list_,而后再被另外一个tcp.socket分配,这样会形成另外一个socket产生错误的回调。这应该是个bug吧。。。

七、boost.asio.write与async_write_some

boost.asio.write循环调用async_write_some实现数据发送. asio/impl/write.hpp中的write_op的代码用于回调,operator()中第一次是start=1,之后的start都是0。这里的代码有点奇怪,switch里的default在for循环中。

void operator()(const boost::system::error_code& ec,
        std::size_t bytes_transferred, int start = 0)
    {
      switch (start_ = start)
      {
        case 1:
        buffers_.prepare(this->check_for_completion(ec, total_transferred_));
        for (;;)
        {
          stream_.async_write_some(buffers_,
              BOOST_ASIO_MOVE_CAST(write_op)(*this));
          return; default:
          total_transferred_ += bytes_transferred;
          buffers_.consume(bytes_transferred);
          buffers_.prepare(this->check_for_completion(ec, total_transferred_));
          if ((!ec && bytes_transferred == 0)
              || buffers_.begin() == buffers_.end())
            break;
        }

若是是本身写的话,估计是这样(boost的代码中能够节省一行async_write_some代码,大牛的思想果真不同凡响):

void operator()(const boost::system::error_code& ec,
        std::size_t bytes_transferred, int start = 0)
    {
      switch (start_ = start)
      {
	    case 1:
        	buffers_.prepare(this->check_for_completion(ec, total_transferred_));
        	stream_.async_write_some(buffers_,BOOST_ASIO_MOVE_CAST(write_op)(*this));
          	return; 
	    default:
        	total_transferred_ += bytes_transferred;
        	buffers_.consume(bytes_transferred);
        	buffers_.prepare(this->check_for_completion(ec, total_transferred_));
        	total_transferred_ += bytes_transferred;
          	if ((!ec && bytes_transferred == 0)|| buffers_.begin() == buffers_.end())
			    break;
        	stream_.async_write_some(buffers_,BOOST_ASIO_MOVE_CAST(write_op)(*this));
          	return; 
        }
    }

[to be continue]

相关文章
相关标签/搜索