延迟消息处理

以前有这样一个需求,运营在后端配置一条系统消息或者营销活动等类型的消息等到了须要推送的时间之后会自动的将消息推送给用户APP端显示,一开始是采用的任务调度的方式(定时器),经过轮询扫表去作,由于具体何时推送消息没有固定的频率,固定的时间,所以须要每分钟扫表以免消息在指定时间内未及时推送给APP端内.因此每次都是1分钟扫描一次,太过于频繁。因此不太适合(定时器适合那种固定频率或时间段处理)。java

所以这里选取了几种延迟发送的方式:redis

        1.rabbitMQspring

        2.redis后端

        3.DelayedQueue(慎用)markdown

代码部分(发送端):分布式

    

/**
 * 提供了一个公有的方法
 */
public interface ISysMessageDelayProcessor {
    long FIVE_MINUTES = 5 * 60 * 1000;
    /**
     * 发送消息的处理
     * @param msg<按需自行封装处理>
     * @param pushDate<推送时间>
     */
    void sendMessage(Object msg, LocalDateTime pushDate);

}
/**
 * 基于RabbitMQ的实现方式(须要下载rabbitMQ插件)
 * 
 * 
 */
@Slf4j
@EnableBinding(SysMessageSink.class)
public class SysMessageRabbitMQDelayProcessorImpl implements ISysMessageDelayProcessor {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @Override
    public void sendMessage(Object msg LocalDateTime pushDate) {
        resolver.resolveDestination(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC_PRODUCER)
                .send(MessageBuilder.withPayload(msg)
                        .setHeader("x-delay",
                                Duration.between(LocalDateTime.now(), pushDate)
                                        .toMillis())
                        .build());
    }

}
#配置系统消息的延迟发送
spring.cloud.stream.bindings.your-topic-producer.destination=your-topic
spring.cloud.stream.rabbit.bindings.your-topic-producer.producer.delayed-exchange=true
spring.cloud.stream.bindings.your-topic-consumer.destination=your-topic
spring.cloud.stream.rabbit.bindings.your-topic-consumer.consumer.delayed-exchange=true
spring.cloud.stream.bindings.your-topic-consumer.group=your-topic-group
/**
 * 
 * 基于redis的实现
 * 
 */
public class SysMessageRedisDelayProcessorImpl implements ISysMessageDelayProcessor {
    @Autowired
    private RedisTemplate redisTemplate;


    @Override
    public void sendMessage(Object msg, LocalDateTime pushDate) {
        redisTemplate.opsForZSet().add(MQTopicConstant.SYS_MESSAGE_QUERY_DELAY_TOPIC,msg,
                    Duration.between(LocalDateTime.now(), pushDate)
                            .toMillis());
    }


}
/**
 * 是一种补备用方案,当不知足redis,rabbitMQ的场景的时候使用
 * 是一种基于内存的方式,一旦宕机,或者重启那么内存中的数据就会丢失
 * 慎用!
 */
public class SysMessageDelayedQueueProcessorImpl implements ISysMessageDelayProcessor, Delayed {
    private LocalDateTime executeTime;
    private Object data;
    // send queue
    private static final DelayQueue<SysMessageDelayedQueueProcessorImpl> sendDelayQueue =
            new DelayQueue<>();
    // query queue
    private static final DelayQueue<SysMessageDelayedQueueProcessorImpl> queryDelayQueue =
            new DelayQueue<>();

    public SysMessageDelayedQueueProcessorImpl() {
        new Thread(new SysMessageDelayedQueueProcessorListener(sendDelayQueue, queryDelayQueue)).start();
    }

    public SysMessageDelayedQueueProcessorImpl(LocalDateTime executeTime, Object data) {
        this.executeTime = executeTime;
        this.data = data;
    }


    @Override
    public void sendMessage(Object msg, LocalDateTime pushDate) {
        sendDelayQueue.offer(new SysMessageDelayedQueueProcessorImpl(pushDate, msg));
    }


    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(Duration.between(LocalDateTime.now(), executeTime).toMillis(),
                TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

接收端:ide

/**
 * 监听
 */
public abstract class ISysMessageDelayedListener implements Runnable {

    protected static final LinkedBlockingQueue<SysMessageVO> SEND_QUEUE =
            new LinkedBlockingQueue(1000);

    @Override
    public void run() {
        sendProcessor();
    }

    /**
     * 监听发送方法
     */
    public abstract void sendProcessor();
}
/**
 * 只用来监听MQ延迟队列推送过来的数据包,及转发数据包
 * 不作其余业务处理
 *
 */
@Component
@EnableBinding(SysMessageSink.class)
@Slf4j
public class SysMessageRabbitMQDelayedProcessorListener extends ISysMessageDelayedListener {
    @Autowired
    private SysMessageQueryProcessor sysMessageQueryProcessor;

    private static final LinkedBlockingQueue<SysMessageVO> SEND_QUEUE =
            new LinkedBlockingQueue(1000);


    /**
     * 接受发送的数据
     *
     * @param 
     */
    @StreamListener(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC_CONSUMER)
    public void onSendHandle(Object msg) {
       
        try {
            // put
            SEND_QUEUE.put(sysMessage);
        } catch (InterruptedException e) {
            log.error("caught onSendHandle invoke fail,e:", e);
        }

    }

    @Override
    public void sendProcessor() {

    }
}
/**
 * redis监听处理
 */
@Slf4j
public class SysMessageRedisDelayedProcessorListener extends ISysMessageDelayedListener {
    private static final String setNX = "lock:sysmessage:delay";
    public static final int LOCK_EXPIRE = 300; // ms

    @Autowired
    private RedisTemplate redisTemplate;

    public SysMessageRedisDelayedProcessorListener() {
        new Thread(new SysMessagePushWork(SEND_QUEUE)).start();
    }

    /**
     * 监听是否有到期的数据
     */
    private void monitorSendQueue() {
        while (true) {
            if (lock()) {
                Set<ZSetOperations.TypedTuple<Object>> set =
                        redisTemplate.opsForZSet().rangeWithScores(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC, 0, 0);
                Iterator<ZSetOperations.TypedTuple<Object>> iterator = set.iterator();
                while (iterator.hasNext()) {
                    ZSetOperations.TypedTuple<Object> next = iterator.next();
                    consumer(MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC, next);
                }
            }
        }
    }

    /**
     * 获取分布式琐
     *
     * @return
     */
    private boolean lock() {
        try {
            long expireAt = System.currentTimeMillis() + LOCK_EXPIRE + 1;
            return (Boolean) this.redisTemplate.execute((RedisCallback) connection -> {
                Boolean acquire = connection.setNX(setNX.getBytes(), String.valueOf(expireAt).getBytes());
                if (acquire) {
                    return true;
                }
                byte[] value = connection.get(setNX.getBytes());
                if (Objects.isNull(value) || value.length <= 0) {
                    return false;
                }
                long expireTime = Long.parseLong(new String(value));
                if (expireTime < System.currentTimeMillis()) {
                    // 若是锁已通过期
                    byte[] oldValue = connection.getSet(setNX.getBytes(),
                            String.valueOf(System.currentTimeMillis() + LOCK_EXPIRE + 1).getBytes());
                    // 防止死锁
                    return Long.parseLong(new String(oldValue)) < System.currentTimeMillis();
                }
                return true;
            });
        } catch (Exception e) {
            log.error("obtain lock option fail, caught exception:", e);
            return false;
        }
    }

    /**
     * 删除过时的数据
     *
     * @param value
     */
    private void removeDataByExpireTime(String key, Object value) {
        redisTemplate.opsForZSet().remove(key, value);
    }

    /**
     * 消费
     *
     * @param next
     */
    private void consumer(String key, ZSetOperations.TypedTuple<Object> next) {
        // processor and remove
        if (!ifExpire(next.getScore())) {
            return;
        }
       if (MQTopicConstant.SYS_MESSAGE_SEND_DELAY_TOPIC.equals(key)) {
            // in queue
            SEND_QUEUE.offer(sysMessage);
        }
        // remove
        removeDataByExpireTime(key, next.getValue());


    }

    /**
     * 过时判断
     *
     * @param expireTime
     * @return
     */
    private boolean ifExpire(Double expireTime) {
        return (expireTime.longValue() + 1000) <= LocalDateTime.now().toInstant(ZoneOffset.of("+8")).toEpochMilli();
    }

    @Override
    public void sendProcessor() {
        // 监听发送队列的变化
        monitorSendQueue();
    }
}
/**
 *
 */
@Slf4j
public class SysMessageDelayedQueueProcessorListener extends ISysMessageDelayedListener {
    private DelayQueue<SysMessageDelayedQueueProcessorImpl> sendDelayQueue;

    public SysMessageDelayedQueueProcessorListener(DelayQueue<SysMessageDelayedQueueProcessorImpl> sendDelayQueue) {
        this.sendDelayQueue = sendDelayQueue;
        new Thread(new SysMessagePushWork(SEND_QUEUE)).start();
    }

  

    @Override
    public void sendProcessor() {
        CompletableFuture.runAsync(() -> {
            while (true) {
                try {
                    // processor
                    SysMessageDelayedQueueProcessorImpl queue = sendDelayQueue.take();
                    if (Objects.isNull(queue)) {
                        continue;
                    }
                   
                    // execute
                    SEND_QUEUE.offer(queue.getData());
                } catch (InterruptedException e) {
                    // 
                }
            }
        });

    }
}
/**
 */
@Configuration
public class SysMessageConfiguration {

    /**
     * 基于rabbitMQ的延迟处理
     * @return
     */
    @Primary
    @ConditionalOnClass(name = "org.springframework.cloud.stream.binding.BinderAwareChannelResolver")
    @Bean
    public SysMessageRabbitMQDelayProcessorImpl createSysMessageRabbitMQDelayProcessor() {

        return new SysMessageRabbitMQDelayProcessorImpl();
    }

//    /**
//     * 基于redis的延迟处理
//     * @return
//     */
//    @ConditionalOnClass(RedisTemplate.class)
//    @ConditionalOnMissingClass("org.springframework.cloud.stream.binding.BinderAwareChannelResolver")
//    @Bean
//    public SysMessageRedisDelayProcessorImpl createSysMessageRedisDelayProcessor() {
//        return new SysMessageRedisDelayProcessorImpl();
//    }

    /**
     * 基于内存的延迟处理
     * @return
     */
    @ConditionalOnMissingClass({"org.springframework.cloud.stream.binding.BinderAwareChannelResolver",
            "org.springframework.data.redis.core.RedisTemplate"})
    @Bean
    public SysMessageDelayedQueueProcessorImpl createSysMessageDelayedQueueProcessor() {
        return new SysMessageDelayedQueueProcessorImpl();
    }
}
private ISysMessageDelayProcessor sysMessageDelayProcessor;

    @Autowired
    public xxxx(ISysMessageDelayProcessor sysMessageDelayProcessor) {
        this.sysMessageDelayProcessor = sysMessageDelayProcessor;
    }

 其余部分业务代码按需处理便可ui