SpringDataRedis事务 专题

 

5.10.1. @Transactional Support
Transaction Support is disabled by default and has to be explicitly enabled for each RedisTemplate in use by setting setEnableTransactionSupport(true).
This will force binding the RedisConnection in use to the current Thread triggering MULTI.
If the transaction finishes without errors, EXEC is called, otherwise DISCARD.
Once in MULTI, RedisConnection would queue write operations, all readonly operations, such as KEYS are piped to a fresh (non thread bound) RedisConnection.css

/** Sample Configuration **/
@Configuration
public class RedisTxContextConfiguration {
  @Bean
  public StringRedisTemplate redisTemplate() {
    StringRedisTemplate template = new StringRedisTemplate(redisConnectionFactory());
    // explicitly enable transaction support
    template.setEnableTransactionSupport(true);
    return template;
  }

  @Bean
  public PlatformTransactionManager transactionManager() throws SQLException {
    return new DataSourceTransactionManager(dataSource());
  }

  @Bean
  public RedisConnectionFactory redisConnectionFactory( // jedis, lettuce, srp,... );

  @Bean
  public DataSource dataSource() throws SQLException { // ... }
}
/** Usage Constrainsts **/

// executed on thread bound connection
template.opsForValue().set("foo", "bar");

// read operation executed on a free (not tx-aware)
connection template.keys("*");

// returns null as values set within transaction are not visible
template.opsForValue().get("foo");

 

 

    public Long leftPush(V value) {
        return this.ops.leftPush(this.getKey(), value);
    }
public Long leftPush(K key, V value) {
        final byte[] rawKey = this.rawKey(key);
        final byte[] rawValue = this.rawValue(value);
        return (Long)this.execute(new RedisCallback() { public Long doInRedis(RedisConnection connection) {
                return connection.lPush(rawKey, new byte[][]{rawValue});
            }
        }, true);
    }
    <T> T execute(RedisCallback<T> callback, boolean b) {
        return this.template.execute(callback, b);
    }
    public <T> T execute(RedisCallback<T> action, boolean exposeConnection) {
        return this.execute(action, exposeConnection, false);
    }
 public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
        Assert.isTrue(this.initialized, "template not initialized; call afterPropertiesSet() before using it");
        Assert.notNull(action, "Callback object must not be null");
        RedisConnectionFactory factory = this.getConnectionFactory();
        RedisConnection conn = null;

        Object var11;
        try {
            if(this.enableTransactionSupport) {
                conn = RedisConnectionUtils.bindConnection(factory, this.enableTransactionSupport);
            } else {
                conn = RedisConnectionUtils.getConnection(factory);
            }

            boolean existingConnection = TransactionSynchronizationManager.hasResource(factory);
            RedisConnection connToUse = this.preProcessConnection(conn, existingConnection);
            boolean pipelineStatus = connToUse.isPipelined();
            if(pipeline && !pipelineStatus) {
                connToUse.openPipeline();
            }

            RedisConnection connToExpose = exposeConnection?connToUse:this.createRedisConnectionProxy(connToUse);
            Object result = action.doInRedis(connToExpose);
            if(pipeline && !pipelineStatus) {
                connToUse.closePipeline();
            }

            var11 = this.postProcessResult(result, connToUse, existingConnection);
        } finally {
            if(!this.enableTransactionSupport) {
                RedisConnectionUtils.releaseConnection(conn, factory);
            }

        }

        return var11;
    }

本文主要讲述如何在java里头使用redis进行cas操做。其实呢,redis不像memcached那样显示地支持cas操做,不过它有事务的概念。html

准备

redis的乐观锁支持

Redis经过使用WATCH, MULTI, and EXEC组成的事务来实现乐观锁(注意没有用DISCARD),Redis事务没有回滚操做。在SpringDataRedis当中经过RedisTemplate的SessionCallback中来支持(不然事务不生效)。discard的话不须要本身代码处理,callback返回null,成的话,返回非null,依据这个来判断事务是否成功(没有抛异常)。github

实例

   @Test
    public void cas() throws InterruptedException, ExecutionException {
        String key = "test-cas-1";
        ValueOperations<String, String> strOps = redisTemplate.opsForValue();
        strOps.set(key, "hello");
        ExecutorService pool  = Executors.newCachedThreadPool();
        List<Callable<Object>> tasks = new ArrayList<>();
        for(int i=0;i<5;i++){
            final int idx = i;
            tasks.add(new Callable() {
                @Override
                public Object call() throws Exception {
                    return redisTemplate.execute(new SessionCallback() {
                        @Override
                        public Object execute(RedisOperations operations) throws DataAccessException {
                            operations.watch(key);
                            String origin = (String) operations.opsForValue().get(key);
                            operations.multi();
                            operations.opsForValue().set(key, origin + idx);
                            Object rs = operations.exec();
                            System.out.println("set:"+origin+idx+" rs:"+rs);
                            return rs;
                        }
                    });
                }
            });
        }
        List<Future<Object>> futures = pool.invokeAll(tasks);
        for(Future<Object> f:futures){
            System.out.println(f.get());
        }
        pool.shutdown();
        pool.awaitTermination(1000, TimeUnit.MILLISECONDS);
    }

 

 

输出web

set:hello2 rs:null set:hello3 rs:[] set:hello1 rs:null set:hello4 rs:null set:hello0 rs:null

查看该值redis

127.0.0.1:6379> get test-cas-1 "\"hello3\""

SessionCallback

没有在SessionCallback里头执行watch、multi、exec,而是本身单独写算法

与数据库事务的混淆

template.setEnableTransactionSupport(true);
这个应该是支持数据库的事务成功才执行的意思。由于Spring默认的事务,都是基于DB事务的
   /**
     * Gets a Redis connection. Is aware of and will return any existing corresponding connections bound to the current
     * thread, for example when using a transaction manager. Will create a new Connection otherwise, if
     * {@code allowCreate} is <tt>true</tt>.
     * 
     * @param factory connection factory for creating the connection
     * @param allowCreate whether a new (unbound) connection should be created when no connection can be found for the
     *          current thread
     * @param bind binds the connection to the thread, in case one was created
     * @param enableTransactionSupport
     * @return an active Redis connection
     */
    public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind,
            boolean enableTransactionSupport) {

        Assert.notNull(factory, "No RedisConnectionFactory specified");

        RedisConnectionHolder connHolder = (RedisConnectionHolder) TransactionSynchronizationManager.getResource(factory);

        if (connHolder != null) {
            if (enableTransactionSupport) {
                potentiallyRegisterTransactionSynchronisation(connHolder, factory);
            }
            return connHolder.getConnection();
        }

        if (!allowCreate) {
            throw new IllegalArgumentException("No connection found and allowCreate = false");
        }

        if (log.isDebugEnabled()) {
            log.debug("Opening RedisConnection");
        }

        RedisConnection conn = factory.getConnection();

        if (bind) {

            RedisConnection connectionToBind = conn;
            if (enableTransactionSupport && isActualNonReadonlyTransactionActive()) {
                connectionToBind = createConnectionProxy(conn, factory);
            }

            connHolder = new RedisConnectionHolder(connectionToBind);

            TransactionSynchronizationManager.bindResource(factory, connHolder);
            if (enableTransactionSupport) {
                potentiallyRegisterTransactionSynchronisation(connHolder, factory);
            }

            return connHolder.getConnection();
        }

        return conn;
    }

 

不要跟本文的乐观锁说的事务混淆在一块儿。spring

参考

https://segmentfault.com/a/1190000004393573

org.springframework.data.redis.core.RedisTemplate

 public <T> T execute(RedisCallback<T> action, boolean exposeConnection, boolean pipeline) {
        Assert.isTrue(this.initialized, "template not initialized; call afterPropertiesSet() before using it");
        Assert.notNull(action, "Callback object must not be null");
        RedisConnectionFactory factory = this.getConnectionFactory();
        RedisConnection conn = null;

        Object var11;
        try {
            if(this.enableTransactionSupport) {
 conn = RedisConnectionUtils.bindConnection(factory, this.enableTransactionSupport);
//这个是否支持的开关能够在@Configuration中配置:
}
else { conn = RedisConnectionUtils.getConnection(factory); } boolean existingConnection = TransactionSynchronizationManager.hasResource(factory); RedisConnection connToUse = this.preProcessConnection(conn, existingConnection); boolean pipelineStatus = connToUse.isPipelined(); if(pipeline && !pipelineStatus) { connToUse.openPipeline(); } RedisConnection connToExpose = exposeConnection?connToUse:this.createRedisConnectionProxy(connToUse); Object result = action.doInRedis(connToExpose); if(pipeline && !pipelineStatus) { connToUse.closePipeline(); } var11 = this.postProcessResult(result, connToUse, existingConnection); } finally { if(!this.enableTransactionSupport) { RedisConnectionUtils.releaseConnection(conn, factory); } } return var11; }

 

    @Bean
    public RedisTemplate redisTemplate(){
        RedisTemplate<StringRedisSerializer, Serializable> rt = new RedisTemplate<>();
        rt.setConnectionFactory(jedisConnectionFactory());
        StringRedisSerializer stringSerializer = new StringRedisSerializer();
        JdkSerializationRedisSerializer jdkSerializationRedisSerializer = new JdkSerializationRedisSerializer();
        RedisHashKeySerializer redisHashKeySerializer = new RedisHashKeySerializer();
        rt.setKeySerializer(stringSerializer);
        rt.setValueSerializer(jdkSerializationRedisSerializer);
        rt.setHashKeySerializer(redisHashKeySerializer);
        rt.setHashValueSerializer(jdkSerializationRedisSerializer);
        rt.afterPropertiesSet();
 rt.setEnableTransactionSupport(true); return rt;
    }

 

 

org.springframework.data.redis.core.RedisConnectionUtils

 

    public static RedisConnection doGetConnection(RedisConnectionFactory factory, boolean allowCreate, boolean bind, boolean enableTransactionSupport) {
        Assert.notNull(factory, "No RedisConnectionFactory specified");
        RedisConnectionUtils.RedisConnectionHolder connHolder = (RedisConnectionUtils.RedisConnectionHolder)TransactionSynchronizationManager.getResource(factory);
        if(connHolder != null) {
            if(enableTransactionSupport) {
                potentiallyRegisterTransactionSynchronisation(connHolder, factory);
            }

            return connHolder.getConnection();
        } else if(!allowCreate) {
            throw new IllegalArgumentException("No connection found and allowCreate = false");
        } else {
            if(log.isDebugEnabled()) {
                log.debug("Opening RedisConnection");
            }

            RedisConnection conn = factory.getConnection();
            if(bind) {
                RedisConnection connectionToBind = conn;
                if(enableTransactionSupport && isActualNonReadonlyTransactionActive()) {
                    connectionToBind = createConnectionProxy(conn, factory);
                }

                connHolder = new RedisConnectionUtils.RedisConnectionHolder(connectionToBind);
                TransactionSynchronizationManager.bindResource(factory, connHolder);
                if(enableTransactionSupport) {
                    potentiallyRegisterTransactionSynchronisation(connHolder, factory);
                }

                return connHolder.getConnection();
            } else {
                return conn;
            }
        }
    }

 

 

private static void potentiallyRegisterTransactionSynchronisation(RedisConnectionUtils.RedisConnectionHolder connHolder, RedisConnectionFactory factory) {
        if(isActualNonReadonlyTransactionActive() && !connHolder.isTransactionSyncronisationActive()) {
            connHolder.setTransactionSyncronisationActive(true);
            RedisConnection conn = connHolder.getConnection();
            conn.multi();
            TransactionSynchronizationManager.registerSynchronization(new RedisConnectionUtils.RedisTransactionSynchronizer(connHolder, conn, factory));
        }

    }

 

 

RedisTemplate  api详解

1. RedisTemplate的事务

    private boolean enableTransactionSupport = false;
    private boolean exposeConnection = false;
    private boolean initialized = false;
    private boolean enableDefaultSerializer = true;
    private RedisSerializer<?> defaultSerializer = new JdkSerializationRedisSerializer();

    private RedisSerializer keySerializer = null;
    private RedisSerializer valueSerializer = null;
    private RedisSerializer hashKeySerializer = null;
    private RedisSerializer hashValueSerializer = null;
    private RedisSerializer<String> stringSerializer = new StringRedisSerializer();

    private ScriptExecutor<K> scriptExecutor;

    // cache singleton objects (where possible)
    private ValueOperations<K, V> valueOps;
    private ListOperations<K, V> listOps;
    private SetOperations<K, V> setOps;
    private ZSetOperations<K, V> zSetOps;

enableTransactionSupport:是否启用事务支持。
在代码中搜索下用到这个变量的地方,会看到,在调用RedisCallback以前,有一行代码是若是启用事务支持,那么conn = RedisConnectionUtils.bindConnection(factory, enableTransactionSupport),
也就是说,系统自动帮咱们拿到了事务中绑定的链接
能够在一个方法的屡次对Redis增删该查中,始终使用同一个链接。可是,即便使用了一样的链接,没有进行connection.multi()和connection.exec(),依然是没法启用事务的。

我没有仔细的查阅代码,可是能够知道的是,Spring已经对这个,给了咱们一个更好的支持:@Transactional 

在调用RedisTempalte中的execute()方法的地方,加入这个注解(是spring包下面提供的,不要引用成rt包下的注解),能让这个方法中的全部execute,自动加入multi()以及异常的回滚或者是正常运行时候的提交!

 

2. RedisTempalte的Serializer

用过jedis操做的都知道,全部connection的操做方法,都是传入字节数组。那么,将一个对象和字节相互转换,就须要经过序列化和反序列化。

模版方法中,Spring提供了默认的StringSerializer和JdkSerializer,第一个很简单,就是经过String.getBytes()来实现的。并且在Redis中,全部存储的值都是字符串类型的。因此这种方法保存后,经过Redis-cli控制台,是能够清楚的查看到咱们保存了什么key,value是什么。可是对于JdkSerializationRedisSerializer来讲,这个序列化方法就是Jdk提供的了。首先要求咱们要被序列化的类继承自Serializeable接口,而后经过,而后经过Jdk对象序列化的方法保存。(注:这个序列化保存的对象,即便是个String类型的,在redis控制台,也是看不出来的,由于它保存了一些对象的类型什么的额外信息,)

 

这么一长串,其实就是一个int类型的123。

 

keySerializer:这个是对key的默认序列化器。默认值是StringSerializer。

valueSerializer:这个是对value的默认序列化器,默认值是取自DefaultSerializer的JdkSerializationRedisSerializer。

hashKeySerializer:对hash结构数据的hashkey序列化器,默认值是取自DefaultSerializer的JdkSerializationRedisSerializer。

hashValueSerializer:对hash结构数据的hashvalue序列化器,默认值是取自DefaultSerializer的JdkSerializationRedisSerializer。

 

除此以外,咱们在该类中,还发现了valueOps和hashOps等操做类,这是spring给咱们提供的能够直接使用来操做Redis的类,很是方便。下一篇咱们将讲解这些类。

http://www.cnblogs.com/luochengqiuse/p/4640932.html?utm_source=tuicool&utm_medium=referral

 

把 Redis 看成数据库的用例
如今咱们来看看在服务器端 Java 企业版系统中把 Redis 看成数据库的各类用法吧。不管用例的简繁,Redis 都能帮助用户优化性能、处理能力和延迟,让常规 Java 企业版技术栈望而却步。

1. 全局惟一增量计数器
咱们先从一个相对简单的用例开始吧:一个增量计数器,可显示某网站受到多少次点击。Spring Data Redis 有两个适用于这一实用程序的类:RedisAtomicInteger 和 RedisAtomicLong。和 Java 并发包中的 AtomicInteger 和 AtomicLong 不一样的是,这些 Spring 类能在多个 JVM 中发挥做用。

列表 1:全局惟一增量计数器

RedisAtomicLong counter = new RedisAtomicLong("UNIQUE_COUNTER_NAME", redisTemplate.getConnectionFactory()); 
Long myCounter = counter.incrementAndGet();// return the incremented value

请注意整型溢出并谨记,在这两个类上进行操做须要付出相对较高的代价。

2. 全局悲观锁
时不时的,用户就得应对服务器集群的争用。假设你从一个服务器集群运行一个预约做业。在没有全局锁的状况下,集群中的节点会发起冗余做业实例。假设某个聊天室分区可容纳 50 人。若是聊天室已满,就须要建立新的聊天室实例来容纳另外 50 人。

若是检测到聊天室已满但没有全局锁,集群中的各个节点就会建立自有的聊天室实例,为整个系统带来不可预知的因素。列表 2 介绍了应当如何充分利用 SETNX(SET if **N**ot e**X**ists:若是不存在,则设置)这一 Redis 命令来执行全局悲观锁。

列表2:全局悲观锁

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundValueOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;


@Service
public class DistributedLockRedis {

    public static final Logger LOGGER = LoggerFactory.getLogger(DistributedLockRedis.class);

    public static final String DISTRIBUTED_LOCK_REDIS_KEY = "distributed:lock:redis:";

    @Autowired
    private RedisTemplate<String, Long> redisTemplate;

    /**
     * 因为Redis是单线程模型,命令操做原子性,因此利用这个特性能够很容易的实现分布式锁。
     * 得到一个锁
     *
     * @param bizName     业务名
     * @param lockTimeout 线程占用锁的时间
     * @param unit        单位
     * @throws InterruptedException 锁能够被中断
     */
    public void lock(String bizName, int lockTimeout, TimeUnit unit) throws InterruptedException {
//        redisTemplate.getConnectionFactory().getConnection().setNX()
        String redisKey;
        if (StringUtils.isBlank(bizName)) {
            LOGGER.warn("is not recommended!");
            redisKey = DISTRIBUTED_LOCK_REDIS_KEY;
        } else {
            redisKey = DISTRIBUTED_LOCK_REDIS_KEY + bizName.trim();
        }
        BoundValueOperations<String, Long> valueOps = redisTemplate.boundValueOps(redisKey);
        while (true) {
            // https://redis.io/commands/setnx
            long currentTimeMillis = System.currentTimeMillis();
            long releaseLockTime = currentTimeMillis + unit.toMillis(lockTimeout) + 1;
            //这两个if else不能混写,由于多个相同类型的线程竞争锁时,在锁超时时,设置的超时时间是同样的
            if (valueOps.setIfAbsent(releaseLockTime)) {//第一次获取锁
                redisTemplate.expire(redisKey, lockTimeout, unit);
                return;
            } else if (currentTimeMillis > valueOps.get()) {//锁已经超时
                //若是其它线程占用锁,再从新设置的时间和原来时间的时间差,能够忽略
                Long lockCurrentValue = valueOps.getAndSet(releaseLockTime);
                //若是当前时间小于LockKey存放的时间,说明已经有其它线程加锁
                if (currentTimeMillis > lockCurrentValue) {
                    redisTemplate.expire(redisKey, lockTimeout, unit);
                    return;
                }
            } else {
                TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(100, 1000));
            }

        }

    }


}

 

Set key to hold string value if key does not exist. In that case, it is equal to SET. When key already holds a value, no operation is performed. SETNX is short for "SET if Not eXists".
Return value
Integer reply, specifically:
1 if the key was set
0 if the key was not set
Examples
redis> SETNX mykey "Hello"
(integer) 1
redis> SETNX mykey "World"
(integer) 0
redis> GET mykey
"Hello"
redis>

 

 

How to do set nx option using RedisTemplate?

The method name is setIfAbsent
setIfAbsent
Boolean setIfAbsent(V value)
Set the bound key to hold the string value if the bound key is absent.
Parameters:
value -
See Also:
Redis Documentation: SETNX

http://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/BoundValueOperations.html#setIfAbsent-V-

 

用如下Python代码来实现上述的使用 SETNX 命令做分布式锁的算法。

LOCK_TIMEOUT = 3
lock = 0
lock_timeout = 0
lock_key = 'lock.foo'

# 获取锁
while lock != 1:
    now = int(time.time())
    lock_timeout = now + LOCK_TIMEOUT + 1
    lock = redis_client.setnx(lock_key, lock_timeout)
    if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout)):
        break
    else:
        time.sleep(0.001)

# 已得到锁
do_job()

# 释放锁
now = int(time.time())
if now < lock_timeout:
    redis_client.delete(lock_key)

http://blog.csdn.net/lihao21/article/details/49104695

 

 

若是使用关系数据库,一旦最早生成锁的程序意外退出,锁就可能永远得不到释放。Redis 的 EXPIRE 设置可确保在任何状况下释放锁。

3. 位屏蔽(Bit Mask)
假设 web 客户端须要轮询一台 web 服务器,针对某个数据库中的多个表查询客户指定更新内容。若是盲目地查询全部相应的表以寻找潜在更新,成本较高。为了不这一作法,能够尝试在 Redis 中给每一个客户端保存一个整型做为脏指标,整型的每一个数位表示一个表。该表中存在客户所需更新时,设置数位。轮询期间,不会触发对表的查询,除非设置了相应数位。就获取并将这样的位屏蔽设置为 STRING 而言,Redis 很是高效。

4. 排行榜(Leaderboard)
Redis 的 ZSET 数据结构为游戏玩家排行榜提供了简洁的解决方案。ZSET 的工做方式有些相似于 Java 中的 PriorityQueue,各个对象均为通过排序的数据结构,层次分明。能够按照分数排出游戏玩家在排行榜上的位置。Redis 的 ZSET 定义了一分内容丰富的命令列表,支持灵活有效的查询。例如,ZRANGE(包括 ZREVRANGE)可返回有序集内的指定范围要素。

你可使用这一命令列出排行榜前 100 名玩家。ZRANGEBYSCORE 返回指定分数范围内的要素(例如列出得分为 1000 至 2000 之间的玩家),ZRNK 则返回有序集内的要素的排名,诸如此类。

5. 布隆(Bloom)过滤器
布隆过滤器 (Bloom filter) 是一种空间利用率较高的几率数据结构,用来测试某元素是否某个集的一员。可能会出现误报匹配,但不会漏报。查询可返回“可能在集内”或“确定不在集内”。

就在线服务和离线服务包括大数据分析等方面,布隆过滤器数据结构都能派上不少用场。Facebook 利用布隆过滤器进行输入提示搜索,为用户输入的查询提取朋友和朋友的朋友。Apache HBase 则利用布隆过滤器过滤掉不包含特殊行或列的 HFile 块磁盘读取,使读取速度获得明显提高。Bitly 用布隆过滤器来避免将用户重定向到恶意网站,而 Quara 则在订阅后端执行了一个切分的布隆过滤器,用来过滤掉以前查看过的内容。在我本身的项目里,我用布隆过滤器追踪用户对各个主题的投票状况。

借助出色的速度和处理能力,Redis 极好地融合了布隆过滤器。搜索 GitHub,就能发现不少 Redis 布隆过滤器项目,其中一些还支持可调谐精度。

6. 高效的全局通知:发布/订阅渠道
Redis 发布/订阅渠道的工做方式相似于一个扇出消息传递系统,或 JMS 语义中的一个主题。JMS 主题和 Redis 发布/订阅渠道的一个区别是,经过 Redis 发布的消息并不持久。消息被推送给全部相连的客户端后,Redis 上就会删除这一消息。换句话说,订阅者必须一直在线才能接收新消息。Redis 发布/订阅渠道的典型用例包括实时配置分布、简单的聊天服务器等。

在 web 服务器集群中,每一个节点均可以是 Redis 发布/订阅渠道的一个订阅者。发布到渠道上的消息也会被即时推送到全部相连节点。这一消息能够是某种配置更改,也能够是针对全部在线用户的全局通知。和恒定轮询相比,这种推送沟通模式显然极为高效。

Redis 性能优化
Redis 很是强大,但也能够从总体上和根据特定编程场景作出进一步优化。能够考虑如下技巧。

存活时间
全部 Redis 数据结构都具有存活时间 (TTL) 属性。当你设置这一属性时,数据结构会在过时后自动删除。充分利用这一功能,可让 Redis 保持较低的内存损耗。

管道技术
在一条请求中向 Redis 发送多个命令,这种方法叫作管道技术。这一技术节省了网络往返的成本,这一点很是重要,由于网络延迟可能比 Redis 延迟要高上好几个量级。但这里存在一个陷阱:管道中的 Redis 命令列表必须预先肯定,而且应当彼此独立。若是一个命令的参数是由先前命令的结果计算得出,管道技术就不起做用。列表 3 给出了 Redis 管道技术的一个示例。

列表 3:管道技术

@Override
public List<LeaderboardEntry> fetchLeaderboard(String key, String... playerIds) {    
   final List<LeaderboardEntry> entries = new ArrayList<>();
    redisTemplate.executePipelined(new RedisCallback<Object>() {    // enable Redis Pipeline        
    @Override 
        public Object doInRedis(RedisConnection connection) throws DataAccessException { 
            for(String playerId : playerIds) {
                Long rank = connection.zRevRank(key.getBytes(), playerId.getBytes());
                Double score = connection.zScore(key.getBytes(), playerId.getBytes());
                LeaderboardEntry entry = new LeaderboardEntry(playerId, 
                score!=null?score.intValue():-1, rank!=null?rank.intValue():-1);
                entries.add(entry);
            }        
            return null; 
        }
    }); 
    return entries; 
}

副本集和切分
Redis 支持主从副本配置。和 MongoDB 同样,副本集也是不对称的,由于从节点是只读的,以便共享读取工做量。
我在文章开头提到过,也能够执行切分来横向扩展 Redis 的处理能力和存储容量。
事实上,Redis 很是强大,据亚马逊公司的内部基准显示,类型 r3.4xlarge 的一个 EC2 实例每秒可轻松处理 100000 次请求。传说还有把每秒 700000 次请求做为基准的。
对于中小型应用程序,一般无需考虑 Redis 切分。
(请参见这篇很是出色的文章《运行中的 Redis》https://www.manning.com/books/redis-in-action,进一步了解 Redis 的性能优化和切分http://www.oneapm.com/brand/apm.html。)

Redis 中的事务

Redis 并不像关系数据库管理系统那样能支持全面的 ACID 事务,但其自有的事务也很是有效。从本质上来讲,Redis 事务是管道、乐观锁、肯定提交和回滚的结合。其思想是执行一个管道中的一个命令列表,而后观察某一关键记录的潜在更新(乐观锁)。根据所观察的记录是否会被另外一个进程更新,该命令列表或总体肯定提交,或彻底回滚。

下面以某个拍卖网站上的卖方库存为例。买方试图从卖方处购买某件商品时,你负责观察 Redis 事务内的卖方库存变化。同时,你要从同一个库存中删除此商品。事务关闭前,若是库存被一个以上进程触及(例如,若是两个买方同时购买了同一件商品),事务将回滚,不然事务会肯定提交。回滚后可开始重试。

Spring Data Redis 中的事务陷阱

我在 Spring 的 RedisTemplate 类 redisTemplate.setEnableTransactionSupport(true); 中启用 Redis 事务时获得一个惨痛的教训:Redis 会在运行几天后开始返回垃圾数据,致使数据严重损坏。StackOverflow上也报道了相似状况。

在运行一个 monitor 命令后,个人团队发现,在进行 Redis 操做或 RedisCallback 后,Spring 并无自动关闭 Redis 链接,而事实上它是应该关闭的。若是再次使用未关闭的链接,可能会从意想不到的 Redis 密钥返回垃圾数据。有意思的是,若是在 RedisTemplate 中把事务支持设为 false,这一问题就不会出现了。

咱们发现,咱们能够先在 Spring 语境里配置一个 PlatformTransactionManager(例如 DataSourceTransactionManager),而后再用 @Transactional 注释来声明 Redis 事务的范围,让 Spring 自动关闭 Redis 链接。

根据这一经验,咱们相信,在 Spring 语境里配置两个单独的 RedisTemplate 是很好的作法:其中一个 RedisTemplates 的事务设为 false,用于大多数 Redis 操做,另外一个 RedisTemplates 的事务已激活,仅用于 Redis 事务。固然必需要声明 PlatformTransactionManager 和 @Transactional,以防返回垃圾数值。

另外,咱们还发现了 Redis 事务和关系数据库事务(在本例中,即 JDBC)相结合的不利之处。混合型事务的表现和预想的不太同样。

I learned a hard lesson when enabling Redis transactions in the Spring RedisTemplate class redisTemplate.setEnableTransactionSupport(true);: 
Redis started returning junk data after running for a few days, causing serious data corruption. A similar case was reported on StackOverflow.

By running a monitor command, my team discovered that after a Redis operation or RedisCallback, Spring doesn't close the Redis connection automatically, as it should do. 
Reusing an unclosed connection may return junk data from an unexpected key in Redis. Interestingly, this issue doesn't show up when transaction support is set to false in RedisTemplate.

We discovered that we could make Spring close Redis connections automatically by configuring a PlatformTransactionManager (such as DataSourceTransactionManager) in the Spring context, 
then using the @Transactional annotation to declare the scope of Redis transactions.

Based on this experience, we believe it's good practice to configure two separate RedisTemplates in the Spring context: 
One with transaction set to false is used on most Redis operations; the other with transaction enabled is only applied to Redis transactions. 
Of course PlatformTransactionManager and @Transactional must be declared to prevent junk values from being returned.

Moreover, we learned the downside of mixing a Redis transaction with a relational database transaction, in this case JDBC. 
Mixed transactions do not behave as you would expect.

 

结论
我但愿经过这篇文章向其余 Java 企业开发师介绍 Redis 的强大之处,尤为是将 Redis 用做远程数据缓存和用于易挥发数据时。
在这里我介绍了 Redis 的六个有效用例,分享了一些性能优化技巧,还说明了个人 Glu Mobile 团队怎样解决了 Spring Data Redis 事务配置不当形成的垃圾数据问题。
我但愿这篇文章可以激发你对 Redis NoSQL 的好奇心,让你可以受到启发,在本身的 Java 企业版系统里创造出一番天地。

http://blog.oneapm.com/apm-tech/778.html

http://www.javaworld.com/article/3062899/big-data/lightning-fast-nosql-with-spring-data-redis.html?page=2

https://docs.spring.io/spring-data/redis/docs/1.8.6.RELEASE/reference/html/#tx.spring 

 

 

若是是使用编程的方式(一般是基于 Spring Boot 项目)配置 RedisTemplate 的话只需在你的配置类(被@Configuration注解修饰的类)中显式建立 RedisTemplate Bean,设置 Serializer 便可。

@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) {
    RedisTemplate redisTemplate = new RedisTemplate();
    redisTemplate.setConnectionFactory(redisConnectionFactory);

    GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
    redisTemplate.setDefaultSerializer(fastJsonRedisSerializer);//设置默认的Serialize,包含 keySerializer & valueSerializer

    //redisTemplate.setKeySerializer(fastJsonRedisSerializer);//单独设置keySerializer
    //redisTemplate.setValueSerializer(fastJsonRedisSerializer);//单独设置valueSerializer
    return redisTemplate;
}

https://github.com/alibaba/fastjson/wiki/%E5%9C%A8-Spring-%E4%B8%AD%E9%9B%86%E6%88%90-Fastjson

相关文章
相关标签/搜索