最近遇到了这样的场景:每隔一段时间,须要在后台使用队列对一批数据进行业务处理。数组
Quartz.NET是一种选择,在 .NET Core中,可使用IHostedService
执行后台定时任务。在本篇中,首先尝试把队列还原到最简单、原始的状态,而后给出以上场景问题的具体解决方案。安全
假设一个队列有8个元素。如今abcd依次进入队列。网络
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
---|---|---|---|---|---|---|---|
a | b | c | d | ||||
head | tail |
ab依次出队列。并发
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
---|---|---|---|---|---|---|---|
c | d | ||||||
head | tail |
能够想象,随着不断地入列出列,head和tail的位置不断日后,当tail在7号位的时候,虽然队列里还有空间,但此时数据就没法入队列了。app
如何才能够继续入队列呢?ide
首先想到的是数据搬移。当数据没法进入队列,首先让队列项出列,进入到另一个新队列,这个新队列就能够再次接收数据入队列了。可是,搬移整个队列中的数据的时间复杂度为O(n),而原先出队列的时间复杂度是O(1),这种方式不够理想。ui
还有一种思路是使用循环队列。当tail指向最后一个位置,此时有新的数据进入队列,tail就来到头部指向0号位置,这样这个队列就能够循环使用了。线程
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
---|---|---|---|---|---|---|---|
h | i | j | |||||
head | tail |
如今a入栈。code
0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 |
---|---|---|---|---|---|---|---|
h | i | j | a | ||||
tail | head |
队列有不少种实现。接口
好比在生产消费模型中能够用阻塞队列。当生产队列为空的时候,为了避免让消费者取数据,生产队列的Dequeue行为会被阻塞;而当生产队列满的时候,为了避免让更多的数据进来,生产队列的Enqueue行为被阻塞。
线程安全的队列叫并发队列,如C#中的ConcurrentQueue
。
线程池内部也使用了队列机制。由于CPU的资源是有限的,过多的线程会致使CPU频繁地在线程之间切换。线程池内经过维护必定数量的线程来减轻CPU的负担。当线程池没有多余的线程可供使用,请求过来,一种方式是拒绝请求,另一种方式是让请求队列阻塞,等到线程池内有线程可供使用,请求队列出列执行。用链表实现的队列是无界队列(unbounded queue),这种作法可能会致使过多的请求在排队,等待响应时间过长。用数组实现的队列是有界队列(bounded queue),当线程池已满,请求过来就会被拒绝。对有界队列来讲数组的大小设置很讲究。
来模拟一个数组队列。
public class ArrayQueue { private string[] items; private int n = 0; //数组长度 private int head = 0; private int tail = 0; public ArrayQueue(int capacity) { n = capacity; items = new string[capacity]; } public bool Enqueue(string item) { if(tail==n){ return false; } items[tail] = item; ++tail; return true; } public string Dequeue() { if(head==null){ return null; } string ret = items[head]; ++head; return ret; } }
以上就是一个最简单的、用数组实现的队列。
再次回到要解决的场景问题。解决思路大体是:实现IHostedService
接口,在其中执行定时任务,每次把队列项放到队列中,并定义出队列的方法,在其中执行业务逻辑。
关于队列,经过如下的步骤使其在后台运行。
Dictionary<string, MessageQueueItem>
静态字典集合MessageQueueUtility
类:决定着如何运行,好比队列执行的间隔时间、垃圾回收MessageQueueThreadUtility
类:维护队列线程,提供队列在后台运行的方法Startup.cs
中的Configure
中调用MessageQueueThreadUtility
中的方法使队列在后台运行队列项(MessageQueueItem)
public class MessageQueueItem { public MessageQueueItem(string key, Action action, string description=null) { Key = key; Action = action; Description = description; AddTime = DateTime.Now; } public string Key{get;set;} public Actioin Action{get;set;} public DateTime AddTime{get;set;} public string Description{get;set;} }
队列(MessageQueue),维护着针对队列项的一个静态字典集合。
public class MessageQueue { public static Dictionary<string, MessageQueueItem> MessageQueueDictionary = new Dictionary<string, MessageQueueItem>(StringComparer.OrdinalIgnoreCase); public static object MessageQueueSyncLock = new object(); public static object OperateLock = new object(); public static void OperateQueue() { lock(OperateLock) { var mq = new MessageQueue(); var key = mq.GetCurrentKey(); while(!string.IsNullOrEmpty(key)) { var mqItem = mq.GetItem(key); mqItem.Action(); mq.Remove(key); key = mq.GetCurrentKey(); } } } public string GetCurrentKey() { lock(MessageQueueSyncLock) { return MessageQueueDictionary.Keys.FirstOrDefault(); } } public MessageQueueItem GetItem(string key) { lock(MessageQueueSyncLock) { if(MessageQueueDictionary.ContainsKey(key)) { return MessageQueueDictionary[key]; } return null; } } public void Remove(string key) { lock(MessageQueueSyncLock) { if(MessageQueueDictionary.ContainsKey(key)) { MessageQueueDictionary.Remove(key); } } } public MessageQueueItem Add(string key, Action actioin) { lock(MessageQueueSyncLock) { var mqItem = new MessageQueueItem(key, action); MessageQueueDictionary[key] = mqItem; return mqItem; } } public int GetCount() { lock(MessageQueueSyncLock) { return MessageQueueDictionary.Count; } } }
MessageQueueUtility
类, 决定着队列运行的节奏。
public class MessageQueueUtility { private readonly int _sleepMilliSeconds; public MessageQueueUtility(int sleepMilliSeconds=1000) { _sleepMilliSeconds = sleepMilliSeoncds; } ~MessageQueueUtility() { MessageQueue.OperateQueue(); } public void Run { do { MessageQueue.OperateQueue(); Thread.Sleep(_sleepMilliSeconds); } while(true) } }
MessageQueueThreadUtility
类,管理队列的线程,并让其在后台运行。
public static class MessageQueueThreadUtility { public static Dictionary<string, Thread> AsyncThreadCollection = new Dictioanry<string, Thread>(); public static void Register(string threadUniqueName) { { MessageQueueUtility messageQueueUtility = new MessageQueueUtility(); Thread messageQueueThread = new Thread(messageQueueUtility.Run){ Name = threadUniqueName }; AsyncThreadCollection.Add(messageQueueThread.Name, messageQueueThread); } AsyncThreadCollection.Values.ToList().ForEach(z => { z.IsBackground = true; z.Start(); }); } }
Startup.cs
中注册。
public class Startup { public IServiceProvider ConfigureServices(IServiceCollection services) { ... } public void Configure(IApplicationBuilder app, IHostingEnvironment env...) { RegisterMessageQueueThreads(); } private void RegisterMessageQueueThreads() { MessageQueueThreadUtility.Register(""); } }
最后在IHostedService
的实现类中把队列项丢给队列。
public class MyBackgroundSerivce : IHostedService, IDisposable { private Timer _timer; public IServiceProvider Services{get;} public MyBackgroundService(IServiceProvider services) { Serivces = services; } public void Dispose() { _timer?.Dispose(); } public Task StartAsync(CancellationToken cancellationToken) { _timer = new Timer(DoWork, null, TimeSpan.Zero, TimeSpan.FromSeconds(10)); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { _timer?.Change(Timeout.Infinite,0); return Task.CompletedTask; } private void DoWork(object state) { using(var scope = Services.CreateScope()) { using(var db = scope.ServiceProvider.GetRequiredService<MyDbContext>()) { ... var mq = new MessageQueue(); mq.Add("somekey", DealQueueItem); } } } private void DealQueueItem() { var mq = new MessageQueue(); var key = mq.GetCurrentKey(); var item = mq.GetItem(key); if(item!=null) { using(var scope = Services.CreateScope()) { using(var db = scope.ServiceProvider.GetRequiredService<MyDbContext>()) { //执行业务逻辑 } } } } }
当须要使用上下文的时候,首先经过IServiceProvider
的CreateScope
方法获得ISerivceScope
,再经过它的ServiceProvider
属性获取依赖倒置容器中的上下文服务。
以上,用IHostedService
结合队列解决了开篇提到的场景问题,若是您有很好的想法,咱们一块儿交流吧。文中的队列部分来自"盛派网络"的Senparc.Weixin SDK源码。