Redis系列(二)--消息队列

1.概念node

Redis 的 list(列表) 数据结构经常使用来做为异步消息队列使用,使用rpush/lpush操做入队列,使用lpop 和 rpop来出队列。Redis 的消息队列不是专业的消息队列,它没有很是多的高级特性,没有 ack 保证,若是对消息的可靠性有着极致的追求,那么它就不适合使用。命令方式:web

生产数据:redis

rpush test:aaron:queue apple banana pear

 

消费数据:spring

服务器

lpop test:aaron:queue

数据结构

rpop test:aaron:queue

注意:注意执行命令的时候,须要执行三次,才能所有获取到数据。app

 

思考1:队列空了怎么办?异步

 

客户端是经过队列的 pop 操做来获取消息,而后进行处理。处理完了再接着获取消息,再进行处理。如此循环往复,这即是做为队列消费者的客户端的生命周期。ide

 

但是若是队列空了,客户端就会陷入 pop 的死循环,不停地 pop,没有数据,接着再 pop,又没有数据。这就是浪费生命的空轮询。空轮询不但拉高了客户端的 CPU,redis 的 QPS 也会被拉高,若是这样空轮询的客户端有几十来个,Redis 的慢查询可能会显著增多。函数

 

一般咱们使用 sleep 来解决这个问题,让线程睡一会,睡个 1s 钟就能够了。不但客户端的 CPU 能降下来,Redis 的 QPS 也降下来了。

 

解决方式:

 

有没有什么办法能显著下降延迟呢?你固然能够很快想到:那就把睡觉的时间缩短点。这种方式固然能够,不过有没有更好的解决方案呢?固然也有,那就是 blpop/brpop

 

这两个指令的前缀字符b表明的是blocking,也就是阻塞读。

 

阻塞读在队列没有数据的时候,会当即进入休眠状态,一旦数据到来,则马上醒过来。消息的延迟几乎为零。用blpop/brpop替代前面的lpop/rpop,就完美解决了上面的问题。

 

思考2:空闲链接自动断开

 

你觉得上面的方案真的很完美么?先别急着开心,其实他还有个问题须要解决。

 

什么问题?—— 空闲链接的问题。

 

若是线程一直阻塞在哪里,Redis 的客户端链接就成了闲置链接,闲置太久,服务器通常会主动断开链接,减小闲置资源占用。这个时候blpop/brpop会抛出异常来。

 

因此编写客户端消费者的时候要当心,注意捕获异常,还要重试。

2.代码

  1. py代码




import redis#redis 链接pool = redis.ConnectionPool(host='127.0.0.1', port=6379)r = redis.Redis(connection_pool=pool)#消息队列-def input_mq():#if 条件语句if r :count = 0while( count < 3):r.rpush("notify:queue","apple" + str(count))count = count + 1print(r.llen("notify:queue"))#消息队列-延迟队列def out_mq():#if 条件语句if r :count = 0;while( count < 3):print(r.brpop("notify:queue"))count = count + 1print(count)#主函数,执行行数if __name__ == '__main__':    input_mq()    out_mq()

    2 Java代码

Java代码

    2.1 目录结构

 

    2.2 pom文件和配置

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
spring:redis:database: 0 #索引(默认为0)host: localhost #地址port: 6379 #端口号#password:  #链接密码(默认空)pool:max-idle: 8 #链接池中的最大空闲链接min-idle: 0 #链接池中的最小空闲链接max-active: 8 #链接池最大链接数(使用负值表示没有限制)max-wait: -1 #链接池最大阻塞等待时间(使用负值表示没有限制)#sentinel:#master: mymaster # 哨兵监听的Redis server的名称#nodes:#127.0.0.1:26379,127.0.0.1:26479,127.0.0.1:26579 #哨兵的配置列表timeout: 5000 #链接超时时间(毫秒)server:port: 8000

        2.3生产者




package com.example.redis.zfr.demoredis.mq;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;@Controllerpublic class RedisController {@Autowiredprivate RedisTemplate<String, String> redisTemplate;/*** 发布消息* @param id* @return*/@RequestMapping("/sendMessage/{id}")public String sendMessage(@PathVariable String id) {redisTemplate.convertAndSend("msg1","哈哈哈,mq 繁荣Aaron 你好"+id);        //主要为了测试多个主题的发送redisTemplate.convertAndSend("msg","哈哈哈,mq 繁荣Aaron 你好"+id);return "";}}

        2.4 消费者




package com.example.redis.zfr.demoredis.mq;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.connection.MessageListener;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.RedisSerializer;import org.springframework.stereotype.Component;@Componentpublic class RedisMessage implements MessageListener{@Autowiredprivate RedisTemplate<Object, Object> redisTemplate;@Overridepublic void onMessage(Message message, byte[] pattern) {RedisSerializer<String> serializer = redisTemplate.getStringSerializer();String msg = serializer.deserialize(message.getBody());System.out.println("接收到的消息是:" + msg);}}

        2.5 配置







package com.example.redis.zfr.demoredis.mq;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.listener.PatternTopic;import org.springframework.data.redis.listener.RedisMessageListenerContainer;import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;@Configurationpublic class RedisSubConfig {/*** 建立链接工厂** @param connectionFactory* @param adapter* @return*/@Beanpublic RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,MessageListenerAdapter adapter,MessageListenerAdapter adapter1) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(adapter, new PatternTopic("msg"));        //主要为了测试多个主题的发送container.addMessageListener(adapter1,new PatternTopic("msg1"));return container;}/*** @param message* @return*/@Beanpublic MessageListenerAdapter adapter(RedisMessage message){// onMessage 若是RedisMessage 中 没有实现接口,这个参数必须跟RedisMessage中的读取信息的方法名称同样return new MessageListenerAdapter(message, "onMessage");}    /**主要为了测试多个主题的发送* @param message* @return*/@Beanpublic MessageListenerAdapter adapter1(RedisMessage1 message){// onMessage 若是RedisMessage 中 没有实现接口,这个参数必须跟RedisMessage中的读取信息的方法名称同样return new MessageListenerAdapter(message, "onMessage");}}

3.思考

思考问题

Redis 做为消息队列为何不能保证 100% 的可靠性?

解决

pop出消息后,list 中就没这个消息了,若是处理消息的程序拿到消息还未处理就挂掉了,那消息就丢失了,因此是不可靠队列。https://redis.io/commands/rpoplpush 这个能够实现可靠队列。

相关文章
相关标签/搜索