在使用Redis的过程当中,咱们对Redis作的每个操做,下发的每个命令, 均可以认为是事件的存在。所谓事件监听,就是Redis Server会对客户端下发命令进行一个监控, 一但有人对Redis Server作操做, Redis Server都能知道,并经过某种方式将监听到的事件转发到对应的订阅者。 html
一个电商商家后台,商家能够设置多个商品的价格并指订价格的生效时间。后台服务须要生效时间到时对全部已经上架的商品进行价格修改。并在价格修改为功后通知全部关注该商品的买家客户。java
注意: 假设该平台拥有1w商家,平均每一个商家设置了100个商品,也就是你要保证200w件商品价格修改的实时通知性。nginx
解决方案一: 每一个商品都有一份表去记录全部的新价格和生效时间,由定时任务job去轮询表中的数据,若是符合当前时间则取出并执行接下来的业务逻辑。web
解决方案二: 每一个商品都有一份表去记录全部的新价格和生效时间,由多个分布式job去轮询表中的数据,为了减轻job服务实例的压力,设置每2秒执行一次(定时任务不建议设置每秒)。在这基础上其实还有优化的空间,能够在设置分布式job分片处理逻辑。对于每个job实例,还能够在其内部开启异步线程并行处理。redis
从上述的描述中咱们能够发现,用户量仍是比较大,其实实时性要求比较高,因此若是咱们把数据落库,而后每次定时的时候从数据库里面去取而后作逻辑的判断,这样确定是没法达到实时性的要求的,因此有一种方案是采用redis来管理这批数据。可是也有两个个问题spring
要解决这个功能就须要使用到redis的一个高级的功能:redis 键空间通知(供Keyspace Notifications功能)其容许客户Publish / Subscribe ,以便以某种方式接收影响Redis数据集的事件。数据库
一样是电商平台,商家能够设置商品的预售时间, 当预售时间到达时,修改商品状态,并上架商品。该需求和需求一相似,都是以时间或者秒做为计算依据,每一个商品都是独立的,它们的时间属性都不会同样,因此是没有规律性的。编程
订单超时30分钟自动关闭。(无论多少订单,都是固定的时间间隔30分钟,有规律)
这个问题解决的方案就有多种了,咱们能够经过MQ来进行,如今大多的MQ都带有死信队列的机制,咱们能够经过这个机制来完成,其次也能够经过quartz的轮询方式的完成,选择合适解决方案应对当前的需求便可。固然本次主要是解决第一个需求,因此只谈如何使用redis来解决。ruby
因为Keyspace Notifications是在Redis 2.8.0以后的版本才提供的功能,因此咱们的Redis版本须要再2.8.0之上,不然没法使用Redis时间监听,在笔者写这篇文章之时,Redis的最新正式版本已经为5.0了 服务器
修改Redis配置,开启Keyspace Notifications的两种方式
CONFIG set notify-keyspace-events AKEx
- 配置文件修改
修改配置文件redis.conf,notify-keyspace-events AKEx
,从新启动Redis
参数说明
1)notify-keyspace-events选项的参数为空字符串时,表示功能关闭,当参数不是空字符串时,表示功能开启
2)notify-keyspace-events默功能是关闭的
3)若是要使用此功能,必须字符串包含 K 或者 E,不然收不到任何事件消息
4)若是参数为“AKE”,意味着接收全部事件消息
notify-keyspace-events 的参数能够是如下字符的任意组合, 它指定了服务器该发送哪些类型的通知:
字符 发送的通知 K 键空间通知,全部通知以 keyspace@ 为前缀 E 键事件通知,全部通知以 keyevent@ 为前缀 g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知 字符串命令的通知 l 列表命令的通知 s 集合命令的通知 h 哈希命令的通知 z 有序集合命令的通知 x 过时事件:每当有过时键被删除时发送 e 驱逐(evict)事件:每当有键由于 maxmemory 政策而被删除时发送 A 参数 g lshzxe 的别名
同时监听 set、get、del 、 expire 操做
注意:get 操做监听不到消息,set
,del
,expire
若是操做成功能够监听到消息,若是操做失败也监听不到消息.
更多命令参考
# 以keyevent订阅库0上的set、get、del、expire多个事件
subscribe __keyevent@0__:set __keyevent@0__:get __keyevent@0__:del __keyevent@0__:expire
# 以keyspace订阅库0上关于key为mykey的全部事件
subscribe __keyspace@0__:mykey
复制代码
模式匹配则使用psubscribe
# 以keyspace订阅库0上关于key为mykey:*的全部事件
psubscribe __keyspace@0__:mykey:*
# 以keyevent、keyspace订阅全部库上的全部事件
psubscribe __key*@*__:*
复制代码
使用技术Spring Boot + RedisTemplate
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
@Component
public class RedisExpiredListener implements MessageListener {
public final static String LISTENER_PATTERN = "__key*@*__:*";
@Override
public void onMessage(Message message, byte[] bytes) {
// 建议使用: valueSerializer
String body = new String(message.getBody());
String channel = new String(message.getChannel());
System.out.println("onMessage >> " + String.format("channel: %s, body: %s, bytes: %s"
, channel, body, new String(bytes)));
if (body.startsWith("product:")) {
final String productId = body.replace("product:", "");
System.out.println("获得产品id:" + productId);
}
}
}
复制代码
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@SpringBootApplication
public class RedisExpiredApplication implements CommandLineRunner {
@Autowired
private RedisTemplate redisTemplate;
public static void main(String[] args) {
SpringApplication.run(RedisExpiredApplication.class, args);
}
@Bean
@Primary
public RedisTemplate redisTemplate() {
RedisSerializer<String> stringSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringSerializer);
redisTemplate.setValueSerializer(stringSerializer);
redisTemplate.setHashKeySerializer(stringSerializer);
redisTemplate.setHashValueSerializer(stringSerializer);
return redisTemplate;
}
@Bean
public RedisMessageListenerContainer listenerContainer(RedisConnectionFactory redisConnection, Executor executor) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 设置Redis的链接工厂
container.setConnectionFactory(redisConnection);
// 设置监听使用的线程池
container.setTaskExecutor(executor);
// 设置监听的Topic: PatternTopic/ChannelTopic
Topic topic = new PatternTopic(RedisExpiredListener.LISTENER_PATTERN);
// 设置监听器
container.addMessageListener(new RedisExpiredListener(), topic);
return container;
}
@Bean
public Executor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("V-Thread");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Override
public void run(String... strings) throws Exception {
redisTemplate.opsForValue().set("orderId:123", "过时了是取不到的", 5, TimeUnit.SECONDS);
System.out.println("初始化设置 key 过时时间 5s");
System.out.println("main 线程休眠10秒");
Thread.sleep(10 * 1000);
System.out.println("main 线程休眠结束:获取key orderId结果为:" + redisTemplate.opsForValue().get("orderId:123"));
}
复制代码
spring.redis.database=0
spring.redis.host=192.168.104.102
spring.redis.port=6378
spring.redis.pool.max-idle=8
spring.redis.pool.min-idle=0
spring.redis.pool.max-active=8
spring.redis.pool.max-wait=-1
复制代码
效果展现:
由于redis key 过时以后,其中的value是没法获取到的, 因此在设计key的时候就包含了业务主键id在其中,以此来解决value消失没法处理业务逻辑的状况。到这里,就能够根据具体到期时间执行具体逻辑了。
Redis过时命令设置
# Redis Expire 命令用于设置 key 的过时时间。key 过时后将再也不可用。
Expire KEY_NAME TIME_IN_SECONDS
# Redis Expireat 命令用于以 UNIX 时间戳(unix timestamp)格式设置 key 的过时时间。key 过时后将再也不可用。
Expireat KEY_NAME TIME_IN_UNIX_TIMESTAMP
# Redis PEXPIREAT 命令用于设置 key 的过时时间,已毫秒计。key 过时后将再也不可用。
PEXPIREAT KEY_NAME TIME_IN_MILLISECONDS_IN_UNIX_TIMESTAMP
复制代码
由于 Redis 目前的订阅与发布功能采起的是 发送即忘(fire and forget) 策略, 因此若是你的程序须要可靠事件通知(reliable notification of events), 那么目前的键空间通知可能并不适合你:当订阅事件的客户端(服务实例)断线时, 它会丢失全部在断线期间分发给它的事件。并不能确保消息送达。将来有计划容许更可靠的事件传递,但可能这将在更基础的层面上解决,或者为Pub / Sub自己带来可靠性,或者容许Lua脚本拦截Pub / Sub消息来执行诸如推送将事件列入清单。
文章每周持续更新,能够微信搜索「 十分钟学编程 」第一时间阅读和催更,若是这个文章写得还不错,以为有点东西的话 ~求点赞👍 求关注❤️ 求分享❤️
各位的支持和承认,就是我创做的最大动力,咱们下篇文章见!