1.redis的发布订阅功能,很简单。
消息发布者和消息订阅者互相不认得,也不关心对方有谁。
消息发布者,将消息发送给频道(channel)。
而后是由 频道(channel)将消息发送给对本身感兴趣的 消息订阅者们,进行消费。redis
2.redis的发布订阅和专业的MQ相比较spring
1>redis的发布订阅只是最基本的功能,不支持持久化,消息发布者将消息发送给频道。若是没有订阅者消费,消息就丢失了。
2>在消息发布过程当中,若是客户端和服务器链接超时,MQ会有重试机制,事务回滚等。可是Redis没有提供消息传输保障。
3>简单的发布订阅可使用redis,根据业务需求选择。apache
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--spring2.0集成redis所需common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<!-- 使用redis的LUA脚本 须要序列化操做的jar-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
为redis添加消息适配器,绑定消息处理器服务器
消息适配器 能够添加多个 app
package com.sxd.swapping.config; import com.sxd.swapping.redisReceiver.RedisReceiver; import com.sxd.swapping.redisReceiver.RedisReceiver2; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; /** * @author sxd * @date 2019/5/27 16:13 */ @Configuration @AutoConfigureAfter(RedisAutoConfiguration.class) public class RedisConfig { /** * redis消息监听器容器 * 能够添加多个监听不一样话题的redis监听器,只须要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 * 经过反射技术调用消息订阅处理器的相关方法进行一些业务处理 * @param connectionFactory * @param listenerAdapter * @return
*/ @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter, MessageListenerAdapter listenerAdapter2) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //能够添加多个 messageListener //能够对 messageListener对应的适配器listenerAdapter 指定本适配器 适配的消息类型 是什么 //在发布的地方 对应发布的redisTemplate.convertAndSend("user",msg); 那这边的就对应的能够消费到指定类型的 订阅消息
container.addMessageListener(listenerAdapter, new PatternTopic("user")); container.addMessageListener(listenerAdapter2, new PatternTopic("goods")); return container; } /** * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法 * * receiveMessage 是默认监听方法 通常不变 * @param redisReceiver redis消息处理器,自定义的 * @return
*/ @Bean MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) { System.out.println("消息适配器1进来了"); return new MessageListenerAdapter(redisReceiver, "receiveMessage"); } /** * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法 * * receiveMessage 是默认监听方法 通常不变 * @param redisReceiver2 redis消息处理器,自定义的 * @return
*/ @Bean MessageListenerAdapter listenerAdapter2(RedisReceiver2 redisReceiver2) { System.out.println("消息适配器2进来了"); return new MessageListenerAdapter(redisReceiver2, "receiveMessage"); } //使用默认的工厂初始化redis操做模板
@Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } @Bean public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) { RedisTemplate redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(factory); RedisSerializer keySerializer = new StringRedisSerializer(); // RedisSerializer valueSerializer = new GenericJackson2JsonRedisSerializer(); //key采用字符串反序列化对象
redisTemplate.setKeySerializer(keySerializer); //value也采用字符串反序列化对象 //缘由:管道操做,是对redis命令的批量操做,各个命令返回结果可能类型不一样 //多是 Boolean类型 多是String类型 多是byte[]类型 所以统一将结果按照String处理
redisTemplate.setValueSerializer(keySerializer); return redisTemplate; } }
package com.sxd.swapping.redisReceiver; import org.springframework.stereotype.Service; /** * * redis 订阅发布 消息接收器/处理器 * @author sxd * @date 2019/5/30 17:12 */ @Service public class RedisReceiver { public void receiveMessage(String message) { System.out.println("消息处理器1>我处理用户信息:"+message); //这里是收到通道的消息以后执行的方法 //此处执行接收到消息后的 业务逻辑
} }
package com.sxd.swapping.redisReceiver; import org.springframework.stereotype.Service; /** * redis 订阅发布 消息接收器/处理器2 * @author sxd * @date 2019/5/30 17:15 */ @Service public class RedisReceiver2 { public void receiveMessage(String message) { System.out.println("消息处理器2>我处理商品信息:"+message); //这里是收到通道的消息以后执行的方法 //此处执行接收到消息后的 业务逻辑
} }
@Autowired RedisTemplate redisTemplate; /** * redis 发布订阅pubsub */ @RequestMapping(value = "/redisPubSub") public void redisPubSub(String msg){ if (msg.contains("用户")){ redisTemplate.convertAndSend("user",msg); }else { redisTemplate.convertAndSend("goods",msg); } }
测试:ide
发送请求:http://localhost:9666/redistest/redisPubSub?msg=用户---德玛西亚的用户spring-boot
结果:测试
消息处理器1>我处理用户信息:用户---德玛西亚的用户spa
发送请求:http://localhost:9666/redistest/redisPubSub?msg=goods---德玛西亚的用商品3d
结果:
消息处理器2>我处理商品信息:goods---德玛西亚的用商品