ASP.Net Core 3.X并发限制简介

在Asp.net core 3.0 以前使用asp.net core 会出现线程池内线程不足的状况,出现并发冲突现象,web

并发冲突主要状况以下:数据库

一、用户导航到实体编辑页面;服务器

二、第一个用户的更改还未写入数据库以前,另外一个用户更新实体多线程

此时若是未启用并发检测,当发生更新时最后一个更新优先,即最后一个更新的值保存到数据库,而第一个保存的数据将丢失;并发

那么如何处理并发冲突呢?app

1】能够跟踪用户已经修改的属性,并只更新数据库中相应的列,这样,当两个用户更新了不一样的属性,下次查看时都将生效。asp.net

可是也存在如下问题:异步

【1】当对同一个属性进行竞争性更改的话,没法避免数据的流失async

【2】一般不适用于web应用。它须要维持重要的状态,以便跟踪全部提取值和新值。维持大量状态可能影响应用性能;性能

【3】可能会增长应用复杂性(与实体上的开发检测相比)

 

2】客户端优先

即客户端的值优先于数据库存储的值。而且若是不对并发处理进行任何编码,将自动进行客户端优先;

3】 存储优先

这种方式能够阻止在数据可对数据的更改,而且能够

 【1】显示错误消息

 【2】显示数据的当前状态

 【3】容许用户从新更新应用的更改

 

如何处理并发呢?

1】检测属性的并发冲突,能够使用ConcurrencyCheck特性在属性级别检测并发冲突。该特性可应用于模型上的多个属性

2】检测行的并发冲突,检测行的并发冲突,将rowversion耿总列添加到模型

3】当属性为并发令牌时:

   1)EFCore 验证提取属性后是否未更改属性,调用SaveChanges或者SaveChangesAsync时会执行并发检测

  2)若是提取属性后更改了属性,将引起DbUpdateConcurrencyException

   数据库和数据模型必须配置为支持引起DbUpdateConcurrencyException.

 

 

4]在Asp.net core 3.0以后  

 微软在asp.net core 3.0 增长了Microsoft.AspNetCore.ConcurrencyLimiter,用于传入的请求进行排队处理

避免线程池的不足;

Queue策略

一、添加Nuget    Install-Package Microsoft.AspNetCore.ConcurrencyLimiter

public void ConfigureServices(IServiceCollection services)
        {
            services.AddQueuePolicy(options =>
            {
                //最大并发请求数
                options.MaxConcurrentRequests = 2;
                //请求队列长度限制
                options.RequestQueueLimit = 1;
            });
            services.AddControllers();
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            //添加并发限制中间件
            app.UseConcurrencyLimiter();
            app.Run(async context =>
            {
                Task.Delay(100).Wait(); // 100ms sync-over-async

                await context.Response.WriteAsync("Hello World!");
            });
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseHttpsRedirection();

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }

经过上面简单的配置,咱们就能够将他引入到咱们的代码中,从而作并发量限制,以及队列的长度;那么问题来了,他是怎么实现的呢?

public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
{
        services.Configure(configure);
        services.AddSingleton<IQueuePolicy, QueuePolicy>();
        return services;
}

QueuePolicy  采用的是SemaphoreSlim 信号量设计,SemapHoreSlim、SemapHore(信号量)支持并发多线程进入被保护代码,对象在初始化时会指定最大任务数量,当线程请求访问资源,信号量递减而当他们释放时信号量数量递增。

public QueuePolicy(IOptions<QueuePolicyOptions> options)
        {
            _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
            if (_maxConcurrentRequests <= 0)
            {
                throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer.");
            }

            _requestQueueLimit = options.Value.RequestQueueLimit;
            if (_requestQueueLimit < 0)
            {
                throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
            }
            //使用SemaphoreSlim来限制任务最大个数
            _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
        }

 

ConcurrencyLimiterMiddleware中间件

public async Task Invoke(HttpContext context)
        {
            var waitInQueueTask = _queuePolicy.TryEnterAsync();

            // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
            bool result;

            if (waitInQueueTask.IsCompleted)
            {
                ConcurrencyLimiterEventSource.Log.QueueSkipped();
                result = waitInQueueTask.Result;
            }
            else
            {
                using (ConcurrencyLimiterEventSource.Log.QueueTimer())
                {
                    result = await waitInQueueTask;
                }
            }

            if (result)
            {
                try
                {
                    await _next(context);
                }
                finally
                {
                    _queuePolicy.OnExit();
                }
            }
            else
            {
                ConcurrencyLimiterEventSource.Log.RequestRejected();
                ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
                context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
                await _onRejected(context);
            }
        }

每次当咱们请求的时候首先会调用_queuePolicy.TryEnterAsync(),进入该方法后先开启一个私有lock锁,再接着判断总请求量是否≥(请求队列限制的大小+最大并发请求数),若是当前数量超出了,那么我直接抛出,送你个503状态;

if (result)
  {
         try
         {
             await _next(context);
         }
         finally
        {
            _queuePolicy.OnExit();
        }
        }
        else
        {
            ConcurrencyLimiterEventSource.Log.RequestRejected();
            ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
            context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
            await _onRejected(context);
        }

问题来了,我这边若是说还没到你设置的大小呢,我这个请求没有给你服务器造不成压力,那么你给我处理一下吧.
await _serverSemaphore.WaitAsync();异步等待进入信号量,若是没有线程被授予对信号量的访问权限,则进入执行保护代码;不然此线程将在此处等待,直到信号量被释放为止

 

lock (_totalRequestsLock)
    {
        if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
        {
             return false;
        }
            TotalRequests++;
        }
        //异步等待进入信号量,若是没有线程被授予对信号量的访问权限,则进入执行保护代码;不然此线程将在此处等待,直到信号量被释放为止
        await _serverSemaphore.WaitAsync();
        return true;
    }

返回成功后那么中间件这边再进行处理,_queuePolicy.OnExit();经过该调用进行调用_serverSemaphore.Release();释放信号灯,再对总请求数递减

相关文章
相关标签/搜索