/** * 测试事务和乐观锁 */ @RunWith(SpringRunner.class) @SpringBootTest public class RedisTransactionTest { private RedisTemplate<String, String> redisTemplate; @Autowired void setRedisTemplate(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } /** * 测试事务。 * 模拟场景是用户出售商品到商店。 */ @Test public void test() { Assert.assertEquals(listItem(), true); } boolean listItem() { // 乐观锁 redisTemplate.watch("inventory:17"); // 判断背包内是否有要出售的物品ItemN,若是没有就结束流程 if (!redisTemplate.opsForSet().isMember("inventory:17", "ItemN")) { redisTemplate.unwatch(); return false; } // 启用事务 redisTemplate.setEnableTransactionSupport(true); redisTemplate.multi(); // 添加到商店 redisTemplate.opsForZSet().add("market:", "ItemN", 9); // 从背包中移除 redisTemplate.opsForSet().remove("inventory:17", "ItemN"); // 执行事务 List<Object> result = redisTemplate.exec(); // 判断事务是否为空 if (result == null || result.size() == 0) return false; return true; } }
使用过时时间特性实现访问次数限制。java
test方法redis
为每一个IP建立一个Key,计数器为Value,过时时间为1分钟,每次请求先判断key是否存在,若是存在就自增计数器,当计数器次数大于指定次数则表明到达限制次数。该方法的缺点是在当前分钟的最后一秒和下一分钟的第一秒,能够连续访问。spring
test1方法ruby
为每个IP建立一个List,List中存放请求时间,每次请求先判断List中的长度是否小于指定次数,若是小于,则表明没有到达限制,将当前请求时间放到List中。ide
若是List中长度大于指定次数,则取出最后一次请求的时间,与当前时间比较,判断是否在指定时间内,若是在时间内,表明请求指定时间内的请求次数超标,若是不在指定时间内,则把当前时间放到队列中,将队列最先的时间弹出。函数
/** * 测试过时时间 */ @RunWith(SpringRunner.class) @SpringBootTest public class RedisExpireTest { private RedisTemplate<String, Object> redisTemplate; @Autowired void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) { this.redisTemplate = redisTemplate; } /** * 设置ip访问频率,每分钟只能访问2次 */ @Test public void test() { String key = "rate.limiting:localhost"; List<String> keys = new ArrayList<>(); keys.add(key); // 判断key是否存在 if (redisTemplate.countExistingKeys(keys) > 0) { // 若是存在就将这个ip的访问计数器+1 long times = redisTemplate.opsForValue().increment(key); // 若是次数大于2,则提示操做了访问频率 if (times > 2) { System.out.println("超过了访问频率。"); } } else { // 若是key不存在,则建立这个ip的计数器 redisTemplate.opsForValue().increment(key); // 把计数器的超时时间设置成60秒 // 这种作法,在当前分钟的最后一秒和下一分钟的第一秒,能够连续访问 2 + 2 = 4次 redisTemplate.expire(key, 60, TimeUnit.SECONDS); } } @Test public void test1() { String key = "1rate.limiting:localhost"; long len = redisTemplate.opsForList().size(key); // 一分钟内不可请求超过2次 if (len < 2) { // 保存当前时间 redisTemplate.opsForList().leftPush(key, System.currentTimeMillis()); } else { // 取出上一次的时间 long time = (long) redisTemplate.opsForList().index(key, -1); // 已经请求了2次,但与上次请求的时间的间隔不够1分钟 if (System.currentTimeMillis() - time < 60000) { System.out.println("超过了访问频率。"); } else { // 保存当前时间 redisTemplate.opsForList().leftPush(key, System.currentTimeMillis()); // 裁剪队列 redisTemplate.opsForList().trim(key, 0, 1); } } } }
/** * 测试Sort * * @author xuepeng */ @RunWith(SpringRunner.class) @SpringBootTest() public class RedisSortTest { private RedisTemplate<String, String> redisTemplate; @Autowired void setRedisTemplate(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } /** * 调用Sort排序 */ @Test public void test() { SortQuery<String> query = SortQueryBuilder.sort("mylist").build(); List<String> result = redisTemplate.sort(query); System.out.println(result); } /** * 调用Sort + By排序 */ @Test public void test1() { SortQuery<String> query = SortQueryBuilder.sort("sortbylist") .by("itemscore:*") .order(SortParameters.Order.DESC) .limit(0, -1).build(); List<String> result = redisTemplate.sort(query); System.out.println(result); } /** * 调用Sort + By + Get排序 */ @Test public void test2() { SortQuery<String> query = SortQueryBuilder.sort("tag:ruby:posts") .by("post:*->time") .order(SortParameters.Order.DESC) .limit(0, -1) .get("post:*->title") .get("post:*->time") .get("#") .build(); List<String> result = redisTemplate.sort(query); System.out.println(result); } }
经过brpop函数消费队列中的数据,若是超过了指定的阻塞时间尚未消费到数据,则取消消费。post
brpop能够同时消费多个队列,顺序是从左到右,根据此特性能够实现优先级队列。测试
/** * 阻塞队列 * * @author xuepeng */ @RunWith(SpringRunner.class) @SpringBootTest() public class RedisQueueTest { private static Lock lock = new ReentrantLock(); private RedisTemplate<String, String> redisTemplate; @Autowired void setRedisTemplate(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } @Test public void test() throws InterruptedException { lock.lockInterruptibly(); RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory(); RedisConnection connection = connectionFactory.getConnection(); try { while (true) { List<byte[]> results = connection.bRPop(60, "queue".getBytes()); if (!CollectionUtils.isEmpty(results)) { for (byte[] bytes : results) { System.out.println(new String(bytes)); } } } } finally { lock.unlock(); RedisConnectionUtils.releaseConnection(connection, connectionFactory); } } }
加入监听,监听message这个key。ui
/** * 配置消费者,参数是实现了MessageListener的对象。 */ @Bean public MessageListenerAdapter adapter(RedisMessage message) { // onMessage 若是RedisMessage 中 没有实现接口,这个参数必须跟RedisMessage中的读取信息的方法名称同样 return new MessageListenerAdapter(message, "message"); } /** * 配置监听容器。 */ @Bean public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory, MessageListenerAdapter adapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //监听对应的channel container.addMessageListener(adapter, new PatternTopic("message")); return container; }
消息提供者this
/** * 测试发布订阅 */ @RunWith(SpringRunner.class) @SpringBootTest() public class RedisMQTest { private RedisTemplate<String, String> redisTemplate; @Autowired void setRedisTemplate(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } /** * 发送消息到Redis的MQ。 */ @Test public void test() { redisTemplate.convertAndSend("message", "hello world"); } }
消息消费者
/** * 消费Redis的MQ */ @Component public class RedisMessage implements MessageListener { private RedisTemplate<String, String> redisTemplate; @Autowired void setRedisTemplate(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } @Override public void onMessage(Message message, byte[] pattern) { RedisSerializer<String> serializer = redisTemplate.getStringSerializer(); String msg = serializer.deserialize(message.getBody()); System.out.println("接收到的消息是:" + msg); } }
若是有批量操做redis的场景,能够使用pipeline,减小了屡次链接redis的开销。
package cc.xuepeng.ch06; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.StringRedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.SessionCallback; import org.springframework.test.context.junit4.SpringRunner; import java.util.List; /** * 测试管道发布 */ @RunWith(SpringRunner.class) @SpringBootTest() public class RedisPipelineTest { private RedisTemplate<String, String> redisTemplate; @Autowired void setRedisTemplate(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; } /** * 发送消息到Redis的MQ。 */ @Test public void test() { List<Object> redisResult = redisTemplate.executePipelined(new RedisCallback<String>() { @Override public String doInRedis(RedisConnection redisConnection) throws DataAccessException { for (int i = 0; i < 1000; i++) { redisConnection.incr("incr:test".getBytes()); } return null; } }); System.out.println(redisResult); } }