[Abp vNext 源码分析] - 12. 后台做业与后台工做者

1、简要说明

文章信息:html

基于的 ABP vNext 版本:1.0.0数据库

创做日期:2019 年 10 月 24 日晚缓存

更新日期:暂无安全

ABP vNext 提供了后台工做者和后台做业的支持,基本实现与原来的 ABP 框架相似,而且 ABP vNext 还提供了对 HangFire 和 RabbitMQ 的后台做业集成。开发人员在使用这些第三方库的时候,基本就是开箱即用,不须要作其余复杂的配置。多线程

后台做业在系统开发的过程中,是比较经常使用的功能。由于老是有一些长耗时的任务,而这些任务咱们不是当即响应的,例如 Excel 文档导入、批量发送短信通知等。app

后台工做者 的话,ABP vNext 的实现就是在 CLR 的 Timer 之上封装了一层,周期性地执行用户逻辑。ABP vNext 默认提供的 后台任务管理器,就是在后台工做者基础之上进行的封装。框架

涉及到后台任务、后台工做者的模块一共有 6 个,它们分别是:async

  • Volo.Abp.Threading :提供了一些经常使用的线程组件,其中 AbpTimer 就是在里面实现的。
  • Volo.Abp.BackgroundWorkers :后台工做者的定义和实现。分布式

  • Volo.Abp.BackgroundJobs.Abstractions :后台任务的一些共有定义。
  • Volo.Abp.BackgroundJobs :默认的后台任务管理器实现。
  • Volo.Abp.BackgroundJobs.HangFire :基于 Hangfire 库实现的后台任务管理器。
  • Volo.Abp.BackgroundJobs.RabbitMQ : 基于 RabbitMQ 实现的后台任务管理器。ide

2、源码分析

线程组件

健壮的计时器

CLR 为咱们提供了多种计时器,咱们通常使用的是 System.Threading.Timer ,它是基于 CLR 线程池的一个周期计时器,会根据咱们配置的 Period (周期) 定时执行。在 CLR 线程池中,全部的 Timer 只有 1 个线程为其服务。这个线程直到下一个计时器的触发时间,当下一个 Timer 对象到期时,这个线程就会将 Timer 的回调方法经过 ThreadPool.QueueUserWorkItem() 扔到线程池去执行。

不过这带来了一个问题,即你的回调方法执行时间超过了计时器的周期,那么就会形成上一个任务还没执行完成又开始执行新的任务。

解决这个方法其实很简单,即启动以后,将周期设置为 Timeout.Infinite ,这样只会执行一次。当回调方法执行完成以后,就设置 dueTime 参数说明下次执行要等待多久,而且周期仍是 Timeout.Infinite

ABP vNext 已经为咱们提供了健壮的计时器,该类型的定义是 AbpTimer ,在内部用到了 volatile 关键字和 Monitor 实现 条件变量模式 解决多线程环境下的问题。

public class AbpTimer : ITransientDependency
{
    // 回调事件。
    public event EventHandler Elapsed;

    // 执行周期。
    public int Period { get; set; }

    // 定时器启动以后就开始运行,默认为 Fasle。
    public bool RunOnStart { get; set; }

    // 日志记录器。
    public ILogger<AbpTimer> Logger { get; set; }

    private readonly Timer _taskTimer;
    // 定时器是否在执行任务,默认为 false。
    private volatile bool _performingTasks;
    // 定时器的运行状态,默认为 false。
    private volatile bool _isRunning;

    public AbpTimer()
    {
        Logger = NullLogger<AbpTimer>.Instance;

        // 回调函数是 TimerCallBack,执行周期为永不执行。
        _taskTimer = new Timer(TimerCallBack, null, Timeout.Infinite, Timeout.Infinite);
    }

    public void Start(CancellationToken cancellationToken = default)
    {
        // 若是传递的周期小于等于 0 ,则抛出异常。
        if (Period <= 0)
        {
            throw new AbpException("Period should be set before starting the timer!");
        }

        // 使用互斥锁,保证线程安全。
        lock (_taskTimer)
        {
            // 若是启动以后就须要立刻执行,则设置为 0,立刻执行任务,不然会等待 Period 毫秒以后再执行(1 个周期)。
            _taskTimer.Change(RunOnStart ? 0 : Period, Timeout.Infinite);
            // 定时器成功运行了。
            _isRunning = true;
        }
        // 释放 _taskTimer 的互斥锁。
    }

    public void Stop(CancellationToken cancellationToken = default)
    {
        // 使用互斥锁。
        lock (_taskTimer)
        {
            // 将内部定时器设置为永不执行的状态。
            _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);

            // 检测当前是否还有正在执行的任务,若是有则等待任务执行完成。
            while (_performingTasks)
            {
                // 临时释放锁,阻塞当前线程。可是其余线程能够获取 _timer 的互斥锁。
                Monitor.Wait(_taskTimer);
            }

            // 须要表示中止状态,因此标记状态为 false。
            _isRunning = false;
        }
    }

    private void TimerCallBack(object state)
    {
        lock (_taskTimer)
        {
            // 若是有任务正在运行,或者内部定时器已经中止了,则不作任何事情。
            if (!_isRunning || _performingTasks)
            {
                return;
            }

            // 临时中止内部定时器。
            _taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
            // 代表立刻须要执行任务了。
            _performingTasks = true;
        }

        try
        {
            // 调用绑定的事件。
            Elapsed.InvokeSafely(this, new EventArgs());
        }
        catch
        {
            // 注意,这里将会吞噬异常。
        }
        finally
        {
            lock (_taskTimer)
            {
                // 任务执行完成,更改状态。
                _performingTasks = false;

                // 若是定时器还在运行,没有被中止,则启动下一个 Period 周期。
                if (_isRunning)
                {
                    _taskTimer.Change(Period, Timeout.Infinite);
                }

                // 解除由于释放锁而阻塞的线程。
                // 若是已经调用了 Stop,则会唤醒那个由于 Wait 阻塞的线程,就会使 _isRunning 置为 false。
                Monitor.Pulse(_taskTimer);
            }
        }
    }
}

这里对 _performingTasks_isRunning 字段设置为 volatile 防止指令重排和寄存器缓存。这是由于在 Stop 方法内部使用到的 _performingTasks 可能会被优化,因此将该字段设置为了易失的。

IRunnable 接口

ABP vNext 为任务的启动和中止,抽象了一个 IRunnable 接口。虽然描述说的是对线程的行为进行抽象,但千万千万不要手动调用 Thread.Abort() 。关于 Thread.Abort() 的坏处,这里再也不多加赘述,能够参考 这篇文章 的描述,或者搜索其余的相关文章。

public interface IRunnable
{
    // 启动这个服务。
    Task StartAsync(CancellationToken cancellationToken = default);

    /// <summary>
    /// 中止这个服务。
    /// </summary>
    Task StopAsync(CancellationToken cancellationToken = default);
}

后台工做者

模块的构造

后台工做者的模块行为比较简单,它定义了在应用程序初始化和销毁时的行为。在初始化时,后台工做者管理器 得到全部 后台工做者,并开始启动它们。在销毁时,后台工做者管理器得到全部后台工做者,并开始中止他们,这样才可以作到优雅退出。

[DependsOn(
    typeof(AbpThreadingModule)
    )]
public class AbpBackgroundWorkersModule : AbpModule
{
    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        // 若是启用了后台工做者,那么得到后台工做者管理器的实例,并调用 StartAsync 启动全部后台工做者。
        if (options.IsEnabled)
        {
            AsyncHelper.RunSync(
                () => context.ServiceProvider
                    .GetRequiredService<IBackgroundWorkerManager>()
                    .StartAsync()
            );
        }
    }

    public override void OnApplicationShutdown(ApplicationShutdownContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundWorkerOptions>>().Value;
        // 若是启用了后台工做者,那么得到后台工做者管理器的实例,并调用 StopAsync 中止全部后台工做者。
        if (options.IsEnabled)
        {
            AsyncHelper.RunSync(
                () => context.ServiceProvider
                    .GetRequiredService<IBackgroundWorkerManager>()
                    .StopAsync()
            );
        }
    }
}

后台工做者的定义

首先看看 IBackgroundWorker 接口的定义,是空的。不过继承了 ISingletonDependency 接口,说明咱们的每一个后台工做者都是 单例 的。

/// <summary>
/// 在后台运行,执行某些任务的工做程序(线程)的接口定义。
/// </summary>
public interface IBackgroundWorker : IRunnable, ISingletonDependency
{

}

ABP vNext 为咱们定义了一个抽象的后台工做者类型 BackgroundWorkerBase,这个基类的设计目的是提供一些经常使用组件(和 ApplicationService 同样)。

public abstract class BackgroundWorkerBase : IBackgroundWorker
{
    //TODO: Add UOW, Localization and other useful properties..?
    //TODO: 是否应该提供工做单元、本地化以及其余经常使用的属性?

    public ILogger<BackgroundWorkerBase> Logger { protected get; set; }

    protected BackgroundWorkerBase()
    {
        Logger = NullLogger<BackgroundWorkerBase>.Instance;
    }
    
    public virtual Task StartAsync(CancellationToken cancellationToken = default)
    {
        Logger.LogDebug("Started background worker: " + ToString());
        return Task.CompletedTask;
    }

    public virtual Task StopAsync(CancellationToken cancellationToken = default)
    {
        Logger.LogDebug("Stopped background worker: " + ToString());
        return Task.CompletedTask;
    }

    public override string ToString()
    {
        return GetType().FullName;
    }
}

ABP vNext 内部只有一个默认的后台工做者实现 PeriodicBackgroundWorkerBase。从名字上来看,意思是就是周期执行的后台工做者,内部就是用的 AbpTimer 来实现,ABP vNext 将其包装起来是为了实现统一的模式(后台工做者)。

public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
    protected readonly AbpTimer Timer;

    // 也就意味着子类必须在其构造函数,指定 timer 的执行周期。
    protected PeriodicBackgroundWorkerBase(AbpTimer timer)
    {
        Timer = timer;
        Timer.Elapsed += Timer_Elapsed;
    }

    // 启动后台工做者。
    public override async Task StartAsync(CancellationToken cancellationToken = default)
    {
        await base.StartAsync(cancellationToken);
        Timer.Start(cancellationToken);
    }

    // 中止后台工做者。
    public override async Task StopAsync(CancellationToken cancellationToken = default)
    {
        Timer.Stop(cancellationToken);
        await base.StopAsync(cancellationToken);
    }
    
    // Timer 关联的周期事件,之因此不直接挂载 DoWork,是为了捕获异常。
    private void Timer_Elapsed(object sender, System.EventArgs e)
    {
        try
        {
            DoWork();
        }
        catch (Exception ex)
        {
            Logger.LogException(ex);
        }
    }

    // 你要周期执行的任务。
    protected abstract void DoWork();
}

咱们若是要实现本身的后台工做者,只须要继承该类,实现 DoWork() 方法便可。

public class TestWorker : PeriodicBackgroundWorkerBase
{
    public TestWorker(AbpTimer timer) : base(timer)
    {
        // 每五分钟执行一次。
        timer.Period = (int)TimeSpan.FromMinutes(5).TotalMilliseconds;
    }

    protected override void DoWork()
    {
        Console.WriteLine("后台工做者被执行了。");
    }
}

而后在咱们本身模块的 OnPreApplicationInitialization() 方法内解析出后台做业管理器(IBackgroundWorkerManager),调用它的 Add() 方法,将咱们定义的 TestWorker 添加到管理器当中便可。

后台工做者管理器

全部的后台工做者都是经过 IBackgroundWorkerManager 进行管理的,它提供了 StartAsync()StopAsync()Add() 方法。前面两个方法就是 IRunnable 接口定义的,后台工做者管理器直接集成了该接口,后面的 Add() 方法就是用来动态添加咱们的后台工做者。

后台工做者管理器的默认实现是 BackgroundWorkerManager 类型,它内部作的事情很简单,就是维护一个后台工做者集合。每当调用 StartAsync()StopAsync() 方法的时候,都从这个集合遍历后台工做者,执行他们的启动和中止方法。

这里值得注意的一点是,当咱们调用 Add() 方法添加了一个后台工做者以后,后台工做者管理器就会启动这个后台工做者。

public class BackgroundWorkerManager : IBackgroundWorkerManager, ISingletonDependency, IDisposable
{
    protected bool IsRunning { get; private set; }

    private bool _isDisposed;

    private readonly List<IBackgroundWorker> _backgroundWorkers;

    public BackgroundWorkerManager()
    {
        _backgroundWorkers = new List<IBackgroundWorker>();
    }

    public virtual void Add(IBackgroundWorker worker)
    {
        _backgroundWorkers.Add(worker);

        // 若是当先后台工做者管理器还处于运行状态,则调用工做者的 StartAsync() 方法启动。
        if (IsRunning)
        {
            AsyncHelper.RunSync(
                () => worker.StartAsync()
            );
        }
    }

    public virtual void Dispose()
    {
        if (_isDisposed)
        {
            return;
        }

        _isDisposed = true;

        //TODO: ???
    }

    // 启动,则遍历集合启动。
    public virtual async Task StartAsync(CancellationToken cancellationToken = default)
    {
        IsRunning = true;

        foreach (var worker in _backgroundWorkers)
        {
            await worker.StartAsync(cancellationToken);
        }
    }

    // 中止, 则遍历集合中止。
    public virtual async Task StopAsync(CancellationToken cancellationToken = default)
    {
        IsRunning = false;

        foreach (var worker in _backgroundWorkers)
        {
            await worker.StopAsync(cancellationToken);
        }
    }
}

上述代码其实存在一个问题,即后台工做者被释放之后,是否还能执行 Add() 操做。参考我 以前的文章 ,其实当对象被释放以后,就应该抛出 ObjectDisposedException 异常。

后台做业

比起后台工做者,咱们执行一次性任务的时候,通常会使用后台做业进行处理。比起只能设置固定周期的 PeriodicBackgroundWorkerBase ,集成了 Hangfire 的后台做业管理器,可以让咱们使用 Cron 表达式,更加灵活地设置任务的执行周期。

模块的构造

关于后台做业的模块,咱们须要说道两处。第一处是位于 Volo.Abp.BackgroundJobs.Abstractions 项目的 AbpBackgroundJobsAbstractionsModule ,第二出则是位于 Volo.Abp.BackgroundJobs 项目的 AbpBackgroundJobsModule

AbpBackgroundJobsAbstractionsModule 的主要行为是将符合条件的后台做业,添加到 AbpBackgroundJobOptions 配置当中,以便后续进行使用。

public override void PreConfigureServices(ServiceConfigurationContext context)
{
    RegisterJobs(context.Services);
}

private static void RegisterJobs(IServiceCollection services)
{
    var jobTypes = new List<Type>();

    // 若是注册的类型符合 IBackgroundJob<> 泛型,则添加到集合当中。
    services.OnRegistred(context =>
    {
        if (ReflectionHelper.IsAssignableToGenericType(context.ImplementationType, typeof(IBackgroundJob<>)))
        {
            jobTypes.Add(context.ImplementationType);
        }
    });

    services.Configure<AbpBackgroundJobOptions>(options =>
    {
        // 将数据赋值给配置类。
        foreach (var jobType in jobTypes)
        {
            options.AddJob(jobType);
        }
    });
}

Volo.Abp.BackgroundJobs 内部是 ABP vNext 为咱们提供的 默认后台做业管理器,这个后台做业管理器 本质上是一个后台工做者

这个后台工做者会周期性(取决于 AbpBackgroundJobWorkerOptions.JobPollPeriod 值,默认为 5 秒种)地从 IBackgroundJobStore 捞出一堆后台任务,而且在后台执行。至于每次执行多少个后台任务,这也取决于 AbpBackgroundJobWorkerOptions.MaxJobFetchCount 的值,默认值是 1000 个。

注意:

这里的 Options 类是 AbpBackgroundJobWorkerOptions,别和 AbpBackgroundWorkerOptions 混淆了。

因此在 AbpBackgroundJobsModule 模块里面,只作了一件事情,就是将负责后台做业的后台工做者,添加到后台工做者管理器种,并开始周期性地执行。

public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
    var options = context.ServiceProvider.GetRequiredService<IOptions<AbpBackgroundJobOptions>>().Value;
    if (options.IsJobExecutionEnabled)
    {
        // 得到后台工做者管理器,并将负责后台做业的工做者添加进去。
        context.ServiceProvider
            .GetRequiredService<IBackgroundWorkerManager>()
            .Add(context.ServiceProvider.GetRequiredService<IBackgroundJobWorker>()
            );
    }
}

后台做业的定义

在上一节里面看到,只要是实现 IBackgroundJob<TArgs> 类型的都视为一个后台做业。这个后台做业接口,只定义了一个行为,那就是执行(Execute(TArgs))。这里的 TArgs 泛型做为执行后台做业时,须要传递的参数类型。

// 由于是传入的参数,因此泛型参数是逆变的。
public interface IBackgroundJob<in TArgs>
{
    void Execute(TArgs args);
}

检查源码,发现 ABP vNext 的邮箱模块定义了一个邮件发送任务 BackgroundEmailSendingJob,它的实现大概以下。

public class BackgroundEmailSendingJob : BackgroundJob<BackgroundEmailSendingJobArgs>, ITransientDependency
{
    // ...
    
    public override void Execute(BackgroundEmailSendingJobArgs args)
    {
        AsyncHelper.RunSync(() => EmailSender.SendAsync(args.To, args.Subject, args.Body, args.IsBodyHtml));
    }
}

后台做业管理器

后台做业都是经过一个后台做业管理器(IBackgroundJobManager)进行管理的,这个接口定义了一个入队方法(EnqueueAsync()),注意,咱们的后台做业在入队后,不是立刻执行的。

说一下这个入队处理逻辑:

  1. 首先咱们会经过参数的类型,获取到任务的名称。(假设任务上面没有标注 BackgroundJobNameAttribute 特性,那么任务的名称就是参数类型的 FullName 。)
  2. 构造一个 BackgroundJobInfo 对象。
  3. 经过 IBackgroundJobStore 持久化任务信息。
public virtual async Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
    // 获取任务名称。
    var jobName = BackgroundJobNameAttribute.GetName<TArgs>();
    var jobId = await EnqueueAsync(jobName, args, priority, delay);
    return jobId.ToString();
}

protected virtual async Task<Guid> EnqueueAsync(string jobName, object args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
{
    var jobInfo = new BackgroundJobInfo
    {
        Id = GuidGenerator.Create(),
        JobName = jobName,
        // 经过序列化器,序列化参数值,方便存储。这里内部其实使用的是 JSON.NET 进行序列化。
        JobArgs = Serializer.Serialize(args),
        Priority = priority,
        CreationTime = Clock.Now,
        NextTryTime = Clock.Now
    };

    // 若是任务有执行延迟,则任务的初始执行时间要加上这个延迟。
    if (delay.HasValue)
    {
        jobInfo.NextTryTime = Clock.Now.Add(delay.Value);
    }

    // 持久化任务信息,方便后面执行后台做业的工做者可以取到。
    await Store.InsertAsync(jobInfo);

    return jobInfo.Id;
}

BackgroundJobNameAttribute 相关的方法:

public static string GetName<TJobArgs>()
{
    return GetName(typeof(TJobArgs));
}

public static string GetName([NotNull] Type jobArgsType)
{
    Check.NotNull(jobArgsType, nameof(jobArgsType));

    // 判断参数类型上面是否标注了特性,而且特性实现了 IBackgroundJobNameProvider 接口。
    return jobArgsType
                .GetCustomAttributes(true)
                .OfType<IBackgroundJobNameProvider>()
                .FirstOrDefault()
                ?.Name
        // 拿不到名字,则使用类型的 FullName。
            ?? jobArgsType.FullName;
}

后台做业的存储

后台做业的存储默认是放在内存的,这点能够从 InMemoryBackgroundJobStore 类型实现看出来。在它的内部使用了一个并行字典,经过做业的 Guid 与做业进行关联绑定。

除了内存实现,在 Volo.Abp.BackgroundJobs.Domain 模块还有一个 BackgroundJobStore 实现,基本套路与 SettingStore 同样,都是存储到数据库里面。

public class BackgroundJobStore : IBackgroundJobStore, ITransientDependency
{
    protected IBackgroundJobRepository BackgroundJobRepository { get; }

    // ... 
    
    public BackgroundJobInfo Find(Guid jobId)
    {
        return ObjectMapper.Map<BackgroundJobRecord, BackgroundJobInfo>(
            BackgroundJobRepository.Find(jobId)
        );
    }
    
    // ...

    public void Insert(BackgroundJobInfo jobInfo)
    {
        BackgroundJobRepository.Insert(
            ObjectMapper.Map<BackgroundJobInfo, BackgroundJobRecord>(jobInfo)
        );
    }

    // ...
}

后台做业的执行

默认的后台做业管理器是经过一个后台工做者来执行后台任务的,这个实现叫作 BackgroundJobWorker,这个后台工做者的生命周期也是单例的。后台做业的具体执行逻辑里面,涉及到了如下几个类型的交互。

类型 做用
AbpBackgroundJobOptions 提供每一个后台任务的配置信息,包括任务的类型、参数类型、任务名称数据。
AbpBackgroundJobWorkerOptions 提供后台做业工做者的配置信息,例如每一个周期 最大执行的做业数量、后台
工做者的 执行周期、做业执行 超时时间 等。
BackgroundJobConfiguration 后台任务的配置信息,做用是将持久化存储的做业信息与运行时类型进行绑定
和实例化,以便 ABP vNext 来执行具体的任务。
IBackgroundJobExecuter 后台做业的执行器,当咱们从持久化存储获取到后台做业信息时,将会经过
这个执行器来执行具体的后台做业。
IBackgroundJobSerializer 后台做业序列化器,用于后台做业持久化时进行序列化的工具,默认采用的
是 JSON.NET 进行实现。
JobExecutionContext 执行器在执行后台做业时,是经过这个上下文参数进行执行的,在这个上下
文内部,包含了后台做业的具体类型、后台做业的参数值。
IBackgroundJobStore 前面已经讲过了,这个是用于后台做业的持久化存储,默认实现是存储在内存。
BackgroundJobPriority 后台做业的执行优先级定义,ABP vNext 在执行后台任务时,会根据任务的优
先级进行排序,以便在后面执行的时候优先级高的任务先执行。

咱们来按照逻辑顺序走一遍它的实现,首前后台做业的执行工做者会从持久化存储内,获取 MaxJobFetchCount 个任务用于执行。从持久化存储获取后台做业信息(BackgroundJobInfo),是由 IBackgroundJobStore 提供的。

var store = scope.ServiceProvider.GetRequiredService<IBackgroundJobStore>();

var waitingJobs = store.GetWaitingJobs(WorkerOptions.MaxJobFetchCount);

// 不存在任何后台做业,则直接结束本次调用。
if (!waitingJobs.Any())
{
    return;
}

InMemoryBackgroundJobStore 的相关实现:

public List<BackgroundJobInfo> GetWaitingJobs(int maxResultCount)
{
    return _jobs.Values
        .Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now)
        .OrderByDescending(t => t.Priority)
        .ThenBy(t => t.TryCount)
        .ThenBy(t => t.NextTryTime)
        .Take(maxResultCount)
        .ToList();
}

上面的代码能够看出来,首先排除 被放弃的任务 ,包含达到执行时间的任务,而后根据任务的优先级从高到低进行排序。重试次数少的优先执行,预计执行时间越早的越先执行。最后从这些数据中,筛选出 maxResultCount 结果并返回。

说到这里,咱们来看一下这个 NextTryTime 是如何被计算出来的?回想起最开始的后台做业管理器,咱们在添加一个后台任务的时候,就会设置这个后台任务的 预计执行时间。第一个任务被添加到执行队列中时,它的值通常是 Clock.Now ,也就是它被添加到队列的时间。

不过 ABP vNext 为了让那些常常执行失败的任务,有比较低的优先级再执行,就在每次任务执行失败以后,会将 NextTryTime 的值指数级进行增长。这块代码能够在 CalculateNextTryTime 里面看到,也就是说某个任务的执行 失败次数越高,那么它下一次的预期执行时间就会越远。

protected virtual DateTime? CalculateNextTryTime(BackgroundJobInfo jobInfo, IClock clock)
{
    // 通常来讲,这个 DefaultWaitFactor 因子的值是 2.0 。
    var nextWaitDuration = WorkerOptions.DefaultFirstWaitDuration * (Math.Pow(WorkerOptions.DefaultWaitFactor, jobInfo.TryCount - 1)); // 同执行失败的次数进行挂钩。
    var nextTryDate = jobInfo.LastTryTime?.AddSeconds(nextWaitDuration) ??
                        clock.Now.AddSeconds(nextWaitDuration);

    if (nextTryDate.Subtract(jobInfo.CreationTime).TotalSeconds > WorkerOptions.DefaultTimeout)
    {
        return null;
    }

    return nextTryDate;
}

当预期的执行时间都超过 DefaultTimeout 的超时时间时(默认为 2 天),说明这个任务确实没救了,就不要再执行了。

咱们以前说到,从 IBackgroundJobStore 拿到了须要执行的后台任务信息集合,接下来咱们就要开始执行后台任务了。

foreach (var jobInfo in waitingJobs)
{
    jobInfo.TryCount++;
    jobInfo.LastTryTime = clock.Now;

    try
    {
        // 根据任务名称获取任务的配置参数。
        var jobConfiguration = JobOptions.GetJob(jobInfo.JobName);
        // 根据配置里面存储的任务类型,将参数值进行反序列化。
        var jobArgs = serializer.Deserialize(jobInfo.JobArgs, jobConfiguration.ArgsType);
        // 构造一个新的执行上下文,让执行器执行任务。
        var context = new JobExecutionContext(scope.ServiceProvider, jobConfiguration.JobType, jobArgs);

        try
        {
            jobExecuter.Execute(context);

            // 若是任务执行成功则删除该任务。            
            store.Delete(jobInfo.Id);
        }
        catch (BackgroundJobExecutionException)
        {
            // 发生任务执行失败异常时,根据指定的公式计算下一次的执行时间。
            var nextTryTime = CalculateNextTryTime(jobInfo, clock);

            if (nextTryTime.HasValue)
            {
                jobInfo.NextTryTime = nextTryTime.Value;
            }
            else
            {
                // 超过超时时间的时候,公式计算函数返回 null,该任务置为废弃任务。
                jobInfo.IsAbandoned = true;
            }

            TryUpdate(store, jobInfo);
        }
    }
    catch (Exception ex)
    {
        // 执行过程当中,产生了未知异常,设置为废弃任务,并打印日志。
        Logger.LogException(ex);
        jobInfo.IsAbandoned = true;
        TryUpdate(store, jobInfo);
    }
}

执行后台任务的时候基本分为 5 步,它们分别是:

  1. 得到任务关联的配置参数,默认不用提供,由于在以前模块初始化的时候就已经配置了(你也能够显式指定)。
  2. 经过以前存储的配置参数,将参数值反序列化出来,构造具体实例。
  3. 构造一个执行上下文。
  4. 后台任务执行器执行具体的后台任务。
  5. 成功则删除任务,失败则更新任务下次的执行状态。

至于执行器里面的真正执行操做,你都拿到了参数值和任务类型了。就能够经过类型用 IoC 获取后台任务对象的实例,而后经过反射匹配方法签名,在实例上调用这个方法传入参数便可。

public virtual void Execute(JobExecutionContext context)
{
    // 构造具体的后台做业实例对象。
    var job = context.ServiceProvider.GetService(context.JobType);
    if (job == null)
    {
        throw new AbpException("The job type is not registered to DI: " + context.JobType);
    }

    // 得到须要执行的方法签名。
    var jobExecuteMethod = context.JobType.GetMethod(nameof(IBackgroundJob<object>.Execute));
    if (jobExecuteMethod == null)
    {
        throw new AbpException($"Given job type does not implement {typeof(IBackgroundJob<>).Name}. The job type was: " + context.JobType);
    }

    try
    {
        // 直接经过 MethodInfo 的 Invoke 方法调用,传入具体的实例对象和参数值便可。
        jobExecuteMethod.Invoke(job, new[] { context.JobArgs });
    }
    catch (Exception ex)
    {
        Logger.LogException(ex);

        // 若是是执行方法内的异常,则包装进行处理,而后抛出。
        throw new BackgroundJobExecutionException("A background job execution is failed. See inner exception for details.", ex)
        {
            JobType = context.JobType.AssemblyQualifiedName,
            JobArgs = context.JobArgs
        };
    }
}

集成 Hangfire

ABP vNext 对于 Hangfire 的集成代码分布在 Volo.Abp.HangFireVolo.Abp.BackgroundJobs.HangFire 模块内部,前者是在模块配置里面,调用 Hangfire 库的相关方法,注入组件到 IoC 容器当中。后者则是对后台做业进行了适配处理,替换了默认的 IBackgroundJobManager 实现。

AbpHangfireModule 模块内部,经过工厂建立出来一个 BackgroudJobServer 实例,并将它的生命周期与应用程序的生命周期进行绑定,以便进行销毁处理。

public class AbpHangfireModule : AbpModule
{
    private BackgroundJobServer _backgroundJobServer;

    public override void ConfigureServices(ServiceConfigurationContext context)
    {
        context.Services.AddHangfire(configuration =>
        {
            context.Services.ExecutePreConfiguredActions(configuration);
        });
    }

    public override void OnApplicationInitialization(ApplicationInitializationContext context)
    {
        var options = context.ServiceProvider.GetRequiredService<IOptions<AbpHangfireOptions>>().Value;
        _backgroundJobServer = options.BackgroundJobServerFactory.Invoke(context.ServiceProvider);
    }

    public override void OnApplicationShutdown(ApplicationShutdownContext context)
    {
        //TODO: ABP may provide two methods for application shutdown: OnPreApplicationShutdown & OnApplicationShutdown
        _backgroundJobServer.SendStop();
        _backgroundJobServer.Dispose();
    }
}

咱们直奔主题,看一下基于 Hangfire 的后台做业管理器是怎么实现的。

public class HangfireBackgroundJobManager : IBackgroundJobManager, ITransientDependency
{
    public Task<string> EnqueueAsync<TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal,
        TimeSpan? delay = null)
    {
        // 若是没有延迟参数,则直接经过 Enqueue() 方法扔进执行对了。
        if (!delay.HasValue)
        {
            return Task.FromResult(
                BackgroundJob.Enqueue<HangfireJobExecutionAdapter<TArgs>>(
                    adapter => adapter.Execute(args)
                )
            );
        }
        else
        {
            return Task.FromResult(
                BackgroundJob.Schedule<HangfireJobExecutionAdapter<TArgs>>(
                    adapter => adapter.Execute(args),
                    delay.Value
                )
            );
        }
    }

上述代码中使用 HangfireJobExecutionAdapter 进行了一个适配操做,由于 Hangfire 要将一个后台任务扔进队列执行,不是用 TArgs 就能解决的。

转到这个适配器定义,提供了一个 Execute(TArgs) 方法,当被添加到 Hangfire 队列执行的时候。实际 Hangfire 会调用适配器的 Excetue(TArgs) 方法,而后内部仍是使用的 IBackgroundJobExecuter 来执行具体定义的任务。

public class HangfireJobExecutionAdapter<TArgs>
{
    protected AbpBackgroundJobOptions Options { get; }
    protected IServiceScopeFactory ServiceScopeFactory { get; }
    protected IBackgroundJobExecuter JobExecuter { get; }

    public HangfireJobExecutionAdapter(
        IOptions<AbpBackgroundJobOptions> options, 
        IBackgroundJobExecuter jobExecuter, 
        IServiceScopeFactory serviceScopeFactory)
    {
        JobExecuter = jobExecuter;
        ServiceScopeFactory = serviceScopeFactory;
        Options = options.Value;
    }

    public void Execute(TArgs args)
    {
        using (var scope = ServiceScopeFactory.CreateScope())
        {
            var jobType = Options.GetJob(typeof(TArgs)).JobType;
            var context = new JobExecutionContext(scope.ServiceProvider, jobType, args);
            JobExecuter.Execute(context);
        }
    }
}

集成 RabbitMQ

基于 RabbitMQ 的后台做业实现,我想放在分布式事件总线里面,对其一块儿进行讲解。

3、总结

ABP vNext 为咱们提供了多种后台做业管理器的实现,你能够根据本身的需求选用不一样的后台做业管理器,又或者是本身动手造轮子。

须要看其余的 ABP vNext 相关文章?点击我 便可跳转到总目录。

相关文章
相关标签/搜索