Redis的高级特性使用

乐观锁和事务

  • 使用watch监控某一个key;
  • 开启事务进行操做;
  • 操做完成后提交事务;
  • 若在提交前,watch的key发生了变化,提交的事务将不起做用;
/**
 * 测试事务和乐观锁
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisTransactionTest {

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 测试事务。
     * 模拟场景是用户出售商品到商店。
     */
    @Test
    public void test() {
        Assert.assertEquals(listItem(), true);
    }

    boolean listItem() {
        // 乐观锁
        redisTemplate.watch("inventory:17");
        // 判断背包内是否有要出售的物品ItemN,若是没有就结束流程
        if (!redisTemplate.opsForSet().isMember("inventory:17", "ItemN")) {
            redisTemplate.unwatch();
            return false;
        }
        // 启用事务
        redisTemplate.setEnableTransactionSupport(true);
        redisTemplate.multi();
        // 添加到商店
        redisTemplate.opsForZSet().add("market:", "ItemN", 9);
        // 从背包中移除
        redisTemplate.opsForSet().remove("inventory:17", "ItemN");
        // 执行事务
        List<Object> result = redisTemplate.exec();
        // 判断事务是否为空
        if (result == null || result.size() == 0) return false;
        return true;
    }

}

过时时间

使用过时时间特性实现访问次数限制。java

test方法redis

为每一个IP建立一个Key,计数器为Value,过时时间为1分钟,每次请求先判断key是否存在,若是存在就自增计数器,当计数器次数大于指定次数则表明到达限制次数。该方法的缺点是在当前分钟的最后一秒和下一分钟的第一秒,能够连续访问。spring

test1方法ruby

为每个IP建立一个List,List中存放请求时间,每次请求先判断List中的长度是否小于指定次数,若是小于,则表明没有到达限制,将当前请求时间放到List中。ide

若是List中长度大于指定次数,则取出最后一次请求的时间,与当前时间比较,判断是否在指定时间内,若是在时间内,表明请求指定时间内的请求次数超标,若是不在指定时间内,则把当前时间放到队列中,将队列最先的时间弹出。函数

/**
 * 测试过时时间
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisExpireTest {

    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 设置ip访问频率,每分钟只能访问2次
     */
    @Test
    public void test() {
        String key = "rate.limiting:localhost";
        List<String> keys = new ArrayList<>();
        keys.add(key);
        // 判断key是否存在
        if (redisTemplate.countExistingKeys(keys) > 0) {
            // 若是存在就将这个ip的访问计数器+1
            long times = redisTemplate.opsForValue().increment(key);
            // 若是次数大于2,则提示操做了访问频率
            if (times > 2) {
                System.out.println("超过了访问频率。");
            }
        } else {
            // 若是key不存在,则建立这个ip的计数器
            redisTemplate.opsForValue().increment(key);
            // 把计数器的超时时间设置成60秒
            // 这种作法,在当前分钟的最后一秒和下一分钟的第一秒,能够连续访问 2 + 2 = 4次
            redisTemplate.expire(key, 60, TimeUnit.SECONDS);
        }
    }

    @Test
    public void test1() {
        String key = "1rate.limiting:localhost";
        long len = redisTemplate.opsForList().size(key);
        // 一分钟内不可请求超过2次
        if (len < 2) {
            // 保存当前时间
            redisTemplate.opsForList().leftPush(key, System.currentTimeMillis());
        } else {
            // 取出上一次的时间
            long time = (long) redisTemplate.opsForList().index(key, -1);
            // 已经请求了2次,但与上次请求的时间的间隔不够1分钟
            if (System.currentTimeMillis() - time < 60000) {
                System.out.println("超过了访问频率。");
            } else {
                // 保存当前时间
                redisTemplate.opsForList().leftPush(key, System.currentTimeMillis());
                // 裁剪队列
                redisTemplate.opsForList().trim(key, 0, 1);
            }
        }
    }
}

Sort排序

  • sort:test方法中实现,能够对List类型,Set类型,ZSet类型进行排序;
  • by:test1方法中实现,能够经过by链接Hash或String类型作排序字段;
  • get:test2方法中实现,做用是返回get参数指定的值,能够是用于by的对象的值,若是想返回主key的值则能够使用#号;
/**
 * 测试Sort
 *
 * @author xuepeng
 */
@RunWith(SpringRunner.class)
@SpringBootTest()
public class RedisSortTest {

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 调用Sort排序
     */
    @Test
    public void test() {
        SortQuery<String> query = SortQueryBuilder.sort("mylist").build();
        List<String> result = redisTemplate.sort(query);
        System.out.println(result);
    }

    /**
     * 调用Sort + By排序
     */
    @Test
    public void test1() {
        SortQuery<String> query = SortQueryBuilder.sort("sortbylist")
                .by("itemscore:*")
                .order(SortParameters.Order.DESC)
                .limit(0, -1).build();
        List<String> result = redisTemplate.sort(query);
        System.out.println(result);
    }

    /**
     * 调用Sort + By + Get排序
     */
    @Test
    public void test2() {
        SortQuery<String> query = SortQueryBuilder.sort("tag:ruby:posts")
                .by("post:*->time")
                .order(SortParameters.Order.DESC)
                .limit(0, -1)
                .get("post:*->title")
                .get("post:*->time")
                .get("#")
                .build();
        List<String> result = redisTemplate.sort(query);
        System.out.println(result);
    }


}

阻塞队列

经过brpop函数消费队列中的数据,若是超过了指定的阻塞时间尚未消费到数据,则取消消费。post

brpop能够同时消费多个队列,顺序是从左到右,根据此特性能够实现优先级队列。测试

/**
 * 阻塞队列
 *
 * @author xuepeng
 */
@RunWith(SpringRunner.class)
@SpringBootTest()
public class RedisQueueTest {

    private static Lock lock = new ReentrantLock();

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Test
    public void test() throws InterruptedException {
        lock.lockInterruptibly();
        RedisConnectionFactory connectionFactory = redisTemplate.getConnectionFactory();
        RedisConnection connection = connectionFactory.getConnection();

        try {
            while (true) {
                List<byte[]> results = connection.bRPop(60, "queue".getBytes());
                if (!CollectionUtils.isEmpty(results)) {
                    for (byte[] bytes : results) {
                        System.out.println(new String(bytes));
                    }
                }
            }
        } finally {
            lock.unlock();
            RedisConnectionUtils.releaseConnection(connection, connectionFactory);
        }

    }

}

发布/订阅

加入监听,监听message这个key。ui

/**
     * 配置消费者,参数是实现了MessageListener的对象。
     */
    @Bean
    public MessageListenerAdapter adapter(RedisMessage message) {
        // onMessage 若是RedisMessage 中 没有实现接口,这个参数必须跟RedisMessage中的读取信息的方法名称同样
        return new MessageListenerAdapter(message, "message");
    }

    /**
     * 配置监听容器。
     */
    @Bean
    public RedisMessageListenerContainer container(LettuceConnectionFactory connectionFactory,
                                                   MessageListenerAdapter adapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //监听对应的channel
        container.addMessageListener(adapter, new PatternTopic("message"));
        return container;
    }

消息提供者this

/**
 * 测试发布订阅
 */
@RunWith(SpringRunner.class)
@SpringBootTest()
public class RedisMQTest {

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 发送消息到Redis的MQ。
     */
    @Test
    public void test() {
        redisTemplate.convertAndSend("message", "hello world");
    }

}

消息消费者

/**
 * 消费Redis的MQ
 */
@Component
public class RedisMessage implements MessageListener {

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
        String msg = serializer.deserialize(message.getBody());
        System.out.println("接收到的消息是:" + msg);
    }

}

管道

若是有批量操做redis的场景,能够使用pipeline,减小了屡次链接redis的开销。

package cc.xuepeng.ch06;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.StringRedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.List;

/**
 * 测试管道发布
 */
@RunWith(SpringRunner.class)
@SpringBootTest()
public class RedisPipelineTest {

    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    void setRedisTemplate(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 发送消息到Redis的MQ。
     */
    @Test
    public void test() {
        List<Object> redisResult = redisTemplate.executePipelined(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection redisConnection) throws DataAccessException {
                for (int i = 0; i < 1000; i++) {
                    redisConnection.incr("incr:test".getBytes());
                }
                return null;
            }
        });
        System.out.println(redisResult);
    }

}
相关文章
相关标签/搜索