说下hangfire吧

最近因工做须要开发计划任务模块(严格来讲应该是修改bug吧,其余同事负责的)接触到了Hangfire。早前听同事说hangfire有点坑,怀着好奇,趁这两天bug改的差很少了,在github上面down了hangfire源码,下面分享一下,本身读hangfire源码的一些理解,和工做中须要注意的地方。介绍大概分为如下几个部分吧。1.准备工做,2.简单使用,3.源码分析,4.避坑。须要说明一下接触hangfire源码的时间不长,也就几天时间理解不到位,或者说错了的,但愿在评论指正。
准备工做:hangfire源代码的代码量很少,github地址: https://github.com/HangfireIO/Hangfire,有兴趣的朋友能够本身下载瞅瞅源码。功能上大概能够分为客户端模式和服务端模式。用到的技术大概有Multi Thread、Expression、Dapper、Cron等。能够这么说,它的定时任务彻底就是基于多线程协做实现的。由于是多线程环境,因此我的以为看起来有点费力。
简单使用:.Net&.Net Core环境均可以使用,下面就以.Net Core的使用为例。
1.客户端和服务端独立部署
client端
 1 public IServiceProvider ConfigureServices(IServiceCollection services)  2  {  3             // 其余代码
 4              services.AddHangfire(config =>
 5  {  6  config.UseSqlServerStorage(...);  7  });  8  }  9  
10 public void Configure(IApplicationBuilder app, IWebHostEnvironment env) 11  { 12             // 其余代码... 13             // 启用Dashboard看板
14  app.UseHangfireDashboard(); 15         }

 

server端
 1 public void Configuration(IAppBuilder app)  2  {  3  GlobalConfiguration.Configuration  4                  .UseSqlServerStorage("链接字符串", new SqlServerStorageOptions  5  {  6                     // options
 7  });  8             app.UseHangfireServer(new BackgroundJobServerOptions  9  { 10  }); 11  } 12  
13
或者
1 services.AddHangfireServer(options =>
2  { 3                         // 基于IHostedService接口实现
4                     });
PS:server端还有一种实现方式,实现IHostedService接口 其实跟上面的使用方法同样的,注入到服务就ok,在程序启动阶段会自动执行IHostedService接口的两个方法,能够简单看下IHostedService接口的定义。
1   public interface IHostedService 2  { 3  Task StartAsync(CancellationToken cancellationToken); 4  Task StopAsync(CancellationToken cancellationToken); 5   }
接口就定义了两个方法,start是在程序启动的时候执行,固然stop就是在程序中止的时候执行。咱们用一张图简单描绘一下它的执行时机,图是盗的。
以上就是hangfire的client端和server端分开部署的一个简单应用,下面咱们看下第二种,client&server部署在同一台机器上。
2.客户端和服务端统一部署
1 public void Configuration(IAppBuilder app) 2 { 3     GlobalConfiguration.Configuration.UseSqlServerStorage(); // 配置数据库链接
4     
5     app.UseHangfireServer(); // 启用server
6     app.UseHangfireDashboard(); // 启用看板
7 }

 

简单的几行代码,固然我也只会简单的用法。以上服务注入并执行,接下来就是往hangfire里面添加任务。
1 BackgroundJob.Enqueue(() => Console.WriteLine("Simple!")); // 当即执行
2 BackgroundJob.Schedule(() => Console.WriteLine("Reliable!"), TimeSpan.FromDays(7)); // 延后执行
3 RecurringJob.AddOrUpdate(() => Console.WriteLine("Transparent!"), Cron.Daily); // 循环执行,支持cron表达式
简单使用就到这吧,咱们继续大纲的第三部分,源码分析。
 
源码分析
客户端模式就不用说了,说白了就是往hangfire数据库里面写任务,咱们主要是看看服务端的执行原理。咱们先找到入口,也能够看作是NetCore里面的一个中间件吧。看代码
1 app.UseHangfireServer(); // 启用server
UseHangfireServer实现
 1 public static IAppBuilder UseHangfireServer(  2             [NotNull] this IAppBuilder builder,  3  [NotNull] JobStorage storage,  4  [NotNull] BackgroundJobServerOptions options,  5             [NotNull] params IBackgroundProcess[] additionalProcesses)  6  {  7             // 其余代码...
 8             var server = new BackgroundJobServer(options, storage, additionalProcesses);  9             
10             return builder; 11         }

 

UseHangfireServer扩展方法实现里面,比较重要的一行代码就是建立BackgroundJobServer,BackgroundJobServer实现了IBackgroundProcessingServer接口,server的启动也就是间接在它的构造器里面完成的。咱们不妨先瞅瞅IBackgroundProcessingServer接口和BackgroundJobServer类的定义。
 1 // IBackgroundProcessingServer
 2 public interface IBackgroundProcessingServer : IDisposable  3  {  4         void SendStop();  5         bool WaitForShutdown(TimeSpan timeout);  6  Task WaitForShutdownAsync(CancellationToken cancellationToken);  7  }  8  
 9 // BackgroundJobServer
10 public class BackgroundJobServer : IBackgroundProcessingServer 11  { 12         // 其余成员...
13         public BackgroundJobServer( 14  [NotNull] BackgroundJobServerOptions options, 15  [NotNull] JobStorage storage, 16             [NotNull] IEnumerable<IBackgroundProcess> additionalProcesses, 17  [CanBeNull] IJobFilterProvider filterProvider, 18  [CanBeNull] JobActivator activator, 19  [CanBeNull] IBackgroundJobFactory factory, 20  [CanBeNull] IBackgroundJobPerformer performer, 21  [CanBeNull] IBackgroundJobStateChanger stateChanger) 22  { 23             // 其余代码
24             var processes = new List<IBackgroundProcessDispatcherBuilder>(); 25  processes.AddRange(GetRequiredProcesses(filterProvider, activator, factory, performer, stateChanger)); 26             processes.AddRange(additionalProcesses.Select(x =>  x.UseBackgroundPool(1))); 27             var properties = new Dictionary<string, object>
28  { 29                 { "Queues", options.Queues }, 30                 { "WorkerCount", options.WorkerCount } 31  }; 32             
33             _processingServer = new BackgroundProcessingServer( 34  storage, 35  processes, 36  properties, 37  GetProcessingServerOptions()); 38  } 39         public void SendStop() 40  { 41  } 42         public void Dispose() 43  { 44  } 45         [Obsolete("This method is a stub. There is no need to call the `Start` method. Will be removed in version 2.0.0.")] 46         public void Start() 47  { 48  } 49         [Obsolete("Please call the `Shutdown` method instead. Will be removed in version 2.0.0.")] 50         public void Stop() 51  { 52  } 53         [Obsolete("Please call the `Shutdown` method instead. Will be removed in version 2.0.0.")] 54         public void Stop(bool force) 55  { 56  } 57         public bool WaitForShutdown(TimeSpan timeout) 58  { 59  } 60         public Task WaitForShutdownAsync(CancellationToken cancellationToken) 61  { 62         }

 

IBackgroundProcessingServer接口里面的这几个方法都是跟停用server,取消任务清理资源相关的。BackgroundJobServer类里面真正完成接口的实现是由BackgroundProcessingServer类型的同名函数实现,这个对象是在构造函数里面初始化的,在初始化BackgroundProcessingServer类型的同时,建立了若干IBackgroundProcessDispatcherBuilder实现类BackgroundProcessDispatcherBuilder的实例,hangfire默认实现了7种dispatcher,咱们任务、日志、心跳等等独立线程都是由它的Create方法完成,这个地方不算server启动主线,会在后面细说。咱们继续看看BackgroundProcessingServer这个类型。这里须要注意的是里面有几个方法好像是被停用了,start、stop等方法,官方也注释了,被删除了。start方法被停用了,难道咱们的server启动是在BackgroundProcessingServer类型里面?继续看BackgroundProcessingServer的定义。
 1 public sealed class BackgroundProcessingServer : IBackgroundProcessingServer  2  {  3         // 其余成员
 4         internal BackgroundProcessingServer(  5  [NotNull] BackgroundServerProcess process,  6  [NotNull] BackgroundProcessingServerOptions options)  7  {  8             _process = process ?? throw new ArgumentNullException(nameof(process));  9             _options = options ?? throw new ArgumentNullException(nameof(options)); 10             _dispatcher = CreateDispatcher(); 11 #if !NETSTANDARD1_3
12             AppDomain.CurrentDomain.DomainUnload += OnCurrentDomainUnload; 13             AppDomain.CurrentDomain.ProcessExit += OnCurrentDomainUnload; 14 #endif
15  } 16         public void SendStop() 17  { 18  } 19         public bool WaitForShutdown(TimeSpan timeout) 20  { 21  } 22         public async Task WaitForShutdownAsync(CancellationToken cancellationToken) 23  { 24  } 25         public void Dispose() 26  { 27             
28  } 29         private void OnCurrentDomainUnload(object sender, EventArgs args) 30  { 31             
32  } 33         private IBackgroundDispatcher CreateDispatcher() 34  { 35             var execution = new BackgroundExecution( 36  _stoppingCts.Token, 37                 new BackgroundExecutionOptions 38  { 39                     Name = nameof(BackgroundServerProcess), 40                     ErrorThreshold = TimeSpan.Zero, 41                     StillErrorThreshold = TimeSpan.Zero, 42                     RetryDelay = retry => _options.RestartDelay 43  }); 44             return new BackgroundDispatcher( 45  execution, 46  RunServer, 47  execution, 48  ThreadFactory); 49  } 50         private void RunServer(Guid executionId, object state) 51  { 52  _process.Execute(executionId, (BackgroundExecution)state, _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token); 53  } 54         private static IEnumerable<Thread> ThreadFactory(ThreadStart threadStart) 55  { 56             yield return new Thread(threadStart) 57  { 58                 IsBackground = true, 59                 Name = $"{nameof(BackgroundServerProcess)} #{Interlocked.Increment(ref _lastThreadId)}", 60  }; 61  } 62     }

 

果不其然,server的启动快要揭开神秘的面纱了,RunServer?翻译过来应该是启动服务吧,咱们暂且不去管他,先记一下这个有个runserver,咱们继续跟踪。在构造函数里面调用了一个CreateDispatcher()的方法,咱们看下它的实现
 1 private IBackgroundDispatcher CreateDispatcher()  2  {  3             var execution = new BackgroundExecution(  4  _stoppingCts.Token,  5                 new BackgroundExecutionOptions  6  {  7                     Name = nameof(BackgroundServerProcess),  8                     ErrorThreshold = TimeSpan.Zero,  9                     StillErrorThreshold = TimeSpan.Zero, 10                     RetryDelay = retry => _options.RestartDelay 11  }); 12             return new BackgroundDispatcher( 13  execution, 14  RunServer, 15  execution, 16  ThreadFactory); 17         }

 

在CreateDispatcher方法里面返回了一个BackgroundDispatcher,字面意思好像是后台分发器,而且指定了回调runserver,BackgroundDispatcher实现了IBackgroundDispatcher接口,咱们先看下它们的定义。
 1 // IBackgroundDispatcher
 2 public interface IBackgroundDispatcher : IDisposable  3  {  4         bool Wait(TimeSpan timeout);  5  Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken);  6  }  7  
 8 // BackgroundDispatcher
 9 internal sealed class BackgroundDispatcher : IBackgroundDispatcher 10  { 11         // 其余成员
12         public BackgroundDispatcher( 13  [NotNull] IBackgroundExecution execution, 14             [NotNull] Action<Guid, object> action, 15             [CanBeNull] object state, 16             [NotNull] Func<ThreadStart, IEnumerable<Thread>> threadFactory) 17  { 18             if (threadFactory == null) throw new ArgumentNullException(nameof(threadFactory)); 19             _execution = execution ?? throw new ArgumentNullException(nameof(execution)); 20             _action = action ?? throw new ArgumentNullException(nameof(action)); 21             _state = state; 22 #if !NETSTANDARD1_3
23  AppDomainUnloadMonitor.EnsureInitialized(); 24 #endif
25             var threads = threadFactory(DispatchLoop)?.ToArray(); 26             if (threads == null || threads.Length == 0) 27  { 28                 throw new ArgumentException("At least one unstarted thread should be created.", nameof(threadFactory)); 29  } 30             if (threads.Any(thread => thread == null || (thread.ThreadState &  ThreadState.Unstarted) == 0)) 31  { 32                 throw new ArgumentException("All the threads should be non-null and in the ThreadState.Unstarted state.", nameof(threadFactory)); 33  } 34             _stopped = new CountdownEvent(threads.Length); 35             foreach (var thread in threads) 36  { 37  thread.Start(); 38  } 39  } 40         public bool Wait(TimeSpan timeout) 41  { 42             return _stopped.WaitHandle.WaitOne(timeout); 43  } 44         public async Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken) 45  { 46             await _stopped.WaitHandle.WaitOneAsync(timeout,  cancellationToken).ConfigureAwait(false); 47  } 48         public void Dispose() 49  { 50  } 51         public override string ToString() 52  { 53  } 54         private void DispatchLoop() 55  { 56             try
57  { 58  _execution.Run(_action, _state); 59  } 60             catch (Exception ex) 61  { 62  
63  } 64             finally
65  { 66                 try
67  { 68  _stopped.Signal(); 69  } 70                 catch (ObjectDisposedException) 71  { 72  
73  } 74  } 75  } 76     }

 

从IBackgroundDispatcher接口的定义来看,分发器应该是负责协调资源处理,咱们具体看看BackgroundDispatcher的实现。以上代码就是server的启动执行核心代码而且我以加粗,其实就是启动线程Loop执行。在DispatchLoop方法里面间接调用了我上面说的runserver方法。在runserver方法里面实现了整个server端的初始化工做。咱们接着看DispatchLoop方法的实现 ,在这个方法里面调用了IBackgroundExecution接口的run方法,继续IBackgroundExecution接口的定义。
1 public interface IBackgroundExecution : IDisposable 2  { 3         void Run([NotNull] Action<Guid, object> callback, [CanBeNull] object state); 4         Task RunAsync([NotNull] Func<Guid, object, Task> callback, [CanBeNull]  object state); 5     }

 

就两方法,run包含同步和异步,看看它的惟一实现类BackgroundExecution。
 1   internal sealed class BackgroundExecution : IBackgroundExecution  2  {  3                 // 其余成员
 4         public void Run(Action<Guid, object> callback, object state)  5  {  6             if (callback == null) throw new ArgumentNullException(nameof(callback));  7             var executionId = Guid.NewGuid();  8            
 9  { 10 #if !NETSTANDARD1_3
11                 try
12 #endif
13  { 14                     HandleStarted(executionId, out var nextDelay); 15                     while (true) 16  { 17                         // Don't place anything here.
18                         try
19  { 20                            
21                             if (StopRequested) break; 22                             if (nextDelay > TimeSpan.Zero) 23  { 24  HandleDelay(executionId, nextDelay); 25  } 26  callback(executionId, state); 27                             HandleSuccess(out nextDelay); 28  } 29 #if !NETSTANDARD1_3
30                         catch (ThreadAbortException) when (AppDomainUnloadMonitor.IsUnloading) 31  { 32                             // Our thread is aborted due to AppDomain unload. It's better to give up to 33                             // not to cause the host to be more aggressive.
34                             throw; 35  } 36 #endif
37                         catch (OperationCanceledException) when (StopRequested) 38  { 39                             break; 40  } 41                         catch (Exception ex) 42  { 43                             HandleException(executionId, ex, out nextDelay); 44  } 45  } 46  HandleStop(executionId); 47  } 48 #if !NETSTANDARD1_3
49                 catch (ThreadAbortException ex) 50  { 51  HandleThreadAbort(executionId, ex); 52  } 53 #endif
54  } 55  } 56 }

 

hangfire里面全部的独立线程都是经过run方法执行,而后回调到本身的实现类Execute方法,自此每一个独立的功能线程就循环干着本身独立的工做(这个后面会单独分析RecurringJobScheduler)。继续咱们的主线,server启动,咱们以run的同步方法为例,第一个线程(咱们就叫它主线程吧)启动了一个while循环,在循环里面而且callback调用了咱们的runserver方法。
    
1 private void RunServer(Guid executionId, object state) 2  { 3  _process.Execute(executionId, (BackgroundExecution)state, _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token); 4         }

 

在runserver方法里面的实现很简单,直接调用了_process的execute方法,咱们简单看下_process类型IBackgroundServerProcess的定义。
1 internal interface IBackgroundServerProcess 2  { 3         void Execute( 4  Guid executionId, 5  BackgroundExecution execution, 6  CancellationToken stoppingToken, 7  CancellationToken stoppedToken, 8  CancellationToken shutdownToken); 9     }

 

IBackgroundServerProcess的定义就一个execute方法,这个接口的工做其实就是初始化server服务端,咱们看看它的惟一实现类BackgroundServerProcess。
 1 internal sealed class BackgroundServerProcess : IBackgroundServerProcess  2  {  3         
 4         // 其余成员
 5         public BackgroundServerProcess(  6  [NotNull] JobStorage storage,  7             [NotNull] IEnumerable<IBackgroundProcessDispatcherBuilder> dispatcherBuilders,  8  [NotNull] BackgroundProcessingServerOptions options,  9             [NotNull] IDictionary<string, object> properties)  10  {  11             if (dispatcherBuilders == null) throw new ArgumentNullException(nameof(dispatcherBuilders));  12  
 13  
 14             _storage = storage ?? throw new ArgumentNullException(nameof(storage));  15             _options = options ?? throw new ArgumentNullException(nameof(options));  16             _properties = properties ?? throw new ArgumentNullException(nameof(properties));  17  
 18  
 19             var builders = new List<IBackgroundProcessDispatcherBuilder>();  20             builders.AddRange(GetRequiredProcesses()); // 添加默认的工做dispatcher也就是独立线程
 21  builders.AddRange(GetStorageComponents());  22  builders.AddRange(dispatcherBuilders);  23  
 24  
 25             _dispatcherBuilders = builders.ToArray();  26  }  27  
 28  
 29         public void Execute(Guid executionId, BackgroundExecution execution, CancellationToken stoppingToken,  30             CancellationToken stoppedToken, CancellationToken shutdownToken)  // server初始化
 31  {  32             var serverId = GetServerId();  33             Stopwatch stoppedAt = null;  34  
 35  
 36             void HandleRestartSignal()  37  {  38                 if (!stoppingToken.IsCancellationRequested)  39  {  40                     _logger.Info($"{GetServerTemplate(serverId)} caught restart signal...");  41  }  42  }  43             using (var restartCts = new CancellationTokenSource())  44             using (var restartStoppingCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, restartCts.Token))  45             using (var restartStoppedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppedToken, restartCts.Token))  46             using (var restartShutdownCts = CancellationTokenSource.CreateLinkedTokenSource(shutdownToken, restartCts.Token))  47             using (restartStoppingCts.Token.Register(HandleStopRestartSignal))  48             using (stoppingToken.Register(HandleStoppingSignal))  49             using (stoppedToken.Register(HandleStoppedSignal))  50             using (shutdownToken.Register(HandleShutdownSignal))  51             using (restartCts.Token.Register(HandleRestartSignal))  52  {  53                 var context = new BackgroundServerContext(  54  serverId,  55  _storage,  56  _properties,  57  restartStoppingCts.Token,  58  restartStoppedCts.Token,  59  restartShutdownCts.Token);  60                 var dispatchers = new List<IBackgroundDispatcher>();  61  CreateServer(context);  62                 try
 63  {  64                     // ReSharper disable once AccessToDisposedClosure
 65                     using (var heartbeat = CreateHeartbeatProcess(context, () => restartCts.Cancel())) // 建立守护线程
 66  {  67                         StartDispatchers(context, dispatchers); // 启动hangfire默认初始化的全部独立任务线程
 68  execution.NotifySucceeded();  69  WaitForDispatchers(context, dispatchers);  70  
 71  
 72  restartCts.Cancel();  73  
 74  heartbeat.WaitAsync(Timeout.InfiniteTimeSpan, shutdownToken).GetAwaiter().GetResult();  75  }  76  }  77                 finally
 78  {  79  DisposeDispatchers(dispatchers);  80  ServerDelete(context, stoppedAt);  81  }  82  }  83  }  84  
 85  
 86         private IBackgroundDispatcher CreateHeartbeatProcess(BackgroundServerContext context, Action requestRestart) // 建立守护线程
 87  {  88             return new ServerHeartbeatProcess(_options.HeartbeatInterval, _options.ServerTimeout, requestRestart)  89                 .UseBackgroundPool(threadCount: 1)  90  .Create(context, _options);  91  }  92  
 93  
 94         private IEnumerable<IBackgroundProcessDispatcherBuilder> GetRequiredProcesses() // 初始化日志和任务监控线程
 95  {  96             yield return new ServerWatchdog(_options.ServerCheckInterval, _options.ServerTimeout).UseBackgroundPool(threadCount: 1);  97             yield return new ServerJobCancellationWatcher(_options.CancellationCheckInterval).UseBackgroundPool(threadCount: 1);  98  }  99         private string GetServerId() // 获取serverid
100  { 101             var serverName = _options.ServerName 102                  ?? Environment.GetEnvironmentVariable("COMPUTERNAME") 103                  ?? Environment.GetEnvironmentVariable("HOSTNAME"); 104             var guid = Guid.NewGuid().ToString(); 105  
106             return !String.IsNullOrWhiteSpace(serverName) ? $"{serverName.ToLowerInvariant()}:{guid}" : guid; 107  } 108  
109         
110         private void CreateServer(BackgroundServerContext context) // 建立server,写入Server数据表
111  { 112             var stopwatch = Stopwatch.StartNew(); 113             using (var connection = _storage.GetConnection()) 114  { 115  connection.AnnounceServer(context.ServerId, GetServerContext(_properties)); 116  } 117  stopwatch.Stop(); 118  
119  
120  ServerJobCancellationToken.AddServer(context.ServerId); 121             _logger.Info($"{GetServerTemplate(context.ServerId)} successfully announced in {stopwatch.Elapsed.TotalMilliseconds} ms"); 122  } 123  
124  
125         private void StartDispatchers(BackgroundServerContext context, ICollection<IBackgroundDispatcher> dispatchers) // 启动全部独立的任务线程,包括咱们的队列计划、循环计划、日志、守护等等线程
126  { 127  
128             foreach (var dispatcherBuilder in _dispatcherBuilders) 129  { 130  dispatchers.Add(dispatcherBuilder.Create(context, _options)); 131  } 132  } 133  
134     }

 

以上代码我有作精简处理,不要纠结里面的实现,代码注释也比较详细。下面我作一个简单的总结吧,第一个线程(暂时叫主线程吧)从startup里面调用usehangfireserver扩展方法-》启动一个新的worker线程用于初始化&启动server-》主程返回-》启动hangfire全部任务线程-》建立的第一个worker线程挂起(用于处理全部任务线程的资源释放)。server的初始化工做大概就是这些,下面详细看看hangfire的任务线程的执行原理,这里咱们以RecurringJobScheduler循环任务为例。
 
RecurringJobScheduler实现机制
还记得上面提到的7个dispatcher任务线程的建立吗?这7个默认的任务线程初始化就发生在上面加粗的代码里面StartDispatchers方法,咱们看代码。
1 private void StartDispatchers(BackgroundServerContext context,  ICollection<IBackgroundDispatcher> dispatchers) 2  { 3                // 其余代码...
4             foreach (var dispatcherBuilder in _dispatcherBuilders) 5  { 6                 dispatchers.Add(dispatcherBuilder.Create(context, _options)); // 初始化独立任务线程
7  } 8         }

 

遍历_dispatcherBuilders数组,7种任务类型,分别调用它们的Create方法。继续看create方法。
   
 1  public IBackgroundDispatcher Create(BackgroundServerContext context,  BackgroundProcessingServerOptions options) // 第一步
 2  {  3             // 其余代码
 4             var execution = new BackgroundExecution(  5  context.StoppingToken,  6                 new BackgroundExecutionOptions  7  {  8                     Name = _process.GetType().Name,  9                     RetryDelay = options.RetryDelay 10                 }); // 定义本身的execution
11             return new BackgroundDispatcher( // 建立BackgroundDispatcher 
12  execution, 13                 ExecuteProcess, // 指定回调
14                 Tuple.Create(_process, context, execution), // 建立三元组上下文,注意一下1元组这个对象
15  _threadFactory); 16  } 17  
18 public BackgroundDispatcher(  // 第二步
19  [NotNull] IBackgroundExecution execution, 20             [NotNull] Action<Guid, object> action, 21             [CanBeNull] object state, 22             [NotNull] Func<ThreadStart, IEnumerable<Thread>> threadFactory) 23  { 24    
25             _state = state; 26  
27             var threads = threadFactory(DispatchLoop)?.ToArray(); 28            
29             foreach (var thread in threads) 30  { 31                 thread.Start(); // 执行线程
32  } 33  } 34  
35 private void DispatchLoop() // 第三步
36  { 37             try
38  { 39                 _execution.Run(_action, _state);  // 在run里面回调_action
40  } 41             catch (Exception ex) 42  { 43  } 44             finally
45  { 46                 try
47  { 48  _stopped.Signal(); 49  } 50                 catch (ObjectDisposedException) 51  { 52  } 53  } 54  } 55  
56 private static void ExecuteProcess(Guid executionId, object state) // 第四步 回调方法,对应上面的指定回调
57  { 58             var tuple = (Tuple<IBackgroundProcess, BackgroundServerContext,  BackgroundExecution>)state; 59             var serverContext = tuple.Item2; 60             var context = new BackgroundProcessContext( // 建立公共上下文
61  serverContext.ServerId, 62  serverContext.Storage, 63                 serverContext.Properties.ToDictionary(x => x.Key, x => x.Value), 64  executionId, 65  serverContext.StoppingToken, 66  serverContext.StoppedToken, 67  serverContext.ShutdownToken); 68             while (!context.IsStopping) 69  { 70                 tuple.Item1.Execute(context); // 执行本身元组对应的实例
71  tuple.Item3.NotifySucceeded(); 72  } 73         }

 

上面有点乱啊,我大概简单串起来讲一下。第一步在create方法里面建立了BackgroundDispatcher并指定了元组参数-》第二步绑定线程的执行函数Loop而且执行-》第三步执行Loop而且回调_action委托-》第四步_action参数对应的函数地址就是ExecuteProcess,最后在ExecuteProcess里面经过元组参数调用对应的任务类型,自此7种任务类型启动并开始工做。以上代码还有个细节须要说明一下,Tuple.Create(_process, context, execution)。元组的第一个参数,其类型为IBackgroundProcess,看下定义。
1 public interface IBackgroundProcess : IServerProcess 2  { 3         void Execute([NotNull] BackgroundProcessContext context); 4     }

 

接口就定义了一个方法,没什么特别的,可是它的几个实现类就是咱们单独的任务类,咱们下面要说的RecurringJobScheduler循环任务类也实现了这个接口。到此咱们的RecurringJobScheduler循环定时任务线程就算开始执行了。
RecurringJobScheduler循环定时任务机制
照旧看下这个类型的定义
 1 public class RecurringJobScheduler : IBackgroundProcess  2  {  3         // 其余代码
 4         public RecurringJobScheduler(  5  [NotNull] IBackgroundJobFactory factory,  6  TimeSpan pollingDelay,  7  [NotNull] ITimeZoneResolver timeZoneResolver,  8             [NotNull] Func<DateTime> nowFactory)  9  { 10             if (factory == null) throw new ArgumentNullException(nameof(factory)); 11             if (nowFactory == null) throw new ArgumentNullException(nameof(nowFactory)); 12             if (timeZoneResolver == null) throw new ArgumentNullException(nameof(timeZoneResolver)); 13  
14  
15             _factory = factory; 16             _nowFactory = nowFactory; 17             _timeZoneResolver = timeZoneResolver; 18             _pollingDelay = pollingDelay; 19             _profiler = new SlowLogProfiler(_logger); 20  } 21  
22  
23         /// <inheritdoc />
24         public void Execute(BackgroundProcessContext context) // 实现方法
25  { 26             if (context == null) throw new ArgumentNullException(nameof(context)); 27  
28  
29             var jobsEnqueued = 0; 30  
31  
32             while (EnqueueNextRecurringJobs(context)) // 从数据库获取定时任务
33  { 34                 jobsEnqueued++; 35  
36  
37                 if (context.IsStopping) 38  { 39                     break; 40  } 41  } 42  
43  
44             if (jobsEnqueued != 0) 45  { 46                 _logger.Debug($"{jobsEnqueued} recurring job(s) enqueued."); 47  } 48  
49  
50             if (_pollingDelay > TimeSpan.Zero) 51  { 52  context.Wait(_pollingDelay); 53  } 54             else
55  { 56                 var now = _nowFactory(); 57                 context.Wait(now.AddMilliseconds(-now.Millisecond).AddSeconds(-now.Second).AddMinutes(1) - now); 58  } 59  } 60     }

 

承上,调用元组的第一个参数的execute方法,RecurringJobScheduler的execute方法得以执行,该方法就干一件事,每隔15秒从数据库获取待执行的计划,每次1000条数据。经过EnqueueNextRecurringJobs方法获取任务。
 1 private bool EnqueueNextRecurringJobs(BackgroundProcessContext context)  2  {  3             return UseConnectionDistributedLock(context.Storage, connection => 
 4  {  5                 var result = false;  6                 if (IsBatchingAvailable(connection))  7  {  8                     var now = _nowFactory();  9                     var timestamp = JobHelper.ToTimestamp(now); 10                     var recurringJobIds =  ((JobStorageConnection)connection).GetFirstByLowestScoreFromSet("recurring-jobs", 0,  timestamp, BatchSize); // 从数据库里面查询
11                     if (recurringJobIds == null || recurringJobIds.Count == 0) return  false; 12                     foreach (var recurringJobId in recurringJobIds) 13  { 14                         if (context.IsStopping) return false; 15                         if (TryEnqueueBackgroundJob(context, connection, recurringJobId,  now))// 排队执行
16  { 17                             result = true; 18  } 19  } 20  } 21                 else
22  { 23                     for (var i = 0; i < BatchSize; i++) 24  { 25                         if (context.IsStopping) return false; 26                         var now = _nowFactory(); 27                         var timestamp = JobHelper.ToTimestamp(now); 28                         var recurringJobId =  connection.GetFirstByLowestScoreFromSet("recurring-jobs", 0, timestamp); 29                         if (recurringJobId == null) return false; 30                         if (!TryEnqueueBackgroundJob(context, connection, recurringJobId, now)) 31  { 32                             return false; 33  } 34  } 35  } 36                 return result; 37  }); 38         }

 

GetFirstByLowestScoreFromSet方法从数据库Set表里面查询top1000数据,条件是key为recurring-jobs字符串(表示定时任务)而且 时间范围是0到当前时间。随后遍历这些jobids,排队执行,往下看TryEnqueueBackgroundJob方法的实现。
 1 private bool EnqueueBackgroundJob(  2  BackgroundProcessContext context,  3  IStorageConnection connection,  4             string recurringJobId,  5  DateTime now)  6  {  7             // 其余代码
 8             using (connection.AcquireDistributedRecurringJobLock(recurringJobId, LockTimeout))  9  { 10                 try
11  { 12                     var recurringJob = connection.GetRecurringJob(recurringJobId, _timeZoneResolver, now); 13                     if (recurringJob == null) 14  { 15                         using (var transaction = connection.CreateWriteTransaction()) 16  { 17                             transaction.RemoveFromSet("recurring-jobs", recurringJobId); 18  transaction.Commit(); 19  } 20                         return false; 21  } 22           
23                     BackgroundJob backgroundJob = null; 24                     IReadOnlyDictionary<string, string> changedFields; 25                     if (recurringJob.TrySchedule(out var nextExecution, out var error)) 26  { 27                         if (nextExecution.HasValue && nextExecution <= now) 28  { 29                             backgroundJob = _factory.TriggerRecurringJob(context.Storage, connection, _profiler, recurringJob, now); 30                             if (String.IsNullOrEmpty(backgroundJob?.Id)) 31  { 32                                 _logger.Debug($"Recurring job '{recurringJobId}' execution at '{nextExecution}' has been canceled."); 33  } 34  } 35                         recurringJob.IsChanged(out changedFields, out nextExecution); 36  } 37                     else if (recurringJob.RetryAttempt < MaxRetryAttemptCount) 38  { 39                         var delay = _pollingDelay > TimeSpan.Zero ? _pollingDelay :  TimeSpan.FromMinutes(1); 40                         
41                         _logger.WarnException($"Recurring job '{recurringJobId}' can't be scheduled due to an error and will be retried in {delay}.", error); 42                         recurringJob.ScheduleRetry(delay, out changedFields, out nextExecution); 43  } 44                     else
45  { 46                         _logger.ErrorException($"Recurring job '{recurringJobId}' can't be scheduled due to an error and will be disabled.", error); 47                         recurringJob.Disable(error, out changedFields, out nextExecution); 48  } 49               
50                     using (var transaction = connection.CreateWriteTransaction()) 51  { 52                         if (backgroundJob != null) 53  { 54  _factory.StateMachine.EnqueueBackgroundJob( 55  context.Storage, 56  connection, 57  transaction, 58  recurringJob, 59  backgroundJob, 60                                 "Triggered by recurring job scheduler", 61  _profiler); 62  } 63  transaction.UpdateRecurringJob(recurringJob, changedFields, nextExecution, _logger); 64  transaction.Commit(); 65                         return true; 66  } 67  } 68                 catch (TimeZoneNotFoundException ex) 69  { 70                 catch (Exception ex) 71  { 72    
73  } 74                 return false; 75  } 76         }

 

须要注意的地方我都有加粗,该方法大概流程是:1.GetRecurringJob根据jobid从Hash表里面查询一条完整的定时任务,2.TrySchedule获取该任务的下次执行时间,若是下次执行时间小于当前,执行这条任务(并不是真正执行定时任务,只是往job表里面写数据,真正执行任务由worker完成),3.获取下次执行时间&全部任务字段,4.状态机修改任务状态。定时任务就这样周而复始的重复执行以上流程。这里简单说下worker的执行机制,其实际就是轮询检索job表里面的数据执行任务表达式树,worker在hangfire里面默认开启了20个线程。第三部分就到这吧。
 
避坑
简单说下我的在改bug期间遇到的一些问题啊。
1.时区问题,在添加定时任务时若是不指定时区信息,默认使用的是utc时间,咱们中国是东8区,也就是说解析出来的执行时间会晚8个小时执行。解决办法有几种能够经过全局指定options的ITimeZoneResolver属性指定,也能够经过AddorUpdate方法指定,若是是指定时区信息,须要注意看板上面的异常信息,若是有异常会致使任务不执行,时区信息它是从系统里面检索出来的,没有就抛异常。就这样吧。
相关文章
相关标签/搜索