在Asp.net core 3.0 以前使用asp.net core 会出现线程池内线程不足的状况,出现并发冲突现象,web
并发冲突主要状况以下:数据库
一、用户导航到实体编辑页面;服务器
二、第一个用户的更改还未写入数据库以前,另外一个用户更新实体多线程
此时若是未启用并发检测,当发生更新时最后一个更新优先,即最后一个更新的值保存到数据库,而第一个保存的数据将丢失;并发
那么如何处理并发冲突呢?app
1】能够跟踪用户已经修改的属性,并只更新数据库中相应的列,这样,当两个用户更新了不一样的属性,下次查看时都将生效。asp.net
可是也存在如下问题:异步
【1】当对同一个属性进行竞争性更改的话,没法避免数据的流失async
【2】一般不适用于web应用。它须要维持重要的状态,以便跟踪全部提取值和新值。维持大量状态可能影响应用性能;性能
【3】可能会增长应用复杂性(与实体上的开发检测相比)
2】客户端优先
即客户端的值优先于数据库存储的值。而且若是不对并发处理进行任何编码,将自动进行客户端优先;
3】 存储优先
这种方式能够阻止在数据可对数据的更改,而且能够
【1】显示错误消息
【2】显示数据的当前状态
【3】容许用户从新更新应用的更改
如何处理并发呢?
1】检测属性的并发冲突,能够使用ConcurrencyCheck特性在属性级别检测并发冲突。该特性可应用于模型上的多个属性
2】检测行的并发冲突,检测行的并发冲突,将rowversion耿总列添加到模型
3】当属性为并发令牌时:
1)EFCore 验证提取属性后是否未更改属性,调用SaveChanges或者SaveChangesAsync时会执行并发检测
2)若是提取属性后更改了属性,将引起DbUpdateConcurrencyException
数据库和数据模型必须配置为支持引起DbUpdateConcurrencyException.
4]在Asp.net core 3.0以后
微软在asp.net core 3.0 增长了Microsoft.AspNetCore.ConcurrencyLimiter,用于传入的请求进行排队处理
避免线程池的不足;
Queue策略
一、添加Nuget Install-Package Microsoft.AspNetCore.ConcurrencyLimiter
public void ConfigureServices(IServiceCollection services) { services.AddQueuePolicy(options => { //最大并发请求数 options.MaxConcurrentRequests = 2; //请求队列长度限制 options.RequestQueueLimit = 1; }); services.AddControllers(); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { //添加并发限制中间件 app.UseConcurrencyLimiter(); app.Run(async context => { Task.Delay(100).Wait(); // 100ms sync-over-async await context.Response.WriteAsync("Hello World!"); }); if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseHttpsRedirection(); app.UseRouting(); app.UseAuthorization(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); }
经过上面简单的配置,咱们就能够将他引入到咱们的代码中,从而作并发量限制,以及队列的长度;那么问题来了,他是怎么实现的呢?
public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure) { services.Configure(configure); services.AddSingleton<IQueuePolicy, QueuePolicy>(); return services; }
QueuePolicy 采用的是SemaphoreSlim 信号量设计,SemapHoreSlim、SemapHore(信号量)支持并发多线程进入被保护代码,对象在初始化时会指定最大任务数量,当线程请求访问资源,信号量递减而当他们释放时信号量数量递增。
public QueuePolicy(IOptions<QueuePolicyOptions> options) { _maxConcurrentRequests = options.Value.MaxConcurrentRequests; if (_maxConcurrentRequests <= 0) { throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer."); } _requestQueueLimit = options.Value.RequestQueueLimit; if (_requestQueueLimit < 0) { throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number."); } //使用SemaphoreSlim来限制任务最大个数 _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests); }
ConcurrencyLimiterMiddleware中间件
public async Task Invoke(HttpContext context) { var waitInQueueTask = _queuePolicy.TryEnterAsync(); // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets. bool result; if (waitInQueueTask.IsCompleted) { ConcurrencyLimiterEventSource.Log.QueueSkipped(); result = waitInQueueTask.Result; } else { using (ConcurrencyLimiterEventSource.Log.QueueTimer()) { result = await waitInQueueTask; } } if (result) { try { await _next(context); } finally { _queuePolicy.OnExit(); } } else { ConcurrencyLimiterEventSource.Log.RequestRejected(); ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger); context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await _onRejected(context); } }
每次当咱们请求的时候首先会调用_queuePolicy.TryEnterAsync()
,进入该方法后先开启一个私有lock锁,再接着判断总请求量是否≥(请求队列限制的大小+最大并发请求数),若是当前数量超出了,那么我直接抛出,送你个503状态;
if (result) { try { await _next(context); } finally { _queuePolicy.OnExit(); } } else { ConcurrencyLimiterEventSource.Log.RequestRejected(); ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger); context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await _onRejected(context); }
问题来了,我这边若是说还没到你设置的大小呢,我这个请求没有给你服务器造不成压力,那么你给我处理一下吧.
await _serverSemaphore.WaitAsync();
异步等待进入信号量,若是没有线程被授予对信号量的访问权限,则进入执行保护代码;不然此线程将在此处等待,直到信号量被释放为止
lock (_totalRequestsLock) { if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests) { return false; } TotalRequests++; } //异步等待进入信号量,若是没有线程被授予对信号量的访问权限,则进入执行保护代码;不然此线程将在此处等待,直到信号量被释放为止 await _serverSemaphore.WaitAsync(); return true; }
返回成功后那么中间件这边再进行处理,_queuePolicy.OnExit();
经过该调用进行调用_serverSemaphore.Release();
释放信号灯,再对总请求数递减