.net core实践系列之短信服务-Sikiro.SMS.Job服务的实现

前言

本篇会继续讲解Sikiro.SMS.Job服务的实现,在我写第一篇的时候,我就发现我当时设计的架构里Sikiro.SMS.Job这个能够选择不须要,而使用MQ代替。可是为了说明调度任务使用实现也坚持写了下。后面会一篇针对架构、实现优化的讲解。html

源码地址:https://github.com/SkyChenSky/Sikiro.SMSgit

Quartz的简介

Quartz.NET是一款功能齐全的开源做业调度框架,小至的应用程序,大到企业系统均可以适用。Quartz是做者James House用JAVA语言编写的,而Quartz.NET是从Quartz移植过来的C#版本。github

Quartz.Net的做用

  • Quartz.Net是多线程的,容许多个JOB同时执行。
  • Quartz.Net能够进行持久化,结合管理后台能够进行可视化的监控
  • Quartz.Net提供API进行远程操控,结合管理后台能够进行运维管理

在通常企业,能够利用Quartz.Net框架作各类的定时任务,例如,数据迁移、跑报表等等。sql

Cron表达式

字段名 是否必填 值范围 特殊字符
Seconds YES 0-59 , - * /
Minutes YES 0-59 , - * /
Hours YES 0-23 , - * /
Day of month YES 1-31 , - * ? / L W
Month YES 1-12 or JAN-DEC , - * /
Day of week YES 1-7 or SUN-SAT , - * ? / L #
Year NO empty, 1970-2099 , - * /

缺点

Quartz.Net的缺点很明显,没有自带的管理后台,而同款的Hangfir调度任务框架则会有更加良好的易用性。可是在Github上有很多人开源了Quartz.Net的管理后台,对此做为了弥补。多线程

其余

其余Quartz.Net的信息能够看我以前记录的一篇文章《Quartz.NET的使用(附源码)架构

Quartz.Net DEMO:https://github.com/SkyChenSky/QuartzDotNetDemo.git并发

业务流程

从MongoDB持久化的数据,查询出状态为待处理而且定时时间小于当前时间的数据。经过Mongo驱动提供的FindOneAndUpdate对文档进行原子性操做(更新中间状态并查询出刚更新的文档)。若是有数据则发送到MQ,由Sikiro.SMS.Bus进行订阅发送,由于本次有数据,我认为可能还会有其余须要发送的数据,所以马上调用JOB自身方法,进行下一条须要处理的数据进行发送。若是这次JOB的执行并无数据,那么认为接下来一段时间没有须要处理的数据,此次调度结束。框架

TimeSendSms示例

public class TimeSendSms : BaseJob
    {
        private readonly SmsService _smsService;
        private readonly IBus _bus;

        public TimeSendSms(SmsService smsService, IBus bus)
        {
            _smsService = smsService;
            _bus = bus;
        }

        protected override void ExecuteBusiness()
        {
            _smsService.GetToBeSend();

            if (_smsService.Sms != null)
                _bus.Publish(_smsService.Sms.MapTo<SmsModel, SmsQueueModel>());

            _smsService.ContinueDo(ExecuteBusiness);
        }

        protected override void OnException()
        {
            _smsService.RollBack();
        }
    }

模板模式

Job的轮询处理流程基本类似,查询出须要执行数据-遍历业务处理-若是有异常则特殊处理,所以针对相似流程相同,可是实现有差别的程序,咱们可使用模板模式。运维

 public abstract class BaseJob : IJob
    {
        private void OnException(Action action)
        {
            try
            {
                action();
            }
            catch (Exception e)
            {
                e.WriteToFile();
                OnException();
            }
        }

        public Task Execute(IJobExecutionContext context)
        {
            OnException(ExecuteBusiness);

            return null;
        }

        protected virtual void OnException()
        {

        }

        protected abstract void ExecuteBusiness();
    }

 

Mongo的原子性

原子性

原子是物理概念,指的是指化学反应不可再分的基本微粒。而计算机领域的原子性强调的对象是操做(指令、事务)。咱们所说的指令组是原子操做,意思要么一块儿成功,要么一块儿失败。不容许2个指令里,一个成功一个失败的状况存在。ide

MongoDB 原子操做

MongoDB的原子操做就是要么这个文档完整的保存到Mongodb,要么没有保存到Mongodb,不会出现查询到的文档没有保存完整的状况。

MongoDB的文档的保存,修改,删除等操做都是原子性,除此以外还提供了FindOneAndDelete、FindOneAndUpdate、FindOneAndReplace等原子操做。

以FindOneAndUpdate为例,对某文档FindOneAndUpdate,能够文档B进行Update操做完成后返回出文档B的结果,根据参数返回结果是更新前仍是更新后(通常咱们须要更新后)。

而这FindOneAndUpdate的操做对于咱们更新到中间状态的很是实用:

  • 避免进行Update后没法良好的查询到刚Update的文档
  • 避免应用集群部署时批量更新后,没法良好分配任务
  • 批量更新多个文档须要isolated标识隔离,全局锁在大并发状况下性能并不乐观

虽然以上能够经过更新时标识版本号进行解决,这无疑增长实现难度。

MongoDB锁机制

Mongodb并发操做又读写锁来进行控制。

简单来讲

当进行读操做的时候会加读锁,这个时候其余读操做能够也得到读锁,可是不能加写锁,也就是说不能进行写操做。

当进行写操做的时候会加写锁,这个时候其余操做没法加任何锁,也就是说不能进行其余的读操做和写操做。

多个JOB的并发性

综上所述,落实到咱们应用场景,在部署多个调度任务服务,或者JOB多个线程去跑时,咱们可使用FindOneAndUpdate,每一个调度任务每次只处理一个文档,Update操做的时候会进行写锁阻塞其余进程(进程)的写操做。那么就能够保证每一个调度任务均可以只处理惟一一个有效的文档,避免重复处理。

下面是个人Sikiro.Nosql.Mongo的FindOneAndUpdate封装示例,由于Update字段的不友好,因此我封装了一下Lambda表达式,ReturnDocument = ReturnDocument.After标识响应数据是更新前仍是更新后的文档。

public T GetAndUpdate<T>(string database, string collection, Expression<Func<T, bool>> predicate, Expression<Func<T, T>> updateExpression)
        {
            var db = _mongoClient.GetDatabase(database);
            var col = db.GetCollection<T>(collection);

            var updateDefinitionList = MongoExpression<T>.GetUpdateDefinition(updateExpression);

            var updateDefinitionBuilder = new UpdateDefinitionBuilder<T>().Combine(updateDefinitionList);

            return col.FindOneAndUpdate(predicate, updateDefinitionBuilder, new FindOneAndUpdateOptions<T, T>
            {
                ReturnDocument = ReturnDocument.After
            });

SQL Server的UpdateSelect

SQL Server的操做也具备上述FindOneAndUpdate的功能,咱们公司成他为UpdateSelect,下面是示例代码:

UPDATE TOP ( 100 )
        SYS_USER WITH ( UPDLOCK, READPAST )
SET     USER_STATUS = 1
OUTPUT  INSERTED.[USER_NAME] ,
        INSERTED.SYS_USERID ,
        INSERTED.EMAIL
FROM    SYS_USER
WHERE   CREATE_DATETIME < '2018-09-13'
        AND USER_STATUS = 2;

结尾

本篇介绍了调度任务结合MongoDB原子操做的使用,使得调度任务服务能够具备良好的伸缩性。若是有任何建议与问题能够在下方评论反馈给我。

相关文章
相关标签/搜索