State-Thread(如下简称st),是一个由C语言编写的小巧、简洁却高效的开源协程库。这个库基于单线程运做、不强制占用用户线程,给予了开发者最大程度的轻量级和较低的侵入性。本篇文章中,网易云信音视频研发大神将为你们简要分析State-Thread,欢迎你们积极留言,和咱们共同讨论。
在开始这个话题以前,咱们先来聊一聊协程。
什么是协程?
协程是一种程序组件。一般咱们把协程理解为是一种程序本身实现调度、用于提升运行效率、下降开发复杂度的东西。提升运行效率很好理解,由于在程序层本身完成了部分的调度,下降了对系统调度的依赖,减小了大量的中断和换页操做。而下降了开发复杂度,则是指对于开发者而言,可使用同步的方式去进行代码开发(不须要考虑异步模型的诸多回调),也不须要考虑多线程模型的线程调度和诸多的临界资源问题。
不少语言都拥有协程,例如python或者golang。而对于c/c++而言,一般实现协程的常见方式,一般是依赖于glibc提供的setjump&longjump或者基于汇编语言,固然还有基于语义实现(protothread)。linux上使用协程库的方式,一般也会分为替换函数和更为暴力的替换so来实现。固然而各类方式有各自的优劣。而st选用的汇编语言实现setjump&longjump和要求用户调用st_打头的函数来嵌入程序。因此st具有了跨平台的能力,以及让开发者们更开心的“与容许调用者自行选择切换时机”的能力。
st到底是如何实现了这一切?
首先咱们先看看st的总体工做流程:python
在宏观的来看,ST的结构主要分红:
vp_schedule。主要是负责了一个调度的能力。有点相似于linux内核当中的schedule()函数。每次当这个函数被调用的时候,都会完成一次线程的切换。
各类Queue。用于保存各类状态下等待被调度协程(st_thread)
Timer。用于记录各类超时和sleep。
poll。用于监听各类io事件,会根据系统能力不一样而进行切换(kqueue、epoll、poll、select)。
st_thread。用于保存各类协程的信息。
其中比较重要的是schedule模块和thread模块二者。这二者实现了一个完整的协程切换和调度。属于st的核心。而schedule部分一般是开发者们最须要关心的部分。
接下来咱们会深刻到代码层,看一下具体在这个过程里作了些什么。
一般对于st而言,全部暴露给用户的除了init函数,就是一系列的st_xxx函数了。那么先看看init函数。
int st_init(void)
{
_st_thread_t *thread;linux
if (_st_active_count) {
/ Already initialized /
return 0;
}c++
/ We can ignore return value here /
st_set_eventsys(ST_EVENTSYS_DEFAULT);golang
if (_st_io_init() < 0)
return -1;多线程
memset(&_st_this_vp, 0, sizeof(_st_vp_t));架构
ST_INIT_CLIST(&_ST_RUNQ);
ST_INIT_CLIST(&_ST_IOQ);
ST_INIT_CLIST(&_ST_ZOMBIEQ);框架
if ((*_st_eventsys->init)() < 0)
return -1;异步
_st_this_vp.pagesize = getpagesize();
_st_this_vp.last_clock = st_utime();socket
/*函数
*/
_st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start,
NULL, 0, 0);
if (!_st_this_vp.idle_thread)
return -1;
_st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD;
_st_active_count--;
_ST_DEL_RUNQ(_st_this_vp.idle_thread);
/*
*/
thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) +
(ST_KEYS_MAX sizeof(void )));
if (!thread)
return -1;
thread->private_data = (void **) (thread + 1);
thread->state = _ST_ST_RUNNING;
thread->flags = _ST_FL_PRIMORDIAL;
_ST_SET_CURRENT_THREAD(thread);
_st_active_count++;
return 0;
}
这段函数一共作了3事情,建立了一个idle_thread, 初始化了_ST_RUNQ、_ST_IOQ、
_ST_ZOMBIEQ三个队列,把当前调用者初始化成原始函数(一般st_init会在main里面调用,因此这个原始的thread至关因而主线程)。idle_thread函数,其实就是整个IO和定时器相关的本体函数了。st会在每一次_ST_RUNQ运行完成后,调用idle_thread来获取可读写的io和定时器。这个咱们后续再说。
那么,st_xxx通常会分红io类和延迟类(sleep)。二者入口实际上是同一个,只不过在io类的会多调用一层。咱们这里选择st_send为表明。
int st_sendmsg(_st_netfd_t fd, const struct msghdr msg, int flags,
st_utime_t timeout)
{
int n;
while ((n = sendmsg(fd->osfd, msg, flags)) < 0) {
if (errno == EINTR)
continue;
if (!_IO_NOT_READY_ERROR)
return -1;
/ Wait until the socket becomes writable /
if (st_netfd_poll(fd, POLLOUT, timeout) < 0)
return -1;
}
return n;
}
本质上全部的st函数都是以异步接口+ st_netfd_poll来实现的。在st_netfd_poll之内,会去调用st_poll,而st_poll本质上会调用而且切换线程。
int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout)
{
struct pollfd pd;
int n;
pd.fd = fd->osfd;
pd.events = (short) how;
pd.revents = 0;
if ((n = st_poll(&pd, 1, timeout)) < 0)
return -1;
if (n == 0) {
/ Timed out /
errno = ETIME;
return -1;
}
if (pd.revents & POLLNVAL) {
errno = EBADF;
return -1;
}
return 0;
}
int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
{
struct pollfd *pd;
struct pollfd *epd = pds + npds;
_st_pollq_t pq;
_st_thread_t *me = _ST_CURRENT_THREAD();
int n;
if (me->flags & _ST_FL_INTERRUPT) {
me->flags &= ~_ST_FL_INTERRUPT;
errno = EINTR;
return -1;
}
if ((*_st_eventsys->pollset_add)(pds, npds) < 0)
return -1;
pq.pds = pds;
pq.npds = npds;
pq.thread = me;
pq.on_ioq = 1;
_ST_ADD_IOQ(pq);
if (timeout != ST_UTIME_NO_TIMEOUT)
_ST_ADD_SLEEPQ(me, timeout);
me->state = _ST_ST_IO_WAIT;
_ST_SWITCH_CONTEXT(me);
n = 0;
if (pq.on_ioq) {
/ If we timed out, the pollq might still be on the ioq. Remove it /
_ST_DEL_IOQ(pq);
(*_st_eventsys->pollset_del)(pds, npds);
} else {
/ Count the number of ready descriptors /
for (pd = pds; pd < epd; pd++) {
if (pd->revents)
n++;
}
}
if (me->flags & _ST_FL_INTERRUPT) {
me->flags &= ~_ST_FL_INTERRUPT;
errno = EINTR;
return -1;
}
return n;
}
那么到此为止,st_poll中就出现了咱们最关心的调度部分了。
当一个线程进行调度的时候通常都是poll_add(若是是io操做),add_queue, _ST_SWITCH_CONTEXT完成一次调度。根据不一样的类型,会add到不一样的queue。例如须要超时,则会add到IOQ和SLEEPQ。而_ST_SWITCH_CONTEXT,则是最关键的切换线程操做了。
_ST_SWITCH_CONTEXT实际上是一个宏,它的本质是调用了MD_SETJMP和_st_vp_schedule().
ST_BEGIN_MACRO \
ST_SWITCH_OUT_CB(_thread); \
if (!MD_SETJMP((_thread)->context)) { \
_st_vp_schedule(); \
} \
ST_DEBUG_ITERATE_THREADS(); \
ST_SWITCH_IN_CB(_thread); \
ST_END_MACRO
这个函数其实就是一个完成的线程切换了。在st里线程的切换会使用MD_SETJMP->_st_vp_schedule->MD_LONGJMP。MD_SETJMP和MD_LONGJMP其实就是st使用汇编本身写的setjmp和longjmp函数(glibc),效果也是几乎等效的。(由于st自己会作平台适配,因此咱们以x86-64的汇编为例)
/*
*/
.file "md.S"
.text
/ _st_md_cxt_save(__jmp_buf env) /
.globl _st_md_cxt_save
.type _st_md_cxt_save, @function
.align 16
_st_md_cxt_save:
/*
*/
movq %rbx, (JB_RBX*8)(%rdi)
movq %rbp, (JB_RBP*8)(%rdi)
movq %r12, (JB_R12*8)(%rdi)
movq %r13, (JB_R13*8)(%rdi)
movq %r14, (JB_R14*8)(%rdi)
movq %r15, (JB_R15*8)(%rdi)
/ Save SP /
leaq 8(%rsp), %rdx
movq %rdx, (JB_RSP*8)(%rdi)
/ Save PC we are returning to /
movq (%rsp), %rax
movq %rax, (JB_PC*8)(%rdi)
xorq %rax, %rax
ret
.size _st_md_cxt_save, .-_st_md_cxt_save
//
/ _st_md_cxt_restore(__jmp_buf env, int val) /
.globl _st_md_cxt_restore
.type _st_md_cxt_restore, @function
.align 16
_st_md_cxt_restore:
/*
*/
movq (JB_RBX*8)(%rdi), %rbx
movq (JB_RBP*8)(%rdi), %rbp
movq (JB_R12*8)(%rdi), %r12
movq (JB_R13*8)(%rdi), %r13
movq (JB_R14*8)(%rdi), %r14
movq (JB_R15*8)(%rdi), %r15
/ Set return value /
test %esi, %esi
mov $01, %eax
cmove %eax, %esi
mov %esi, %eax
movq (JB_PC*8)(%rdi), %rdx
movq (JB_RSP*8)(%rdi), %rsp
/ Jump to saved PC /
jmpq *%rdx
.size _st_md_cxt_restore, .-_st_md_cxt_restore
//
MD_SETJMP的时候,会使用汇编把全部寄存器的信息保留下来,而MD_LONGJMP则会把全部的寄存器信息从新加载出来。二者配合使用的时候,能够完成一次函数间的跳转。
那么咱们已经看到了MD_SETJMP的调用,MD_LONGJMP调用在哪儿呢?
让咱们继续看下去,在最一开始,咱们就说起过_st_vp_schedule()这个核心函数。
void _st_vp_schedule(void)
{
_st_thread_t *thread;
if (_ST_RUNQ.next != &_ST_RUNQ) {
/ Pull thread off of the run queue /
thread = _ST_THREAD_PTR(_ST_RUNQ.next);
_ST_DEL_RUNQ(thread);
} else {
/ If there are no threads to run, switch to the idle thread /
thread = _st_this_vp.idle_thread;
}
ST_ASSERT(thread->state == _ST_ST_RUNNABLE);
/ Resume the thread /
thread->state = _ST_ST_RUNNING;
_ST_RESTORE_CONTEXT(thread);
}
这个函数其实很是简单,基本工做原理能够认为是执行如下几步: 1.查看当前RUNQ是否有能够调用的,若是有,则RUNQ pop一个thread。 2. 若是没有,则运行idle_thread。 3. 调用_ST_RESTORE_CONTEXT。
那么_ST_RESTORE_CONTEXT作了什么呢?
ST_BEGIN_MACRO \
_ST_SET_CURRENT_THREAD(_thread); \
MD_LONGJMP((_thread)->context, 1); \
ST_END_MACRO
简单来讲,_ST_RESTORE_CONTEXT就是调用了咱们以前所没有看到的MD_LONGJMP。
因此,咱们能够简单地认为,在携程须要schedule的时候,会先把自身当前的栈经过MD_SETJMP保存起来,当线程被schedule再次调度出来的时候,则会使用MD_SETJMP来还原栈,完成一次协程切换。
而后咱们来看看idle_thread作了什么。
虽然这个协程名字叫作idle,可是其实作了不少的事情。
void _st_idle_thread_start(void arg)
{
_st_thread_t *me = _ST_CURRENT_THREAD();
while (_st_active_count > 0) {
/ Idle vp till I/O is ready or the smallest timeout expired /
_ST_VP_IDLE();
/ Check sleep queue for expired threads /
_st_vp_check_clock();
me->state = _ST_ST_RUNNABLE;
_ST_SWITCH_CONTEXT(me);
}
/ No more threads /
exit(0);
/ NOTREACHED /
return NULL;
}
总的来讲,idle_thread作了两件事情。1. _ST_VP_IDLE() 2. _st_vp_check_clock()。_st_vp_check_clock很好理解,就是检查定时器是否超时,若是超时了,则设置超时标记以后,放回RUNQ。而_ST_VP_IDLE,其实就是查看io是否已经ready了。例如linux的话,则会调用epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist,
_st_epoll_data->evtlist_size, timeout)去查看是否有可响应的io。timeout值会根据当前空闲状况进行变化,一般来讲会是一个极小的值。
那么看到这里,总体的线程调度已经所有走完了。(详见前面最一开始的流程图)整体流程总结来讲基本上是func() -> st_xxxx() -> AddQ -> MD_SETJMP -> schedule() -> MD_LONG -> func()。
因此对于st而言,因此的调度,是基于用户调用。那么若是用户一直不调用st_xxx()(例如计算密集性服务),st也就没法进行协程切换,那么其余协程也就产生极大的阻塞了。这也是为何st并不太合适计算密集型的缘由(其实单线程框架大多都不合适计算密集型)
想要阅读更多技术干货文章,欢迎关注网易云信博客。了解网易云信,来自网易核心架构的通讯与视频云服务。