[Abp 源码分析]十6、后台做业与后台工做者

0. 简介

在某些时候咱们可能会须要执行后台任务,或者是执行一些周期性的任务。好比说可能每隔 1 个小时要清除某个临时文件夹内的数据,可能用户会要针对某一个用户群来群发一组短信。前面这些就是典型的应用场景,在 Abp 框架里面为咱们准备了后台做业和后台工做者来帮助咱们解决这个问题。html

后台做业与后台工做者的区别是,前者主要用于某些耗时较长的任务,而不想阻塞用户的时候所使用。后者主要用于周期性的执行某些任务,从 “工做者” 的名字能够看出来,就是一个个工人,并且他们每一个工人都拥有单独的后台线程。数据库

0.1 典型场景

后台做业网络

  • 某个用户按下了报表按钮来生成一个须要长时间等待的报表。你添加这个工做到队列中,当报表生成完毕后,发送报表结果到该用户的邮箱。
  • 在后台做业中发送一封邮件,有些问题可能会致使发送失败(网络链接异常,或者主机宕机);因为有后台做业以及持久化机制,在问题排除后,能够重试以保证任务的成功执行。

后台工做者框架

  • 后台工做者可以周期性地执行旧日志的删除。
  • 后台工做者能够周期性地筛选出非活跃性用户,而且发送回归邮件给这些用户。

1. 启动流程

后台做业与后台工做者都是经过各自的 Manager(IBackgroundJobManager/IBackgroundWorkerManager) 来进行管理的。而这两个 Manager 分别继承了 ISingletonDependency 接口,因此在启动的时候就会自动注入这两个管理器以便开发人员管理操做。async

这里值得注意的一点是,IBackgroundJobManager 接口是 IBackgroundWorker 的派生接口,而 IBackgroudWorker 是归属于 IBackgroundWorkerManager 进行管理的。ide

因此,你能够在 AbpKernelModule 里面看到以下代码:函数

public sealed class AbpKernelModule : AbpModule
{
    public override void PostInitialize()
    {
        // 注册可能缺乏的组件
        RegisterMissingComponents();
        
        // ... 忽略的代码
        // 各类管理器的初始化操做

        // 从配置项中读取,是否启用了后台做业功能
        if (Configuration.BackgroundJobs.IsJobExecutionEnabled)
        {
            var workerManager = IocManager.Resolve<IBackgroundWorkerManager>();
            // 开始启动后台工做者
            workerManager.Start();
            // 增长后台做业管理器
            workerManager.Add(IocManager.Resolve<IBackgroundJobManager>());
        }
    }
}

能够看到,后台做业管理器是做为一个后台工做者被添加到了 IBackgroundWorkerManager 当中来执行的。this

2. 代码分析

2.1 后台工做者

2.1.1 后台工做者管理器

Abp 经过后台工做者管理器来管理后台做业队列,因此咱们首先来看一下后台工做者管理器接口的定义是什么样子的。线程

public interface IBackgroundWorkerManager : IRunnable
{
    void Add(IBackgroundWorker worker);
}

仍是至关简洁的,就一个 Add 方法用来添加一个新的后台工做者对象。只是在这个地方,能够看到该接口又是集成自 IRunnable 接口,那么该接口的做用又是什么呢?日志

转到其定义能够看到,IRunable 接口定义了三个基本的方法:Start()Stop()WaitStop() ,并且他拥有一个默认实现 RunableBase,其实就是用来标识一个任务的运行状态。

public interface IRunnable
{
    // 开始执行任务
    void Start();

    // 中止执行任务
    void Stop();

    // 阻塞线程,等待任务执行完成后标识为中止。
    void WaitToStop();
}

public abstract class RunnableBase : IRunnable
{
    // 用于标识任务是否运行的布尔值变量
    public bool IsRunning { get { return _isRunning; } }

    private volatile bool _isRunning;

    // 启动以后表示任务正在运行
    public virtual void Start()
    {
        _isRunning = true;
    }

    // 中止以后表示任务结束运行
    public virtual void Stop()
    {
        _isRunning = false;
    }

    public virtual void WaitToStop()
    {

    }
}

到目前为止整个代码都仍是比较简单清晰的,咱们接着看 IBackgroundWorkerManager 的默认实现 BackgroundWorkerManager 类,首先咱们看一下该类拥有哪些属性与字段。

public class BackgroundWorkerManager : RunnableBase, IBackgroundWorkerManager, ISingletonDependency, IDisposable
{
    private readonly IIocResolver _iocResolver;
    private readonly List<IBackgroundWorker> _backgroundJobs;
    
    public BackgroundWorkerManager(IIocResolver iocResolver)
    {
        _iocResolver = iocResolver;
        _backgroundJobs = new List<IBackgroundWorker>();
    }
}

在后台工做者管理器类的内部,默认有一个 List 集合,用于维护全部的后台工做者对象。那么其余的 Start() 等方法确定是基于这个集合进行操做的。

public override void Start()
{
    base.Start();

    _backgroundJobs.ForEach(job => job.Start());
}

public override void Stop()
{
    _backgroundJobs.ForEach(job => job.Stop());

    base.Stop();
}

public override void WaitToStop()
{
    _backgroundJobs.ForEach(job => job.WaitToStop());

    base.WaitToStop();
}

能够看到实现仍是比较简单的,接下来咱们继续看他的 Add() 方法是如何进行操做的?

public void Add(IBackgroundWorker worker)
{
    _backgroundJobs.Add(worker);

    if (IsRunning)
    {
        worker.Start();
    }
}

在这里咱们看到他会针对 IsRunning 进行断定是否当即启动加入的后台工做者对象。而这个 IsRunning 属性值惟一产生变化的状况就在于 Start() 方法与 Stop() 方法的调用。

最后确定也有相关的销毁方法,用于释放全部注入的后台工做者对象,并将集合清除。

private bool _isDisposed;

public void Dispose()
{
    if (_isDisposed)
    {
        return;
    }

    _isDisposed = true;

    // 遍历集合,经过 Ioc 解析器的 Release 方法释放对象
    _backgroundJobs.ForEach(_iocResolver.Release);
    // 清空集合
    _backgroundJobs.Clear();
}

因此,针对于全部后台工做者的管理,都是经过 IBackgroundWorkerManager 来进行操做的。

2.1.2 后台工做者

看完了管理器,咱们来看一下 IBackgroundWorker 后台工做者对象是怎样的构成。

public interface IBackgroundWorker : IRunnable
{

}

貌似只是一个空的接口,其做用主要是标识某个类型是否为后台工做者,转到其抽象类实现 BackgroundWorkerBase,里面只是注入了一些辅助对象与本地化的一些方法。

public abstract class BackgroundWorkerBase : RunnableBase, IBackgroundWorker
{
    // 配置管理器
    public ISettingManager SettingManager { protected get; set; }

    // 工做单元管理器
    public IUnitOfWorkManager UnitOfWorkManager
    {
        get
        {
            if (_unitOfWorkManager == null)
            {
                throw new AbpException("Must set UnitOfWorkManager before use it.");
            }

            return _unitOfWorkManager;
        }
        set { _unitOfWorkManager = value; }
    }
    private IUnitOfWorkManager _unitOfWorkManager;

    // 得到当前的工做单元
    protected IActiveUnitOfWork CurrentUnitOfWork { get { return UnitOfWorkManager.Current; } }

    // 本地化资源管理器
    public ILocalizationManager LocalizationManager { protected get; set; }

    // 默认的本地化资源的源名称
    protected string LocalizationSourceName { get; set; }

    protected ILocalizationSource LocalizationSource
    {
        get
        {
            // 若是没有配置源名称,直接抛出异常
            if (LocalizationSourceName == null)
            {
                throw new AbpException("Must set LocalizationSourceName before, in order to get LocalizationSource");
            }

            if (_localizationSource == null || _localizationSource.Name != LocalizationSourceName)
            {
                _localizationSource = LocalizationManager.GetSource(LocalizationSourceName);
            }

            return _localizationSource;
        }
    }
    private ILocalizationSource _localizationSource;

    // 日志记录器
    public ILogger Logger { protected get; set; }

    protected BackgroundWorkerBase()
    {
        Logger = NullLogger.Instance;
        LocalizationManager = NullLocalizationManager.Instance;
    }
    
    // ... 其余模板代码
}

咱们接着看继承并实现了 BackgroundWorkerBase 的类型 PeriodicBackgroundWorkerBase,从字面意思上来看,该类型应该是一个定时后台工做者基类。

重点在于 Periodic(定时),从其类型内部的定义能够看到,该类型使用了一个 AbpTimer 对象来进行周期计时与具体工做任务的触发。咱们暂时先不看这个 AbpTimer,仅仅看 PeriodicBackgroundWorkerBase 的内部实现。

public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
    protected readonly AbpTimer Timer;

    // 注入 AbpTimer
    protected PeriodicBackgroundWorkerBase(AbpTimer timer)
    {
        Timer = timer;
        // 绑定周期执行的任务,这里是 DoWork()
        Timer.Elapsed += Timer_Elapsed;
    }

    public override void Start()
    {
        base.Start();
        Timer.Start();
    }

    public override void Stop()
    {
        Timer.Stop();
        base.Stop();
    }

    public override void WaitToStop()
    {
        Timer.WaitToStop();
        base.WaitToStop();
    }

    private void Timer_Elapsed(object sender, System.EventArgs e)
    {
        try
        {
            DoWork();
        }
        catch (Exception ex)
        {
            Logger.Warn(ex.ToString(), ex);
        }
    }

    protected abstract void DoWork();
}

能够看到,这里基类绑定了 DoWork() 做为其定时执行的方法,那么用户在使用的时候直接继承自该基类,而后重写 DoWork() 方法便可绑定本身的后台工做者的任务。

2.1.3 AbpTimer 定时器

在上面的基类咱们看到,基类的 Start()Stop()WaitTpStop() 方法都是调用的 AbpTimer 所提供的,因此说 AbpTimer 其实也继承了 RunableBase 基类并实现其具体的启动与中止操做。

其实 AbpTimer 的核心就是经过 CLR 的 Timer 来实现周期性任务执行的,不过默认的 Timer 类有两个比较大的问题。

  1. CLR 的 Timer 并不会等待你的任务执行完再执行下一个周期的任务,若是你的某个任务耗时过长,超过了 Timer 定义的周期。那么 Timer 会开启一个新的线程执行,这样的话最后咱们系统的资源会由于线程大量重复建立而被拖垮。
  2. 如何知道一个 Timer 所执行的业务方法已经真正地被结束了。

因此 Abp 才会从新封装一个 AbpTimer 做为一个基础的计时器。第一个问题的解决方法很简单,就是在执行具体绑定的业务方法以前,经过 Timer.Change() 方法来让 Timer 临时失效。等待业务方法执行完成以后,再将 Timer 的周期置为用户设定的周期。

// CLR Timer 绑定的回调方法
private void TimerCallBack(object state)
{
    lock (_taskTimer)
    {
        if (!_running || _performingTasks)
        {
            return;
        }
        
        // 暂时让 Timer 失效
        _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
        // 设置执行标识为 TRUE,表示当前的 AbpTimer 正在执行
        _performingTasks = true;
    }

    try
    {
        // 若是绑定了相应的触发事件
        if (Elapsed != null)
        {
            // 执行相应的业务方法,这里就是最开始绑定的 DoWork() 方法
            Elapsed(this, new EventArgs());
        }
    }
    catch
    {

    }
    finally
    {
        lock (_taskTimer)
        {
            // 标识业务方法执行完成
            _performingTasks = false;
            if (_running)
            {
                // 更改周期为用户指定的执行周期,等待下一次触发
                _taskTimer.Change(Period, Timeout.Infinite);
            }

            Monitor.Pulse(_taskTimer);
        }
    }
}

针对于第二个问题,Abp 经过 WaitToStop() 方法会阻塞调用这个 Timer 的线程,而且在 _performingTasks 标识位是 false 的时候释放。

public override void WaitToStop()
{
    // 锁定 CLR 的 Timer 对象
    lock (_taskTimer)
    {
        // 循环检测
        while (_performingTasks)
        {
            Monitor.Wait(_taskTimer);
        }
    }

    base.WaitToStop();
}

至于其余的 Start() 方法就是使用 CLR 的 Timer 更改其执行周期,而 Stop() 就是直接将 Timer 的周期设置为无限大,使计时器失效。

2.1.4 总结

Abp 后台工做者的核心就是经过 AbpTimer 来实现周期性任务的执行,用户只须要继承自 PeriodicBackgroundWorkerBase,而后将其添加到 IBackgroundWorkerManager 的集合当中。这样 Abp 在启动以后就会遍历这个工做者集合,而后周期执行这些后台工做者绑定的方法。

固然若是你继承了 PeriodicBackgroundWorkerBase 以后,能够经过设置构造函数的 AbpTimer 来指定本身的执行周期。

2.2 后台做业队列

后台工做队列的管理是经过 IBackgroundJobManager 来处理的,而该接口又继承自 IBackgroundWorker,因此一整个后台做业队列就是一个后台工做者,只不过这个工做者有点特殊。

2.2.1 后台做业管理器

IBackgroundJobManager 接口的定义其实就两个方法,一个 EnqueueAsync<TJob, TArgs>() 用于将一个后台做业加入到执行队列当中。而 DeleteAsync() 方法呢,顾名思义就是从队列当中移除指定的后台做业。

首先看一下其默认实现 BackgroundJobManager,该实现一样是继承自 PeriodicBackgroundWorkerBase 而且其默认周期为 5000 ms。

public class BackgroundJobManager : PeriodicBackgroundWorkerBase, IBackgroundJobManager, ISingletonDependency
{
        // 事件总线
        public IEventBus EventBus { get; set; }
        
        // 轮训后台做业的间隔,默认值为 5000 毫秒.
        public static int JobPollPeriod { get; set; }

        // IOC 解析器
        private readonly IIocResolver _iocResolver;
        
        // 后台做业队列存储
        private readonly IBackgroundJobStore _store;

        static BackgroundJobManager()
        {
            JobPollPeriod = 5000;
        }

        public BackgroundJobManager(
            IIocResolver iocResolver,
            IBackgroundJobStore store,
            AbpTimer timer)
            : base(timer)
        {
            _store = store;
            _iocResolver = iocResolver;

            EventBus = NullEventBus.Instance;

            Timer.Period = JobPollPeriod;
        }
}

基础结构基本上就这个样子,接下来看一下他的两个接口方法是如何实现的。

EnqueueAsync<TJob, TArgs> 方法经过传入指定的后台做业对象和相应的参数,同时还有任务的优先级。将其经过 IBackgroundJobStore 进行持久化,并返回一个任务的惟一 JobId 以便进行删除操做。

public async Task<string> EnqueueAsync<TJob, TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
    where TJob : IBackgroundJob<TArgs>
{
    // 经过 JobInfo 包装任务的基本信息
    var jobInfo = new BackgroundJobInfo
    {
        JobType = typeof(TJob).AssemblyQualifiedName,
        JobArgs = args.ToJsonString(),
        Priority = priority
    };

    // 若是须要延时执行的话,则用当前时间加上延时的时间做为任务下次运行的时间
    if (delay.HasValue)
    {
        jobInfo.NextTryTime = Clock.Now.Add(delay.Value);
    }

    // 经过 Store 进行持久话存储
    await _store.InsertAsync(jobInfo);

    // 返回后台任务的惟一标识
    return jobInfo.Id.ToString();
}

至于删除操做,在 Manager 内部其实也是经过 IBackgroundJobStore 进行实际的删除操做的。

public async Task<bool> DeleteAsync(string jobId)
{
    // 判断 jobId 的值是否有效
    if (long.TryParse(jobId, out long finalJobId) == false)
    {
        throw new ArgumentException($"The jobId '{jobId}' should be a number.", nameof(jobId));
    }

    // 使用 jobId 从 Store 处筛选到 JobInfo 对象的信息
    BackgroundJobInfo jobInfo = await _store.GetAsync(finalJobId);
    if (jobInfo == null)
    {
        return false;
    }

    // 若是存在有 JobInfo 则使用 Store 进行删除操做
    await _store.DeleteAsync(jobInfo);
    return true;
}

后台做业管理器实质上是一个周期性执行的后台工做者,那么咱们的后台做业是每 5000 ms 执行一次,那么他的 DoWork() 方法又在执行什么操做呢?

protected override void DoWork()
{
    // 从 Store 当中得到等待执行的后台做业集合
    var waitingJobs = AsyncHelper.RunSync(() => _store.GetWaitingJobsAsync(1000));

    // 遍历这些等待执行的后台任务,而后经过 TryProcessJob 进行执行
    foreach (var job in waitingJobs)
    {
        TryProcessJob(job);
    }
}

能够看到每 5 秒钟咱们的后台做业管理器就会从 IBackgroundJobStore 当中拿到最大 1000 条的后台做业信息,而后遍历这些信息。经过 TryProcessJob(job) 方法来执行后台做业。

TryProcessJob() 方法,本质上就是经过反射构建出一个 IBackgroundJob 对象,而后取得序列化的参数值,经过反射获得的 MethodInfo 对象来执行咱们的后台任务。执行完成以后,就会从 Store 当中移除掉执行完成的任务。

针对于在执行过程中所出现的异常,会经过 IEventBus 触发一个 AbpHandledExceptionData 事件记录后台做业执行失败时的异常信息。而且一旦在执行过程中出现了任何异常的状况,都会将该任务的 IsAbandoned 字段置为 true,当该字段为 true 时,该任务将再也不回被执行。

PS:就是在 GetWaitingJobsAsync() 方法时,会过滤掉 IsAbandoned 值为 true 的任务。

private void TryProcessJob(BackgroundJobInfo jobInfo)
{
    try
    {
        // 任务执行次数自增 1
        jobInfo.TryCount++;
        // 最后一次执行时间设置为当前时间
        jobInfo.LastTryTime = Clock.Now;

        // 经过反射取得后台做业的类型
        var jobType = Type.GetType(jobInfo.JobType);
        // 经过 Ioc 解析器获得一个临时的后台做业对象,执行完以后既被释放
        using (var job = _iocResolver.ResolveAsDisposable(jobType))
        {
            try
            {
                // 经过反射获得后台做业的 Execute 方法
                var jobExecuteMethod = job.Object.GetType().GetTypeInfo().GetMethod("Execute");
                var argsType = jobExecuteMethod.GetParameters()[0].ParameterType;
                var argsObj = JsonConvert.DeserializeObject(jobInfo.JobArgs, argsType);

                // 结合持久话存储的参数信息,调用 Execute 方法进行后台做业
                jobExecuteMethod.Invoke(job.Object, new[] { argsObj });

                // 执行完成以后从 Store 删除该任务的信息
                AsyncHelper.RunSync(() => _store.DeleteAsync(jobInfo));
            }
            catch (Exception ex)
            {
                Logger.Warn(ex.Message, ex);

                // 计算下一次执行的时间,一旦超过 2 天该任务都执行失败,则返回 null
                var nextTryTime = jobInfo.CalculateNextTryTime();
                if (nextTryTime.HasValue)
                {
                    jobInfo.NextTryTime = nextTryTime.Value;
                }
                else
                {
                    // 若是为 null 则说明该任务在 2 天的时间内都没有执行成功,则放弃继续执行
                    jobInfo.IsAbandoned = true;
                }

                // 更新 Store 存储的任务信息
                TryUpdate(jobInfo);

                // 触发异常事件
                EventBus.Trigger(
                    this,
                    new AbpHandledExceptionData(
                        new BackgroundJobException(
                            "A background job execution is failed. See inner exception for details. See BackgroundJob property to get information on the background job.", 
                            ex
                        )
                        {
                            BackgroundJob = jobInfo,
                            JobObject = job.Object
                        }
                    )
                );
            }
        }
    }
    catch (Exception ex)
    {
        Logger.Warn(ex.ToString(), ex);
        // 表示任务再也不执行
        jobInfo.IsAbandoned = true;
        // 更新 Store
        TryUpdate(jobInfo);
    }
}

2.2.2 后台做业

后台做业的默认接口定义为 IBackgroundJob<in TArgs> ,他只有一个 Execute(TArgs args) 方法,用于接收指定类型的做业参数,并执行。

通常来讲咱们不建议直接经过继承 IBackgroundJob<in TArgs> 来实现后台做业,而是继承自 BackgroundJob<TArgs> 抽象类。该抽象类内部也没有什么特别的实现,主要是注入了一些基础设施,好比说 UOW 与 本地化资源管理器,方便咱们开发使用。

后台做业自己是具体执行的对象,而 BackgroundJobInfo 则是存储了后台做业的 Type 类型和参数,方便在须要执行的时候经过反射的方式执行后台做业。

2.2.2 后台做业队列存储

IBackgroundJobStore 咱们就能够猜到以 Abp 框架的套路,他确定会有两种实现,第一种就是基于内存的 InMemoryBackgroundJobStore。而第二种呢,就是由 Abp.Zero 模块所提供的基于数据库的 BackgroundJobStore

IBackgroundJobStore 接口所定义的方法基本上就是增删改查,没有什么复杂的。

public interface IBackgroundJobStore
{
    // 经过 JobId 获取后台任务信息
    Task<BackgroundJobInfo> GetAsync(long jobId);

    // 插入一个新的后台任务信息
    Task InsertAsync(BackgroundJobInfo jobInfo);

    /// <summary>
    /// Gets waiting jobs. It should get jobs based on these:
    /// Conditions: !IsAbandoned And NextTryTime &lt;= Clock.Now.
    /// Order by: Priority DESC, TryCount ASC, NextTryTime ASC.
    /// Maximum result: <paramref name="maxResultCount"/>.
    /// </summary>
    /// <param name="maxResultCount">Maximum result count.</param>
    Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount);

    /// <summary>
    /// Deletes a job.
    /// </summary>
    /// <param name="jobInfo">Job information.</param>
    Task DeleteAsync(BackgroundJobInfo jobInfo);

    /// <summary>
    /// Updates a job.
    /// </summary>
    /// <param name="jobInfo">Job information.</param>
    Task UpdateAsync(BackgroundJobInfo jobInfo);
}

这里先从简单的内存 Store 提及,这个 InMemoryBackgroundJobStore 内部使用了一个并行字典来存储这些任务信息。

public class InMemoryBackgroundJobStore : IBackgroundJobStore
{
    private readonly ConcurrentDictionary<long, BackgroundJobInfo> _jobs;
    private long _lastId;
    
    public InMemoryBackgroundJobStore()
    {
        _jobs = new ConcurrentDictionary<long, BackgroundJobInfo>();
    }
}

至关简单,这几个接口方法基本上就是针对与这个并行字典操做的一层封装。

public Task<BackgroundJobInfo> GetAsync(long jobId)
{
    return Task.FromResult(_jobs[jobId]);
}

public Task InsertAsync(BackgroundJobInfo jobInfo)
{
    jobInfo.Id = Interlocked.Increment(ref _lastId);
    _jobs[jobInfo.Id] = jobInfo;

    return Task.FromResult(0);
}

public Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount)
{
    var waitingJobs = _jobs.Values
        // 首先筛选出再也不执行的后台任务
        .Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now)
        // 第一次根据后台做业的优先级进行排序,高优先级优先执行
        .OrderByDescending(t => t.Priority)
        // 再根据执行次数排序,执行次数越少的,越靠前
        .ThenBy(t => t.TryCount)
        .ThenBy(t => t.NextTryTime)
        .Take(maxResultCount)
        .ToList();

    return Task.FromResult(waitingJobs);
}

public Task DeleteAsync(BackgroundJobInfo jobInfo)
{
    _jobs.TryRemove(jobInfo.Id, out _);

    return Task.FromResult(0);
}

public Task UpdateAsync(BackgroundJobInfo jobInfo)
{
    // 若是是再也不执行的任务,删除
    if (jobInfo.IsAbandoned)
    {
        return DeleteAsync(jobInfo);
    }

    return Task.FromResult(0);
}

至于持久化到数据库,无非是注入一个仓储,而后针对这个仓储进行增删查改的操做罢了,这里就不在赘述。

2.2.3 后台做业优先级

后台做业的优先级定义在 BackgroundJobPriority 枚举当中,一共有 5 个等级,分别是 LowBelowNormalNormalAboveNormalHigh ,他们从最低到最高排列。

3.点此跳转到总目录

相关文章
相关标签/搜索