.NET Core下开源任务调度框架Hangfire的Api任务拓展(支持秒级任务)

HangFire的拓展和使用

看了不少博客,小白第一次写博客。html

最近因为以前的任务调度框架总出现问题,所以想寻找一个替代品,以前使用的是Quartz.Net,这个框架方便之处就是支持cron表达式适合复杂日期场景使用,以及秒级任务。可是配置比较复杂,并且管理不方便,本身开发了个web管理页面,不过这个须要额外的单独线程去统一管理工做状态,很容易出现问题。mysql

有考虑过 “FluentScheduler” ,使用简单,可是管理配置也很麻烦,我但愿能作到配置简单,管理方便,高性能。最后想到了之前听过的hangfire,它的好处就是自带控制面板,在园子里看了不少相关资料,偶然发现了有人拓展过hangfire经过调用api接口来执行任务,这种方式能够避免依赖本地代码,方便部署,在此基础上,我用空闲时间拓展了一下如今已经基本能够知足需求。linux

 

所拓展的功能所有属于外部拓展,所以hangfire版本能够一直更新,如今已经更新最新版,支持秒级任务git

gitHub地址github

 

因为更新到最新版hangfire 1.7支持秒级任务,使用的在线表达式生成部分表达式有问题,注掉了秒级任务表达式生成,有时间须要详细测试更改,能够参考(hangfire官方提供的表达式)web

如今已经实现的功能有:

1,部署及调试:只须要配置数据库链接,而后编译便可运行,无需建表,支持(redis,mysql, sqlserver)其余数据库暂时用不到没测试。推荐使用redis集群。项目中直接添加了redis的存储包,已经更新StackExchange.Redis到最新版本方便拓展,调试时能够直接调试。部署,只须要发布项目,运行建立windows服务的bat命令,命令已经包含在项目中,或者发布至Linux。redis

 

2,周期任务:支持在控制面板页面上添加周期任务,编辑周期任务,删除周期任务,手动触发周期任务,暂停和继续周期任务(暂停实现的原理是经过set中添加属性,在job执行前,过滤掉,直接跳过执行,由于hangfire中job一旦建立就失去了控制权,只能经过过滤器去拦截),任务暂停后会查询状态并渲染面板列表为红色字体方便查找哪一个任务被暂停。sql

3,计划任务在做业选项卡中,计划做业中能够实现添加计划任务,计划任务可使任务在指定的分钟后执行,只执行一次。数据库

 

4,只读面板经过配置的用户名密码,使用户只具备读取面板的权限,这样能够防止误操做json

 

 

 1  //只读面板,只能读取不能操做
 2             app.UseHangfireDashboard("/job-read", new DashboardOptions  3  {  4                 AppPath = "#",//返回时跳转的地址
 5                 DisplayStorageConnectionString = false,//是否显示数据库链接信息
 6                 IsReadOnlyFunc = Context =>
 7  {  8                     return true;  9  }, 10                 Authorization = new[] { new BasicAuthAuthorizationFilter(new BasicAuthAuthorizationFilterOptions 11  { 12                     RequireSsl = false,//是否启用ssl验证,即https
13                     SslRedirect = false, 14                     LoginCaseSensitive = true, 15                     Users = new [] 16  { 17                         new BasicAuthAuthorizationUser 18  { 19                             Login = "read", 20                             PasswordClear = "only"
21  }, 22                         new BasicAuthAuthorizationUser 23  { 24                             Login = "test", 25                             PasswordClear = "123456"
26  }, 27                         new BasicAuthAuthorizationUser 28  { 29                             Login = "guest", 30                             PasswordClear = "123@123"
31  } 32  } 33  }) 34  } 35             });
View Code

 

 

 5,邮件推送:目前使用的方式是,任务错误重试达到指定次数后,发送邮件通知,使用的MailKit

 1   catch (Exception ex)  2  {  3                 //获取重试次数
 4                 var count = context.GetJobParameter<string>("RetryCount");  5  context.SetTextColor(ConsoleTextColor.Red);  6                 //signalR推送  7                 //SendRequest(ConfigSettings.Instance.URL+"/api/Publish/EveryOne", "测试");
 8                 if (count == "3")//重试达到三次的时候发邮件通知
 9  { 10  SendEmail(item.JobName, item.Url, ex.ToString()); 11  } 12                 logger.Error(ex, "HttpJob.Excute"); 13                 context.WriteLine($"执行出错:{ex.Message}"); 14                 throw;//不抛异常不会执行重试操做
15             }
View Code
 1 /// <summary>
 2         /// 邮件模板  3         /// </summary>
 4         /// <param name="jobname"></param>
 5         /// <param name="url"></param>
 6         /// <param name="exception"></param>
 7         /// <returns></returns>
 8         private static string SethtmlBody(string jobname, string url, string exception)  9  { 10             var htmlbody = $@"<h3 align='center'>{HangfireHttpJobOptions.SMTPSubject}</h3> 11  <h3>执行时间:</h3> 12  <p> 13  {DateTime.Now} 14  </p> 15  <h3> 16  任务名称:<span> {jobname} </span><br/> 17  </h3> 18  <h3> 19  请求路径:{url} 20  </h3> 21  <h3><span></span> 22  执行结果:<br/> 23  </h3> 24  <p> 25  {exception} 26  </p> "; 27             return htmlbody; 28         }
邮件模板
 1  //使用redis
 2                         config.UseRedisStorage(Redis, new Hangfire.Redis.RedisStorageOptions()  3  {  4                             FetchTimeout=TimeSpan.FromMinutes(5),  5                             Prefix = "{hangfire}:",  6                             //活动服务器超时时间
 7                             InvisibilityTimeout = TimeSpan.FromHours(1),  8                             //任务过时检查频率
 9                             ExpiryCheckInterval = TimeSpan.FromHours(1), 10                             DeletedListSize = 10000, 11                             SucceededListSize = 10000
12  }) 13                         .UseHangfireHttpJob(new HangfireHttpJobOptions() 14  { 15                             SendToMailList = HangfireSettings.Instance.SendMailList, 16                             SendMailAddress = HangfireSettings.Instance.SendMailAddress, 17                             SMTPServerAddress = HangfireSettings.Instance.SMTPServerAddress, 18                             SMTPPort = HangfireSettings.Instance.SMTPPort, 19                             SMTPPwd = HangfireSettings.Instance.SMTPPwd, 20                             SMTPSubject = HangfireSettings.Instance.SMTPSubject 21                         })
配置邮件参数

 

6,signalR 推送:宿主程序使用的weapi,所以能够经过webapi推送,这样作的好处是能够将服务看成推送服务使用,第三方接口也能够利用此来推送,

 

 1  /// <summary>
 2        ///用户加入组处理  3        /// </summary>
 4        /// <param name="userid">用户惟一标识</param>
 5        /// <param name="GroupName">组名称</param>
 6        /// <returns></returns>
 7         public Task InitUsers(string userid,string GroupName)  8  {  9             Console.WriteLine($"{userid}加入用户组"); 10  Groups.AddToGroupAsync(Context.ConnectionId, GroupName); 11             SignalrGroups.UserGroups.Add(new SignalrGroups() 12  { 13                 ConnectionId = Context.ConnectionId, 14                 GroupName = GroupName, 15                 UserId = userid 16  }); 17             return Clients.All.SendAsync("UserJoin", "用户组数据更新,新增id为:" + Context.ConnectionId + " pid:" + userid); 18  } 19         /// <summary>
20         /// 断线的时候处理 21         /// </summary>
22         /// <param name="exception"></param>
23         /// <returns></returns>
24         public override Task OnDisconnectedAsync(Exception exception) 25  { 26             //掉线移除用户,不给其推送
27             var user = SignalrGroups.UserGroups.FirstOrDefault(c => c.ConnectionId == Context.ConnectionId); 28 
29             if (user != null) 30  { 31                 Console.WriteLine($"用户:{user.UserId}已离线"); 32  SignalrGroups.UserGroups.Remove(user); 33  Groups.RemoveFromGroupAsync(Context.ConnectionId, user.GroupName); 34  } 35             return base.OnDisconnectedAsync(exception); 36         }
Hub定义
 1  /// <summary>
 2         /// 单个connectionid推送  3         /// </summary>
 4         /// <param name="groups"></param>
 5         /// <returns></returns>
 6         [HttpPost, Route("AnyOne")]  7         public IActionResult AnyOne([FromBody]IEnumerable<SignalrGroups> groups)  8  {  9             if (groups != null && groups.Any()) 10  { 11                 var ids = groups.Select(c => c.UserId); 12                 var list = SignalrGroups.UserGroups.Where(c => ids.Contains(c.UserId)); 13                 foreach (var item in list) 14                     hubContext.Clients.Client(item.ConnectionId).SendAsync("AnyOne", $"{item.ConnectionId}: {item.Content}"); 15  } 16             return Ok(); 17  } 18 
19         /// <summary>
20         /// 所有推送 21         /// </summary>
22         /// <param name="message"></param>
23         /// <returns></returns>
24         [HttpPost, Route("EveryOne")] 25         public IActionResult EveryOne([FromBody] MSG body) 26  { 27             var data = HttpContext.Response.Body; 28             hubContext.Clients.All.SendAsync("EveryOne", $"{body.message}"); 29             return Ok(); 30  } 31 
32         /// <summary>
33         /// 单个组推送 34         /// </summary>
35         /// <param name="group"></param>
36         /// <returns></returns>
37         [HttpPost, Route("AnyGroups")] 38         public IActionResult AnyGroups([FromBody]SignalrGroups group) 39  { 40             if (group != null) 41  { 42                 hubContext.Clients.Group(group.GroupName).SendAsync("AnyGroups", $"{group.Content}"); 43  } 44             return Ok(); 45         }
推送接口定义

 

7,接口健康检查:由于主要用来调用api接口,所以集成接口健康检查仍是颇有必要的,目前使用的方式是配置文件中添加须要检查的地址

 1 /*健康检查配置项*/
 2   "HealthChecks-UI": {  3     /*检查地址,能够配置当前程序和外部程序*/
 4     "HealthChecks": [  5  {  6         "Name": "Hangfire Api 健康检查",  7         "Uri": "http://localhost:9006/healthz"
 8  }  9  ], 10     /*须要检查的Api地址*/
11     "CheckUrls": [ 12  { 13         "Uri": "http://localhost:17600/CityService.svc/HealthyCheck", 14         "httpMethod": "Get"
15  }, 16  { 17         "Uri": "http://localhost:9098/CheckHelath", 18         "httpMethod": "Post"
19  }, 20  { 21         "Uri": "http://localhost:9067/GrtHelathCheck", 22         "httpMethod": "Get"
23  }, 24  { 25         "Uri": "http://localhost:9043/GrtHelathCheck", 26         "httpMethod": "Get"
27  } 28  ], 29     "Webhooks": [], //钩子配置
30     "EvaluationTimeOnSeconds": 10, //检测频率
31     "MinimumSecondsBetweenFailureNotifications": 60, //推送间隔时间
32     "HealthCheckDatabaseConnectionString": "Data Source=\\healthchecksdb" //-> sqlite库存储检查配置及日志信息
33   }
健康检查相关配置

后台会根据配置的指定间隔去检查服务接口是否能够正常访问,(这个中间件能够实现不少检查功能,包括网络,数据库,mq等,支持webhook推送等丰富功能,系统用不到所以没有添加)

健康检查的配置

1  //添加健康检查地址
2             HangfireSettings.Instance.HostServers.ForEach(s =>
3  { 4                 services.AddHealthChecks().AddUrlGroup(new Uri(s.Uri), s.httpMethod.ToLower() == "post" ? HttpMethod.Post : HttpMethod.Get, $"{s.Uri}"); 5             });
健康检查地址添加
 1  app.UseHealthChecks("/healthz", new HealthCheckOptions()  2  {  3                 Predicate = _ => true,  4                 ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse  5  });  6             app.UseHealthChecks("/health", options);//获取自定义格式的json数据
 7             app.UseHealthChecksUI(setup =>
 8  {  9                 setup.UIPath = "/hc"; // 健康检查的UI面板地址
10                 setup.ApiPath = "/hc-api"; // 用于api获取json的检查数据
11             });
健康检查中间件配置

其中,ui配置路径是在面板中展现检查结果须要使用的地址

api地址,能够经过接口的方式来调用检查结果,方便在第三方系统中使用,其数据格式能够自定义

 经过接口调用

 1 [{  2     "id": 1,  3     "status": "Unhealthy",  4     "onStateFrom": "2019-04-07T18:00:09.6996751+08:00",  5     "lastExecuted": "2019-04-07T18:05:03.4761739+08:00",  6     "uri": "http://localhost:53583/healthz",  7     "name": "Hangfire Api 健康检查",  8     "discoveryService": null,  9     "entries": [{ 10         "id": 1, 11         "name": "http://localhost:17600/CityService.svc/HealthyCheck", 12         "status": "Unhealthy", 13         "description": "An error occurred while sending the request.", 14         "duration": "00:00:04.3907375"
15  }, { 16         "id": 2, 17         "name": "http://localhost:9098/CheckHelath", 18         "status": "Unhealthy", 19         "description": "An error occurred while sending the request.", 20         "duration": "00:00:04.4140310"
21  }, { 22         "id": 3, 23         "name": "http://localhost:9067/GrtHelathCheck", 24         "status": "Unhealthy", 25         "description": "An error occurred while sending the request.", 26         "duration": "00:00:04.3847367"
27  }, { 28         "id": 4, 29         "name": "http://localhost:9043/GrtHelathCheck", 30         "status": "Unhealthy", 31         "description": "An error occurred while sending the request.", 32         "duration": "00:00:04.4499007"
33  }], 34     "history": [] 35 }]
接口返回数据原始格式
 1 {  2     "status": "Unhealthy",  3     "errors": [{  4         "key": "http://localhost:17600/CityService.svc/HealthyCheck",  5         "value": "Unhealthy"
 6  }, {  7         "key": "http://localhost:9098/CheckHelath",  8         "value": "Unhealthy"
 9  }, { 10         "key": "http://localhost:9067/GrtHelathCheck", 11         "value": "Unhealthy"
12  }, { 13         "key": "http://localhost:9043/GrtHelathCheck", 14         "value": "Unhealthy"
15  }] 16 }
接口返回数据处理后格式
 1  //重写json报告数据,可用于远程调用获取健康检查结果
 2             var options = new HealthCheckOptions  3  {  4                 ResponseWriter = async (c, r) =>
 5  {  6                     c.Response.ContentType = "application/json";  7 
 8                     var result = JsonConvert.SerializeObject(new
 9  { 10                         status = r.Status.ToString(), 11                         errors = r.Entries.Select(e => new { key = e.Key, value = e.Value.Status.ToString() }) 12  }); 13                     await c.Response.WriteAsync(result); 14  } 15             };
处理方式

 

8,经过接口添加任务:添加编辑周期任务,添加计划任务,触发周期任务,删除周期任务,多个任务连续一次执行的任务

 1  /// <summary>
 2         /// 添加一个队列任务当即被执行  3         /// </summary>
 4         /// <param name="httpJob"></param>
 5         /// <returns></returns>
 6         [HttpPost, Route("AddBackGroundJob")]  7         public JsonResult AddBackGroundJob([FromBody] Hangfire.HttpJob.Server.HttpJobItem httpJob)  8  {  9             var addreslut = string.Empty;  10             try
 11  {  12                 addreslut = BackgroundJob.Enqueue(() => Hangfire.HttpJob.Server.HttpJob.Excute(httpJob, httpJob.JobName, null));  13  }  14             catch (Exception ec)  15  {  16                 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });  17  }  18             return Json(new Message() { Code = true, ErrorMessage = "" });  19  }  20 
 21         /// <summary>
 22         /// 添加一个周期任务  23         /// </summary>
 24         /// <param name="httpJob"></param>
 25         /// <returns></returns>
 26         [HttpPost, Route("AddOrUpdateRecurringJob")]  27         public JsonResult AddOrUpdateRecurringJob([FromBody] Hangfire.HttpJob.Server.HttpJobItem httpJob)  28  {  29             try
 30  {  31                 RecurringJob.AddOrUpdate(httpJob.JobName, () => Hangfire.HttpJob.Server.HttpJob.Excute(httpJob, httpJob.JobName, null), httpJob.Corn, TimeZoneInfo.Local);  32  }  33             catch (Exception ec)  34  {  35                 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });  36  }  37             return Json(new Message() { Code = true, ErrorMessage = "" });  38  }  39 
 40         /// <summary>
 41         /// 删除一个周期任务  42         /// </summary>
 43         /// <param name="jobname"></param>
 44         /// <returns></returns>
 45         [HttpGet,Route("DeleteJob")]  46         public JsonResult DeleteJob(string jobname)  47  {  48             try
 49  {  50  RecurringJob.RemoveIfExists(jobname);  51  }  52             catch (Exception ec)  53  {  54                 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });  55  }  56             return Json(new Message() { Code = true, ErrorMessage = "" });  57  }  58         /// <summary>
 59         /// 手动触发一个任务  60         /// </summary>
 61         /// <param name="jobname"></param>
 62         /// <returns></returns>
 63         [HttpGet, Route("TriggerRecurringJob")]  64         public JsonResult TriggerRecurringJob(string jobname)  65  {  66             try
 67  {  68  RecurringJob.Trigger(jobname);  69  }  70             catch (Exception ec)  71  {  72                 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });  73  }  74             return Json(new Message() { Code = true, ErrorMessage = "" });  75  }  76         /// <summary>
 77         /// 添加一个延迟任务  78         /// </summary>
 79         /// <param name="httpJob">httpJob.DelayFromMinutes(延迟多少分钟执行)</param>
 80         /// <returns></returns>
 81         [HttpPost, Route("AddScheduleJob")]  82         public JsonResult AddScheduleJob([FromBody] Hangfire.HttpJob.Server.HttpJobItem httpJob)  83  {  84             var reslut = string.Empty;  85             try
 86  {  87                 reslut = BackgroundJob.Schedule(() => Hangfire.HttpJob.Server.HttpJob.Excute(httpJob, httpJob.JobName, null), TimeSpan.FromMinutes(httpJob.DelayFromMinutes));  88  }  89             catch (Exception ec)  90  {  91                 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });  92  }  93             return Json(new Message() { Code = true, ErrorMessage = "" });  94  }  95         /// <summary>
 96         /// 添加连续任务,多个任务依次执行,只执行一次  97         /// </summary>
 98         /// <param name="httpJob"></param>
 99         /// <returns></returns>
100         [HttpPost, Route("AddContinueJob")] 101         public JsonResult AddContinueJob([FromBody] List<Hangfire.HttpJob.Server.HttpJobItem> httpJobItems) 102  { 103             var reslut = string.Empty; 104             var jobid = string.Empty; 105             try
106  { 107                 httpJobItems.ForEach(k =>
108  { 109                     if (!string.IsNullOrEmpty(jobid)) 110  { 111                         jobid = BackgroundJob.ContinueJobWith(jobid, () => RunContinueJob(k)); 112  } 113                     else
114  { 115                         jobid = BackgroundJob.Enqueue(() => Hangfire.HttpJob.Server.HttpJob.Excute(k, k.JobName, null)); 116  } 117  }); 118                 reslut = "true"; 119  } 120             catch (Exception ec) 121  { 122                 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() }); 123  } 124             return Json(new Message() { Code = true, ErrorMessage = "" }); 125         }
经过接口添加任务

这样作的好处是有效利用了宿主的webapi,并且无需登陆控制面板操做就能实现任务管理,方便集成管理到其余系统中

 

防止多个实例的任务并行执行,即一个任务未执行完成,另外一个相同的任务开始执行,可使用分布式锁来解决

 经过特性来添加任务重试时间间隔(hangfire 1.7 新增,单位/秒),重试次数,队列名称,任务名称,以及分布式锁超时时间

 1 /// <summary>
 2         /// 执行任务,DelaysInSeconds(重试时间间隔/单位秒)  3         /// </summary>
 4         /// <param name="item"></param>
 5         /// <param name="jobName"></param>
 6         /// <param name="context"></param>
 7         [AutomaticRetry(Attempts = 3, DelaysInSeconds = new[] { 30, 60, 90 }, LogEvents = true, OnAttemptsExceeded = AttemptsExceededAction.Fail)]  8         [DisplayName("Api任务:{1}")]  9         [Queue("apis")] 10         [JobFilter(timeoutInSeconds: 3600)]
配置分布式锁超时时间

 

 1 //设置分布式锁,分布式锁会阻止两个相同的任务并发执行,用任务名称和方法名称做为锁
 2             var jobresource = $"{filterContext.BackgroundJob.Job.Args[1]}.{filterContext.BackgroundJob.Job.Method.Name}";  3             var locktimeout = TimeSpan.FromSeconds(_timeoutInSeconds);  4             try
 5  {  6                 //判断任务是否被暂停
 7                 using (var connection = JobStorage.Current.GetConnection())  8  {  9                     var conts = connection.GetAllItemsFromSet($"JobPauseOf:{filterContext.BackgroundJob.Job.Args[1]}"); 10                     if (conts.Contains("true")) 11  { 12                         filterContext.Canceled = true;//任务被暂停不执行直接跳过
13                         return; 14  } 15  } 16                 //申请分布式锁
17                 var distributedLock = filterContext.Connection.AcquireDistributedLock(jobresource, locktimeout); 18                 filterContext.Items["DistributedLock"] = distributedLock; 19  } 20             catch (Exception ec) 21  { 22                 //获取锁超时,取消任务,任务会默认置为成功
23                 filterContext.Canceled = true; 24                 logger.Info($"任务{filterContext.BackgroundJob.Job.Args[1]}超时,任务id{filterContext.BackgroundJob.Id}"); 25             }
过滤器添加分布式锁

 

1  if (!filterContext.Items.ContainsKey("DistributedLock")) 2  { 3                 throw new InvalidOperationException("找不到分布式锁,没有为该任务申请分布式锁."); 4  } 5             //释放分布式锁
6             var distributedLock = (IDisposable)filterContext.Items["DistributedLock"]; 7             distributedLock.Dispose();
释放分布式锁

 

经过过滤器来设置任务过时时间,过时后自动在数据库删除历史记录

 

1 public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction) 2  { 3             //设置过时时间,任务将在三天后过时,过时的任务会自动被扫描并删除
4             context.JobExpirationTimeout = TimeSpan.FromDays(3); 5         }
设置任务过时时间

 

redis集群下,测试秒级任务

集群为windws环境下,一个主节点四个从节点,(使用时须要在redis链接中配置所有集群链接,主节点和从节点),目前用不到linux环境,没有进行测试。

相关文章
相关标签/搜索