以前有这样一个需求,运营在后端配置一条系统消息或者营销活动等类型的消息等到了须要推送的时间之后会自动的将消息推送给用户APP端显示,一开始是采用的任务调度的方式(定时器),经过轮询扫表去作,由于具体何时推送消息没有固定的频率,固定的时间,所以须要每分钟扫表以免消息在指定时间内未及时推送给APP端内.因此每次都是1分钟扫描一次,太过于频繁。因此不太适合(定时器适合那种固定频率或时间段处理)。java
所以这里选取了几种延迟发送的方式:redis
1.rabbitMQspring
2.redis后端
3.DelayedQueue(慎用)markdown
代码部分(发送端):分布式
/** * 提供了一个公有的方法 */ public interface ISysMessageDelayProcessor { long FIVE_MINUTES = 5 * 60 * 1000; /** * 发送消息的处理 * @param msg<按需自行封装处理> * @param pushDate<推送时间> */ void sendMessage(Object msg, LocalDateTime pushDate); }/** * 基于RabbitMQ的实现方式(须要下载rabbitMQ插件) * * */ @Slf4j @EnableBinding(SysMessageSink.class) public class SysMessageRabbitMQDelayProcessorImpl implements ISysMessageDelayProcessor { @Autowired private BinderAwareChannelResolver resolver; @Override public void sendMessage(Object msg LocalDateTime pushDate) { resolver.resolveDestination(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC_PRODUCER) .send(MessageBuilder.withPayload(msg) .setHeader("x-delay", Duration.between(LocalDateTime.now(), pushDate) .toMillis()) .build()); } }#配置系统消息的延迟发送 spring.cloud.stream.bindings.your-topic-producer.destination=your-topic spring.cloud.stream.rabbit.bindings.your-topic-producer.producer.delayed-exchange=true spring.cloud.stream.bindings.your-topic-consumer.destination=your-topic spring.cloud.stream.rabbit.bindings.your-topic-consumer.consumer.delayed-exchange=true spring.cloud.stream.bindings.your-topic-consumer.group=your-topic-group/** * * 基于redis的实现 * */ public class SysMessageRedisDelayProcessorImpl implements ISysMessageDelayProcessor { @Autowired private RedisTemplate redisTemplate; @Override public void sendMessage(Object msg, LocalDateTime pushDate) { redisTemplate.opsForZSet().add(MQTopicConstant.SYS_MESSAGE_QUERY_DELAY_TOPIC,msg, Duration.between(LocalDateTime.now(), pushDate) .toMillis()); } }/** * 是一种补备用方案,当不知足redis,rabbitMQ的场景的时候使用 * 是一种基于内存的方式,一旦宕机,或者重启那么内存中的数据就会丢失 * 慎用! */ public class SysMessageDelayedQueueProcessorImpl implements ISysMessageDelayProcessor, Delayed { private LocalDateTime executeTime; private Object data; // send queue private static final DelayQueue<SysMessageDelayedQueueProcessorImpl> sendDelayQueue = new DelayQueue<>(); // query queue private static final DelayQueue<SysMessageDelayedQueueProcessorImpl> queryDelayQueue = new DelayQueue<>(); public SysMessageDelayedQueueProcessorImpl() { new Thread(new SysMessageDelayedQueueProcessorListener(sendDelayQueue, queryDelayQueue)).start(); } public SysMessageDelayedQueueProcessorImpl(LocalDateTime executeTime, Object data) { this.executeTime = executeTime; this.data = data; } @Override public void sendMessage(Object msg, LocalDateTime pushDate) { sendDelayQueue.offer(new SysMessageDelayedQueueProcessorImpl(pushDate, msg)); } @Override public long getDelay(TimeUnit unit) { return unit.convert(Duration.between(LocalDateTime.now(), executeTime).toMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } }接收端:ide
/** * 监听 */ public abstract class ISysMessageDelayedListener implements Runnable { protected static final LinkedBlockingQueue<SysMessageVO> SEND_QUEUE = new LinkedBlockingQueue(1000); @Override public void run() { sendProcessor(); } /** * 监听发送方法 */ public abstract void sendProcessor(); }/** * 只用来监听MQ延迟队列推送过来的数据包,及转发数据包 * 不作其余业务处理 * */ @Component @EnableBinding(SysMessageSink.class) @Slf4j public class SysMessageRabbitMQDelayedProcessorListener extends ISysMessageDelayedListener { @Autowired private SysMessageQueryProcessor sysMessageQueryProcessor; private static final LinkedBlockingQueue<SysMessageVO> SEND_QUEUE = new LinkedBlockingQueue(1000); /** * 接受发送的数据 * * @param */ @StreamListener(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC_CONSUMER) public void onSendHandle(Object msg) { try { // put SEND_QUEUE.put(sysMessage); } catch (InterruptedException e) { log.error("caught onSendHandle invoke fail,e:", e); } } @Override public void sendProcessor() { } }/** * redis监听处理 */ @Slf4j public class SysMessageRedisDelayedProcessorListener extends ISysMessageDelayedListener { private static final String setNX = "lock:sysmessage:delay"; public static final int LOCK_EXPIRE = 300; // ms @Autowired private RedisTemplate redisTemplate; public SysMessageRedisDelayedProcessorListener() { new Thread(new SysMessagePushWork(SEND_QUEUE)).start(); } /** * 监听是否有到期的数据 */ private void monitorSendQueue() { while (true) { if (lock()) { Set<ZSetOperations.TypedTuple<Object>> set = redisTemplate.opsForZSet().rangeWithScores(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC, 0, 0); Iterator<ZSetOperations.TypedTuple<Object>> iterator = set.iterator(); while (iterator.hasNext()) { ZSetOperations.TypedTuple<Object> next = iterator.next(); consumer(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC, next); } } } } /** * 获取分布式琐 * * @return */ private boolean lock() { try { long expireAt = System.currentTimeMillis() + LOCK_EXPIRE + 1; return (Boolean) this.redisTemplate.execute((RedisCallback) connection -> { Boolean acquire = connection.setNX(setNX.getBytes(), String.valueOf(expireAt).getBytes()); if (acquire) { return true; } byte[] value = connection.get(setNX.getBytes()); if (Objects.isNull(value) || value.length <= 0) { return false; } long expireTime = Long.parseLong(new String(value)); if (expireTime < System.currentTimeMillis()) { // 若是锁已通过期 byte[] oldValue = connection.getSet(setNX.getBytes(), String.valueOf(System.currentTimeMillis() + LOCK_EXPIRE + 1).getBytes()); // 防止死锁 return Long.parseLong(new String(oldValue)) < System.currentTimeMillis(); } return true; }); } catch (Exception e) { log.error("obtain lock option fail, caught exception:", e); return false; } } /** * 删除过时的数据 * * @param value */ private void removeDataByExpireTime(String key, Object value) { redisTemplate.opsForZSet().remove(key, value); } /** * 消费 * * @param next */ private void consumer(String key, ZSetOperations.TypedTuple<Object> next) { // processor and remove if (!ifExpire(next.getScore())) { return; } if (MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC.equals(key)) { // in queue SEND_QUEUE.offer(sysMessage); } // remove removeDataByExpireTime(key, next.getValue()); } /** * 过时判断 * * @param expireTime * @return */ private boolean ifExpire(Double expireTime) { return (expireTime.longValue() + 1000) <= LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli(); } @Override public void sendProcessor() { // 监听发送队列的变化 monitorSendQueue(); } }/** * */ @Slf4j public class SysMessageDelayedQueueProcessorListener extends ISysMessageDelayedListener { private DelayQueue<SysMessageDelayedQueueProcessorImpl> sendDelayQueue; public SysMessageDelayedQueueProcessorListener(DelayQueue<SysMessageDelayedQueueProcessorImpl> sendDelayQueue) { this.sendDelayQueue = sendDelayQueue; new Thread(new SysMessagePushWork(SEND_QUEUE)).start(); } @Override public void sendProcessor() { CompletableFuture.runAsync(() -> { while (true) { try { // processor SysMessageDelayedQueueProcessorImpl queue = sendDelayQueue.take(); if (Objects.isNull(queue)) { continue; } // execute SEND_QUEUE.offer(queue.getData()); } catch (InterruptedException e) { // } } }); } }/** */ @Configuration public class SysMessageConfiguration { /** * 基于rabbitMQ的延迟处理 * @return */ @Primary @ConditionalOnClass(name = "org.springframework.cloud.stream.binding.BinderAwareChannelResolver") @Bean public SysMessageRabbitMQDelayProcessorImpl createSysMessageRabbitMQDelayProcessor() { return new SysMessageRabbitMQDelayProcessorImpl(); } // /** // * 基于redis的延迟处理 // * @return // */ // @ConditionalOnClass(RedisTemplate.class) // @ConditionalOnMissingClass("org.springframework.cloud.stream.binding.BinderAwareChannelResolver") // @Bean // public SysMessageRedisDelayProcessorImpl createSysMessageRedisDelayProcessor() { // return new SysMessageRedisDelayProcessorImpl(); // } /** * 基于内存的延迟处理 * @return */ @ConditionalOnMissingClass({"org.springframework.cloud.stream.binding.BinderAwareChannelResolver", "org.springframework.data.redis.core.RedisTemplate"}) @Bean public SysMessageDelayedQueueProcessorImpl createSysMessageDelayedQueueProcessor() { return new SysMessageDelayedQueueProcessorImpl(); } }private ISysMessageDelayProcessor sysMessageDelayProcessor; @Autowired public xxxx(ISysMessageDelayProcessor sysMessageDelayProcessor) { this.sysMessageDelayProcessor = sysMessageDelayProcessor; }其余部分业务代码按需处理便可ui