首先作简单的引入。redis
MQ主要是用来:服务器
目前使用的较多的有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。
网上的资源对各类状况都有详细的解释,在此不作过多赘述。本文
仅介绍如何使用Redis实现轻量级MQ的过程。数据结构
在业务的实现过程当中,就算没有大量的流量,解耦和异步化几乎也是到处可用,此时MQ就显得尤其重要。但与此同时MQ也是一个蛮重的组件,例如咱们若是用RabbitMQ就必须为它搭建一个服务器,同时若是要考虑可用性,就要为服务端创建一个集群,并且在生产若是有问题也须要查找功能。在中小型业务的开发过程当中,可能业务的其余整个实现都没这个重。太重的组件服务会成倍增长工做量。
所幸的是,Redis提供的list数据结构很是适合作消息队列。
可是如何实现即时消费?如何实现ack机制?这些是实现的关键所在。app
网上所流传的方法是使用Redis中list的操做BLPOP或BRPOP,即列表的阻塞式(blocking)弹出。
让咱们来看看阻塞式弹出的使用方式:异步
BRPOP key [key ...] timeout
此命令的说明是:ide
一、当给定列表内没有任何元素可供弹出的时候,链接将被 BRPOP 命令阻塞,直到等待超时或发现可弹出元素为止。
二、当给定多个key参数时,按参数 key 的前后顺序依次检查各个列表,弹出第一个非空列表的尾部元素。
另外,BRPOP 除了弹出元素的位置和 BLPOP 不一样以外,其余表现一致。ui
以此来看,列表的阻塞式弹出有两个特色:spa
一、若是list中没有任务的时候,该链接将会被阻塞
二、链接的阻塞有一个超时时间,当超时时间设置为0时,便可无限等待,直到弹出消息
由此看来,此方式是可行的,但此为传统的观察者模式,业务简单则可以使用,如A的任务只由B去执行。但若是A和Z的任务,B和C都能执行,那使用这种方式就相形见肘。这个时候就应该使用订阅/发布模式,使业务系统更加清晰。
好在Redis也支持Pub/Sub(发布/订阅)。在消息A入队list的同时发布(PUBLISH)消息B到频道channel,此时已经订阅channel的worker就接收到了消息B,知道了list中有消息A进入,便可循环lpop或rpop来消费list中的消息。流程以下:线程
其中的worker能够是单独的线程,也能够是独立的服务,其充当了Consumer和业务处理者角色。下面作实例说明。code
示例场景为:worker要作同步文件功能,等到有文件生成时立马同步。
首先开启一个线程表明worker,来订阅频道channel:
@Service public class SubscribeService { @Resource private RedisService redisService; @Resource private SynListener synListener;//订阅者 @PostConstruct public void subscribe() { new Thread(new Runnable() { @Override public void run() { LogCvt.info("服务已订阅频道:{}", channel); redisService.subscribe(synListener, channel); } }).start(); } }
代码中的SynListener即为所声明的订阅者,channel为订阅的频道名称,具体的订阅逻辑以下:
@Service public class SynListener extends JedisPubSub { @Resource private DispatchMessageHandler dispatchMessageHandler; @Override public void onMessage(String channel, String message) { LogCvt.info("channel:{},receives message:{}",channel,message); try { //处理业务(同步文件) dispatchMessageHandler.synFile(); } catch (Exception e) { LogCvt.error(e.getMessage(),e); } } }
处理业务的时候,就去list中去消费消息:
@Service public class DispatchMessageHandler { @Resource private RedisService redisService; @Resource private MessageHandler messageHandler; public void synFile(){ while(true){ try { String message = redisService.lpop(RedisKeyUtil.syn_file_queue_key()); if (null == message){ break; } Thread.currentThread().setName(Tools.uuid()); // 队列数据处理 messageHandler.synfile(message); } catch (Exception e) { LogCvt.error(e.getMessage(),e); } } } }
这样咱们就达到了消息的实时消费的目的。
ack,即消息确认机制(Acknowledge)。
首先来看RabbitMQ的ack机制:
那么在咱们用Redis实现消息队列的ack机制的时候该怎么作呢?
须要注意两点:
上面第一点能够在业务中完成,即失败后执行回滚消息。
(该方案主要解决worker挂掉的状况)
Redis做为消息队列是有很大局限性的。由于其主要特性及用途决定它只能实现轻量级的消息队列。写在最后:没有绝对好的技术,只有对业务最友好的技术,谨此献给全部developer。