通过《ZooKeeper入门》后,咱们学会了ZooKeeper的基本用法。java
实际上ZooKeeper的应用是很是普遍的,实现分布式锁只是其中一种。接下来咱们就ZooKeeper实现分布式锁解决秒杀超卖问题进行展开。mysql
秒杀活动应该都不陌生,不用过多解释。git
不难想象,在这种"秒杀"的场景中,实际上会出现多个用户争抢"资源"的状况,也就是多个线程同时并发,这种状况是很容易出现数据不许确,也就是超卖问题。程序员
下面使用程序演示,我使用了SpringBoot2.0、Mybatis、Mybatis-Plus、SpringMVC搭建了一个简单的项目,github地址:github
建立一个商品信息表:sql
CREATE TABLE `tb_commodity_info` (
`id` varchar(32) NOT NULL,
`commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',
`commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',
`number` int(10) DEFAULT '0' COMMENT '商品数量',
`description` varchar(2048) DEFAULT '' COMMENT '商品描述',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';
复制代码
添加一个商品[叉烧包]进去: 数据库
核心的代码逻辑是这样的:apache
@Override
public boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {
//1.先查询数据库中商品的数量
TbCommodityInfo commodityInfo = commodityInfoMapper.selectById(commodityId);
//2.判断商品数量是否大于0,或者购买的数量大于库存
Integer count = commodityInfo.getNumber();
if (count <= 0 || number > count) {
//商品数量小于或者等于0,或者购买的数量大于库存,则返回false
return false;
}
//3.若是库存数量大于0,而且购买的数量小于或者等于库存。则更新商品数量
count -= number;
commodityInfo.setNumber(count);
boolean bool = commodityInfoMapper.updateById(commodityInfo) == 1;
if (bool) {
//若是更新成功,则打印购买商品成功
System.out.println("购买商品[ " + commodityInfo.getCommodityName() + " ]成功,数量为:" + number);
}
return bool;
}
复制代码
逻辑示意图以下:服务器
上面这个逻辑,若是单线程请求的话是没有问题的。
可是多线程的话就出现问题了。如今我就建立多个线程,经过HttpClient进行请求,看会发生什么:
public static void main(String[] args) throws Exception {
//请求地址
String url = "http://localhost:8080/mall/commodity/purchase";
//请求参数,商品ID,数量
Map<String, String> map = new HashMap<>();
map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
map.put("number", "1");
//建立10个线程经过HttpClient进行发送请求,测试
for (int i = 0; i < 10; i++) {
//这个线程的逻辑仅仅是发送请求
CommodityThread commodityThread = new CommodityThread(url, map);
commodityThread.start();
}
}
复制代码
说明一下,叉烧包的数量是100,这里有10个线程同时去购买,假设都购买成功的话,库存数量应该是90。
实际上,10个线程的确都购买成功了:
可是数据库的商品库存,却不许确:
上面的场景,大概流程以下所示:
能够看出问题的关键在于两个线程"同时"去查询剩余的库存,而后更新库存致使的。要解决这个问题,其实只要保证多个线程在这段逻辑是顺序执行便可,也就是加锁。
本地锁JDK提供有两种:synchronized和Lock锁。
两种方式均可以,我这里为了简便,使用synchronized:
//使用synchronized修饰方法
@Override
public synchronized boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {
//省略...
}
复制代码
而后再测试刚刚多线程并发抢购的状况,看看结果:
问题获得解决!!!
你觉得事情就这样结束了吗,看了看进度条,发现事情并不简单。
咱们知道在实际项目中,每每不会只部署一台服务器,因此不妨咱们启动两台服务器,端口号分别是8080、8081,模拟实际项目的场景:
写一个交替请求的测试脚本,模拟多台服务器分别处理请求,用户秒杀抢购的场景:
public static void main(String[] args) throws Exception {
//请求地址
String url = "http://localhost:%s/mall/commodity/purchase";
//请求参数,商品ID,数量
Map<String, String> map = new HashMap<>();
map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
map.put("number", "1");
//建立10个线程经过HttpClient进行发送请求,测试
for (int i = 0; i < 10; i++) {
//8080、8081交替请求,每一个服务器处理5个请求
String port = "808" + (i % 2);
CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);
commodityThread.start();
}
}
复制代码
首先看购买的状况,确定都是购买成功的:
关键是库存数量是否正确:
有10个请求购买成功,库存应该是90才对,这里库存是95。事实证实本地锁是不能解决多台服务器秒杀抢购出现超卖的问题。
为何会这样呢,请看示意图:
其实和多线程问题是差很少的缘由,多个服务器去查询数据库,获取到相同的库存,而后更新库存,致使数据不正确。要保证库存的数量正确,关键在于多台服务器要保证只能一台服务器在执行这段逻辑,也就是要加分布式锁。
这也体现出分布式锁的做用,就是要保证多台服务器只能有一台服务器执行。
分布式锁有三种实现方式,分别是redis、ZooKeeper、数据库(好比mysql)。
其实是利用ZooKeeper的临时顺序节点的特性实现分布式锁。怎么实现呢?
假设如今有一个客户端A,须要加锁,那么就在"/Lock"路径下建立一个临时顺序节点。而后获取"/Lock"下的节点列表,判断本身的序号是不是最小的,若是是最小的序号,则加锁成功!
如今又有另外一个客户端,客户端B须要加锁,那么也是在"/Lock"路径下建立临时顺序节点。依然获取"/Lock"下的节点列表,判断本身的节点序号是否最小的。发现不是最小的,加锁失败,接着对本身的上一个节点进行监听。
怎么释放锁呢,其实就是把临时节点删除。假设客户端A释放锁,把节点01删除了。那就会触发节点02的监听事件,客户端就再次获取节点列表,而后判断本身是不是最小的序号,若是是最小序号则加锁。
若是多个客户端其实也是同样,一上来就会建立一个临时节点,而后开始判断本身是不是最小的序号,若是不是就监听上一个节点,造成一种排队的机制。也就造成了锁的效果,保证了多台服务器只有一台执行。
假设其中有一个客户端宕机了,根据临时节点的特色,ZooKeeper会自动删除对应的临时节点,至关于自动释放了锁。
首先加入Maven依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.4</version>
</dependency>
复制代码
接着按照上面分析的思路敲代码,建立ZkLock类:
public class ZkLock implements Lock {
//计数器,用于加锁失败时,阻塞
private static CountDownLatch cdl = new CountDownLatch(1);
//ZooKeeper服务器的IP端口
private static final String IP_PORT = "127.0.0.1:2181";
//锁的根路径
private static final String ROOT_NODE = "/Lock";
//上一个节点的路径
private volatile String beforePath;
//当前上锁的节点路径
private volatile String currPath;
//建立ZooKeeper客户端
private ZkClient zkClient = new ZkClient(IP_PORT);
public ZkLock() {
//判断是否存在根节点
if (!zkClient.exists(ROOT_NODE)) {
//不存在则建立
zkClient.createPersistent(ROOT_NODE);
}
}
//加锁
public void lock() {
if (tryLock()) {
System.out.println("加锁成功!!");
} else {
// 尝试加锁失败,进入等待 监听
waitForLock();
// 再次尝试加锁
lock();
}
}
//尝试加锁
public synchronized boolean tryLock() {
// 第一次就进来建立本身的临时节点
if (StringUtils.isBlank(currPath)) {
currPath = zkClient.createEphemeralSequential(ROOT_NODE + "/", "lock");
}
// 对节点排序
List<String> children = zkClient.getChildren(ROOT_NODE);
Collections.sort(children);
// 当前的是最小节点就返回加锁成功
if (currPath.equals(ROOT_NODE + "/" + children.get(0))) {
return true;
} else {
// 不是最小节点 就找到本身的前一个 依次类推 释放也是同样
int beforePathIndex = Collections.binarySearch(children, currPath.substring(ROOT_NODE.length() + 1)) - 1;
beforePath = ROOT_NODE + "/" + children.get(beforePathIndex);
//返回加锁失败
return false;
}
}
//解锁
public void unlock() {
//删除节点并关闭客户端
zkClient.delete(currPath);
zkClient.close();
}
//等待上锁,加锁失败进入阻塞,监听上一个节点
private void waitForLock() {
IZkDataListener listener = new IZkDataListener() {
//监听节点更新事件
public void handleDataChange(String s, Object o) throws Exception {
}
//监听节点被删除事件
public void handleDataDeleted(String s) throws Exception {
//解除阻塞
cdl.countDown();
}
};
// 监听上一个节点
this.zkClient.subscribeDataChanges(beforePath, listener);
//判断上一个节点是否存在
if (zkClient.exists(beforePath)) {
//上一个节点存在
try {
System.out.println("加锁失败 等待");
//加锁失败,阻塞等待
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 释放监听
zkClient.unsubscribeDataChanges(beforePath, listener);
}
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
public void lockInterruptibly() throws InterruptedException {
}
public Condition newCondition() {
return null;
}
}
复制代码
在Controller层加上锁:
@PostMapping("/purchase")
public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception {
boolean bool;
//获取ZooKeeper分布式锁
ZkLock zkLock = new ZkLock();
try {
//上锁
zkLock.lock();
//调用秒杀抢购的service方法
bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
} catch (Exception e) {
e.printStackTrace();
bool = false;
} finally {
//解锁
zkLock.unlock();
}
return bool;
}
复制代码
测试,依然起两台服务器,8080、8081。而后跑测试脚本:
public static void main(String[] args) throws Exception {
//请求地址
String url = "http://localhost:%s/mall/commodity/purchase";
//请求参数,商品ID,数量
Map<String, String> map = new HashMap<>();
map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
map.put("number", "1");
//建立10个线程经过HttpClient进行发送请求,测试
for (int i = 0; i < 10; i++) {
//8080、8081交替请求
String port = "808" + (i % 2);
CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);
commodityThread.start();
}
}
复制代码
结果正确:
Curator是Apache开源的一个操做ZooKeeper的框架。其中就有实现ZooKeeper分布式锁的功能。
固然分布式锁的实现只是这个框架的其中一个很小的部分,除此以外还有不少用途,你们能够到官网去学习。
首先添加Maven依赖:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
复制代码
仍是同样在须要加锁的地方进行加锁:
@PostMapping("/purchase")
public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception {
boolean bool = false;
//设置重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
// 启动客户端
client.start();
InterProcessMutex mutex = new InterProcessMutex(client, "/locks");
try {
//加锁
if (mutex.acquire(3, TimeUnit.SECONDS)) {
//调用抢购秒杀service方法
bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//解锁
mutex.release();
client.close();
}
return bool;
}
复制代码
我尝试用原生的ZooKeeper写分布式锁,有点炸裂。遇到很多坑,最终放弃了,用zkclient的API。可能我太菜了不太会用。
下面我分享我遇到的一些问题,但愿大家在遇到同类型的异常时能迅速定位问题。
这个错误是使用原生ZooKeeper的API出现的错误。主要是我在进入debug模式进行调试出现的。
由于原生的ZooKeeper须要设定一个会话超时时间,通常debug模式咱们都会卡在一个地方去调试,确定就超出了设置的会话时间~
这个也是原生ZooKeeper的API的错误,怎么出现的呢?
主要是建立的ZooKeeper客户端链接服务器时是异步的,因为链接须要时间,还没链接成功,代码已经开始执行create()或者exists(),而后就报这个错误。
解决方法:使用CountDownLatch计数器阻塞,链接成功后再中止阻塞,而后执行create()或者exists()等操做。
这个错误真的太炸裂了~
一开始我是把分布式锁加在service层,而后觉得搞定了。接着启动8080、8081进行并发测试。10个线程都是购买成功,结果竟然是不正确!
第一反应以为本身实现的代码有问题,因而换成curator框架实现的分布式锁,开源框架应该没问题了吧。没想到仍是不行~
既然不是锁自己的问题,是否是事务问题。上一个事务更新库存的操做还没提交,而后下一个请求就进来查询。因而我就把加锁的范围放大一点,放在Controller层。竟然成功了!
你可能已经注意到,我在上面的例子就是把分布式锁加在Controller层,其实我不太喜欢在Controller层写太多代码。
也许有更加优雅的方式,惋惜本人能力不足,若是你有更好的实现方式,能够分享一下~
最后,咱们回顾总结一下吧:
但愿这篇文章对你有用
想第一时间看到我更新的文章,能够微信搜索公众号「java技术爱好者
」,拒绝作一条咸鱼,我是一个努力让你们记住的程序员。咱们下期再见!!!
能力有限,若是有什么错误或者不当之处,请你们批评指正,一块儿学习交流!