本文是redis学习系列的第五篇,点击下面链接可回看系列文章
本文我们继续学习redis与spring的整合,整合之后就可以用redisStringTemplate的setNX()和delete()方法实现分布式锁了。
spring把专门的数据操作独立封装在spring-data系列中,spring-data-redis是对Redis的封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>
1.4
.
2
.RELEASE</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>
2.6
.
2
</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>
2.4
.
2
</version>
</dependency>
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
<!--命令空间中加入下面这行-->
xmlns:p=
"http://www.springframework.org/schema/p"
<!-- redis连接池配置文件 -->
<context:property-placeholder location=
"classpath:redis.properties"
/>
<bean id=
"poolConfig"
class
=
"redis.clients.jedis.JedisPoolConfig"
>
<property name=
"maxIdle"
value=
"${redis.maxIdle}"
/>
<property name=
"maxTotal"
value=
"${redis.maxTotal}"
/>
<property name=
"MaxWaitMillis"
value=
"${redis.MaxWaitMillis}"
/>
<property name=
"testOnBorrow"
value=
"${redis.testOnBorrow}"
/>
</bean>
<bean id=
"connectionFactory"
class
=
"org.springframework.data. redis.connection.jedis.JedisConnectionFactory"
p:host-name=
"${redis.host}"
p:port=
"${redis.port}"
p:password=
"${redis.pass}"
p:pool-config-ref=
"poolConfig"
/>
<bean id=
"redisTemplate"
class
=
"org.springframework.data. redis.core.RedisTemplate"
>
<property name=
"connectionFactory"
ref=
"connectionFactory"
/>
</bean>
|
注意新版的maxTotal,MaxWaitMillis这两个字段与旧版的不同。
1
2
3
4
5
6
7
8
|
redis.host=
192.168
.
2.129
redis.port=
6379
redis.pass=redis129
redis.maxIdle=
300
redis.maxTotal=
600
redis.MaxWaitMillis=
1000
redis.testOnBorrow=
true
|
好了,配置完成,下面写上代码
1
2
3
4
5
6
7
8
9
|
@Entity
@Table
(name =
"t_user"
)
public
class
User {
//主键
private
String id;
//用户名
private
String userName;
//...省略get,set...
}
|
1
2
3
4
5
6
7
|
@Repository
public
abstract
class
BaseRedisDao<K,V> {
@Autowired
(required=
true
)
protected
RedisTemplate<K, V> redisTemplate;
}
|
1
2
3
4
5
6
7
8
9
10
11
|
public
interface
IUserDao {
public
boolean
save(User user);
public
boolean
update(User user);
public
boolean
delete(String userIds);
public
User find(String userId);
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
@Repository
public
class
UserDao
extends
BaseRedisDao<String, User>
implements
IUserDao {
@Override
public
boolean
save(
final
User user) {
boolean
res = redisTemplate.execute(
new
RedisCallback<Boolean>() {
public
Boolean doInRedis(RedisConnection connection)
throws
DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
byte
[] key = serializer.serialize(user.getId());
byte
[] value = serializer.serialize(user.getUserName());
//set not exits
return
connection.setNX(key, value);
}
});
return
res;
}
@Override
public
boolean
update(
final
User user) {
boolean
result = redisTemplate.execute(
new
RedisCallback<Boolean>() {
public
Boolean doInRedis(RedisConnection connection)
throws
DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
byte
[] key = serializer.serialize(user.getId());
byte
[] name = serializer.serialize(user.getUserName());
//set
connection.set(key, name);
return
true
;
}
});
return
result;
}
@Override
public
User find(
final
String userId) {
User result = redisTemplate.execute(
new
RedisCallback<User>() {
public
User doInRedis(RedisConnection connection)
throws
DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
byte
[] key = serializer.serialize(userId);
//get
byte
[] value = connection.get(key);
if
(value ==
null
) {
return
null
;
}
String name = serializer.deserialize(value);
User resUser =
new
User();
resUser.setId(userId);
resUser.setUserName(name);
return
resUser;
}
});
return
result;
}
@Override
public
boolean
delete(
final
String userId) {
boolean
result = redisTemplate.execute(
new
RedisCallback<Boolean>() {
public
Boolean doInRedis(RedisConnection connection)
throws
DataAccessException {
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
byte
[] key = serializer.serialize(userId);
//delete
connection.del(key);
return
true
;
}
});
return
result;
}
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
@RunWith
(SpringJUnit4ClassRunner.
class
)
@ContextConfiguration
(locations = {
"classpath*:applicationContext.xml"
})
public
class
RedisTest
extends
AbstractJUnit4SpringContextTests {
@Autowired
private
IUserDao userDao;
@Test
public
void
testSaveUser() {
User user =
new
User();
user.setId(
"402891815170e8de015170f6520b0000"
);
user.setUserName(
"zhangsan"
);
boolean
res = userDao.save(user);
Assert.assertTrue(res);
}
@Test
public
void
testGetUser() {
User user =
new
User();
user = userDao.find(
"402891815170e8de015170f6520b0000"
);
System.out.println(user.getId() +
"-"
+ user.getUserName() );
}
@Test
public
void
testUpdateUser() {
User user =
new
User();
user.setId(
"402891815170e8de015170f6520b0000"
);
user.setUserName(
"lisi"
);
boolean
res = userDao.update(user);
Assert.assertTrue(res);
}
@Test
public
void
testDeleteUser() {
boolean
res = userDao.delete(
"402891815170e8de015170f6520b0000"
);
Assert.assertTrue(res);
}
}
|
String类型的增删该查已完成,Hash,List,Set数据类型的操作就不举例了,和使用命令的方式差不多。如下
1
2
3
4
5
6
7
8
9
10
11
12
13
|
connection.hSetNX(key, field, value);
connection.hDel(key, fields);
connection.hGet(key, field);
connection.lPop(key);
connection.lPush(key, value);
connection.rPop(key);
connection.rPush(key, values);
connection.sAdd(key, values);
connection.sMembers(key);
connection.sDiff(keys);
connection.sPop(key);
|
1
2
3
|
java.lang.NoSuchMethodError: org.springframework.core.serializer.support.DeserializingConverter.<init>(Ljava/lang/ClassLoader;)V
Caused by: java.lang.NoSuchMethodError: redis.clients.jedis.JedisShardInfo.setTimeout(I)V
|
类似找不到类,找不到方法的问题,当确定依赖的jar已经引入之后,此类问题多事spring-data-redis以及jedis版本问题,多换个版本试试,本文上面提到的版本可以使用。
1
|
No qualifying bean of type [org.springframework.data.redis.core.RedisTemplate] found
for
dependency
|
找不到bean,考虑applicationContext.xml中配置redisTemplate bean时实现类是否写错。例如,BaseRedisDao注入的是RedisTemplate类型的对象,applicationContext.xml中配置的实现类却是RedisTemplate的子类StringRedisTemplate,那肯定报错。整合好后,下面我们着重学习基于redis的分布式锁的实现。
我们知道,在多线程环境中,锁是实现共享资源互斥访问的重要机制,以保证任何时刻只有一个线程在访问共享资源。锁的基本原理是:用一个状态值表示锁,对锁的占用和释放通过状态值来标识,因此基于redis实现的分布式锁主要依赖redis的SETNX命令和DEL命令,SETNX相当于上锁,DEL相当于释放锁,当然,在下面的具体实现中会更复杂些。之所以称为分布式锁,是因为客户端可以在redis集群环境中向集群中任一个可用Master节点请求上锁(即SETNX命令存储key到redis缓存中是随机的)。
现在相信你已经对在基于redis实现的分布式锁的基本概念有了解,需要注意的是,这个和前面文章提到的使用WATCH 命令对key值进行锁操作没有直接的关系。java中synchronized和Lock对象都能对共享资源进行加锁,下面我们将学习用java实现的redis分布式锁。
在分析java实现的redis分布式锁之前,我们先来回顾下java中的锁技术,为了直观的展示,我们采用“多个线程共享输出设备”来举例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
|
public
class
LockTest {
//不加锁
static
class
Outputer {
public
void
output(String name) {
for
(
int
i=
0
; i<name.length(); i++) {
System.out.print(name.charAt(i));
}
System.out.println();
}
}
public
static
void
main(String[] args) {
final
Outputer output =
new
Outputer();
//线程1打印zhangsan
new
Thread(
new
Runnable(){
@Override
public
void
run() {
while
(
true
) {
try
{
Thread.sleep(
1000
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
output.output(
"zhangsan"
);
}
}
}).start();
//线程2打印lingsi
new
Thread(
new
Runnable(){
@Override
public
void
run() {
while
(
true
) {
try
{
Thread.sleep(
1000
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
output.output(
"lingsi"
);
}
}
}).start();
//线程3打印wangwu
new
Thread(
new
Runnable(){
@Override
public
void
run() {
while
(
true
) {
try
{
Thread.sleep(
1000
);
}
catch
(InterruptedException e) {
e.printStackTrace();
}
output.output(
"huangwu"
);
}
}
}).start();
}
}
|
上面例子中,三个线程同时共享输出设备output,线程1需要打印zhangsan,线程2需要打印lingsi,线程3需要打印wangwu。在不加锁的情况,这三个线程会不会因为得不到输出设备output打架呢,我们来看看运行结果:
1
2
3
4
5
6
7
8
9
10
11
|
huangwu
zhangslingsi
an
huangwu
zlingsi
hangsan
huangwu
lzhangsan
ingsi
huangwu
lingsi
|
从运行结果可以看出,三个线程打架了,线程1没打印完zhangsan,线程2就来抢输出设备......可见,这不是我们想要的,我们想要的是线程之间能有序的工作,各个线程之间互斥的使用输出设备output。
现在我们对Outputer进行改进,给它加上锁,加锁之后每次只有一个线程能访问它。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
//使用java5中的锁
static
class
Outputer{
Lock lock =
new
ReentrantLock();
public
void
output(String name) {
//传统java加锁
//synchronized (Outputer.class){
lock.lock();
try
{
for
(
int
i=
0
; i<name.length(); i++) {
System.out.print(name.charAt(i));
}
System.out.println();
}
finally
{
//任何情况下都有释放锁
lock.unlock();
}
//}
}
}
|
看看加锁后的输出结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
zhangsan
lingsi
huangwu
......
|
从运行结果中可以看出,三个线程之间不打架了,线程之间的打印变得有序。有个这个基础,下面我们来学习基于Redis实现的分布式锁就更容易了。
从上面java锁的使用中可以看出,锁对象主要有lock与unlock方法,在lock与unlock方法之间的代码(临界区)能保证线程互斥访问。基于redis实现的Java分布式锁主要依赖redis的SETNX命令和DEL命令,SETNX相当于上锁(lock),DEL相当于释放锁(unlock)。我们只要实现Lock接口重写lock()和unlock()即可。但是这还不够,安全可靠的分布式锁应该满足满足下面三个条件:
l 互斥,不管任何时候,只有一个客户端能持有同一个锁。
l 不会死锁,最终一定会得到锁,即使持有锁的客户端对应的master节点宕掉。
l 容错,只要大多数Redis节点正常工作,客户端应该都能获取和释放锁。
那么什么情况下回不满足上面三个条件呢。多个线程(客户端)同时竞争锁可能会导致多个客户端同时拥有锁。比如,
(1)线程1在master节点拿到了锁(存入key)
(2)master节点在把线程1创建的key写入slave之前宕机了,此时集群中的节点已经没有锁(key)了,包括master节点的slaver节点
(3)slaver节点升级为master节点
(4)线程2向新的master节点发起锁(存入key)请求,很明显,能请求成功。
可见,线程1和线程2同时获得了锁。如果在更高并发的情况,可能会有更多线程(客户端)获取锁,这种情况就会导致上文所说的线程“打架”问题,线程之间的执行杂乱无章。
那什么情况下又会发生死锁的情况呢。如果拥有锁的线程(客户端)长时间的执行或者因为某种原因造成阻塞,就会导致锁无法释放(unlock没有调用),其它线程就不能获取锁而而产生无限期死锁的情况。其它线程在执行lock失败后即使粗暴的执行unlock删除key之后也不能正常释放锁,因为锁就只能由获得锁的线程释放,锁不能正常释放其它线程仍然获取不到锁。解决死锁的最好方式是设置锁的有效时间(redis的expire命令),不管是什么原因导致的死锁,有效时间过后,锁将会被自动释放。
为了保障容错功能,即只要有Redis节点正常工作,客户端应该都能获取和释放锁,我们必须用相同的key不断循环向Master节点请求锁,当请求时间超过设定的超时时间则放弃请求锁,这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间,如果一个master节点不可用了,应该尽快尝试下一个master节点。释放锁比较简单,因为只需要在所有节点都释放锁就行,不管之前有没有在该节点获取锁成功。
根据上面的分析,官方提出了一种用Redis实现分布式锁的算法,这个算法称为RedLock。RedLock算法的主要流程如下:
RedLock算法主要流程
结合上面的流程图,加上下面的代码解释,相信你一定能理解redis分布式锁的实现原理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
public
class
RedisLock
implements
Lock{
protected
StringRedisTemplate redisStringTemplate;
// 存储到redis中的锁标志
private
static
final
String LOCKED =
"LOCKED"
;
// 请求锁的超时时间(ms)
private
static
final
long
TIME_OUT =
30000
;
// 锁的有效时间(s)
public
static
final
int
EXPIRE =
60
;
// 锁标志对应的key;
private
String key;
// state flag
private
volatile
boolean
isLocked =
false
;
public
RedisLock(String key) {
this
.key = key;
@SuppressWarnings
(
"resource"
)
ApplicationContext ctx =
new
ClassPathXmlApplicationContext(
"classpath*:applicationContext.xml"
);
redisStringTemplate = (StringRedisTemplate)ctx.getBean(
"redisStringTemplate"
);
}
@Override
public
void
lock() {
//系统当前时间,毫秒
long
nowTime = System.nanoTime();
//请求锁超时时间,毫秒
long
timeout = TIME_OUT*
1000000
;
final
Random r =
new
Random();
try
{
//不断循环向Master节点请求锁,当请求时间(System.nanoTime() - nano)超过设定的超时时间则放弃请求锁
//这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间
//如果一个master节点不可用了,应该尽快尝试下一个master节点
while
((System.nanoTime() - nowTime) < timeout) {
//将锁作为key存储到redis缓存中,存储成功则获得锁
if
(redisStringTemplate.getConnectionFactory().getConnection().setNX(key.getBytes(),
LOCKED.getBytes())) {
//设置锁的有效期,也是锁的自动释放时间,也是一个客户端在其他客户端能抢占锁之前可以执行任务的时间
//可以防止因异常情况无法释放锁而造成死锁情况的发生
redisStringTemplate.expire(key, EXPIRE, TimeUnit.SECONDS);
isLocked =
true
;
//上锁成功结束请求
break
;
}
//获取锁失败时,应该在随机延时后进行重试,避免不同客户端同时重试导致谁都无法拿到锁的情况出现
//睡眠3毫秒后继续请求锁
Thread.sleep(
3
, r.nextInt(
500
));
}
}
catch
(Exception e) {
e.printStackTrace();
}
}
@Override
public
void
unlock() {
//释放锁
//不管请求锁是否成功,只要已经上锁,客户端都会进行释放锁的操作
if
(isLocked) {
redisStringTemplate.delete(key);
}
}
@Override
public
void
lockInterruptibly()
throws
InterruptedException {
// TODO Auto-generated method stub
}
|