redis 支持对 list,set 和 zset 元素的排序,排序的时间复杂度是 O(N+M*log(M))。(N 是集合大小,M 为返回元素的数量)java
sort key [BY pattern] [LIMIT offset count] [GET pattern [GET pattern ...]] [ASC|DESC] [ALPHA] [STORE destination]
假设如今有用户数据以下:
redis
127.0.0.1:6379> sadd uid 1 2 3 4 127.0.0.1:6379> mset user_name_1 admin user_level_1 9999 127.0.0.1:6379> mset user_name_2 jack user_level_2 10 127.0.0.1:6379> mset user_name_3 peter user_level_3 25 127.0.0.1:6379> mset user_name_4 mary user_level_4 70
首先,直接利用集合内的元素作排序操做:spring
127.0.0.1:6379> sort uid 1) "1" 2) "2" 3) "3" 4) "4"
接着,咱们来试试 [BY pattern] 和 [GET pattern [GET pattern ...]] 操做:ide
127.0.0.1:6379> sort uid by user_name_* get # get user_name_* get user_level_* alpha 1) "1" 2) "admin" 3) "9999" 4) "2" 5) "jack" 6) "10" 7) "4" 8) "mary" 9) "70" 10) "3" 11) "peter" 12) "25"
这个语句有点晦涩,试着这么理解 “by user_name_* ”, user_name_* 是一个占用符,它先取出 uid 中的值,而后再用这个值拼接成外部键,而真正进行排序的正是这些外部键值;“get # get user_name_* get user_level_* ” 的含义也能够这么理解,get # 表示返回本身元素,[get pattern] 表示返回外部键值。工具
redis 的事务机制主要是由下面的几个指令来完成:性能
当 redis 接受到 multi 指令时,这个链接会进入一个事务上下文,该链接后续的命令并非当即执行,而是先放到一个队列中;当从链接受到 exec 命令后,redis 会顺序的执行队列中的全部命令。并将全部命令的运行结果打包到一块儿返回给 client。而后此链接就结束事务上下文。ui
redis 将是否有 watch 命令分为普通类型事务和 CAS(Check And Set)类型事务,无 watch 命令的为普通类型事务,有 watch 命令的为 CAS类型事务。code
事务失败的缘由能够分为静态错误(如不存在的命令)和运行时错误(如 CAS 错误、对 string 用 lpop 操做等)。静态错误会在提交 exec 时返回错误信息,使事务不能执行;而除 CAS 之外的运行时错误不会阻止事务继续执行。所以,Redis 的事务机制并不具备原子性。server
127.0.0.1:6379> multi OK 127.0.0.1:6379> lpush list java QUEUED 127.0.0.1:6379> scard list QUEUED 127.0.0.1:6379> exec 1) (integer) 1 2) (error) WRONGTYPE Operation against a key holding the wrong kind of value 127.0.0.1:6379> lrange list 0 -1 1) "java"
能够看到,即便命令错误,事务依然没有被回滚。所以,redis 的事务机制过于原始,不建议使用。blog
须要注意的是,若是咱们使用 AOF 的方式持久化,可能存在事务被部分写入的状况(事务执行过程当中 redis 挂掉等)从而致使 redis 启动失败退出,可使用 redis-check-aof 工具进行修复。
在事务中 redis 提供了队列,能够批量执行任务,这样性能就比较高,但使用 multi…exec 事务命令是有系统开销的,由于它会检测对应的锁和序列化命令。有时咱们但愿在没有任何附加条件的状况下使用队列批量执行一系列命令,这时可使用 redis的流水线(pipeline)技术。
看看如何在 spring-data-redis 中使用 pipeline 功能,须要注意的是 RedisTemplate 的序列化须要使用 StringRedisSerializer,不能使用 JdkSerializationRedisSerializer,不然会致使序列化失败。
@Test public void pipeline() { // 1.executePipelined 重写 入参 RedisCallback 的doInRedis方法 List<Object> resultList = redisTemplate.executePipelined(new RedisCallback<String>() { @Override public String doInRedis(RedisConnection redisConnection) throws DataAccessException { // 2.redisConnection 给本次管道内添加 要一次性执行的多条命令 // 2.1 一个set操做 redisConnection.set("hello".getBytes(), "world".getBytes()); // 2.2一个批量mset操做 Map<byte[], byte[]> tuple = new HashMap(); tuple.put("m_hello_1".getBytes(), "m_world_1".getBytes()); tuple.put("m_hello_2".getBytes(), "m_world_2".getBytes()); tuple.put("m_hello_3".getBytes(), "m_world_3".getBytes()); redisConnection.mSet(tuple); // 2.3一个get操做 redisConnection.get("m_hello_1".getBytes()); // 3 这里必定要返回null,最终pipeline的执行结果,才会返回给最外层 return null; } }); // 4. 最后对redis pipeline管道操做返回结果进行判断和业务补偿 for (Object str : resultList) { System.out.println(str); } }
继 RxJava、Reactor、CompletableFuture 以及 Spring 的事件驱动模型后,咱们又要接触一种观察者模式啦!redis 做为一个pub/sub server,在订阅者和发布者之间起到了消息路由的功能。订阅者能够经过 subscribe 和 psubscribe 命令向 redis server 订阅本身感兴趣的channel ;发布者经过 publish 命令向 redis server 的 channel 发送消息,订阅该 channel 的所有 client 都会收到此消息。一个 client 能够订阅多个 channel,也能够向多个 channel 发送消息。
订阅 channel:
127.0.0.1:6379> subscribe channel Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "channel" 3) (integer) 1 1) "message" 2) "channel" 3) "Hello,World"
发布 channel 消息:
127.0.0.1:6379> publish channel Hello,World (integer) 1
接下来咱们来看看在 spring-data-redis 中如何实现发布订阅功能。首先咱们须要一个消息监听器,只要让它实现 MessageListener 接口便可:
public class ChannelListener implements MessageListener { @Override public void onMessage(Message message, byte[] pattern) { System.out.println("channel is:" + new String(message.getChannel())); System.out.println("channel content:" + new String(message.getBody())); } }
那么,下面怎么作呢?固然是把消息监听器和 channel 绑定在一块儿,让消息监听器知道处理哪一个 channel 的消息:
/** * redis 消息监听器容器, 绑定消息监听器和 channel * * @param connectionFactory * @return */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //订阅了一个叫 channel 的通道 container.addMessageListener(channelAdapter(), new PatternTopic("channel")); //这个 container 能够添加多个 messageListener return container; } /** * 消息监听器适配器 */ @Bean public MessageListenerAdapter channelAdapter() { return new MessageListenerAdapter(new ChannelListener()); }
接下来,让咱们试着往这个 channel 发布一个消息吧!
@Test public void publish() { redisTemplate.convertAndSend("channel", "Hello,World"); }