srs使用的开源c语言网络协程库 state thread 源码分析

state thread是一个开源的c语言网络协程库,它在用户空间实现了协程调度
st最初是由网景(Netscape)公司的MSPRNetscape Portable Runtime library)项目中剥离出来,后由SGISilicon Graphic Inc)和Yahoo!公司(前者是主力)共同开发维护。
2001年发布v1.0以来一直到2009v1.9稳定版后未再变更

State Threads:回调终结者(必读)
https://blog.csdn.net/caoshangpa/article/details/79565411

st-1.9.tar.gz 是原版, http://state-threads.sourceforge.net/
state-threads-1.9.1.tar.gz srs修改版, https://github.com/ossrs/state-threads

st源码编译
tar zxvf st-1.9.tar.gz

cd st-1.9
make linux-debug          // make命令能够查看支持的编译选项
obj目录有编译生成的文件st.h, lib*.solib*.a

examples目录有几个例子lookupdnsproxyserver

须要的知识点
汇编语言(非必需)
线程的栈管理(非必需)
线程的调度和同步(必须)线程不一样步的测试代码thread.c
4 setjmp/longjmp的使用(必须)。测试代码setjmp.c
5 epoll原理和使用(必须)。测试代码epoll_server.c 和 epoll_client.c

测试代码以及文档下载地址
连接: https://pan.baidu.com/s/1kQz3S1YIt6zUwMKScrnHaQ
提取码: pu9z


分析state_thread源码的目的,是为了正确的使用它
st中thread实际上是协程的概念
st_xxx分为 io  延迟类

一些重要的数据结构
_st_vp_t _st_this_vp;     virtual processor 虚拟处理器
_st_thread_t *_st_this_thread;
_st_clist_t  run_q, io_q, zombie_q, thread_q 
_st_thread_t  *idle_thread, *sleep_q

代码分析
st库自带的example业务逻辑较为复杂,有兴趣能够看下。
为了简化问题,编写了测试代码st-1.9/examples/st_epoll.c,依据此代码提出问题分析问题。
st_init()作了什么?
_st_idle_thread_start()作了什么
st_thread_create()作了什么?
st_thread_exit()作了什么?
st_usleep()作了什么?
主业务逻辑(无限循环)协程是如何调度的?
监听的文件描述符是如何调度的?
协程如何正常退出?
1 没有设置终止条件变量(不能够被join)的协程直接return便可退出; 
2 设置了终止条件变量(能够被join)的协程退出时,先把本身加入到zombie_q中,而后通知等待的协程,等待的协程退出后,本身在退出。html

协程的join(链接)是什么意思?
1 建立协程a的时候 st_thread_create(handle_cycle, NULL, 1, 0) 要设置为1, 表示该协程能够被join
2 协程b代码里要掉用st_thread_join(thread, retvalp),表示我要join到协程a上
3 join的意思是 协程a和协程b 有必定关联行,在协程退出时,要先退出协程b 才能退出协程a
4 st中一个协程只能被另外一个协程join,不能被多个协程join
5 能够被join的协程a,在没有其余协程join时,协程a没法正常退出linux

st里的mutex有什么用?
一般状况下st的多协程是不须要加锁的,可是在有些状况下须要锁来保证原子操做,下面会详细说明。
st_mutex_new(void); 建立锁
st_mutex_destroy(st_mutex_t lock); 等待队列必须为空才能销毁锁
st_mutex_lock(st_mutex_t lock); 第一次掉用能得到锁,之后掉用会加入锁的等待队列中(FIFO)
st_mutex_unlock(st_mutex_t lock); 释放锁并激活等待队列的协程
st_mutex_trylock(st_mutex_t lock); 尝试得到锁不会加入到等待队列git

st里的cond有什么用?
一般状况下st的多协程是不须要条件变量的,可是有些状况下须要条件变量来保证协程执行的前后顺序,好比:协程a要先于协程b执行
st_cond_new(void); 建立条件变量
st_cond_destroy(st_cond_t cvar); 等待队列必须为空才能销毁条件变量
st_cond_timedwait(st_cond_t cvar, st_utime_t timeout); 限时等待条件变量,会加入条件变量的等待队列中(FIFO),并加入到sleep_q队列中(可能先于FIFO的顺序被调度到)
st_cond_wait(st_cond_t cvar); 阻塞等待条件变量,会加入条件变量的等待队列中(FIFO)
st_cond_signal(st_cond_t cvar); 唤醒阻塞在条件变量上的一个协程
st_cond_broadcast(st_cond_t cvar); 唤醒阻塞在条件变量上的所有协程github

这个图要配合测试代码 st-1.9/examples/st_epoll.c网络

st中与调度有关的函数
st的setjmp
#define _ST_SWITCH_CONTEXT(_thread)   \                协程切换的两个宏函数之一,中止当前协程并运行其余协程
  ST_BEGIN_MACRO                      \
  ST_SWITCH_OUT_CB(_thread);          \                       协程切走时调用的函数,通常无论用
  if (!MD_SETJMP((_thread)->context)) \                         汇编语言实现 应该跟setjmp()同样 首次掉用返回0
  {                                   \
    _st_vp_schedule();                \                                    核心调度函数
  }                                   \
  ST_DEBUG_ITERATE_THREADS();         \
  ST_SWITCH_IN_CB(_thread);           \                          协程切回时调用的函数,通常无论用
  ST_END_MACRO

st的longjmp
#define _ST_RESTORE_CONTEXT(_thread) \               协程切换的两个宏函数之一,恢复线程运行
  ST_BEGIN_MACRO                     \
  _ST_SET_CURRENT_THREAD(_thread);   \                设置全局变量 _st_this_thread = _thread
  MD_LONGJMP((_thread)->context, 1); \                       汇编语言实现 应该跟longjmp()同样返回值永远为1
  ST_END_MACRO

MD_SETJMP的时候,会使用汇编把全部寄存器的信息保留下来,而MD_LONGJMP则会把全部的寄存器信息从新加载出来。二者配合使用的时候,能够完成函数间的跳转。

st的核心调度函数
void _st_vp_schedule(void)
{
  _st_thread_t *thread;
  printf("in _st_vp_schedule\n");
  printf("_st_active_count = %d\n", _st_active_count);
  if (_ST_RUNQ.next != &_ST_RUNQ)
  {
    printf("use runq\n");
    /* Pull thread off of the run queue */
    thread = _ST_THREAD_PTR(_ST_RUNQ.next);
    _ST_DEL_RUNQ(thread);
  }
  else
  {
    printf("use idle\n");
    /* If there are no threads to run, switch to the idle thread */
    thread = _st_this_vp.idle_thread;
  }
  ST_ASSERT(thread->state == _ST_ST_RUNNABLE);
  /* Resume the thread */
  thread->state = _ST_ST_RUNNING;
  _ST_RESTORE_CONTEXT(thread);
}

st辅助调度函数
void *_st_idle_thread_start(void *arg)
{
  printf("i'm in _st_idle_thread_start()\n");
  _st_thread_t *me = _ST_CURRENT_THREAD();

  while (_st_active_count > 0)
  {
    /* Idle vp till I/O is ready or the smallest timeout expired */
    printf("call _st_epoll_dispatch()\n");
    _ST_VP_IDLE();                                                       处理io类事件

    /* Check sleep queue for expired threads */
    _st_vp_check_clock();                                               处理延时类事件
    me->state = _ST_ST_RUNNABLE;
    _ST_SWITCH_CONTEXT(me);                                  从这里恢复运行,而后判断_st_active_count的值
  }
/* No more threads */
  exit(0);                                                                        整个程序退出
  /* NOTREACHED */
  return NULL;
}

会触发协程切换的函数有哪些?
sched.c:86: _ST_SWITCH_CONTEXT(me); 59 int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
sched.c:234: _ST_SWITCH_CONTEXT(me); 221 void *_st_idle_thread_start(void *arg)
sched.c:261: _ST_SWITCH_CONTEXT(thread); 244 void st_thread_exit(void *retval)
sched.c:276: _ST_SWITCH_CONTEXT(thread); 244 void st_thread_exit(void *retval)
sync.c:131: _ST_SWITCH_CONTEXT(me); 115 int st_usleep(st_utime_t usecs)
sync.c:198: _ST_SWITCH_CONTEXT(me); 180 int st_cond_timedwait(_st_cond_t *cvar, st_utime_t timeout)
sync.c:315: _ST_SWITCH_CONTEXT(me); 290 int st_mutex_lock(_st_mutex_t *lock)

sched.c:134: _ST_RESTORE_CONTEXT(thread); 115 void _st_vp_schedule(void)

st中的interrupt
显示调用void st_thread_interrupt(_st_thread_t *thread)会对协程设置interrupt状态,interrupt状态会中断协程的本次运行(多是个循环任务),是否致使协程退出,要看协程内部对interrupt返回值的处理。下面以st_usleep()函数为例进行说明。
[ykMac:st-1.9]# grep -nr "_ST_FL_INTERRUPT" *
common.h:311:#define _ST_FL_INTERRUPT 0x08        interrupt的宏定义
sched.c:68:  if (me->flags & _ST_FL_INTERRUPT)         59 int st_poll() ,调用函数时,判断是否设置interrupt
sched.c:70:    me->flags &= ~_ST_FL_INTERRUPT;       若是设置就退出,退出前对interrupt取反
sched.c:107:  if (me->flags & _ST_FL_INTERRUPT)       59 int st_poll(),变为运行协程时,判断是否设置interrupt
sched.c:109:    me->flags &= ~_ST_FL_INTERRUPT;     若是设置就退出,退出前对interrupt取反
sched.c:551:  thread->flags |= _ST_FL_INTERRUPT;     545 void st_thread_interrupt()中设置为interrupt
sync.c:119:  if (me->flags & _ST_FL_INTERRUPT) {       115 int st_usleep(st_utime_t usecs),调用函数时
sync.c:120:    me->flags &= ~_ST_FL_INTERRUPT;
sync.c:133:  if (me->flags & _ST_FL_INTERRUPT) {       115 int st_usleep(st_utime_t usecs),变为运行协程时
sync.c:134:    me->flags &= ~_ST_FL_INTERRUPT;
sync.c:185:  if (me->flags & _ST_FL_INTERRUPT) {       180 int st_cond_timedwait(),调用函数时
sync.c:186:    me->flags &= ~_ST_FL_INTERRUPT;
sync.c:208:  if (me->flags & _ST_FL_INTERRUPT) {       180 int st_cond_timedwait(),变为运行协程时
sync.c:209:    me->flags &= ~_ST_FL_INTERRUPT;
sync.c:294:  if (me->flags & _ST_FL_INTERRUPT) {       290 int st_mutex_lock(),调用函数时
sync.c:295:    me->flags &= ~_ST_FL_INTERRUPT;
sync.c:319:  if ((me->flags & _ST_FL_INTERRUPT) && lock->owner != me) {       290 int st_mutex_lock(),变运行时
sync.c:320:    me->flags &= ~_ST_FL_INTERRUPT;

115 int st_usleep(st_utime_t usecs)
116 {
117   _st_thread_t *me = _ST_CURRENT_THREAD();
118
119   if (me->flags & _ST_FL_INTERRUPT) {
120     me->flags &= ~_ST_FL_INTERRUPT;     退出前对interrupt取反
121     errno = EINTR;
122     return -1;                                                若是不对errno或返回值作处理,循环仍是会继续的
123   }
124
125   if (usecs != ST_UTIME_NO_TIMEOUT) {
126     me->state = _ST_ST_SLEEPING;
127     _ST_ADD_SLEEPQ(me, usecs);
128   } else
129     me->state = _ST_ST_SUSPENDED;
130
131   _ST_SWITCH_CONTEXT(me);
132
133   if (me->flags & _ST_FL_INTERRUPT) {
134     me->flags &= ~_ST_FL_INTERRUPT;
135     errno = EINTR;
136     return -1;
137   }
138
139   return 0;
140 }

st的优缺点
优势:
1 用户空间实现协程调度,下降了用户空间和内核空间的切换,必定程度上提升了程序效率。
2 因为是在单核上的单线程多协程,同一时间只会有一个协程在运行,因此对于全局变量也不须要作协程同步。
   共享资源释放函数只需作到可重入就行,所谓的可重入就是释放以前先判断是否为空值,释放后要赋空值。
3 协程使用完,直接return便可,st会回收协程资源并作协程切换。
能够经过向run_q链表头部加入协程,来实现优先调度。
5 st支持多个操做系统,好比 AIX,CYGWIN,DARWIN,FREEBSD,HPUX,IRIX,LINUX,NETBSD,OPENBSD,SOLARIS
缺点:
全部I/O操做必须使用st提供的API,只有这样协程才能被调度器管理。
2 全部协程里不能使用sleep(),sleep()会形成整个线程sleep。
被调度到的协程不会限制运行时长,若是有协程cpu密集型或死循环,就会严重阻碍其余协程运
4 单进程单线程,只能使用单核,想要经过多个cpu提升并发能力,只能开多个程序(进程),多进程通讯较麻烦。

补充知识点
线程为何要同步?
线程由内核自动调度
同一个进程上的线程共享该进程的整个虚拟地址空间
同一个进程上的线程代码区是共享的,即不一样的线程能够执行一样的函数
因此在并发环境中,多个线程同时对同一个内存地址进行写入,因为CPU寄存器时间调度上的问题,写入数据会被屡次的覆盖,会形成共享数据损坏,因此就要使线程同步。

2 什么状况下须要线程同步?
线程同步指的是 不一样时发生,就是线程要排队
1 多核,单进程多线程,不一样线程会对全局变量读写,这种状况才须要对线程作同步控制
2 单核,单进程多线程,不一样线程会对全局变量读写,这种状况不须要对线程作同步控制
3 多核,单进程多线程,不一样线程不对全局变量读写,这种状况不须要对线程作同步控制
4 多核,单进程多线程,不一样线程全局变量,这种状况不须要对线程作同步控制

问题:对于第2条,这个应该是有点儿片面,有依赖有优先级抢占也是要同步,除非像abcde这种几个线程干一摸同样的事情,并且项目之间不依赖。
有依赖 能够理解为 生产消费关系,虽然是 单核 单进程 多线程 可是同一时间只能有一个线程在运行,也就是说 生产和消费不会同时发生,一样多个生产也不会同时发生,因此不须要锁。

线程是有优先级控制,可是无论怎么控制,只要保证同一时间只能有一个线程在运行,就不须要锁了。
问题:对于第2条,这个应该是有点儿片面,有原子操做且原子操做过程当中有线程切换,这种是须要锁的。
好比,线程a 第一次读取全局变量x并作处理,而后发生线程切换(线程由内和自动调度)后切回,而后第二次读取全局变量x并作处理,咱们想确保两次读取
x值相同,可是发生了线程切换x值可能被改变。

如何 确保 第一次读取并处理和第二次读取并处理是原子操做呢? 使用st_mutex_t

3 accept()序列化
亦称惊群效应,亦亦称Zeeg难题
https://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/articles/SerializingAccept.html
在屡次fork本身以后,每一个进程通常将会开始阻塞在 accept() 
每当socket上尝试进行一个链接,阻塞在 accept() 上的每一个进程的 accept() 都会被唤醒。
只有其中一个进程可以真正接收到这个链接,而剩余的进程将会得到一个无聊的 EAGAIN 这致使了大量的CPU周期浪费实际解决方法是把一个锁放在 accept() 调用以前,来序列化它的使用

4 Internet Applications网络程序架构
多进程架构 Multi-Process
    一个进程服务一个链接,要解决数据共享问题
单进程多线程架构 Multi-Threaded 
    一个线程服务一个链接,要解决数据同步问题
事件驱动的状态机架构 Event-Driven State Machine 
    事件触发回调函数(缺点是嵌套 用户空间实现协程调度
实际上 EDSM架构 用很复杂的方式模拟了多线程
st提供的就是EDSM机制,它在用户空间实现协程调度
https://blog.csdn.net/caoshangpa/article/details/53282330数据结构

相关文章
相关标签/搜索