关于高并发下多线程数据处理

1、Lock:多线程

C#中关键字lock(VB.NET中SyncLock,等同于try+finally的Monitor.Enter……Monitor.Exit)。原理是“每次线程进入后锁住当前全部的内存区块等相关区域,由该线程自行处理完毕所有的线程后自动释放”,接着其他线程抢先进入。并发

优势:最为大众所知的一种多线程处理方法,最为广泛的解决方案。函数

缺点:没法彻底适应高并发场合下处理需求——缘由:每次让大量线程在外边排队等候(由于一次只能一个线程处理lock区块),并且外边排队的线程老是要在Windows操做系统、临界区等上下文环境切换。(想想:假设有一个厕所,每次一我的进去大便,由于有人你进不了,也不知道人家啥时候出来……因此尴尬的你只能到休息区去暂时休息打盹、看报、聊天……,等一阵而后再去看看厕所是否处于可用状态……)。这里的“厕所”的门就是“临界区”,“临界区”里边就是受保护的代码,“厕所”和“休息区”是两个不一样的上下文地区,整个公司就是一个Windows操做系统。因此这种lock方式的上下文切换自己就是尝试性地试探是否能够进入,不能够则上下文切换休眠。固然消耗资源也是大的——除非你能够“估计”到每一个线程处理的时间差很少等于线程休眠从上下文切换的时间,并且外部排队线程不是不少的状况下。高并发

2、取代lock的“原子锁”:优化

本质上是CPU级别的锁,最小粒度的。用于保证一次性数据原子量的完成。在NET中有InterLock类能够提供最简单的诸如Increment,Decrement等原子性操做方法。若是要实现共享代码,彻底能够用如下方法尝试实现:spa

private volatile int _isFree = 0;

public void GeneralLockCode(Action act = null)
{
     try
     {
        if(InterLock.Exchange(ref _isFree, 1) == 0)  
        {
              if(act != null)
               act();
        }
     }
     finally
     {
           InterLock.Exchange(ref _isFree, 0);
     }
}

“_isFree”是一个类级别的变量,若是是0表示尚无线程使用act代码(表示容许线程进来)。每次进入此代码以后,有一个Exchange函数,这个函数是内存原子操做函数,它的做用是:1)把“1”赋值给_isFree。2)返回_isFree之前状态的值(在这两步中不容许其它线程干扰对_isFree进行变化)。操作系统

假设有2个线程(A和B)。A抢到了执行了Exchange方法,获得结果true;此时_isFree立马变成了1。此时B线程再次执行Exchange,_isFree从新被赋值为1,可是返回了线程A时候处理的状态1,由于1不会等于0,因此act方法不会执行——直到A执行了finally方法为止。线程

上面的代码加以改进,就彻底能够变成一个通用的,比lock更好一点的“排它锁”:code

private volatile int _isFree = 0;

public void GeneralLock(Action act)
{
      try
      {
            while(InterLock.Exchange(ref _isFree,1) == 1);
            act();
      }
      finally
      {
           InterLock.Exchange(ref _isFree,0);
      }
}

该锁一目了然——也只容许一个线程进入,其他线程外面等待。可是该方法的好处在于——若是act方法足够小(耗时小,时间短),那么当某个线程抢先进入的时候,其他的线程不是像Lock块同样上下文切换,而是直接在临界区“候着”自旋,直到act被执行完毕,锁标记释放为止。之因此要“act耗时足够小”,是由于这里若是高并发的话,没有抢到机会执行act的线程不得不一直处于“自旋”状态,空耗CPU资源。blog

咱们能够经过尝试在while里边加Thread.Sleep,Sleep的毫秒能够随着失败的轮询次数不断试探性增加,或者干脆使用SpinWait或者Thread的SpinUntil代替while,以便于这个CAS的锁更好地适用于通常高并发场合下的应用。

【方案1】

private volatile int _isFree = 0;

public void GeneralLock(Action act)
{
     SpinWait wait = new SpinWait();

      try
      {
             
            while(InterLock.Exchange(ref _isFree,1) == 1)                    
                  {wait.SpinOnce();}
   act();
      }
      finally
      {
           InterLock.Exchange(ref _isFree,0);
      }
}

【方案2】

private volatile int _isFree = 0;

public void GeneralLock(Action act)
{
      try
      {
           Thread.SpinUntil (() => InterLock.Exchange(ref _isFree,1) == 0);  //Until,直到isFree=0,即等到可用状态。
           act();
      }
      finally
      {
           InterLock.Exchange(ref _isFree,0);
      }
}

3、CAS法则:

InternLock中有一个Compare And Swap原子操做,其函数是(以NET为主):

InternLock.CompareExchange(ref 原来变量,替代变量,比较值),函数结果返回“原来变量”前一次的值(被“替代变量”取代前的数值)。

根据这个函数,咱们彻底能够在把每一个线程中拷贝本身的一份“原有变量”的值做为“比较值”,“替代变量”在“比较值”基础上进行某些操做,而后使用这个CAS方法保证每次操做的时候都是原子性的,该函数作3件事情:

1)判断“原来变量”是否等于“比较值”。

2)若是1)步返回true,则用“替代变量”替换“原来变量”。

3)返回“原有变量”前一次的值。

再次声明,这个方法也是原子性的。意味着在这个函数的时候不管如何不会被其它线程干扰打断,修改变量。那么咱们垂手可得能够获得结论——若是有A和B线程,同时修改变量a(假设a共用,初始值为1),那么A线程抢到进去(A线程对a作了拷贝,比较值为1,替代变量为2)。那么当A作CAS的时候,原来变量=比较值,而后把“替代变量”替换“原来变量”,以后返回以前的值1,1=A线程的比较值1,因此成功了。

假设A在执行CAS过程当中(或者以前某些步骤),B线程也进来了,(B的比较值也为1,替代变量为2),此时A率先作了CAS,“原来变量”变成了2,此时B作CAS发现原来变量不等于比较值,所以不会进行替代。固然返回的结果也也是被A线程替代后的原来变量的值2,天然也不等于B线程的比较值。因此B线程不得再也不次去抢——直到知足条件为止。

相似这样的方法,相对于以前讲的“原子锁”而言不存在空转(由于他们每次都尝试生产一个本身的方案,而后分别去抢;不像“原子锁”中的while处于Sleep空转或者忙转状况),可是他们也是死循环尝试探测是否能够抢到原子锁的,因此仍然不排除CPU资源被大量占用的状况。因此也彻底能够尝试先判断一次是否抢到,抢到直接退出循环;不然继续抢或者进行优化的“自旋”,下面给出伪代码结构:

volatile _globalVar = 初始化值;

public void GeneralCAS()
{
      声明 _copyVar和_repVar;
do
           {
              _copyVar = _globalVar;
              _repVar = 在_globalVar基础上改变或者新的值;
           }while(CAS(ref _globalVar,_repVar,_copyVar)!=_copyVar);
}

 最后给出一个多线程累加的例子:

public struct AtomicNumber
{
      private volatile int _number = 0;
      
      public int GetProcessedNumber (int stepToIncrease = 1)
      {
          int copyNum = _number,newNum = copyNum + stepToIncrease;
          for(;;copyNumber = _number,newNum = copyNum+stepToIncrease)
          {
                 if(Interlock.CompareExchange(ref _number, newNum, copyNum) == copyNum)
                 {
                       return newNum;
                 }
          }
      }
}
相关文章
相关标签/搜索