客户端和服务器经过 TCP 链接来进行数据交互, 服务器默认的端口号为 6379 。
客户端和服务器发送的命令或数据一概以 \r\n (CRLF 回车+换行)结尾。html
若是使用 wireshark 对 jedis 抓包:
环境:Jedis 链接到虚拟机 202,运行 main,对 VMnet8 抓包。
过滤条件:ip.dst==192.168.8.202 and tcp.port in {6379}
set qingshan 抓包:java
能够看到实际发出的数据包是:node
*3\r\n$3\r\nSET\r\n$8\r\nqingshan\r\n$4\r\n2673\r\n
get qingshan 抓包:python
*2\r\n$3\r\nGET\r\n$8\r\nqingshan\r\n
客户端跟 Redis 之间 使用一种特殊的编码格式(在 AOF 文件里面咱们看到了),叫作 Redis Serialization Protocol (Redis 序列化协议)。特色:容易实现、解析快、可读性强。客户端发给服务端的消息须要通过编码,服务端收到以后会按约定进行解码,反之亦然。mysql
基于此,咱们能够本身实现一个 Redis 客户端。
参考:myclient.MyClient.java
一、创建 Socket 链接
二、OutputStream 写入数据(发送到服务端)
三、InputStream 读取数据(从服务端接口)
基于这种协议,咱们能够用 Java 实现全部的 Redis 操做命令。固然,咱们不须要这么作,由于已经有不少比较成熟的 Java 客户端,实现了完整的功能和高级特性,而且提供了良好的性能。react
https://redis.io/clients#java
官网推荐的 Java 客户端有 3 个 Jedis,Redisson 和 Luttuce。git
客户端 | 描述 |
---|---|
Jedis | A blazingly small and sane redis java client |
lettuce | Advanced Redis client for thread-safe sync, async, and reactive usage. Supports Cluster, Sentinel,Pipelining, and codecs |
Redisson | distributed and scalable Java data structures on top of Redis server |
Spring 链接 Redis 用的是什么?RedisConnectionFactory 接口支持多种实现,例如 : JedisConnectionFactory 、 JredisConnectionFactory 、LettuceConnectionFactory、SrpConnectionFactory。github
https://github.com/xetorthio/jedis面试
Jedis 是咱们最熟悉和最经常使用的客户端。轻量,简洁,便于集成和改造。redis
public static void main(String[] args) { Jedis jedis = new Jedis("127.0.0.1", 6379); jedis.set("qingshan", "2673"); System.out.println(jedis.get("qingshan")); jedis.close(); }
Jedis 多个线程使用一个链接的时候线程不安全。可使用链接池,为每一个请求建立不一样的链接,基于 Apache common pool 实现。跟数据库同样,能够设置最大链接数等参数。Jedis 中有多种链接池的子类。
例如:
public class ShardingTest { public static void main(String[] args) { JedisPoolConfig poolConfig = new JedisPoolConfig(); // Redis服务器 JedisShardInfo shardInfo1 = new JedisShardInfo("127.0.0.1", 6379); JedisShardInfo shardInfo2 = new JedisShardInfo("192.168.8.205", 6379); // 链接池 List<JedisShardInfo> infoList = Arrays.asList(shardInfo1, shardInfo2); ShardedJedisPool jedisPool = new ShardedJedisPool(poolConfig, infoList); ShardedJedis jedis = null; try{ jedis = jedisPool.getResource(); for(int i=0; i<100; i++){ jedis.set("k"+i, ""+i); } for(int i=0; i<100; i++){ Client client = jedis.getShard("k"+i).getClient(); System.out.println("取到值:"+jedis.get("k"+i)+","+"当前key位于:" + client.getHost() + ":" + client.getPort()); } }finally{ if(jedis!=null) { jedis.close(); } } } }
Jedis 有 4 种工做模式:单节点、分片、哨兵、集群。
3 种请求模式:Client、Pipeline、事务。Client 模式就是客户端发送一个命令,阻塞等待服务端执行,而后读取 返回结果。Pipeline 模式是一次性发送多个命令,最后一次取回全部的返回结果,这种模式经过减小网络的往返时间和 io 读写次数,大幅度提升通讯性能。第三种是事务模式。Transaction 模式即开启 Redis 的事务管理,事务模式开启后,全部的命令(除了 exec,discard,multi 和 watch)到达服务端之后不会当即执行,会进入一个等待队列。
问题:Jedis 链接 Sentinel 的时候,咱们配置的是所有哨兵的地址。Sentinel 是如何返回可用的 master 地址的呢?
在构造方法中:
pool = new JedisSentinelPool(masterName, sentinels);
调用了:
HostAndPort master = initSentinels(sentinels, masterName);
查看:
private HostAndPort initSentinels(Set<String> sentinels, final String masterName) { HostAndPort master = null; boolean sentinelAvailable = false; log.info("Trying to find master from available Sentinels..."); // 有多个 sentinels,遍历这些个 sentinels for (String sentinel : sentinels) { // host:port 表示的 sentinel 地址转化为一个 HostAndPort 对象。 final HostAndPort hap = HostAndPort.parseString(sentinel); log.fine("Connecting to Sentinel " + hap); Jedis jedis = null; try { // 链接到 sentinel jedis = new Jedis(hap.getHost(), hap.getPort()); // 根据 masterName 获得 master 的地址,返回一个 list,host= list[0], port =// list[1] List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName); // connected to sentinel... sentinelAvailable = true; if (masterAddr == null || masterAddr.size() != 2) { log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap + "."); continue; } // 若是在任何一个 sentinel 中找到了 master,再也不遍历 sentinels master = toHostAndPort(masterAddr); log.fine("Found Redis master at " + master); break; } catch (JedisException e) { // resolves #1036, it should handle JedisException there's another chance // of raising JedisDataException log.warning("Cannot get master address from sentinel running @ " + hap + ". Reason: " + e + ". Trying next one."); } finally { if (jedis != null) { jedis.close(); } } } // 到这里,若是 master 为 null,则说明有两种状况,一种是全部的 sentinels 节点都 down 掉了,一种是 master节点没有被存活的 sentinels 监控到 if (master == null) { if (sentinelAvailable) { // can connect to sentinel, but master name seems to not // monitored throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored..."); } else { throw new JedisConnectionException("All sentinels down, cannot determine where is " + masterName + " master is running..."); } } // 若是走到这里,说明找到了 master 的地址 log.info("Redis master running at " + master + ", starting Sentinel listeners..."); // 启动对每一个 sentinels 的监听为每一个 sentinel 都启动了一个监听者 MasterListener。MasterListener 自己是一个线程,它会去订阅 sentinel 上关于 master 节点地址改变的消息。 for (String sentinel : sentinels) { final HostAndPort hap = HostAndPort.parseString(sentinel); MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort()); // whether MasterListener threads are alive or not, process can be stopped masterListener.setDaemon(true); masterListeners.add(masterListener); masterListener.start(); } return master; }
问题:使用 Jedis 链接 Cluster 的时候,咱们只须要链接到任意一个或者多个 redisgroup 中的实例地址,那咱们是怎么获取到须要操做的 Redis Master 实例的?
关键问题:在于如何存储 slot 和 Redis 链接池的关系。
一、程序启动初始化集群环境,读取配置文件中的节点配置,不管是主从,不管多少个,只拿第一个,获取 redis 链接实例(后面有个 break)。
// redis.clients.jedis.JedisClusterConnectionHandler#initializeSlotsCache private void initializeSlotsCache(Set<HostAndPort> startNodes, GenericObjectPoolConfig poolConfig, String password) { for (HostAndPort hostAndPort : startNodes) { // 获取一个 Jedis 实例 Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort()); if (password != null) { jedis.auth(password); } try { // 获取 Redis 节点和 Slot 虚拟槽 cache.discoverClusterNodesAndSlots(jedis); // 直接跳出循环 break; } catch (JedisConnectionException e) { // try next nodes } finally { if (jedis != null) { jedis.close(); } } }
二、用获取的 redis 链接实例执行 clusterSlots ()方法,实际执行 redis 服务端 clusterslots 命令,获取虚拟槽信息。
该集合的基本信息为[long, long, List, List], 第一,二个元素是该节点负责槽点的起始位置,第三个元素是主节点信息,第四个元素为主节点对应的从节点信息。该 list 的基本信息为[string,int,string],第一个为 host 信息,第二个为 port 信息,第三个为惟一id。
三、获取有关节点的槽点信息后,调用 getAssignedSlotArray(slotinfo)来获取全部的槽点值。
四、再获取主节点的地址信息,调用 generateHostAndPort(hostInfo)方法,生成一个 ostAndPort 对象。
五、再根据节点地址信息来设置节点对应的 JedisPool,即设置 Map<String,JedisPool> nodes 的值。
接下来判断若此时节点信息为主节点信息时,则调用 assignSlotsToNodes 方法,设置每一个槽点值对应的链接池,即设置 Map<Integer, JedisPool> slots 的值。
public void discoverClusterNodesAndSlots(Jedis jedis) { w.lock(); try { reset(); // 获取节点集合 List<Object> slots = jedis.clusterSlots(); // 遍历 3 个 master 节点 for (Object slotInfoObj : slots) { // slotInfo 槽开始,槽结束,主,从 // {[0,5460,7291,7294],[5461,10922,7292,7295],[10923,16383,7293,7296]} List<Object> slotInfo = (List<Object>) slotInfoObj; // 若是<=2,表明没有分配 slot if (slotInfo.size() <= MASTER_NODE_INDEX) { continue; } // 获取分配到当前 master 节点的数据槽,例如 7291 节点的{0,1,2,3……5460} List<Integer> slotNums = getAssignedSlotArray(slotInfo); // hostInfos int size = slotInfo.size(); // size 是 4,槽最小最大,主,从 // 第 3 位和第 4 位是主从端口的信息 for (int i = MASTER_NODE_INDEX; i < size; i++) { List<Object> hostInfos = (List<Object>) slotInfo.get(i); if (hostInfos.size() <= 0) { continue; } // 根据 IP 端口生成 HostAndPort 实例 HostAndPort targetNode = generateHostAndPort(hostInfos); // 据HostAndPort解析出ip:port的key值,再根据key从缓存中查询对应的jedisPool实例。若是没有jedisPool实例,就建立 JedisPool 实例,最后放入缓存中。nodeKey 和 nodePool 的关系 setupNodeIfNotExist(targetNode); // 把 slot 和 jedisPool 缓存起来(16384 个),key 是 slot 下标,value 是链接池 if (i == MASTER_NODE_INDEX) { assignSlotsToNode(slotNums, targetNode); } } } finally { w.unlock(); } }
从集群环境存取值:
一、把 key 做为参数,执行 CRC16 算法,获取 key 对应的 slot 值。
二、经过该 slot 值,去 slots 的 map 集合中获取 jedisPool 实例。
三、经过 jedisPool 实例获取 jedis 实例,最终完成 redis 数据存取工做。
咱们看到 set 2 万个 key 用了好几分钟,这个速度太慢了,彻底没有把 Redis 10万的 QPS 利用起来。可是单个命令的执行到底慢在哪里?
Redis 使用的是客户端/服务器(C/S)模型和请求/响应协议的 TCP 服务器。这意味着一般状况下一个请求会遵循如下步骤:
客户端向服务端发送一个查询请求,并监听 Socket 返回,一般是以阻塞模式,等待服务端响应。
服务端处理命令,并将结果返回给客户端。
Redis 客户端与 Redis 服务器之间使用 TCP 协议进行链接,一个客户端能够经过一个 socket 链接发起多个请求命令。每一个请求命令发出后 client 一般会阻塞并等待 redis服务器处理,redis 处理完请求命令后会将结果经过响应报文返回给 client,所以当执行多条命令的时候都须要等待上一条命令执行完毕才能执行。执行过程如图:
Redis 自己提供了一些批量操做命令,好比 mget,mset,能够减小通讯的时间,可是大部分命令是不支持 multi 操做的,例如 hash 就没有.
因为通讯会有网络延迟,假如 client 和 server 之间的包传输时间须要 10 毫秒,一次交互就是 20 毫秒(RTT:Round Trip Time)。这样的话,client 1 秒钟也只能也只能发送 50 个命令。这显然没有充分利用 Redis 的处理能力。另一个,Redis 服务端执行 I/O 的次数过多。
https://redis.io/topics/pipelining
那咱们能不能像数据库的 batch 操做同样,把一组命令组装在一块儿发送给 Redis 服务端执行,而后一次性得到返回结果呢?这个就是 Pipeline 的做用。Pipeline 经过一个队列把全部的命令缓存起来,而后把多个命令在一次链接中发送给服务器。
先来看一下效果(先 flushall):
PipelineSet.java,PipelineGet.java
要实现 Pipeline,既要服务端的支持,也要客户端的支持。对于服务端来讲,须要可以处理客户端经过一个 TCP 链接发来的多个命令,而且逐个地执行命令一块儿返回 。
对于客户端来讲,要把多个命令缓存起来,达到必定的条件就发送出去,最后才处理 Redis 的应答(这里也要注意对客户端内存的消耗)。
jedis-pipeline 的 client-buffer 限制:8192bytes,客户端堆积的命令超过 8192bytes 时,会发送给服务端。
源码:redis.clients.util.RedisOutputStream.java
public RedisOutputStream(final OutputStream out) { this(out, 8192); }
pipeline 对于命令条数没有限制,可是命令可能会受限于 TCP 包大小。
若是 Jedis 发送了一组命令,而发送请求尚未结束,Redis 响应的结果会放在接缓冲区。若是接收缓冲区满了,jedis 会通知 redis win=0,此时 redis 不会再发送结果给 jedis 端,转而把响应结果保存在 Redis 服务端的输出缓冲区中。
输出缓冲区的配置:redis.conf
client-output-buffer-limit
client-output-buffer-limit normal 0 0 0 client-output-buffer-limit replica 256mb 64mb 60 client-output-buffer-limit pubsub 32mb 8mb 60
配置 | 做用 |
---|---|
class | 客户端类型,分为三种。a)normal:普通客户端;b)slave:slave 客户端,用于复制;c)pubsub:发布订阅客户端 |
hard limit | 若是客户端使用的输出缓冲区大于
|
soft limit soft seconds |
若是客户端使用的输出缓冲区超过了
|
每一个客户端使用的输出缓冲区的大小能够用 client list 命令查看
redis> client list
id=5 addr=192.168.8.1:10859 fd=8 name= age=5 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=5 qbuf-free=32763 obl=16380 oll=227 omem=4654408 events=rw cmd=set
Pipeline 适用于什么场景呢?
若是某些操做须要立刻获得 Redis 操做是否成功的结果,这种场景就不适合。
有些场景,例如批量写入数据,对于结果的实时性和成功性要求不高,就能够用Pipeline
原文地址:https://redis.io/topics/distlock
中文地址:http://redis.cn/topics/distlock.html
分布式锁的基本特性或者要求:
一、互斥性:只有一个客户端可以持有锁。
二、不会产生死锁:即便持有锁的客户端崩溃,也能保证后续其余客户端能够获取锁。
三、只有持有这把锁的客户端才能解锁。
distlock.DistLock.java
/** * 尝试获取分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { // set支持多个参数 NX(not exist) XX(exist) EX(seconds) PX(million seconds) String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; }
参数解读:
一、lockKey 是 Redis key 的名称,也就是谁添加成功这个 key 表明谁获取锁成功。
二、requestId 是客户端的 ID(设置成 value),若是咱们要保证只有加锁的客户端才能释放锁,就必须得到客户端的 ID(保证第 3 点)。
三、SET_IF_NOT_EXIST 是咱们的命令里面加上 NX(保证第 1 点)。
四、SET_WITH_EXPIRE_TIME,PX 表明以毫秒为单位设置 key 的过时时间(保证第 2 点)。expireTime 是自动释放锁的时间,好比 5000 表明 5 秒。
释放锁,直接删除 key 来释放锁能够吗?就像这样:
public static void wrongReleaseLock1(Jedis jedis, String lockKey) { jedis.del(lockKey); }
没有对客户端 requestId 进行判断,可能会释放其余客户端持有的锁。
先判断后删除呢?
public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) { // 判断加锁与解锁是否是同一个客户端 if (requestId.equals(jedis.get(lockKey))) { // 若在此时,这把锁忽然不是这个客户端的,则会误解锁 jedis.del(lockKey); } }
若是在释放锁的时候,这把锁已经不属于这个客户端(例如已通过期,而且被别的客户端获取锁成功了),那就会出现释放了其余客户端的锁的状况。
因此咱们把判断客户端是否相等和删除 key 的操做放在 Lua 脚本里面执行。
/** * 释放分布式锁 * @param jedis Redis客户端 * @param lockKey 锁 * @param requestId 请求标识 * @return 是否释放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; }
这个是 Jedis 里面分布式锁的实现。
https://lettuce.io/
与 Jedis 相比,Lettuce 则彻底克服了其线程不安全的缺点:Lettuce 是一个可伸缩的线程安全的 Redis 客户端,支持同步、异步和响应式模式(Reactive)。多个线程能够共享一个链接实例,而没必要担忧多线程并发问题。
同步调用:
public class LettuceSyncTest { public static void main(String[] args) { // 建立客户端 RedisClient client = RedisClient.create("redis://127.0.0.1:6379"); // 线程安全的长链接,链接丢失时会自动重连 StatefulRedisConnection<String, String> connection = client.connect(); // 获取同步执行命令,默认超时时间为 60s RedisCommands<String, String> sync = connection.sync(); // 发送get请求,获取值 sync.set("gupao:sync","lettuce-sync-666" ); String value = sync.get("gupao:sync"); System.out.println("------"+value); //关闭链接 connection.close(); //关掉客户端 client.shutdown(); } }
异步的结果使用 RedisFuture 包装,提供了大量回调的方法。
异步调用:
import io.lettuce.core.RedisClient; import io.lettuce.core.RedisFuture; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisAsyncCommands; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class LettuceASyncTest { public static void main(String[] args) { RedisClient client = RedisClient.create("redis://127.0.0.1:6379"); // 线程安全的长链接,链接丢失时会自动重连 StatefulRedisConnection<String, String> connection = client.connect(); // 获取异步执行命令api RedisAsyncCommands<String, String> commands = connection.async(); // 获取RedisFuture<T> commands.set("gupao:async","lettuce-async-666"); RedisFuture<String> future = commands.get("gupao:async"); try { String value = future.get(60, TimeUnit.SECONDS); System.out.println("------"+value); } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); } } }
它基于 Netty 框架构建,支持 Redis 的高级功能,如 Pipeline、发布订阅,事务、Sentinel,集群,支持链接池。
Lettuce 是 Spring Boot 2.x 默认的客户端,替换了 Jedis。集成以后咱们不须要单独使用它,直接调用 Spring 的 RedisTemplate 操做,链接和建立和关闭也不须要咱们操心。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
https://redisson.org/
https://github.com/redisson/redisson/wiki/目录
Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory
Data Grid),提供了分布式和可扩展的 Java 数据结构。
基于 Netty 实现,采用非阻塞 IO,性能高
支持异步请求
支持链接池、pipeline、LUA Scripting、Redis Sentinel、Redis Cluster
不支持事务,官方建议以 LUA Scripting 代替事务
主从、哨兵、集群都支持。Spring 也能够配置和注入 RedissonClient。
在 Redisson 里面提供了更加简单的分布式锁的实现。
public static void main(String[] args) throws InterruptedException { RLock rLock=redissonClient.getLock("updateAccount"); // 最多等待 100 秒、上锁 10s 之后自动解锁 if(rLock.tryLock(100,10, TimeUnit.SECONDS)){ System.out.println("获取锁成功"); } // do something rLock.unlock(); }
在得到 RLock 以后,只须要一个 tryLock 方法,里面有 3 个参数:
一、watiTime:获取锁的最大等待时间,超过这个时间再也不尝试获取锁
二、leaseTime:若是没有调用 unlock,超过了这个时间会自动释放锁
三、TimeUnit:释放时间的单位
Redisson 的分布式锁是怎么实现的呢?
在加锁的时候,在 Redis 写入了一个 HASH,key 是锁名称,field 是线程名称,value是 1(表示锁的重入次数)
源码:
tryLock()——tryAcquire()——tryAcquireAsync()——tryLockInnerAsync()
最终也是调用了一段 Lua 脚本。里面有一个参数,两个参数的值
占位 | 填充 | 含义 | 实际值 |
---|---|---|---|
KEYS[1] | getName() | 锁的名称(key) | updateAccount |
ARGV[1] | internalLockLeaseTime | 锁释放时间(毫秒) | 10000 |
ARGV[2] | getLockName(threadId) | 线程名称 | b60a9c8c-92f8-4bfe-b0e7-308967346336:1 |
// KEYS[1] 锁名称 updateAccount // ARGV[1] key 过时时间 10000ms // ARGV[2] 线程名称 // 锁名称不存在 if (redis.call('exists', KEYS[1]) == 0) then // 建立一个 hash,key=锁名称,field=线程名,value=1 redis.call('hset', KEYS[1], ARGV[2], 1); // 设置 hash 的过时时间 redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; // 锁名称存在,判断是否当前线程持有的锁 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then // 若是是,value+1,表明重入次数+1 redis.call('hincrby', KEYS[1], ARGV[2], 1); // 从新得到锁,须要从新设置 Key 的过时时间 redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; // 锁存在,可是不是当前线程持有,返回过时时间(毫秒) return redis.call('pttl', KEYS[1])
释放锁,源码:
unlock——unlockInnerAsync
占位 | 填充 | 含义 | 实际值 |
---|---|---|---|
KEYS[1] | getName() | 锁名称 | updateAccount |
KEYS[2] | getChannelName() | 频道名称 | redisson_lock__channel:{updateAccount} |
KEYS[3] | LockPubSub.unlockMessage | 解锁时的消息 | 0 |
KEYS[4] | internalLockLeaseTime | 释放锁的时间 | 10000 |
KEYS[5] | getLockName(threadId) | 线程名称 | b60a9c8c-92f8-4bfe-b0e7-308967346336:1 |
// KEYS[1] 锁的名称 updateAccount // KEYS[2] 频道名称 redisson_lock__channel:{updateAccount} // ARGV[1] 释放锁的消息 0 // ARGV[2] 锁释放时间 10000 // ARGV[3] 线程名称 // 锁不存在(过时或者已经释放了) if (redis.call('exists', KEYS[1]) == 0) then // 发布锁已经释放的消息 redis.call('publish', KEYS[2], ARGV[1]); return 1; end; // 锁存在,可是不是当前线程加的锁 if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end; // 锁存在,是当前线程加的锁 // 重入次数-1 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); // -1 后大于 0,说明这个线程持有这把锁还有其余的任务须要执行 if (counter > 0) then // 从新设置锁的过时时间 redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else // -1 以后等于 0,如今能够删除锁了 redis.call('del', KEYS[1]); // 删除以后发布释放锁的消息 redis.call('publish', KEYS[2], ARGV[1]); return 1; end; // 其余状况返回 nil return nil;
这个是 Redisson 里面分布式锁的实现,咱们在调用的时候很是简单。
Redisson 跟 Jedis 定位不一样,它不是一个单纯的 Redis 客户端,而是基于 Redis 实现的分布式的服务,若是有须要用到一些分布式的数据结构,好比咱们还能够基于Redisson 的分布式队列实现分布式事务,就能够引入 Redisson 的依赖实现。
针对读多写少的高并发场景,咱们可使用缓存来提高查询速度。
当咱们使用 Redis 做为缓存的时候,通常流程是这样的:
一、若是数据在 Redis 存在,应用就能够直接从 Redis 拿到数据,不用访问数据库。
二、若是 Redis 里面没有,先到数据库查询,而后写入到 Redis,再返回给应用
由于这些数据是不多修改的,因此在绝大部分的状况下能够命中缓存。可是,一旦被缓存的数据发生变化的时候,咱们既要操做数据库的数据,也要操做 Redis 的数据,因此问题来了。如今咱们有两种选择:
一、先操做 Redis 的数据再操做数据库的数据
二、先操做数据库的数据再操做 Redis 的数据
到底选哪种?
首先须要明确的是,无论选择哪种方案, 咱们确定是但愿两个操做要么都成功,要么都一个都不成功。否则就会发生 Redis 跟数据库的数据不一致的问题。
可是,Redis 的数据和数据库的数据是不可能经过事务达到统一的,咱们只能根据相应的场景和所须要付出的代价来采起一些措施下降数据不一致的问题出现的几率,在数据一致性和性能之间取得一个权衡。
对于数据库的实时性一致性要求不是特别高的场合,好比 T+1 的报表,能够采用定时任务查询数据库数据同步到 Redis 的方案。
因为咱们是以数据库的数据为准的,因此给缓存设置一个过时时间,是保证最终一致性的解决方案。
这里咱们先要补充一点,当存储的数据发生变化,Redis 的数据也要更新的时候,咱们有两种方案,一种就是直接更新,调用 set;还有一种是直接删除缓存,让应用在下次查询的时候从新写入。
这两种方案怎么选择呢?这里咱们主要考虑更新缓存的代价。
更新缓存以前,是否是要通过其余表的查询、接口调用、计算才能获得最新的数据,而不是直接从数据库拿到的值。若是是的话,建议直接删除缓存,这种方案更加简单,并且避免了数据库的数据和缓存不一致的状况。在通常状况下,咱们也推荐使用删除的方案。
这一点明确以后,如今咱们就剩一个问题:
一、究竟是先更新数据库,再删除缓存
二、仍是先删除缓存,再更新数据库
咱们先看第一种方案。
正常状况:
更新数据库,成功。
删除缓存,成功。
异常状况:
一、更新数据库失败,程序捕获异常,不会走到下一步,因此数据不会出现不一致。
二、更新数据库成功,删除缓存失败。数据库是新数据,缓存是旧数据,发生了不一致的状况。
这种问题怎么解决呢?咱们能够提供一个重试的机制。
好比:若是删除缓存失败,咱们捕获这个异常,把须要删除的 key 发送到消息队列。让后本身建立一个消费者消费,尝试再次删除这个 key。
这种方式有个缺点,会对业务代码形成入侵。
因此咱们又有了第二种方案(异步更新缓存):
由于更新数据库时会往 binlog 写入日志,因此咱们能够经过一个服务来监听 binlog的变化(好比阿里的 canal),而后在客户端完成删除 key 的操做。若是删除失败的话,再发送到消息队列。
总之,对于后删除缓存失败的状况,咱们的作法是不断地重试删除,直到成功。
不管是重试仍是异步删除,都是最终一致性的思想。
正常状况:
删除缓存,成功。
更新数据库,成功。
异常状况:
一、删除缓存,程序捕获异常,不会走到下一步,因此数据不会出现不一致。
二、删除缓存成功,更新数据库失败。 由于以数据库的数据为准,因此不存在数据不一致的状况。
看起来好像没问题,可是若是有程序并发操做的状况下:
1)线程 A 须要更新数据,首先删除了 Redis 缓存
2)线程 B 查询数据,发现缓存不存在,到数据库查询旧值,写入 Redis,返回
3)线程 A 更新了数据库
这个时候,Redis 是旧的值,数据库是新的值,发生了数据不一致的状况。
那问题就变成了:能不能让对同一条数据的访问串行化呢?代码确定保证不了,由于有多个线程,即便作了任务队列也可能有多个服务实例。数据库也保证不了,由于会有多个数据库的链接。只有一个数据库只提供一个链接的状况下,才能保证读写的操做是串行的,或者咱们把全部的读写请求放到同一个内存队列当中,可是这种状况吞吐量过低了。
因此咱们有一种延时双删的策略,在写入数据以后,再删除一次缓存。
A 线程:
1)删除缓存
2)更新数据库
3)休眠 500ms(这个时间,依据读取数据的耗时而定)
4)再次删除缓存
伪代码:
public void write(String key,Object data){ redis.delKey(key); db.updateData(data); Thread.sleep(500); redis.delKey(key); }
在 Redis 存储的全部数据中,有一部分是被频繁访问的。有两种状况可能会致使热点问题的产生,一个是用户集中访问的数据,好比抢购的商品,明星结婚和明星出轨的微博。还有一种就是在数据进行分片的状况下,负载不均衡,超过了单个服务器的承受能力。热点问题可能引发缓存服务的不可用,最终形成压力堆积到数据库。
出于存储和流量优化的角度,咱们必需要找到这些热点数据。
除了自动的缓存淘汰机制以外,怎么找出那些访问频率高的 key 呢?或者说,咱们能够在哪里记录 key 被访问的状况呢?
第一个固然是在客户端了,好比咱们可不能够在全部调用了 get、set 方法的地方,加上 key 的计数。可是这样的话,每个地方都要修改,重复的代码也多。若是咱们用的是 Jedis 的客户端,咱们能够在 Jedis 的 Connection 类的 sendCommand()里面,用一个 HashMap 进行 key 的计数。
可是这种方式有几个问题:
一、不知道要存多少个 key,可能会发生内存泄露的问题。
二、会对客户端的代码形成入侵。
三、只能统计当前客户端的热点 key。
第二种方式就是在代理端实现,好比 TwemProxy 或者 Codis,可是不是全部的项目都使用了代理的架构。
第三种就是在服务端统计,Redis 有一个 monitor 的命令,能够监控到全部 Redis执行的命令。
代码:
jedis.monitor(new JedisMonitor() { @Override public void onCommand(String command) { System.out.println("#monitor: " + command); } });
Facebook 的 开 源 项 目 redis-faina(https://github.com/facebookarchive/redis-faina.git)就是基于这个原理实现的。它是一个 python 脚本,能够分析 monitor 的数据。
redis-cli -p 6379 monitor | head -n 100000 | ./redis-faina.py
这种方法也会有两个问题:1)monitor 命令在高并发的场景下,会影响性能,因此不适合长时间使用。
只能统计一个 Redis 节点的热点 key。
还有一种方法就是机器层面的,经过对 TCP 协议进行抓包,也有一些开源的方案,
好比 ELK 的 packetbeat 插件。
当咱们发现了热点 key 以后,咱们来看下热点数据在高并发的场景下可能会出现的
问题,以及怎么去解决。
缓存雪崩就是 Redis 的大量热点数据同时过时(失效),由于设置了相同的过时时间,恰好这个时候 Redis 请求的并发量又很大,就会致使全部的请求落到数据库。
1)加互斥锁或者使用队列,针对同一个 key 只容许一个线程到数据库查询
2)缓存定时预先更新,避免同时失效
3)经过加随机数,使 key 在不一样的时间过时
4)缓存永不过时
咱们已经知道了 Redis 使用的场景了。在缓存存在和缓存不存在的状况下的什么状况咱们都了解了。
还有一种状况,数据在数据库和 Redis 里面都不存在,多是一次条件错误的查询。在这种状况下,由于数据库值不存在,因此确定不会写入 Redis,那么下一次查询相同的key 的时候,确定仍是会再到数据库查一次。那么这种循环查询数据库中不存在的值,而且每次使用的是相同的 key 的状况,咱们有没有什么办法避免应用到数据库查询呢?
(1)缓存空数据 (2)缓存特殊字符串,好比&&
咱们能够在数据库缓存一个空字符串,或者缓存一个特殊的字符串,那么在应用里面拿到这个特殊字符串的时候,就知道数据库没有值了,也没有必要再到数据库查询了。可是这里须要设置一个过时时间,否则的话数据库已经新增了这一条记录,应用也仍是拿不到值。
这个是应用重复查询同一个不存在的值的状况,若是应用每一次查询的不存在的值是不同的呢?即便你每次都缓存特殊字符串也没用,由于它的值不同,好比咱们的用户系统登陆的场景,若是是恶意的请求,它每次都生成了一个符合 ID 规则的帐号,可是这个帐号在咱们的数据库是不存在的,那 Redis 就彻底失去了做用
这种由于每次查询的值都不存在致使的 Redis 失效的状况,咱们就把它叫作缓存穿透。这个问题咱们应该怎么去解决呢?
其实它也是一个通用的问题,关键就在于咱们怎么知道请求的 key 在咱们的数据库里面是否存在,若是数据量特别大的话,咱们怎么去快速判断。
这也是一个很是经典的面试题:
如何在海量元素中(例如 10 亿无序、不定长、不重复)快速判断一个元素是否存在?
若是是缓存穿透的这个问题,咱们要避免到数据库查询不存的数据,确定要把这 10亿放在别的地方。这些数据在 Redis 里面也是没有的,为了加快检索速度,咱们要把数据放到内存里面来判断,问题来了:
若是咱们直接把这些元素的值放到基本的数据结构(List、Map、Tree)里面,好比一个元素 1 字节的字段,10 亿的数据大概须要 900G 的内存空间,这个对于普通的服务器来讲是承受不了的。
因此,咱们存储这几十亿个元素,不能直接存值,咱们应该找到一种最简单的最节省空间的数据结构,用来标记这个元素有没有出现。
这个东西咱们就把它叫作位图,他是一个有序的数组,只有两个值,0 和 1。0 表明不存在,1 表明存在。
那咱们怎么用这个数组里面的有序的位置来标记这10亿个元素是否存在呢?咱们是否是必需要有一个映射方法,把元素映射到一个下标位置上?
对于这个映射方法,咱们有几个基本的要求:
1)由于咱们的值长度是不固定的,我但愿不一样长度的输入,能够获得固定长度的输出。
2)转换成下标的时候,我但愿他在个人这个有序数组里面是分布均匀的,否则的话所有挤到一对去了,我也无法判断到底哪一个元素存了,哪一个元素没存。
这个就是哈希函数,好比 MD五、SHA-1 等等这些都是常见的哈希算法。
好比,这 6 个元素,咱们通过哈希函数和位运算,获得了相应的下标。
这个时候,Tom 和 Mic 通过计算获得的哈希值是同样的,那么再通过位运算获得的下标确定是同样的,咱们把这种状况叫作哈希冲突或者哈希碰撞。
若是发生了哈希碰撞,这个时候对于咱们的容器存值确定是有影响的,咱们能够经过哪些方式去下降哈希碰撞的几率呢?
第一种就是扩大维数组的长度或者说位图容量。由于咱们的函数是分布均匀的,因此,位图容量越大,在同一个位置发生哈希碰撞的几率就越小。
是否是位图容量越大越好呢?无论存多少个元素,都建立一个几万亿大小的位图,能够吗?固然不行,由于越大的位图容量,意味着越多的内存消耗,因此咱们要建立一个合适大小的位图容量。
除了扩大位图容量,咱们还有什么下降哈希碰撞几率的方法呢?
若是两个元素通过一次哈希计算,获得的相同下标的几率比较高,我能够不能够计算屡次呢? 原来我只用一个哈希函数,如今我对于每个要存储的元素都用多个哈希函数计算,这样每次计算出来的下标都相同的几率就小得多了。
一样的,咱们能不能引入不少个哈希函数呢?好比都计算 100 次,均可以吗?固然也会有问题,第一个就是它会填满位图的更多空间,第二个是计算是须要消耗时间的。
因此总的来讲,咱们既要节省空间,又要很高的计算效率,就必须在位图容量和函数个数之间找到一个最佳的平衡。
好比说:咱们存放 100 万个元素,到底须要多大的位图容量,须要多少个哈希函数呢?
固然,这个事情早就有人研究过了,在 1970 年的时候,有一个叫作布隆的前辈对于判断海量元素中元素是否存在的问题进行了研究,也就是到底须要多大的位图容量和多少个哈希函数,它发表了一篇论文,提出的这个容器就叫作布隆过滤器。
咱们来看一下布隆过滤器的工做原理。
首先,布隆过滤器的本质就是咱们刚才分析的,一个位数组,和若干个哈希函数。
集合里面有 3 个元素,要把它存到布隆过滤器里面去,应该怎么作?首先是 a 元素,这里咱们用 3 次计算。b、c 元素也同样。
元素已经存进去以后,如今我要来判断一个元素在这个容器里面是否存在,就要使用一样的三个函数进行计算。
好比 d 元素,我用第一个函数 f1 计算,发现这个位置上是 1,没问题。第二个位置也是 1,第三个位置也是 1 。
若是通过三次计算获得的下标位置值都是 1,这种状况下,能不能肯定 d 元素必定在这个容器里面呢? 其实是不能的。好比这张图里面,这三个位置分别是把 a,b,c 存进去的时候置成 1 的,因此即便 d 元素以前没有存进去,也会获得三个 1,判断返回 true。
因此,这个是布隆过滤器的一个很重要的特性,由于哈希碰撞不可避免,因此它会存在必定的误判率。这种把原本不存在布隆过滤器中的元素误判为存在的状况,咱们把它叫作假阳性(False Positive Probability,FPP)。
咱们再来看另外一个元素,e 元素。咱们要判断它在容器里面是否存在,同样地要用这三个函数去计算。第一个位置是 1,第二个位置是 1,第三个位置是 0。
e 元素是否是必定不在这个容器里面呢? 能够肯定必定不存在。若是说当时已经把e 元素存到布隆过滤器里面去了,那么这三个位置确定都是 1,不可能出现 0。
总结:布隆过滤器的特色:
从容器的角度来讲:
一、若是布隆过滤器判断元素在集合中存在,不必定存在
二、若是布隆过滤器判断不存在,必定不存在从元素的角度来讲:
三、若是元素实际存在,布隆过滤器必定判断存在
四、若是元素实际不存在,布隆过滤器可能判断存在利用,第二个特性,咱们是否是就能解决持续从数据库查询不存在的值的问题?
谷歌的 Guava 里面就提供了一个现成的布隆过滤器。
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>21.0</version> </dependency>
建立布隆过滤器:
BloomFilter<String> bf = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), insertions);
布隆过滤器提供的存放元素的方法是 put()。
布隆过滤器提供的判断元素是否存在的方法是 mightContain()。
if (bf.mightContain(data)) { if (sets.contains(data)) { // 判断存在实际存在的时候,命中 right++; continue; } // 判断存在却不存在的时候,错误 wrong++; }
布隆过滤器把误判率默认设置为 0.03,也能够在建立的时候指定。
public static <T> BloomFilter<T> create(Funnel<? super T> funnel, long expectedInsertions) { return create(funnel, expectedInsertions, 0.03D); }
位图的容量是基于元素个数和误判率计算出来的。
long numBits = optimalNumOfBits(expectedInsertions, fpp);
根据位数组的大小,咱们进一步计算出了哈希函数的个数。
int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
存储 100 万个元素只占用了 0.87M 的内存,生成了 5 个哈希函数。
https://hur.st/bloomfilter/?n=1000000&p=0.03&m=&k=
布隆过滤器的工做位置:
由于要判断数据库的值是否存在,因此第一步是加载数据库全部的数据。在去 Redis查询以前,先在布隆过滤器查询,若是 bf 说没有,那数据库确定没有,也不用去查了。若是 bf 说有,才走以前的流程。
import com.google.common.base.Charsets; import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; import com.gupaoedu.entity.User; import com.gupaoedu.service.UserService; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import java.util.UUID; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest @EnableAutoConfiguration public class BloomTestsConcurrency { @Resource private RedisTemplate redisTemplate; @Autowired private UserService userService; private static final int THREAD_NUM = 1000; // 并发线程数量,Windows机器不要设置过大 static BloomFilter<String> bf; static List<User> allUsers; @PostConstruct public void init() { // 从数据库获取数据,加载到布隆过滤器 long start = System.currentTimeMillis(); allUsers = userService.getAllUser(); if (allUsers == null || allUsers.size() == 0) { return; } // 建立布隆过滤器,默认误判率0.03,即3% bf = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), allUsers.size()); // 误判率越低,数组长度越长,须要的哈希函数越多 // bf = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), allUsers.size(), 0.0001); // 将数据存入布隆过滤器 for (User user : allUsers) { bf.put(user.getAccount()); } long end = System.currentTimeMillis(); System.out.println("查询并加载"+allUsers.size()+"条数据到布隆过滤器完毕,总耗时:"+(end -start ) +"毫秒"); } @Test public void cacheBreakDownTest() { long start = System.currentTimeMillis(); allUsers = userService.getAllUser(); CyclicBarrier cyclicBarrier = new CyclicBarrier(THREAD_NUM); ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUM); for (int i = 0; i < THREAD_NUM; i++){ executorService.execute(new BloomTestsConcurrency().new MyThread(cyclicBarrier, redisTemplate, userService)); } executorService.shutdown(); //判断是否全部的线程已经运行完 while (!executorService.isTerminated()) { } long end = System.currentTimeMillis(); System.out.println("并发数:"+THREAD_NUM + ",新建线程以及过滤总耗时:"+(end -start ) +"毫秒,演示结束"); } public class MyThread implements Runnable { private CyclicBarrier cyclicBarrier; private RedisTemplate redisTemplate; private UserService userService; public MyThread(CyclicBarrier cyclicBarrier, RedisTemplate redisTemplate, UserService userService) { this.cyclicBarrier = cyclicBarrier; this.redisTemplate = redisTemplate; this.userService = userService; } @Override public void run() { //全部子线程等待,当子线程所有建立完成再一块儿并发执行后面的代码 try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } // 1.1 (测试:布隆过滤器判断不存在,拦截——若是没有布隆过滤器,将形成缓存穿透) // 随机产生一个字符串,在布隆过滤器中不存在 String randomUser = UUID.randomUUID().toString(); // 1.2 (测试:布隆过滤器判断存在,从Redis缓存取值,若是Redis为空则查询数据库并写入Redis) // 从List中获取一个存在的用户 // String randomUser = allUsers.get(new Random().nextInt(allUsers.size())).getAccount(); String key = "Key:" + randomUser; Date date1 = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 若是布隆过滤器中不存在这个用户直接返回,将流量挡掉 /* if (!bf.mightContain(randomUser)) { System.out.println(sdf.format(date1)+" 布隆过滤器中不存在,非法请求"); return; }*/ // 查询缓存,若是缓存中存在直接返回缓存数据 ValueOperations<String, String> operation = (ValueOperations<String, String>) redisTemplate.opsForValue(); Object cacheUser = operation.get(key); if (cacheUser != null) { Date date2 = new Date(); System.out.println(sdf.format(date2)+" 命中redis缓存"); return; } // TODO 防止并发重复写缓存,加锁 synchronized (randomUser) { // 若是缓存不存在查询数据库 List<User> user = userService.getUserByAccount(randomUser); if (user == null || user.size() == 0) { // 很容易发生链接池不够用的状况 HikariPool-1 - Connection is not available, request timed out after System.out.println(" Redis缓存不存在,查询数据库也不存在,发生缓存穿透!!!"); return; } // 将mysql数据库查询到的数据写入到redis中 Date date3 = new Date(); System.out.println(sdf.format(date3)+" 从数据库查询并写入Reids"); operation.set("Key:" + user.get(0).getAccount(), user.get(0).getAccount()); } } } }
布隆过滤器解决的问题是什么?如何在海量元素中快速判断一个元素是否存在。因此除了解决缓存穿透的问题以外,咱们还有不少其余的用途。
好比爬数据的爬虫,爬过的 url 咱们不须要重复爬,那么在几十亿的 url 里面,怎么判断一个 url 是否是已经爬过了?
还有咱们的邮箱服务器,发送垃圾邮件的帐号咱们把它们叫作 spamer,在这么多的邮箱帐号里面,怎么判断一个帐号是否是 spamer 等等一些场景,咱们均可以用到布隆过滤器。
若是你们想要实时关注我更新的文章以及分享的干货的话,能够关注个人公众号。