本文基于Boost 1.69,在展现源代码时删减了部分deprecated或者不一样配置的与本文主题无关的代码块。html
本期讨论的是Asio中涉及的并发编程实践,依旧是基于源代码进行解析。react
scheduler
操做队列不可避免的要考虑多线程的问题:操做队列与线程的关系,操做队列的线程安全问题以及操做在多线程环境的执行。算法
call_stack
and context
。查看源代码可知,call_stack
包含一个tss_ptr<context>
类型的静态数据成员top_
,其中tss_ptr
为thread specific storage指针,在Unix平台经过::pthread_xxxxxx
接口将某个地址与Thread-specific key绑定;context
是call_stack
的嵌套类,有趣的是,context
的构造函数是一个push操做,而析构函数是pop操做,操做对象是top_
。编程
conditionally_enabled_mutex
and conditionally_enabled_event
。基于std::condition_variable
(或其它相似的实现),实现了一些常见的线程控制功能。conditionally_enabled_mutex
额外包装了一个数据成员enabled_
,当enabled_
等于false时,不进行相应的操做。安全
调度过程从两个角度去分析,(生产)用户提交任务和(消费并生产)io_context
的event processing loop。多线程
Asio提交任务的两个典型的内部接口是scheduler::post_immediate_completion
函数(用于提交通常性任务,查看boost::asio::post
源码可知)和reactor::start_op
(用于提交io相关任务,查看basic_stream_socket
源码可知)方法。查看scheduler::post_immediate_completion
源码,涉及到并发的操做很简单,加锁,将任务放入scheduler
数据成员op_queue_
,解锁并唤醒一个线程。并发
// file: <boost/asio/detail/impl/scheduler.ipp> ... void scheduler::post_immediate_completion( scheduler::operation* op, bool is_continuation) { #if defined(BOOST_ASIO_HAS_THREADS) if (one_thread_ || is_continuation) { if (thread_info_base* this_thread = thread_call_stack::contains(this)) { ++static_cast<thread_info*>(this_thread)->private_outstanding_work; static_cast<thread_info*>(this_thread)->private_op_queue.push(op); return; } } #else // defined(BOOST_ASIO_HAS_THREADS) (void)is_continuation; #endif // defined(BOOST_ASIO_HAS_THREADS) work_started(); mutex::scoped_lock lock(mutex_); op_queue_.push(op); wake_one_thread_and_unlock(lock); } ...
查看reactor::start_op
源码。注意到RAII风格的互斥包装器descriptor_lock
获取的是对于某个descriptor的锁。针对不一样的socket的reactor::start_op
能够并行执行。本文的主题是concurrency,因此reactor::start_op
函数体这里不作过多的介绍。注意末尾的scheduler_.work_started
,该函数仅仅执行++outstanding_work_
。异步
// file: <boost/asio/detail/impl/epoll_reactor.ipp> ... void epoll_reactor::start_op(int op_type, socket_type descriptor, epoll_reactor::per_descriptor_data& descriptor_data, reactor_op* op, bool is_continuation, bool allow_speculative) { if (!descriptor_data) { op->ec_ = boost::asio::error::bad_descriptor; post_immediate_completion(op, is_continuation); return; } mutex::scoped_lock descriptor_lock(descriptor_data->mutex_); if (descriptor_data->shutdown_) { post_immediate_completion(op, is_continuation); return; } if (descriptor_data->op_queue_[op_type].empty()) { if (allow_speculative && (op_type != read_op || descriptor_data->op_queue_[except_op].empty())) { if (descriptor_data->try_speculative_[op_type]) { if (reactor_op::status status = op->perform()) { if (status == reactor_op::done_and_exhausted) if (descriptor_data->registered_events_ != 0) descriptor_data->try_speculative_[op_type] = false; descriptor_lock.unlock(); scheduler_.post_immediate_completion(op, is_continuation); return; } } if (descriptor_data->registered_events_ == 0) { op->ec_ = boost::asio::error::operation_not_supported; scheduler_.post_immediate_completion(op, is_continuation); return; } if (op_type == write_op) { if ((descriptor_data->registered_events_ & EPOLLOUT) == 0) { epoll_event ev = { 0, { 0 } }; ev.events = descriptor_data->registered_events_ | EPOLLOUT; ev.data.ptr = descriptor_data; if (epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev) == 0) { descriptor_data->registered_events_ |= ev.events; } else { op->ec_ = boost::system::error_code(errno, boost::asio::error::get_system_category()); scheduler_.post_immediate_completion(op, is_continuation); return; } } } } else if (descriptor_data->registered_events_ == 0) { op->ec_ = boost::asio::error::operation_not_supported; scheduler_.post_immediate_completion(op, is_continuation); return; } else { if (op_type == write_op) { descriptor_data->registered_events_ |= EPOLLOUT; } epoll_event ev = { 0, { 0 } }; ev.events = descriptor_data->registered_events_; ev.data.ptr = descriptor_data; epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, descriptor, &ev); } } descriptor_data->op_queue_[op_type].push(op); scheduler_.work_started(); } ...
接下来是负责“消费和生产”的io_context
的event processing loop。loop主要调用其成员的scheduler::run
。从scheduler::run
入手了解操做队列的调用过程:socket
this_thread
(成员private_op_queue
)scheduler
地址和local变量this_thread
地址入栈mutex_
,其中mutex_
为scheduler
数据成员do_run_one
,lock mutex_
,循环scheduler
地址和local变量this_thread
地址出栈scheduler::do_run_one
。如今来分析scheduler::do_run_one
的执行过程:async
当scheduler
的操做队列op_queue_
不为空时
op_queue_
顶部成员o
并pop op_queue_
若是o
等于&task_operation_
unlock_and_signal_one
,不然unlock
。剩下的部分能够并发执行:task_cleanup
实例reactor::run
,传入的操做队列为线程私有队列task_cleanup
实例析构,cleanup(下文解析)若是o
不等于&task_operation_
unlock_and_signal_one
,不然unlock
。剩下的部分能够并发执行:work_cleanup
实例o->complete
(完成操做队列首位的操做)work_cleanup
实例析构,cleanup当scheduler
的操做队列op_queue_
为空时
wakeup_event_
clear and wait,等待其余线程唤醒本线程介绍一下task_cleanup
类,查看源码发现task_cleanup
惟一的成员函数为析构函数,主要功能也由其实现:
scheduler_->outstanding_work_
进行increment(非原子类型)this_thread_->private_outstanding_work
操做。scheduler_->op_queue_.push(this_thread_->private_op_queue)
等操做。work_cleanup
略微不一样,读者请自行阅读源码了解。
// file: <boost/asio/detail/impl/scheduler.ipp> ... std::size_t scheduler::run(boost::system::error_code& ec) { ec = boost::system::error_code(); if (outstanding_work_ == 0) { stop(); return 0; } thread_info this_thread; this_thread.private_outstanding_work = 0; thread_call_stack::context ctx(this, this_thread); mutex::scoped_lock lock(mutex_); std::size_t n = 0; for (; do_run_one(lock, this_thread, ec); lock.lock()) if (n != (std::numeric_limits<std::size_t>::max)()) ++n; return n; } ... std::size_t scheduler::do_run_one(mutex::scoped_lock& lock, scheduler::thread_info& this_thread, const boost::system::error_code& ec) { while (!stopped_) { if (!op_queue_.empty()) { // Prepare to execute first handler from queue. operation* o = op_queue_.front(); op_queue_.pop(); bool more_handlers = (!op_queue_.empty()); if (o == &task_operation_) { task_interrupted_ = more_handlers; if (more_handlers && !one_thread_) wakeup_event_.unlock_and_signal_one(lock); else lock.unlock(); task_cleanup on_exit = { this, &lock, &this_thread }; (void)on_exit; // Run the task. May throw an exception. Only block if the operation // queue is empty and we're not polling, otherwise we want to return // as soon as possible. task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue); } else { std::size_t task_result = o->task_result_; if (more_handlers && !one_thread_) wake_one_thread_and_unlock(lock); else lock.unlock(); // Ensure the count of outstanding work is decremented on block exit. work_cleanup on_exit = { this, &lock, &this_thread }; (void)on_exit; // Complete the operation. May throw an exception. Deletes the object. o->complete(this, ec, task_result); return 1; } } else { wakeup_event_.clear(lock); wakeup_event_.wait(lock); } } return 0; } ... ~task_cleanup() { if (this_thread_->private_outstanding_work > 0) { boost::asio::detail::increment( scheduler_->outstanding_work_, this_thread_->private_outstanding_work); } this_thread_->private_outstanding_work = 0; // Enqueue the completed operations and reinsert the task at the end of // the operation queue. lock_->lock(); scheduler_->task_interrupted_ = true; scheduler_->op_queue_.push(this_thread_->private_op_queue); scheduler_->op_queue_.push(&scheduler_->task_operation_); } ...
学习scheduler
源码发现,其并发特性以下:
scheduler
数据成员op_queue_
的操做必须获取scheduler
自身的锁来完成,没法并发scheduler
数据成员(原子类型)outstanding_work_
的操做为原子操做reactor::run
的队列参数为线程私有队列,其内部epoll_wait并发执行。reactor::start_op
须要获取descriptor的锁,不一样descriptor之间能够并发执行。值得注意的是关于op_queue_
的几乎全部操做都须要在加锁互斥的状况下完成,这听上去有些不怎么“并发”。Boost有一个lockfree队列实现,虽然能够避免锁的使用,然而这种算法在实际运用中一般比基于锁的算法表现更差。并且scheduler
锁只是在op_queue_
获取元素(指针)及pop元素的这一个较短的时间段内持有,用户操做的执行并不须要锁,综合来看并发能力也不算差。
当咱们要求用户的多个操做互斥时,能够经过strand完成。咱们能够经过strand::dispatch
提交互斥操做,具体实现为detail::strand_executor_service::dispatch
,其执行过程以下:
strand_executor_service::enqueue
,将返回值保存于first
first
为真则dispatch被invoker
类包装的strand implementation。// file: <boost/asio/strand.hpp> ... template <typename Function, typename Allocator> void dispatch(BOOST_ASIO_MOVE_ARG(Function) f, const Allocator& a) const { detail::strand_executor_service::dispatch(impl_, executor_, BOOST_ASIO_MOVE_CAST(Function)(f), a); } ... // file: <boost/asio/detail/impl/strand_executor_service.hpp> ... template <typename Executor, typename Function, typename Allocator> void strand_executor_service::dispatch(const implementation_type& impl, Executor& ex, BOOST_ASIO_MOVE_ARG(Function) function, const Allocator& a) { typedef typename decay<Function>::type function_type; // If we are already in the strand then the function can run immediately. if (call_stack<strand_impl>::contains(impl.get())) { // Make a local, non-const copy of the function. function_type tmp(BOOST_ASIO_MOVE_CAST(Function)(function)); fenced_block b(fenced_block::full); boost_asio_handler_invoke_helpers::invoke(tmp, tmp); return; } // Allocate and construct an operation to wrap the function. typedef executor_op<function_type, Allocator> op; typename op::ptr p = { detail::addressof(a), op::ptr::allocate(a), 0 }; p.p = new (p.v) op(BOOST_ASIO_MOVE_CAST(Function)(function), a); BOOST_ASIO_HANDLER_CREATION((impl->service_->context(), *p.p, "strand_executor", impl.get(), 0, "dispatch")); // Add the function to the strand and schedule the strand if required. bool first = enqueue(impl, p.p); p.v = p.p = 0; if (first) ex.dispatch(invoker<Executor>(impl, ex), a); } ...
接上文,关键函数为strand_executor_service::enqueue
和invoker::operator()
。其中:
strand_executor_service::enqueue
负责在加锁状态下操做入列,并经过对一个bool变量的断定和赋值来肯定是否第一个获取锁invoker::operator()
:
strand_impl
入栈call_stack<strand_impl>
。ready_queue_
内全部操做,注意因为call_stack<strand_impl>
的使用,若是一个操做在执行过程调用了同一个strand_impl
的dispatch,则被dispatch的操做会当即执行调用on_invoker_exit
析构函数:
waiting_queue_
的成员移动到ready_queue_
ready_queue_
为空则清除locked_
(代表做为"当前第一个"获取锁的线程,相关工做已经完成)ready_queue_
不为空则post invoker
// file: <boost/asio/detail/impl/strand_executor_service.ipp> ... bool strand_executor_service::enqueue(const implementation_type& impl, scheduler_operation* op) { impl->mutex_->lock(); if (impl->shutdown_) { impl->mutex_->unlock(); op->destroy(); return false; } else if (impl->locked_) { // Some other function already holds the strand lock. Enqueue for later. impl->waiting_queue_.push(op); impl->mutex_->unlock(); return false; } else { // The function is acquiring the strand lock and so is responsible for // scheduling the strand. impl->locked_ = true; impl->mutex_->unlock(); impl->ready_queue_.push(op); return true; } } ... // file: <boost/asio/detail/impl/strand_executor_service.hpp> ~on_invoker_exit() { this_->impl_->mutex_->lock(); this_->impl_->ready_queue_.push(this_->impl_->waiting_queue_); bool more_handlers = this_->impl_->locked_ = !this_->impl_->ready_queue_.empty(); this_->impl_->mutex_->unlock(); if (more_handlers) { Executor ex(this_->work_.get_executor()); recycling_allocator<void> allocator; ex.post(BOOST_ASIO_MOVE_CAST(invoker)(*this_), allocator); } } ... void operator()() { // Indicate that this strand is executing on the current thread. call_stack<strand_impl>::context ctx(impl_.get()); // Ensure the next handler, if any, is scheduled on block exit. on_invoker_exit on_exit = { this }; (void)on_exit; // Run all ready handlers. No lock is required since the ready queue is // accessed only within the strand. boost::system::error_code ec; while (scheduler_operation* o = impl_->ready_queue_.front()) { impl_->ready_queue_.pop(); o->complete(impl_.get(), ec, 0); } } ...
strand简单来讲就是多个线程获取strand锁而后将操做加入队列,由某一个线程来dispatch strand (as op and contains op)。咱们来看看strand对并发的影响:
(todo 因为笔者对于Asio理解不够深刻,这部份内容处于未完成状态)
回顾一下executor_op::do_complete
的源码,在调用handler
以前构造了一个fenced_block
实例,这是与并发相关的代码。std版本的fenced_block
源码以下,类的代码比较简单,其主要在构造和析构时调用(或不调用)函数std::atomic_thread_fence
。该函数用于创建内存同步顺序。全局搜索Asio源码发现,xxxxxxx_op
在执行complete
以前构造fenced_block b(fenced_block::half)
,而io_context::executor_type
, strand_service
, thread_pool
的成员函数dispatch
可能直接执行操做,执行以前构造fenced_block b(fenced_block::full)
。抽象的来讲,fence的做用在于对fence先后的memory operations的顺序进行某些限制,考虑到cpu或者编译器可能为了优化而打乱顺序。这样,其余线程在观察本线程对内存产生的side effect具备必定的顺序。
为了讲解fence在Asio的做用,介绍一下其余相关内容
implicit strand,引用官网的说明:
Where there is a single chain of asynchronous operations associated with a connection (e.g. in a half duplex protocol implementation like HTTP) there is no possibility of concurrent execution of the handlers. This is an implicit strand.
Concurrency Hints。Concurrency Hints为BOOST_ASIO_CONCURRENCY_HINT_UNSAFE时不使用部分mutex,但多线程运行仍然是可能的,这时用户须要额外的操做来保证io_context内部状态的安全性,官网说明以下:
BOOST_ASIO_CONCURRENCY_HINT_UNSAFEThis special concurrency hint disables locking in both the scheduler and reactor I/O. This hint has the following restrictions:
— Care must be taken to ensure that all operations on the io_context and any of its associated I/O objects (such as sockets and timers) occur in only one thread at a time.
— Asynchronous resolve operations fail with operation_not_supported.
— If a signal_set is used with the io_context, signal_set objects cannot be used with any other io_context in the program.
当scheduler和reactor不启用mutex,用户的操做又符合implicit strand的状况下,Asio如何保证handler A在thread 1修改数据后能被随后的运行在thread 2的handler B看到呢?咱们来考虑一下handler A与handler B可能的执行过程
fenced_block b(fenced_block::half);
std::atomic_thread_fence(std::memory_order_release);
outstanding_work_
;或者执行前一个任务以后的xxxx_cleanup析构函数注意这个顺序:handler A的写操做->fence析构->atomic write;atomic read->handler B的读操做。这刚好是Fence-atomic synchronization。保证了B可以看到A写入x的数据。
(todo)咱们再来考虑fenced_block b(fenced_block::full);
在Asio的应用。
// file: <boost/asio/detail/executor_op.hpp> ... static void do_complete(void* owner, Operation* base, const boost::system::error_code& /*ec*/, std::size_t /*bytes_transferred*/) { ... // Make the upcall if required. if (owner) { fenced_block b(fenced_block::half); BOOST_ASIO_HANDLER_INVOCATION_BEGIN(()); boost_asio_handler_invoke_helpers::invoke(handler, handler); BOOST_ASIO_HANDLER_INVOCATION_END; } } ... // file: <boost/asio/detail/std_fenced_block.hpp> ... class std_fenced_block : private noncopyable { public: enum half_t { half }; enum full_t { full }; // Constructor for a half fenced block. explicit std_fenced_block(half_t) { } // Constructor for a full fenced block. explicit std_fenced_block(full_t) { std::atomic_thread_fence(std::memory_order_acquire); } // Destructor. ~std_fenced_block() { std::atomic_thread_fence(std::memory_order_release); } }; ... // file: <boost/asio/impl/io_context.hpp> template <typename Function, typename Allocator> void io_context::executor_type::dispatch( BOOST_ASIO_MOVE_ARG(Function) f, const Allocator& a) const { typedef typename decay<Function>::type function_type; // Invoke immediately if we are already inside the thread pool. if (io_context_.impl_.can_dispatch()) { // Make a local, non-const copy of the function. function_type tmp(BOOST_ASIO_MOVE_CAST(Function)(f)); detail::fenced_block b(detail::fenced_block::full); boost_asio_handler_invoke_helpers::invoke(tmp, tmp); return; } // Allocate and construct an operation to wrap the function. .... } ...
io_context
的构造函数接受一个名为Concurrency Hints的参数,这个参数会影响io_context
的并发特性。具体说明见官方,由此咱们能够总结一下线程安全问题的分工:
Asio保证的是:
io_context
内部状态的线程安全,或者在其余状况下告知用户如何确保这种安全性用户的责任是:
io_context
内部状态的安全