[超简洁]EasyQ框架-应对WEB高并发业务(秒杀、抽奖)等业务

背景介绍git

    这几年一直在摸索一种框架,足够简单,又能应付不少高并发高性能的需求。研究过一些框架思想如DDD DCI,也实践过CQRS框架。github

可是总以为复杂度高,门槛也高,本身学都吃力,若是团队新人更难接受。因此自从写了最简单的BaseContext类以后很长一段时间内都没有加任何代码。(basecontext只有10行内代码)数据库

以前有个秒杀业务要作,用了MVC的异步Action队列处理请求,感受仍是蛮不错,因此跟另一位同事一同把这个功能整合进这个baseContext里面,既没有用第三方的Queue(如 RabbitMQ )也没有另外开一个宿主进程Exe。api

总之“simple is good!”安全

EasyQ服务器

EasyQ是一个轻量级的专门用来处理高并发HTTP请求的框架。
应用MVC异步Action机制实现了相同业务单线程排队处理,没有用任何读写锁。
能够指定某一种业务建立一条队列,也能够指定某一种业务+某一种数据ID做为一条队列。
如秒杀商品业务,能够指定全部秒杀业务都使用单线程排队处理,避免脏读 脏写。
可是这样作的话,全部秒杀商品都会进入排队,显然是不科学的。
因此扩展一种方式是: 秒杀业务+商品ID 做为队列名。
固然不止商品ID,也能够是用户ID,商品分类等任意字符串做为队列名的后缀。
  GITHUB地址:https://github.com/BTteam/EasyQ多线程

如能占用您一点时间,提出一点改进的意见,不胜感激!并发


使用说明框架

HomeController 是入口页面,须要继承AsyncController,使用MVC的异步Action
BT.Contexts项目放置业务代码,全部Context须要继承抽象类QueueBaseContext,而且实现3个方法
1,InitData 初始化数据,数据库获取数据的方法应该写在此处
2,Interact 交互操做,数据模型之间的交互,业务代码的各类计算、判断等
3,Persist 持久化操做,数据保存到数据库的操做应当写在此处。
这3个方法的默认执行步骤很是简单 1=》2=》3
  这个类是封装了队列、线程的操做,是EasyQ的核心类。
在HomeController使用Context时,首先应该分开2个Action 如 TestAsync TestCompleted。这是MVC异步Action的机制决定
TestAsync用来启动异步,TestCompleted是异步完成后的回调操做。这2个方法必须成对出现。具体原理请参考MSDN异步

调用是URL为:{host}/home/text 注意Async后缀在路由时会被去掉。
SetAsync方法必须传入AsyncManager对象,key是可选参数,如上所述是用来细分队列的。
若是想根据商品ID生成队列,不一样商品的秒杀行为在不一样的队列中排队,就在此处用SetAsync传入key是商品ID

public void TestAsync(string key)
{
//GET DATA
TestContext context = new TestContext(1);
context.SetAsync(AsyncManager, key);//参数为产品队列标识
context.Execute();
}

 

再看回调方法

public ActionResult TestCompleted()
{
var result = AsyncManager.Parameters["response"];

return Content(JsonConvert.SerializeObject( result));
}

 全部Context执行后的结果以Parameters["response"]返回

 

核心解析

QueueBaseContext类

public abstract class  QueueBaseContext:BaseContext
    {
       private ILog log = LogManager.GetLogger(typeof(QueueBaseContext));
       private static ConcurrentDictionary<string, ConcurrentQueue<AsyncManager>> killQueues = new ConcurrentDictionary<string, ConcurrentQueue<AsyncManager>>();
        private static ConcurrentDictionary<string, Task> taskDic = new ConcurrentDictionary<string, Task>();
       //场景使用步骤  编写好 1.Interact() 2.Persist() 3.在api调用初始场景后,调用QueueContextAsync()
        private AsyncManager AsyncManager;
        private string quenekey;

       public void SetAsync(AsyncManager _AsyncManager)
       {
           SetAsync(_AsyncManager, "");
       }
       public void SetAsync(AsyncManager _AsyncManager, string _quenekey)
       {
           quenekey = _quenekey;
           this.AsyncManager = _AsyncManager;
       }

       public abstract void InitData();
       public override string Execute()
       {
           if (AsyncManager == null)
           {
               throw new Exception("必须调用SetAsync 设置AsyncManager对象");

           }
         var runtimeType=  this.GetType();
           var qKey = runtimeType.FullName + quenekey;
          // typeof().
           AsyncManager.OutstandingOperations.Increment();
           //开一个队列 判断是否有队列 
           if (killQueues.ContainsKey(qKey) == false)
           {
               killQueues.TryAdd(qKey, new ConcurrentQueue<AsyncManager>(new[] {AsyncManager}));
           }
           else
           {
               killQueues[qKey].Enqueue(AsyncManager);

           }
           Action ac = () =>
           {
              
               while (killQueues[qKey].IsEmpty == false)
               {
                 //  Thread.Sleep(15000);
                   log.DebugFormat("while 进来了  killQueueitemCount length:{0} ,Q num{1}", killQueues[qKey].Count, killQueues.Count);
                   AsyncManager item;
                   killQueues[qKey].TryDequeue(out item);//取出队列的一个进行处理
                   try
                   {

                       InitData();
                       if (Interact())//对应业务逻辑
                           Persist();

                       AsyncManager.Parameters["response"] = new { Code = this.StatusCode};
                       AsyncManager.OutstandingOperations.Decrement();
                   }
                   catch (Exception e)
                   {
                       log.ErrorFormat("出错,e msg:{0} ,trace:{1}", e.Message, e.StackTrace);
                       AsyncManager.Parameters["response"] = new { Code = ResponseCode.DataError, Description = "服务器错误,请重试" };
                       AsyncManager.OutstandingOperations.Decrement();
                   }
               }
               //remove q
           };
           if (taskDic.ContainsKey(qKey) == false)
           {
               taskDic.TryAdd(qKey, Task.Factory.StartNew(ac));
           }
           if (taskDic[qKey].IsCompleted || taskDic[qKey].IsFaulted)
           {
               taskDic[qKey] = Task.Factory.StartNew(ac);
           }
           return "";
       }
     

    }

构建了2个字典 

killQueues :队列名做为KEY 队列实例做为Value
taskDic:队列名做为KEY 队列指定的执行Task做为Value

处理逻辑是建立一个Task 循环队列每次取出一个项,执行业务操做。直到队列为空。
若是在上一个Task运行过程当中有新的请求加入,则不须要新建Task,只须要继续加入队列尾部。上一个Task会执行对应的业务操做。
队列中的每个元素表明一次业务操做,操做完毕以后会调用
AsyncManager.OutstandingOperations.Decrement();
用来返回异步结果给请求线程。这样HTTP请求结果就返回给用户了。不须要等到队列完毕。
qKey能够设置任意字符串,用来细分队列名称。

队列最好用
ConcurrentQueue
由于在入列出列操做时处于多线程共享队列,必需要用线程安全的队列类。


存在缺陷

1 目前是单点设计,只能在单机上运行,还在研究横向扩展。 2 性能还须要优化 3 因为使用异步Action 致使每一个Action必须一分为二。

相关文章
相关标签/搜索