文章信息:html
基于的 ABP vNext 版本:1.0.0数据库
创做日期:2019 年 10 月 24 日晚缓存
更新日期:暂无安全
ABP vNext 提供了后台工做者和后台做业的支持,基本实现与原来的 ABP 框架相似,而且 ABP vNext 还提供了对 HangFire 和 RabbitMQ 的后台做业集成。开发人员在使用这些第三方库的时候,基本就是开箱即用,不须要作其余复杂的配置。多线程
后台做业在系统开发的过程中,是比较经常使用的功能。由于老是有一些长耗时的任务,而这些任务咱们不是当即响应的,例如 Excel 文档导入、批量发送短信通知等。app
后台工做者 的话,ABP vNext 的实现就是在 CLR 的 Timer
之上封装了一层,周期性地执行用户逻辑。ABP vNext 默认提供的 后台任务管理器,就是在后台工做者基础之上进行的封装。框架
涉及到后台任务、后台工做者的模块一共有 6 个,它们分别是:async
AbpTimer
就是在里面实现的。Volo.Abp.BackgroundWorkers :后台工做者的定义和实现。分布式
Volo.Abp.BackgroundJobs.RabbitMQ : 基于 RabbitMQ 实现的后台任务管理器。ide
CLR 为咱们提供了多种计时器,咱们通常使用的是 System.Threading.Timer
,它是基于 CLR 线程池的一个周期计时器,会根据咱们配置的 Period
(周期) 定时执行。在 CLR 线程池中,全部的 Timer
只有 1 个线程为其服务。这个线程直到下一个计时器的触发时间,当下一个 Timer
对象到期时,这个线程就会将 Timer
的回调方法经过 ThreadPool.QueueUserWorkItem()
扔到线程池去执行。
不过这带来了一个问题,即你的回调方法执行时间超过了计时器的周期,那么就会形成上一个任务还没执行完成又开始执行新的任务。
解决这个方法其实很简单,即启动以后,将周期设置为 Timeout.Infinite
,这样只会执行一次。当回调方法执行完成以后,就设置 dueTime
参数说明下次执行要等待多久,而且周期仍是 Timeout.Infinite
。
ABP vNext 已经为咱们提供了健壮的计时器,该类型的定义是 AbpTimer
,在内部用到了 volatile
关键字和 Monitor
实现 条件变量模式 解决多线程环境下的问题。
public class AbpTimer : ITransientDependency { // 回调事件。 public event EventHandler Elapsed; // 执行周期。 public int Period { get; set; } // 定时器启动以后就开始运行,默认为 Fasle。 public bool RunOnStart { get; set; } // 日志记录器。 public ILogger<AbpTimer> Logger { get; set; } private readonly Timer _taskTimer; // 定时器是否在执行任务,默认为 false。 private volatile bool _performingTasks; // 定时器的运行状态,默认为 false。 private volatile bool _isRunning; public AbpTimer() { Logger = NullLogger<AbpTimer>.Instance; // 回调函数是 TimerCallBack,执行周期为永不执行。 _taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite); } public void Start(CancellationToken cancellationToken = default) { // 若是传递的周期小于等于 0 ,则抛出异常。 if (Period <= 0) { throw new AbpException("Period should be set before starting the timer!"); } // 使用互斥锁,保证线程安全。 lock (_taskTimer) { // 若是启动以后就须要立刻执行,则设置为 0,立刻执行任务,不然会等待 Period 毫秒以后再执行(1 个周期)。 _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite); // 定时器成功运行了。 _isRunning = true; } // 释放 _taskTimer 的互斥锁。 } public void Stop(CancellationToken cancellationToken = default) { // 使用互斥锁。 lock (_taskTimer) { // 将内部定时器设置为永不执行的状态。 _taskTimer.Change(Timeout.Infinite, Timeout.Infinite); // 检测当前是否还有正在执行的任务,若是有则等待任务执行完成。 while (_performingTasks) { // 临时释放锁,阻塞当前线程。可是其余线程能够获取 _timer 的互斥锁。 Monitor.Wait(_taskTimer); } // 须要表示中止状态,因此标记状态为 false。 _isRunning = false; } } private void TimerCallBack(object state) { lock (_taskTimer) { // 若是有任务正在运行,或者内部定时器已经中止了,则不作任何事情。 if (!_isRunning || _performingTasks) { return; } // 临时中止内部定时器。 _taskTimer.Change(Timeout.Infinite, Timeout.Infinite); // 代表立刻须要执行任务了。 _performingTasks = true; } try { // 调用绑定的事件。 Elapsed.InvokeSafely(this, new EventArgs()); } catch { // 注意,这里将会吞噬异常。 } finally { lock (_taskTimer) { // 任务执行完成,更改状态。 _performingTasks = false; // 若是定时器还在运行,没有被中止,则启动下一个 Period 周期。 if (_isRunning) { _taskTimer.Change(Period, Timeout.Infinite); } // 解除由于释放锁而阻塞的线程。 // 若是已经调用了 Stop,则会唤醒那个由于 Wait 阻塞的线程,就会使 _isRunning 置为 false。 Monitor.Pulse(_taskTimer); } } } }
这里对 _performingTasks
和 _isRunning
字段设置为 volatile
防止指令重排和寄存器缓存。这是由于在 Stop
方法内部使用到的 _performingTasks
可能会被优化,因此将该字段设置为了易失的。
IRunnable
接口ABP vNext 为任务的启动和中止,抽象了一个 IRunnable
接口。虽然描述说的是对线程的行为进行抽象,但千万千万不要手动调用 Thread.Abort()
。关于 Thread.Abort()
的坏处,这里再也不多加赘述,能够参考 这篇文章 的描述,或者搜索其余的相关文章。
public interface IRunnable { // 启动这个服务。 Task StartAsync(CancellationToken cancellationToken = default); /// <summary> /// 中止这个服务。 /// </summary> Task StopAsync(CancellationToken cancellationToken = default); }
后台工做者的模块行为比较简单,它定义了在应用程序初始化和销毁时的行为。在初始化时,后台工做者管理器 得到全部 后台工做者,并开始启动它们。在销毁时,后台工做者管理器得到全部后台工做者,并开始中止他们,这样才可以作到优雅退出。
[DependsOn( typeof(AbpThreadingModule) )] public class AbpBackgroundWorkersModule : AbpModule { public override void OnApplicationInitialization(ApplicationInitializationContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value; // 若是启用了后台工做者,那么得到后台工做者管理器的实例,并调用 StartAsync 启动全部后台工做者。 if (options.IsEnabled) { AsyncHelper.RunSync( () => context.ServiceProvider .GetRequiredService<IBackgroundWorkerManager>() .StartAsync() ); } } public override void OnApplicationShutdown(ApplicationShutdownContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value; // 若是启用了后台工做者,那么得到后台工做者管理器的实例,并调用 StopAsync 中止全部后台工做者。 if (options.IsEnabled) { AsyncHelper.RunSync( () => context.ServiceProvider .GetRequiredService<IBackgroundWorkerManager>() .StopAsync() ); } } }
首先看看 IBackgroundWorker
接口的定义,是空的。不过继承了 ISingletonDependency
接口,说明咱们的每一个后台工做者都是 单例 的。
/// <summary> /// 在后台运行,执行某些任务的工做程序(线程)的接口定义。 /// </summary> public interface IBackgroundWorker : IRunnable, ISingletonDependency { }
ABP vNext 为咱们定义了一个抽象的后台工做者类型 BackgroundWorkerBase
,这个基类的设计目的是提供一些经常使用组件(和 ApplicationService
同样)。
public abstract class BackgroundWorkerBase : IBackgroundWorker { //TODO: Add UOW, Localization and other useful properties..? //TODO: 是否应该提供工做单元、本地化以及其余经常使用的属性? public ILogger<BackgroundWorkerBase> Logger { protected get; set; } protected BackgroundWorkerBase() { Logger = NullLogger<BackgroundWorkerBase>.Instance; } public virtual Task StartAsync(CancellationToken cancellationToken = default) { Logger.LogDebug("Started background worker: " + ToString()); return Task.CompletedTask; } public virtual Task StopAsync(CancellationToken cancellationToken = default) { Logger.LogDebug("Stopped background worker: " + ToString()); return Task.CompletedTask; } public override string ToString() { return GetType().FullName; } }
ABP vNext 内部只有一个默认的后台工做者实现 PeriodicBackgroundWorkerBase
。从名字上来看,意思是就是周期执行的后台工做者,内部就是用的 AbpTimer
来实现,ABP vNext 将其包装起来是为了实现统一的模式(后台工做者)。
public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase { protected readonly AbpTimer Timer; // 也就意味着子类必须在其构造函数,指定 timer 的执行周期。 protected PeriodicBackgroundWorkerBase(AbpTimer timer) { Timer = timer; Timer.Elapsed += Timer_Elapsed; } // 启动后台工做者。 public override async Task StartAsync(CancellationToken cancellationToken = default) { await base.StartAsync(cancellationToken); Timer.Start(cancellationToken); } // 中止后台工做者。 public override async Task StopAsync(CancellationToken cancellationToken = default) { Timer.Stop(cancellationToken); await base.StopAsync(cancellationToken); } // Timer 关联的周期事件,之因此不直接挂载 DoWork,是为了捕获异常。 private void Timer_Elapsed(object sender, System.EventArgs e) { try { DoWork(); } catch (Exception ex) { Logger.LogException(ex); } } // 你要周期执行的任务。 protected abstract void DoWork(); }
咱们若是要实现本身的后台工做者,只须要继承该类,实现 DoWork()
方法便可。
public class TestWorker : PeriodicBackgroundWorkerBase { public TestWorker(AbpTimer timer) : base(timer) { // 每五分钟执行一次。 timer.Period = (int)TimeSpan.FromMinutes(5).TotalMilliseconds; } protected override void DoWork() { Console.WriteLine("后台工做者被执行了。"); } }
而后在咱们本身模块的 OnPreApplicationInitialization()
方法内解析出后台做业管理器(IBackgroundWorkerManager
),调用它的 Add()
方法,将咱们定义的 TestWorker
添加到管理器当中便可。
全部的后台工做者都是经过 IBackgroundWorkerManager
进行管理的,它提供了 StartAsync()
、StopAsync()
、Add()
方法。前面两个方法就是 IRunnable
接口定义的,后台工做者管理器直接集成了该接口,后面的 Add()
方法就是用来动态添加咱们的后台工做者。
后台工做者管理器的默认实现是 BackgroundWorkerManager
类型,它内部作的事情很简单,就是维护一个后台工做者集合。每当调用 StartAsync()
或 StopAsync()
方法的时候,都从这个集合遍历后台工做者,执行他们的启动和中止方法。
这里值得注意的一点是,当咱们调用 Add()
方法添加了一个后台工做者以后,后台工做者管理器就会启动这个后台工做者。
public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable { protected bool IsRunning { get; private set; } private bool _isDisposed; private readonly List<IBackgroundWorker> _backgroundWorkers; public BackgroundWorkerManager() { _backgroundWorkers = new List<IBackgroundWorker>(); } public virtual void Add(IBackgroundWorker worker) { _backgroundWorkers.Add(worker); // 若是当先后台工做者管理器还处于运行状态,则调用工做者的 StartAsync() 方法启动。 if (IsRunning) { AsyncHelper.RunSync( () => worker.StartAsync() ); } } public virtual void Dispose() { if (_isDisposed) { return; } _isDisposed = true; //TODO: ??? } // 启动,则遍历集合启动。 public virtual async Task StartAsync(CancellationToken cancellationToken = default) { IsRunning = true; foreach (var worker in _backgroundWorkers) { await worker.StartAsync(cancellationToken); } } // 中止, 则遍历集合中止。 public virtual async Task StopAsync(CancellationToken cancellationToken = default) { IsRunning = false; foreach (var worker in _backgroundWorkers) { await worker.StopAsync(cancellationToken); } } }
上述代码其实存在一个问题,即后台工做者被释放之后,是否还能执行 Add()
操做。参考我 以前的文章 ,其实当对象被释放以后,就应该抛出 ObjectDisposedException
异常。
比起后台工做者,咱们执行一次性任务的时候,通常会使用后台做业进行处理。比起只能设置固定周期的 PeriodicBackgroundWorkerBase
,集成了 Hangfire 的后台做业管理器,可以让咱们使用 Cron 表达式,更加灵活地设置任务的执行周期。
关于后台做业的模块,咱们须要说道两处。第一处是位于 Volo.Abp.BackgroundJobs.Abstractions 项目的 AbpBackgroundJobsAbstractionsModule
,第二出则是位于 Volo.Abp.BackgroundJobs 项目的 AbpBackgroundJobsModule
。
AbpBackgroundJobsAbstractionsModule
的主要行为是将符合条件的后台做业,添加到 AbpBackgroundJobOptions
配置当中,以便后续进行使用。
public override void PreConfigureServices(ServiceConfigurationContext context) { RegisterJobs(context.Services); } private static void RegisterJobs(IServiceCollection services) { var jobTypes = new List<Type>(); // 若是注册的类型符合 IBackgroundJob<> 泛型,则添加到集合当中。 services.OnRegistred(context => { if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>))) { jobTypes.Add(context.ImplementationType); } }); services.Configure<AbpBackgroundJobOptions>(options => { // 将数据赋值给配置类。 foreach (var jobType in jobTypes) { options.AddJob(jobType); } }); }
Volo.Abp.BackgroundJobs 内部是 ABP vNext 为咱们提供的 默认后台做业管理器,这个后台做业管理器 本质上是一个后台工做者。
这个后台工做者会周期性(取决于 AbpBackgroundJobWorkerOptions.JobPollPeriod
值,默认为 5 秒种)地从 IBackgroundJobStore
捞出一堆后台任务,而且在后台执行。至于每次执行多少个后台任务,这也取决于 AbpBackgroundJobWorkerOptions.MaxJobFetchCount
的值,默认值是 1000 个。
注意:
这里的 Options 类是
AbpBackgroundJobWorkerOptions
,别和AbpBackgroundWorkerOptions
混淆了。
因此在 AbpBackgroundJobsModule
模块里面,只作了一件事情,就是将负责后台做业的后台工做者,添加到后台工做者管理器种,并开始周期性地执行。
public override void OnApplicationInitialization(ApplicationInitializationContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value; if (options.IsJobExecutionEnabled) { // 得到后台工做者管理器,并将负责后台做业的工做者添加进去。 context.ServiceProvider .GetRequiredService<IBackgroundWorkerManager>() .Add(context.ServiceProvider.GetRequiredService<IBackgroundJobWorker>() ); } }
在上一节里面看到,只要是实现 IBackgroundJob<TArgs>
类型的都视为一个后台做业。这个后台做业接口,只定义了一个行为,那就是执行(Execute(TArgs)
)。这里的 TArgs
泛型做为执行后台做业时,须要传递的参数类型。
// 由于是传入的参数,因此泛型参数是逆变的。 public interface IBackgroundJob<in TArgs> { void Execute(TArgs args); }
检查源码,发现 ABP vNext 的邮箱模块定义了一个邮件发送任务 BackgroundEmailSendingJob
,它的实现大概以下。
public class BackgroundEmailSendingJob : BackgroundJob<BackgroundEmailSendingJobArgs>, ITransientDependency { // ... public override void Execute(BackgroundEmailSendingJobArgs args) { AsyncHelper.RunSync(() => EmailSender.SendAsync(args.To, args.Subject, args.Body, args.IsBodyHtml)); } }
后台做业都是经过一个后台做业管理器(IBackgroundJobManager
)进行管理的,这个接口定义了一个入队方法(EnqueueAsync()
),注意,咱们的后台做业在入队后,不是立刻执行的。
说一下这个入队处理逻辑:
BackgroundJobNameAttribute
特性,那么任务的名称就是参数类型的 FullName
。)BackgroundJobInfo
对象。IBackgroundJobStore
持久化任务信息。public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { // 获取任务名称。 var jobName = BackgroundJobNameAttribute.GetName<TArgs>(); var jobId = await EnqueueAsync(jobName, args, priority, delay); return jobId.ToString(); } protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { var jobInfo = new BackgroundJobInfo { Id = GuidGenerator.Create(), JobName = jobName, // 经过序列化器,序列化参数值,方便存储。这里内部其实使用的是 JSON.NET 进行序列化。 JobArgs = Serializer.Serialize(args), Priority = priority, CreationTime = Clock.Now, NextTryTime = Clock.Now }; // 若是任务有执行延迟,则任务的初始执行时间要加上这个延迟。 if (delay.HasValue) { jobInfo.NextTryTime = Clock.Now.Add(delay.Value); } // 持久化任务信息,方便后面执行后台做业的工做者可以取到。 await Store.InsertAsync(jobInfo); return jobInfo.Id; }
BackgroundJobNameAttribute
相关的方法:
public static string GetName<TJobArgs>() { return GetName(typeof(TJobArgs)); } public static string GetName([NotNull] Type jobArgsType) { Check.NotNull(jobArgsType, nameof(jobArgsType)); // 判断参数类型上面是否标注了特性,而且特性实现了 IBackgroundJobNameProvider 接口。 return jobArgsType .GetCustomAttributes(true) .OfType<IBackgroundJobNameProvider>() .FirstOrDefault() ?.Name // 拿不到名字,则使用类型的 FullName。 ?? jobArgsType.FullName; }
后台做业的存储默认是放在内存的,这点能够从 InMemoryBackgroundJobStore
类型实现看出来。在它的内部使用了一个并行字典,经过做业的 Guid 与做业进行关联绑定。
除了内存实现,在 Volo.Abp.BackgroundJobs.Domain 模块还有一个 BackgroundJobStore
实现,基本套路与 SettingStore
同样,都是存储到数据库里面。
public class BackgroundJobStore : IBackgroundJobStore, ITransientDependency { protected IBackgroundJobRepository BackgroundJobRepository { get; } // ... public BackgroundJobInfo Find(Guid jobId) { return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>( BackgroundJobRepository.Find(jobId) ); } // ... public void Insert(BackgroundJobInfo jobInfo) { BackgroundJobRepository.Insert( ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo) ); } // ... }
默认的后台做业管理器是经过一个后台工做者来执行后台任务的,这个实现叫作 BackgroundJobWorker
,这个后台工做者的生命周期也是单例的。后台做业的具体执行逻辑里面,涉及到了如下几个类型的交互。
类型 | 做用 |
---|---|
AbpBackgroundJobOptions |
提供每一个后台任务的配置信息,包括任务的类型、参数类型、任务名称数据。 |
AbpBackgroundJobWorkerOptions |
提供后台做业工做者的配置信息,例如每一个周期 最大执行的做业数量、后台 工做者的 执行周期、做业执行 超时时间 等。 |
BackgroundJobConfiguration |
后台任务的配置信息,做用是将持久化存储的做业信息与运行时类型进行绑定 和实例化,以便 ABP vNext 来执行具体的任务。 |
IBackgroundJobExecuter |
后台做业的执行器,当咱们从持久化存储获取到后台做业信息时,将会经过 这个执行器来执行具体的后台做业。 |
IBackgroundJobSerializer |
后台做业序列化器,用于后台做业持久化时进行序列化的工具,默认采用的 是 JSON.NET 进行实现。 |
JobExecutionContext |
执行器在执行后台做业时,是经过这个上下文参数进行执行的,在这个上下 文内部,包含了后台做业的具体类型、后台做业的参数值。 |
IBackgroundJobStore |
前面已经讲过了,这个是用于后台做业的持久化存储,默认实现是存储在内存。 |
BackgroundJobPriority |
后台做业的执行优先级定义,ABP vNext 在执行后台任务时,会根据任务的优 先级进行排序,以便在后面执行的时候优先级高的任务先执行。 |
咱们来按照逻辑顺序走一遍它的实现,首前后台做业的执行工做者会从持久化存储内,获取 MaxJobFetchCount
个任务用于执行。从持久化存储获取后台做业信息(BackgroundJobInfo
),是由 IBackgroundJobStore
提供的。
var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>(); var waitingJobs = store.GetWaitingJobs(WorkerOptions.MaxJobFetchCount); // 不存在任何后台做业,则直接结束本次调用。 if (!waitingJobs.Any()) { return; }
InMemoryBackgroundJobStore
的相关实现:
public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount) { return _jobs.Values .Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now) .OrderByDescending(t => t.Priority) .ThenBy(t => t.TryCount) .ThenBy(t => t.NextTryTime) .Take(maxResultCount) .ToList(); }
上面的代码能够看出来,首先排除 被放弃的任务 ,包含达到执行时间的任务,而后根据任务的优先级从高到低进行排序。重试次数少的优先执行,预计执行时间越早的越先执行。最后从这些数据中,筛选出 maxResultCount
结果并返回。
说到这里,咱们来看一下这个 NextTryTime
是如何被计算出来的?回想起最开始的后台做业管理器,咱们在添加一个后台任务的时候,就会设置这个后台任务的 预计执行时间。第一个任务被添加到执行队列中时,它的值通常是 Clock.Now
,也就是它被添加到队列的时间。
不过 ABP vNext 为了让那些常常执行失败的任务,有比较低的优先级再执行,就在每次任务执行失败以后,会将 NextTryTime
的值指数级进行增长。这块代码能够在 CalculateNextTryTime
里面看到,也就是说某个任务的执行 失败次数越高,那么它下一次的预期执行时间就会越远。
protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock) { // 通常来讲,这个 DefaultWaitFactor 因子的值是 2.0 。 var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1)); // 同执行失败的次数进行挂钩。 var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ?? clock.Now.AddSeconds(nextWaitDuration); if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout) { return null; } return nextTryDate; }
当预期的执行时间都超过 DefaultTimeout
的超时时间时(默认为 2 天),说明这个任务确实没救了,就不要再执行了。
咱们以前说到,从 IBackgroundJobStore
拿到了须要执行的后台任务信息集合,接下来咱们就要开始执行后台任务了。
foreach (var jobInfo in waitingJobs) { jobInfo.TryCount++; jobInfo.LastTryTime = clock.Now; try { // 根据任务名称获取任务的配置参数。 var jobConfiguration = JobOptions.GetJob(jobInfo.JobName); // 根据配置里面存储的任务类型,将参数值进行反序列化。 var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType); // 构造一个新的执行上下文,让执行器执行任务。 var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs); try { jobExecuter.Execute(context); // 若是任务执行成功则删除该任务。 store.Delete(jobInfo.Id); } catch (BackgroundJobExecutionException) { // 发生任务执行失败异常时,根据指定的公式计算下一次的执行时间。 var nextTryTime = CalculateNextTryTime(jobInfo, clock); if (nextTryTime.HasValue) { jobInfo.NextTryTime = nextTryTime.Value; } else { // 超过超时时间的时候,公式计算函数返回 null,该任务置为废弃任务。 jobInfo.IsAbandoned = true; } TryUpdate(store, jobInfo); } } catch (Exception ex) { // 执行过程当中,产生了未知异常,设置为废弃任务,并打印日志。 Logger.LogException(ex); jobInfo.IsAbandoned = true; TryUpdate(store, jobInfo); } }
执行后台任务的时候基本分为 5 步,它们分别是:
至于执行器里面的真正执行操做,你都拿到了参数值和任务类型了。就能够经过类型用 IoC 获取后台任务对象的实例,而后经过反射匹配方法签名,在实例上调用这个方法传入参数便可。
public virtual void Execute(JobExecutionContext context) { // 构造具体的后台做业实例对象。 var job = context.ServiceProvider.GetService(context.JobType); if (job == null) { throw new AbpException("The job type is not registered to DI: " + context.JobType); } // 得到须要执行的方法签名。 var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute)); if (jobExecuteMethod == null) { throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType); } try { // 直接经过 MethodInfo 的 Invoke 方法调用,传入具体的实例对象和参数值便可。 jobExecuteMethod.Invoke(job, new[] { context.JobArgs }); } catch (Exception ex) { Logger.LogException(ex); // 若是是执行方法内的异常,则包装进行处理,而后抛出。 throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex) { JobType = context.JobType.AssemblyQualifiedName, JobArgs = context.JobArgs }; } }
ABP vNext 对于 Hangfire 的集成代码分布在 Volo.Abp.HangFire 和 Volo.Abp.BackgroundJobs.HangFire 模块内部,前者是在模块配置里面,调用 Hangfire 库的相关方法,注入组件到 IoC 容器当中。后者则是对后台做业进行了适配处理,替换了默认的 IBackgroundJobManager
实现。
在 AbpHangfireModule
模块内部,经过工厂建立出来一个 BackgroudJobServer
实例,并将它的生命周期与应用程序的生命周期进行绑定,以便进行销毁处理。
public class AbpHangfireModule : AbpModule { private BackgroundJobServer _backgroundJobServer; public override void ConfigureServices(ServiceConfigurationContext context) { context.Services.AddHangfire(configuration => { context.Services.ExecutePreConfiguredActions(configuration); }); } public override void OnApplicationInitialization(ApplicationInitializationContext context) { var options = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value; _backgroundJobServer = options.BackgroundJobServerFactory.Invoke(context.ServiceProvider); } public override void OnApplicationShutdown(ApplicationShutdownContext context) { //TODO: ABP may provide two methods for application shutdown: OnPreApplicationShutdown & OnApplicationShutdown _backgroundJobServer.SendStop(); _backgroundJobServer.Dispose(); } }
咱们直奔主题,看一下基于 Hangfire 的后台做业管理器是怎么实现的。
public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency { public Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null) { // 若是没有延迟参数,则直接经过 Enqueue() 方法扔进执行对了。 if (!delay.HasValue) { return Task.FromResult( BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>( adapter => adapter.Execute(args) ) ); } else { return Task.FromResult( BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>( adapter => adapter.Execute(args), delay.Value ) ); } }
上述代码中使用 HangfireJobExecutionAdapter
进行了一个适配操做,由于 Hangfire 要将一个后台任务扔进队列执行,不是用 TArgs
就能解决的。
转到这个适配器定义,提供了一个 Execute(TArgs)
方法,当被添加到 Hangfire 队列执行的时候。实际 Hangfire 会调用适配器的 Excetue(TArgs)
方法,而后内部仍是使用的 IBackgroundJobExecuter
来执行具体定义的任务。
public class HangfireJobExecutionAdapter<TArgs> { protected AbpBackgroundJobOptions Options { get; } protected IServiceScopeFactory ServiceScopeFactory { get; } protected IBackgroundJobExecuter JobExecuter { get; } public HangfireJobExecutionAdapter( IOptions<AbpBackgroundJobOptions> options, IBackgroundJobExecuter jobExecuter, IServiceScopeFactory serviceScopeFactory) { JobExecuter = jobExecuter; ServiceScopeFactory = serviceScopeFactory; Options = options.Value; } public void Execute(TArgs args) { using (var scope = ServiceScopeFactory.CreateScope()) { var jobType = Options.GetJob(typeof(TArgs)).JobType; var context = new JobExecutionContext(scope.ServiceProvider, jobType, args); JobExecuter.Execute(context); } } }
基于 RabbitMQ 的后台做业实现,我想放在分布式事件总线里面,对其一块儿进行讲解。
ABP vNext 为咱们提供了多种后台做业管理器的实现,你能够根据本身的需求选用不一样的后台做业管理器,又或者是本身动手造轮子。
须要看其余的 ABP vNext 相关文章?点击我 便可跳转到总目录。