GitHub 9.4k Star 的Java工程师成神之路 ,不来了解一下吗?html
GitHub 9.4k Star 的Java工程师成神之路 ,真的不来了解一下吗?java
GitHub 9.4k Star 的Java工程师成神之路 ,真的肯定不来了解一下吗?git
众所周知,redis是一个高性能的key-value数据库,在NoSQL数据库市场上,redis本身就占据了将近半壁江山,足以见到其强大之处。同时,因为redis的单线程特性,咱们能够将其用做为一个消息队列。本篇文章就来说讲如何将redis整合到spring boot中,并用做消息队列的……github
“消息队列”是在消息的传输过程当中保存消息的容器。——《百度百科》web
消息咱们能够理解为在计算机中或在整个计算机网络中传递的数据。redis
队列是咱们在学习数据结构的时候学习的基本数据结构之一,它具备先进先出的特性。spring
因此,消息队列就是一个保存消息的容器,它具备先进先出的特性。数据库
下面一张图咱们来简单了解一下消息队列apache
由上图能够看到,消息队列充当了一个中间人的角色,咱们能够经过操做这个消息队列来保证咱们的系统稳定。json
Java环境:jdk1.8
spring boot版本:2.2.1.RELEASE
redis-server版本:3.2.100
这里只展现与redis相关的依赖,
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
复制代码
这里解释一下这两个依赖:
这里只展现与redis相关的配置
# redis所在的的地址
spring.redis.host=localhost
# redis数据库索引,从0开始,能够从redis的可视化客户端查看
spring.redis.database=1
# redis的端口,默认为6379
spring.redis.port=6379
# redis的密码
spring.redis.password=
# 链接redis的超时时间(ms),默认是2000
spring.redis.timeout=5000
# 链接池最大链接数
spring.redis.jedis.pool.max-active=16
# 链接池最小空闲链接
spring.redis.jedis.pool.min-idle=0
# 链接池最大空闲链接
spring.redis.jedis.pool.max-idle=16
# 链接池最大阻塞等待时间(负数表示没有限制)
spring.redis.jedis.pool.max-wait=-1
# 链接redis的客户端名
spring.redis.client-name=mall
复制代码
redis用做消息队列,其在spring boot中的主要表现为一个RedisTemplate.convertAndSend()
方法和一个MessageListener
接口。因此咱们要在IOC容器中注入一个RedisTemplate
和一个实现了MessageListener
接口的类。话很少说,先看代码
配置RedisTemplate的主要目的是配置序列化方式以解决乱码问题,同时合理配置序列化方式还能下降一点性能开销。
/**
* 配置RedisTemplate,解决乱码问题
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
LOGGER.debug("redis序列化配置开始");
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
// string序列化方式
RedisSerializer serializer = new GenericJackson2JsonRedisSerializer();
// 设置默认序列化方式
template.setDefaultSerializer(serializer);
template.setKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(serializer);
LOGGER.debug("redis序列化配置结束");
return template;
}
复制代码
代码第12行,咱们配置默认的序列化方式为GenericJackson2JsonRedisSerializer
代码第13行,咱们配置键的序列化方式为StringRedisSerializer
代码第14行,咱们配置哈希表的值的序列化方式为GenericJackson2JsonRedisSerializer
序列化方式 | 介绍 |
---|---|
StringRedisSerializer |
将对象序列化为字符串,可是经测试,没法序列化对象,通常用在key上 |
OxmSerializer |
将对象序列化为xml性质,本质上是字符串 |
ByteArrayRedisSerializer |
默认序列化方式,将对象序列化为二进制字节,可是须要对象实现Serializable接口 |
GenericFastJsonRedisSerializer |
json序列化,使用fastjson序列化方式序列化对象 |
GenericJackson2JsonRedisSerializer |
json序列化,使用jackson序列化方式序列化对象 |
上面说了,与redis队列监听器相关的类为一个名为MessageListener
的接口,下面是该接口的源码
public interface MessageListener {
void onMessage(Message message, @Nullable byte[] pattern);
}
复制代码
能够看到,该接口仅有一个onMessage(Message message, @Nullable byte[] pattern)
方法,该方法即是监听到队列中消息后的回调方法。下面解释一下这两个参数:
byte[] getBody()
以二进制形式获取消息体byte[] getChannel()
以二进制形式获取消息通道message.getChannel()
返回值相同介绍完接口,咱们来实现一个简单的redis队列监听器
@Component
public class RedisListener implement MessageListener{
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);
@Override
public void onMessage(Message message,byte[] pattern){
LOGGER.debug("从消息通道={}监听到消息",new String(pattern));
LOGGER.debug("从消息通道={}监听到消息",new String(message.getChannel()));
LOGGER.debug("元消息={}",new String(message.getBody()));
// 新建一个用于反序列化的对象,注意这里的对象要和前面配置的同样
// 由于我前面设置的默认序列化方式为GenericJackson2JsonRedisSerializer
// 因此这里的实现方式为GenericJackson2JsonRedisSerializer
RedisSerializer serializer=new GenericJackson2JsonRedisSerializer();
LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody()));
}
}
复制代码
代码很简单,就是输出参数中包含的关键信息。须要注意的是,RedisSerializer
的实现要与上面配置的序列化方式一致。
队列监听器实现完之后,咱们还须要将这个监听器添加到redis队列监听器容器中,代码以下:
@Bean
public public RedisMessageListenerContainer container(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(redisListener, new PatternTopic("demo-channel"));
return container;
}
复制代码
这几行代码大概意思就是新建一个Redis消息监听器容器,而后将监听器和管道名想绑定,最后返回这个容器。
这里要注意的是,这个管道名和下面将要说的推送消息时的管道名要一致,否则监听器监听不到消息。
上面咱们配置了RedisTemplate将要在这里使用到。
代码以下:
@Service
public class Publisher{
@Autowrite
private RedisTemplate redis;
public void publish(Object msg){
redis.convertAndSend("demo-channel",msg);
}
}
复制代码
关键代码为第7行,redis.convertAndSend()
这个方法的做用为,向某个通道(参数1)推送一条消息(第二个参数)。
这里仍是要注意上面所说的,生产者和消费者的通道名要相同。
至此,消息队列的生产者和消费者已经所有编写完成。
在我添加了spring-boot-starter-log4j2
依赖并在spring-boot-starter-web
中排除了spring-boot-starter-logging
后,运行项目,仍是会提示下面的错误:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:.....m2/repository/ch/qos/logback/logback-classic/1.2.3/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:.....m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.12.1/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
复制代码
这个错误就是maven中有多个日志框架致使的。后来经过依赖分析,发如今spring-boot-starter-data-redis
中,也依赖了spring-boot-starter-logging
,解决办法也很简单,下面贴出详细代码
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
复制代码
redis队列监听器的监听机制是:使用一个线程监听队列,队列有未消费的消息则取出消息并生成一个新的线程来消费消息。若是你还记得,我开头说的是因为redis单线程特性,所以咱们用它来作消息队列,可是若是监听器每次接受一个消息就生成新的线程来消费信息的话,这样就彻底没有使用到redis的单线程特性,同时还会产生线程安全问题。
最简单的办法莫过于为onMessage()
方法加锁,这样简单粗暴却颇有用,不过这种方式没法控制队列监听的速率,且无限制的创造线程最终会致使系统资源被占光。
那如何解决这种状况呢?线程池。
在将监听器添加到容器的配置的时候,RedisMessageListenerContainer
类中有一个方法setTaskExecutor(Executor taskExecutor)
能够为监听容器配置线程池。配置线程池之后,全部的线程都会由该线程池产生,由此,咱们能够经过调节线程池来控制队列监听的速率。
单一消费者的问题相比于多个消费者来讲仍是较为简单,由于Java内置的锁都是只能控制本身程序的运行,不能干扰其余的程序的运行;然而如今不少时候咱们都是在分布式环境下进行开发,这时处理多个消费者的状况就颇有意义了。
那么这种问题如何解决呢?分布式锁。
下面来简要科普一下什么是分布式锁:
分布式锁是指在分布式环境下,同一时间只有一个客户端可以从某个共享环境中(例如redis)获取到锁,只有获取到锁的客户端才能执行程序。
而后分布式锁通常要知足:排他性(即同一时间只有一个客户端可以获取到锁)、避免死锁(即超时后自动释放)、高可用(即获取或释放锁的机制必须高可用且性能佳)
上面讲依赖的时候,咱们导入了一个spring-integration-redis
依赖,这个依赖里面包含了不少实用的工具类,而咱们接下来要讲的分布式锁就是这个依赖下面的一个工具包RedisLockRegistry
。
首先讲一下如何使用,导入了依赖之后,首先配置一个Bean
@Bean
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory factory) {
return new RedisLockRegistry(factory, "demo-lock",60);
}
复制代码
RedisLockRegistry
的构造函数,第一个参数是redis链接池,第二个参数是锁的前缀,即取出的锁,键名为“demo-lock:KEY_NAME”,第三个参数为锁的过时时间(秒),默认为60秒,当持有锁超过该时间后自动过时。
使用锁的方法,下面是对监听器的修改
@Component
public class RedisListener implement MessageListener{
@Autowrite
private RedisLockRegistry redisLockRegistry;
private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);
@Override
public void onMessage(Message message,byte[] pattern){
Lock lock=redisLockRegistry.obtain("lock");
try{
lock.lock(); //上锁
LOGGER.debug("从消息通道={}监听到消息",new String(pattern));
LOGGER.debug("从消息通道={}监听到消息",new String(message.getChannel()));
LOGGER.debug("元消息={}",new String(message.getBody()));
// 新建一个用于反序列化的对象,注意这里的对象要和前面配置的同样
// 由于我前面设置的默认序列化方式为GenericJackson2JsonRedisSerializer
// 因此这里的实现方式为GenericJackson2JsonRedisSerializer
RedisSerializer serializer=new GenericJackson2JsonRedisSerializer();
LOGGER.debug("反序列化后的消息={}",serializer.deserialize(message.getBody()));
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock(); //解锁
}
}
}
复制代码
上面代码的代码比起前面的监听器代码,只是多了一个注入的RedisLockRegistry
,一个经过redisLockRegistry.obtain()
方法获取锁,一个加锁一个解锁,而后这就完成了分布式锁的使用。
注意这个获取锁的方法redisLockRegistry.obtain()
,其返回的是一个名为RedisLock的锁,这是一个私有内部类,它实现了Lock接口,所以咱们不能从代码外部建立一个他的实例,只能经过obtian()方法来获取这个锁。