一、placement new进行内存重用react
boost/asio/detail/reactive_socket_service_base.hpp中,async_receive须要建立一个reactive_socket_recv_op,该对象不是直接从系统new出来的,而是先查找空闲列表,找到一块可以放得下该op对象的内存(boost_asio_handler_alloc_helper::allocate),而后对该内存进行placement new构造对象。windows
// Allocate and construct an operation to wrap the handler. typedef reactive_socket_recv_op<MutableBufferSequence, Handler> op; typename op::ptr p = { boost::asio::detail::addressof(handler), boost_asio_handler_alloc_helpers::allocate( sizeof(op), handler), 0 }; p.p = new (p.v) op(impl.socket_, impl.state_, buffers, flags, handler);
二、hash_map的实现多线程
在boost/detail/hash_map.hpp中,利用std::list实现了hash_map。做者并无为每一个bucket建一个容器来存放拉链的值,而是只用了一个std::list用于存放全部的值,每一个bucket存放的是该bucket中元素在list中的起始与终止iterator。hash_map在select_reactor.ipp中被用到,用于存储socket到对应op的映射。app
三、async_send/async_receive中的传入的bufferssocket
template <typename ConstBufferSequence, typename Handler> void async_send(base_implementation_type& impl, const ConstBufferSequence& buffers, socket_base::message_flags flags, Handler& handler)
async_send/async_receive是模板方法,发送的内容是一个buffers的序列,最终底层调用async
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
或者tcp
int WSAAPI WSASend ( SOCKET s, LPWSABUF lpBuffers, DWORD dwBufferCount, LPDWORD lpNumberOfBytesSent, int iFlags, LPWSAOVERLAPPED lpOverlapped, LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine );
这样多个buffer不用先合成一个大的buffer,需是直接交给底层发送,相似于writev/readv。函数
这里的BufferSequence能够是简单的const_buffers_1/mutable_buffers_1,底层只包含一个const_buffer或mutable_buffer。用boost.asio.buffer对char*,len包装生成const_buffers_1或mutable_buffers_1。BufferSequence也能够是std::vector<const_buffer> 或是boost::array<mutable_buffer, 3>等。全部这些BufferSequence须要支持begin()与end()。this
因为 typedef reactive_socket_send_op<ConstBufferSequence, Handler> op也是模板定义,不一样的ConstBufferSequence会具现化不一样的reactive_socket_sendop,在reactive_socket_sendop中会存放ConstBufferSequence,当socket可写时,回调reactive_socket_send_op_base的perform。线程
static bool do_perform(reactor_op* base) { reactive_socket_send_op_base* o( static_cast<reactive_socket_send_op_base*>(base)); buffer_sequence_adapter<boost::asio::const_buffer, ConstBufferSequence> bufs(o->buffers_); return socket_ops::non_blocking_send(o->socket_, bufs.buffers(), bufs.count(), o->flags_, o->ec_, o->bytes_transferred_); }
由模板类buffer_sequence_adapter使用偏特化机制,将BufferSequence中每一个Buffer的指针、长度信息写入LPWSABUF或是struct iovec*中,再由non_blocking_send调用WSASend或sendmsg发送数据。
四、 win_iocp_io_service -------- task_io_service
win_iocp_socket_service -------- reactive_socket_service
其中reactive_socket_service使用reactor来模拟proactor效果。
(reactor主要有epoll_reactor/select_reactor/dev_poll_reactor/kqueue_reactor)
五、asio中的超时
async_send/async_receive中都不能带超时,只能用另一个deadline_timer来实现,这样形成超时的代码与发送接收的回调代码只能分开来写,很不方便。 实际上,在reactor上加上超时仍是比较容易的,但多是windows的iocp却没有什么好的办法,咱们不能在iocp上面自由地取消一个操做,而只能取消一个socket上的全部操做或是关闭套节字,因此只能取交集了。
windows下的超时使用CreateWaitableTimer/SetWaitableTimer并用独立线程来实现超时机制。(是否能够用RegisterWaitForSingleObject与UnregisterWaitEx函数来实现或者用timeSetEvent/timeKillEvent来实现?)
六、epoll_reactor中的per_descriptor_data
每一个套节字会在epoll_reactor中注册,由allocate_descriptor_state分配一个per_descriptor_data,存放在object_pool<descriptor_state> registered_descriptors_中; 而object_pool中包含两个list,live_list_一个用于存放当前已分配的descriptor_state,free_list_用于存放释放的descriptor_state,实现循环利用。
int epoll_reactor::register_descriptor(socket_type descriptor, epoll_reactor::per_descriptor_data& descriptor_data) { descriptor_data = allocate_descriptor_state(); { mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); descriptor_data->reactor_ = this; descriptor_data->descriptor_ = descriptor; descriptor_data->shutdown_ = false; } epoll_event ev = { 0, { 0 } }; ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLPRI | EPOLLET; descriptor_data->registered_events_ = ev.events; ev.data.ptr = descriptor_data; int result = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, descriptor, &ev); if (result != 0) return errno; return 0; }
epoll_reactor::descriptor_state* epoll_reactor::allocate_descriptor_state() { mutex::scoped_lock descriptors_lock(registered_descriptors_mutex_); return registered_descriptors_.alloc(); }
epoll_event ev = { 0, { 0 } }; ev.events = descriptor_data->registered_events_ | EPOLLOUT; ev.data.ptr = descriptor_data; if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0) { descriptor_data->registered_events_ |= ev.events; }
当epoll_wait返回时,直接将ev.data.ptr做为descriptor_data,而后进行处理。 这在单线程下没什么问题。但若是在多线程环境下,epoll_wait期间,该descriptor_data可能先被被释放(如调用tcp.socket.close)存入free_list_,而后再被另外一个tcp.socket分配,这样会形成另外一个socket产生错误的回调。这应该是个bug吧。。。
七、boost.asio.write与async_write_some
boost.asio.write循环调用async_write_some实现数据发送. asio/impl/write.hpp中的write_op的代码用于回调,operator()中第一次是start=1,之后的start都是0。这里的代码有点奇怪,switch里的default在for循环中。
void operator()(const boost::system::error_code& ec, std::size_t bytes_transferred, int start = 0) { switch (start_ = start) { case 1: buffers_.prepare(this->check_for_completion(ec, total_transferred_)); for (;;) { stream_.async_write_some(buffers_, BOOST_ASIO_MOVE_CAST(write_op)(*this)); return; default: total_transferred_ += bytes_transferred; buffers_.consume(bytes_transferred); buffers_.prepare(this->check_for_completion(ec, total_transferred_)); if ((!ec && bytes_transferred == 0) || buffers_.begin() == buffers_.end()) break; }
若是是本身写的话,估计是这样(boost的代码中能够节省一行async_write_some代码,大牛的思想果真不同凡响):
void operator()(const boost::system::error_code& ec, std::size_t bytes_transferred, int start = 0) { switch (start_ = start) { case 1: buffers_.prepare(this->check_for_completion(ec, total_transferred_)); stream_.async_write_some(buffers_,BOOST_ASIO_MOVE_CAST(write_op)(*this)); return; default: total_transferred_ += bytes_transferred; buffers_.consume(bytes_transferred); buffers_.prepare(this->check_for_completion(ec, total_transferred_)); total_transferred_ += bytes_transferred; if ((!ec && bytes_transferred == 0)|| buffers_.begin() == buffers_.end()) break; stream_.async_write_some(buffers_,BOOST_ASIO_MOVE_CAST(write_op)(*this)); return; } }
[to be continue]