码云:https://gitee.com/iicode/KMQueue github:https://github.com/fnpac/KMQueuejava
该框架是基于redis实现的分布式队列,简单灵活。git
下面简单介绍下该队列的一些设计,若是还有其余不懂得地方能够参考源码和注释,代码中我加入了详尽的注释。github
还有其余问题能够提issue。redis
KMQueue队列分为两种模式:spring
default
- 简单队列safe
- 安全队列其中默认为default
。数据库
能够以queueName:queueMode
格式设置队列的模式。安全
queueName 队列名称服务器
default 为默认队列,能够不指定,默认值。 特性:队列任务可能会丢失,队列任务没有超时限制。框架
queueMode 队列模式,可选值有:default、safe。分布式
safe 为安全队列,任务有重试策略,达到重试次数依旧失败或者任务存活超时(这里说的超时是指AliveTimeout)(这二者都称为最终失败),Monitor会发出通知, 这样能够根据业务作一些处理,推荐将这些失败的任务持久化到数据库做为日志记录。固然或许你还有更好的处理方式。
注意:须要开启备份队列监听程序BackupQueueMonitor,不然安全队列中最终失败的任务只会存储在备份队列中,而没有消费者去消费处理,这是很危险的行为
new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe") ...
worker1_queue
为简单队列,worker2_queue
为安全队列。
注意:为了更好的支持业务(将已存在的某个队列的
DEFAULT
改成SAFE
,并重启服务的状况),作以下处理: 当new KMQueueManager.Builder
的队列名称参数中,只要有一个队列指定了SAFE
模式,就会建立备份队列(用于队列任务监控,设置任务超时、失败任务重试等), 而且该备份队列的名称基于传入的全部队列名称生成(不管其队列是不是SAFE
模式)。
上面的例子中,备份队列的生成策略为:
base64(md5("worker1_queue" + "worker2_queue"))
构造方法声明以下:
public Task(String queue, String uid, boolean isUnique, String type, String data, Task.TaskStatus status)
uid:若是业务须要区分队列任务的惟一性,请自行生成uid参数, 不然队列默认使用uuid生成策略,这会致使即便data数据彻底相同的任务也会被看成两个不一样的任务处理。
是不是惟一任务,即队列中同一时刻只存在一个该任务。
type:用于业务逻辑的处理,你能够根据不一样的type任务类型,调用不一样的handler去处理,能够不传。
有三种方式获取Redis链接,详情查看KMQueueManager.Builder
构造方法的三种重载形式。 若是你使用spring,建议获取spring中配置的redis链接池对象,并经过以下构造方法建立队列管理器:
public Builder(Pool<Jedis> pool, String... queues)
aliveTimeout
);由于初始化备份队列时设置了循环标记; 因此Monitor这里采用定时Job策略,使用brpoplpush backupQueue backupQueue
循环遍历备份队列,遇到循环标记结束循环遍历。 对执行超时(对应的是大于protectedTimeout
)或者存活时间超时(对应的是大于aliveTimeout
)的任务作处理。
分为两种状况:
Pipeline
,决定这些任务的一些额外处理,好比持久化到数据库作日志记录。 // 任务完全失败后的处理,须要实现Pipeline接口,自行实现处理逻辑 TaskPipeline taskPipeline = new TaskPipeline(); BackupQueueMonitor backupQueueMonitor = new BackupQueueMonitor.Builder("127.0.0.1", 6379, backUpQueueName) ... .setPipeline(taskPipeline).build();
RetryTimes
的任务从新放回任务队列执行,同时更新任务状态:
@Test public void pushTaskTest() { KMQueueManager kmQueueManager = new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe") .setMaxWaitMillis(-1L) .setMaxTotal(600) .setMaxIdle(300) .setAliveTimeout(Constant.ALIVE_TIMEOUT) .build(); // 初始化队列 kmQueueManager.init(); // 1.获取队列 TaskQueue taskQueue = kmQueueManager.getTaskQueue("worker2_queue"); // 2.建立任务 JSONObject ob = new JSONObject(); ob.put("data", "mail proxy task"); String data = JSON.toJSONString(ob); // 参数 uid:若是业务须要区分队列任务的惟一性,请自行生成uid参数, // 不然队列默认使用uuid生成策略,这会致使即便data数据彻底相同的任务也会被看成两个不一样的任务处理。 // 参数 type:用于业务逻辑的处理,你能够根据不一样的type任务类型,调用不一样的handler去处理,能够不传。 Task task = new Task(taskQueue.getName(), "", true, "", data, new Task.TaskStatus()); // 3.将任务加入队列 taskQueue.pushTask(task); }
@Test public void popTaskTest() { KMQueueManager kmQueueManager = new KMQueueManager.Builder("127.0.0.1", 6379, "worker1_queue", "worker2_queue:safe") .setMaxWaitMillis(-1L) .setMaxTotal(600) .setMaxIdle(300) .setAliveTimeout(Constant.ALIVE_TIMEOUT) .build(); // 初始化队列 kmQueueManager.init(); // 1.获取队列 TaskQueue taskQueue = kmQueueManager.getTaskQueue("worker2_queue"); // 2.获取任务 Task task = taskQueue.popTask(); // 业务处理放到TaskConsumersHandler里 if (task != null) { task.doTask(kmQueueManager, TaskConsumersHandler.class); } }
你能够自行实现TaskHandler
接口,建立适合你本身业务逻辑的任务处理类,并经过下面代码执行任务处理。
task.doTask(kmQueueManager, TaskHandler.class)
_若是业务处理抛出异常,队列也将其看成任务执行完成处理,
并经过taskQueue.finishTask(this)
完成任务。
public void doTask(KMQueueManager kmQueueManager, Class clazz) { // 获取任务所属队列 TaskQueue taskQueue = kmQueueManager.getTaskQueue(this.getQueue()); String queueMode = taskQueue.getMode(); if (KMQueueManager.SAFE.equals(queueMode)) {// 安全队列 try { handleTask(clazz); } catch (Throwable e) { e.printStackTrace(); } // 任务执行完成,删除备份队列的相应任务 taskQueue.finishTask(this); } else {// 普通队列 handleTask(clazz); } }
不会再进行任务重试操做。
这点可能不太容易理解,为何任务抛出异常失败了,队列不会执行重试呢?
由于任务执行抛出异常是业务级的错误,队列不作干预。
队列的重试只是针对消费任务的线程被kill掉或者服务器宕机等状况,此时该任务还没执行完,任务的消费者还没告诉队列任务执行完成了。 此时备份队列监控会执行任务的重试。
若是你想在任务抛出异常失败时执行任务重试,能够不使用task.doTask
,当任务抛出异常时,不执行任务的taskQueue.finishTask(this)
操做。 这样备份队列监控会在下一个job对该任务进行检查处理。
taskQueue.finishTask(this)
是一个很是方便的工具方法。
@Test public void monitorTaskTest() { // 任务完全失败后的处理,须要实现Pipeline接口,自行实现处理逻辑 TaskPipeline taskPipeline = new TaskPipeline(); // 根据任务队列的名称构造备份队列的名称,注意:这里的任务队列参数必定要和KMQueueManager构造时传入的一一对应。 String backUpQueueName = KMQUtils.genBackUpQueueName("worker1_queue", "worker2_queue:safe"); // 构造Monitor监听器 BackupQueueMonitor backupQueueMonitor = new BackupQueueMonitor.Builder("127.0.0.1", 6379, backUpQueueName) .setMaxWaitMillis(-1L) .setMaxTotal(600) .setMaxIdle(300) .setAliveTimeout(Constant.ALIVE_TIMEOUT) .setProtectedTimeout(Constant.PROTECTED_TIMEOUT) .setRetryTimes(Constant.RETRY_TIMES) .setPipeline(taskPipeline).build(); // 执行监听 backupQueueMonitor.monitor(); }
重要的事情说三遍:
若是指定了队列的模式为安全队列,必定要开启备份队列监控!!!
若是指定了队列的模式为安全队列,必定要开启备份队列监控!!!
若是指定了队列的模式为安全队列,必定要开启备份队列监控!!!