这是why技术的第 74 篇原创文章前端
深夜怼文的我mysql
分布式事务你应该是知道的。可是这个多线程事务......git
没事,我慢慢给你说。github
如图所示,有个小伙伴想要实现多线程事务。spring
这个需求其实我在不一样的地方看到过不少次,因此我才说:这个问题又出现了。sql
那么有解决方案吗?数据库
在此以前,个人回答都是很是的确定:毋庸置疑,确定是没有的。编程
为何呢?安全
咱们先从理论上去推理一下。多线程
来,首先我问你,事务的特性是什么?
这个不难吧?八股文必背内容之一,ACID 必须张口就来:
那么问题又来了,你以为若是有多线程事务,那么咱们破坏了哪一个特性?
多线程事务你也别想的多深奥,你就想,两个不一样的用户各自发起了一个下单请求,这个请求对应的后台实现逻辑中是有事务存在的。
这不就是多线程事务吗?
这种场景下你没有想过怎么分别去控制两个用户的事务操做吧?
由于这两个操做之间就是彻底隔离的,各自拿着各自的连接玩儿。
因此多个事务之间的最基本的原则是什么?
隔离性。两个事务操做之间不该该相互干扰。
而多线程事务想要实现的是 A 线程异常了。A,B 线程的事务一块儿回滚。
事务的特性里面就卡的死死的。因此,多线程事务从理论上就是行不通的。
经过理论指导实践,那么多线程事务的代码也就是写不出来的。
前面说到隔离性。那么请问,Spring 的源码里面,对于事务的隔离性是如何保证的呢?
答案就是 ThreadLocal。
在事务开启的时候,把当前的连接保存在了 ThreadLocal 里面,从而保证了多线程之间的隔离性:
能够看到,这个 resource 对象是一个 ThreadLocal 对象。
在下面这个方法中进行了赋值操做:
org.springframework.jdbc.datasource.DataSourceTransactionManager#doBegin
其中的 bindResource 方法中,就是把当前连接绑定到当前线程中,其中的 resource 就是咱们刚刚说的 ThreadLocal:
就是每一个线程里面都各自玩本身的,咱们不可能打破 ThreadLocal 的使用规则,让各个线程共享同一个 ThreadLocal 吧?
铁子,你要是这样去作的话,那岂不是走远了?
因此,不管从理论上,仍是代码实现上,我都认为这个需求是不能实现的。
至少我以前是这样想的。
可是事情,稍稍的发生了一点点的变化。
任何脱离场景讨论技术实现的行为都是耍流氓。
因此,咱们先看一下场景是什么。
假设咱们有一个大数据系统,天天指定时间,咱们就须要从大数据系统中拉取 50w 条数据,对数据进行一个清洗操做,而后把数据保存到咱们业务系统的数据库中。
对于业务系统而言,这 50w 条数据,必须所有落库,差一条都不行。要么就是一条都不插入。
在这个过程当中,不会去调用其余的外部接口,也不会有其余的流程去操做这个表的数据。
既然说到一条不差了,那么对于你们直观而言,想到的确定是两个解决方案:
对于这种需求,开启事务,而后在 for 循环中一条条的插入能够说是很是 low 的解决方案了。
效率很是的低下,给你们演示一下。
好比,咱们有一个 Student 表,表结构很是简单,以下:
`CREATE TABLE
student` (
id
bigint(63) NOT NULL AUTO_INCREMENT,
name
varchar(32) DEFAULT NULL,
home
varchar(64) DEFAULT NULL,
PRIMARY KEY (id
)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
``
在咱们的项目中,咱们经过 for 循环插入数据,同时该方法上有 @Transactional 注解:
num 参数是咱们经过前端请求传递过来的数据,表明要插入 num 条数据:
这种状况下,咱们能够经过下面的连接,模拟插入指定数量的数据:
http://127.0.0.1:8081/insertOneByOne?num=xxx
我尝试了把 num 设置为 50w,让它慢慢的跑着,可是我仍是太年轻了,等了很是长的时间都没有等到结果。
因而我把 num 改成了 5000,运行结果以下:
insertOneByOne执行耗时:133449ms,num=5000
一条条的插入 5000 条数据,耗时 133.5 s 的样子。
按照这个速度,插入 50w 条数据得 13350s,大概也是这么多小时:
这谁顶得住啊。
因此,这方案拥有巨大的优化空间。
好比咱们优化为这样批量插入:
其对应的 sql 语句是这样的:
insert into table ([列名],[列名]) VALUES ([列值],[列值]), ([列值],[列值]);
咱们仍是经过前端接口调用:
当咱们的 num 设置为 5000 的时候,我页面刷新了 10 次,你看耗时基本上在 200ms 毫秒之内:
从 133.5s 到 200ms,朋友们,这是什么东西?
这是质的飞跃啊。性能提高了近 667 倍的样子。
为何批量插入能有这么大的飞跃呢?
你想啊,以前 for 循环插入,虽然 SpringBoot 2.0 默认使用了 HikariPool,链接池里面默认给你搞 10 个链接。
可是你只须要一个链接,开启一次事务。这个不耗时。
耗时的地方是你 5000 次 IO 呀。
因此,耗时长是必然的。
而批量插入只是一条 sql 语句,因此只须要一个链接,还不须要开启事务。
为啥不用开启事务?
你一条 sql 开启事务有锤子用啊?
那么,若是咱们一口气插入 50w 条数据,会是怎么样的呢?
来,搞一波,试一下:
http://127.0.0.1:8081/insertBatch?num=500000
能够看到抛出了一个异常。并且错误信息很是的清晰:
`Packet for query is too large (42777840 > 1048576). You can change this value on the server by setting the max_allowed_packet' variable.; nested exception is com.mysql.jdbc.PacketTooBigException: Packet for query is too large (42777840 > 1048576).You can change this value on the server by setting the max_allowed_packet' variable.
`
说你这个包太大了。能够经过设置 max_allowed_packet 来改变包大小。
咱们能够经过下面的语句查询当前的配置大小:
select @@max_allowed_packet;
能够看到是 1048576,即 1024*1024,1M 大小。
而咱们须要传输的包大小是 42777840 字节,大概是 41M 的样子。
因此咱们须要修改配置大小。
这个地方也给你们提了个醒:若是你的 sql 语句很是大,里面有大字段,记得调整一下 mysql 的这个参数。
能够经过修改配置文件或者直接执行 sql 语句的方式进行修改。
我这里就使用 sql 语句修改成 64M:
set global max_allowed_packet = 1024*1024*64;
而后再次执行,能够看到插入成功了:
50w 的数据,74s 的样子。
数据要么所有提交,要么一条也没有,需求也实现了。
时间上呢,是有点长,可是好像也想不到什么好的提高方案。
那么咱们怎么还能再缩短点时间呢?
我能想到的,只能是祭出多线程了。
50w 数据。咱们开五个线程,一个线程处理 10w 数据,没有异常就保存入库,出现问题就回滚。
这个需求很好实现。分分钟就能写出来。
可是再加上一个需求:这 5 个线程的数据,若是有一个线程出现问题了,须要所有回滚。
顺着思路慢慢撸,咱们发现这个时候就是所谓的多线程事务了。
我以前说彻底不可能实现是由于提到事务我就想到了 @Transactional 注解去实现了。
咱们只须要正确使用它,而后关系业务逻辑便可,不须要也根本插手不了事务的开启和提交或者回滚。
这种代码的写法咱们叫作声明式事务。
和声明式事务对应的就是编程式事务了。
经过编程式事务,咱们就能彻底掌控事务的开启和提交或者回滚操做。
能想到编程式事务,这事基本上就成了一半了。
你想,首先咱们有一个全局变量为 Boolean 类型,默认为能够提交。
在子线程里面,咱们能够先经过编程式事务开启事务,而后插入 10w 条数据后,可是不提交。同时告诉主线程,我这边准备好了,进入等待。
若是子线程里面出现了异常,那么我就告诉主线程,我这边出问题了,而后本身进行回滚。
最后主线程收集到了 5 个子线程的状态。
若是有一个线程出现了问题,那么设置全局变量为不可提交。
而后唤醒全部等待的子线程,进行回滚。
根据上面的流程,写出模拟代码就是这样的,你们能够直接复制出来运行:
`public class MainTest {
//是否能够提交
public static volatile boolean IS_OK = true;
public static void main(String[] args) {
//子线程等待主线程通知
CountDownLatch mainMonitor = new CountDownLatch(1);
int threadCount = 5;
CountDownLatch childMonitor = new CountDownLatch(threadCount);
//子线程运行结果
List<Boolean> childResponse = new ArrayList<Boolean>();
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < threadCount; i++) {
int finalI = i;
executor.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + ":开始执行");
// if (finalI == 4) {
// throw new Exception("出现异常");
// }
TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000));
childResponse.add(Boolean.TRUE);
childMonitor.countDown();
System.out.println(Thread.currentThread().getName() + ":准备就绪,等待其余线程结果,判断是否事务提交");
mainMonitor.await();
if (IS_OK) {
System.out.println(Thread.currentThread().getName() + ":事务提交");
} else {
System.out.println(Thread.currentThread().getName() + ":事务回滚");
}
} catch (Exception e) {
childResponse.add(Boolean.FALSE);
childMonitor.countDown();
System.out.println(Thread.currentThread().getName() + ":出现异常,开始事务回滚");
}
});
}
//主线程等待全部子线程执行response
try {
childMonitor.await();
for (Boolean resp : childResponse) {
if (!resp) {
//若是有一个子线程执行失败了,则改变mainResult,让全部子线程回滚
System.out.println(Thread.currentThread().getName()+":有线程执行失败,标志位设置为false");
IS_OK = false;
break;
}
}
//主线程获取结果成功,让子线程开始根据主线程的结果执行(提交或回滚)
mainMonitor.countDown();
//为了让主线程阻塞,让子线程执行。
Thread.currentThread().join();
} catch (Exception e) {
e.printStackTrace();
}
}
}
`
在全部子线程都正常的状况下,输出结果是这样的:
从结果看,是符合咱们的预期的。
假设有子线程出现了异常,那么运行结果是这样的:
一个线程出现异常,所有线程都进行回滚,这样看来也是符合预期的。
若是你根据前面的需求写出了这样的代码,那么恭喜你,一不留神实现了一个相似于两阶段提交(2PC)的一致性协议。
我前面说的能想到编程式事务,这事基本上就成了一半了。
而另一半,就是两阶段提交(2PC)。
有了前面的瓢,你照着画个葫芦不是很简单的事情吗?
就不大段上代码了,示例代码能够点击这里获取到,因此我这里截个图吧:
上面的代码应该是很是好理解的,开启五个线程,每一个线程插入 10w 条数据。
这个不用说,用脚趾头想也能知道,确定是比一次性批量插入 50w 条数据快的。
至于快多少,不废话了,直接看执行效果吧。
因为咱们的 controller 是这样的:
因此调用连接:
http://127.0.0.1:8081/batchHandle
输出结果以下:
还记得咱们批量插入的耗时吗?
73791ms。
从 73791ms 到 15719ms。快了 58s 的样子。
已经很是不错了。
那么若是是某个线程抛出了异常呢?好比这样:
咱们看看日志输出:
经过日志分析,看起来也是符合要求的。
而从读者反馈的实际测试效果来看,也是很是显著的:
符合要求,只是看起来而已。
经验老道的读者朋友们确定早就看到问题所在了。已经把手举得高高的:老师,这题我知道。
我以前说了,这个实现方式实际上就是编程式事务配合二阶段提交(2PC)使用。
破绽就出在 2PC 上。
就像我和读者讨论这样的:
不能再日后扯了,再日后就是 3PC,TCC,Seata 这一套分布式事务的东西了。
这套东西写下来,就得上万字了。因此我从海神那边转了一篇文章,放在第二条推送里面了。若是你们有兴趣的能够去看一下。干货满满。
其实当咱们把一个个子线程理解为微服务中的一个个子系统的时候,这就是一个分布式事务的场景了。
而咱们拿出来的解决方案,并非一个完美的解决方案。
虽然,从某种角度上,咱们绕开了事务的隔离性,可是有必定几率出现数据一致性问题,虽然几率比较小。
因此我称之为这种方案叫作:基于运气编程,用运气换时间。
关于上面的代码,其实还有几个须要注意的地方。
给你们提个醒。
第一个:启用多少线程进行分配数据插入,这个参数是能够进行调整的。
好比我修改成 10 个线程,每一个线程插入 5w 条数据。那么执行时间又快了 2s:
可是必定记得不是越大越好,同时记得调整数据库链接池的最大链接数。否则白搭。
第二个:正是由于启动多少线程是能够进行调整的,甚至是能够每次进行计算的。
那么必需要注意的一个问题是不能让任何一个任务进入队列里面。一旦进入队列,程序立马就凉。
你想,若是咱们须要开启 5 个子线程,可是核心线程数只有 4 个,有一个任务进入队列了。
那么这 4 个核心线程会一直阻塞住,等待主线程唤醒。
而主线程这个时候在干什么?
在等 5 个线程的运行结果,可是它只能收集到 4 个结果。
因此它会一直等下去。
第三个:这里是多个线程开启了事务在往表里插入数据,谨防数据库死锁。
第四个:注意程序里面的代码,countDown 安装标准写法上是要放到 finally 代码块里面的,我这里为了截图的美观度,省去了这个步骤:
你若是真的要用,得注意一下。并且这个finally你得想清楚了写,不是随便写的。
第五个:我这里只是提供一个思路,并且它也根本不是什么多线程事务。
也再次证实了,多线程事务就是一个伪命题。
因此我给出一个基于运气的伪一致性的回答也不过度吧。
第六个:多线程事务换个角度想,能够理解为分布式事务。,能够借助这个案例去了解分布式事务。可是解决分布式事务的最好的方法就是:不要有分布式事务!
而解决分布式事务的绝大部分落地方案都是:最终一致性。
性价比高,大多数业务上也能接受。
第七个:这个解决方案你要拿到生产用的话,记得先和业务同事沟通好,能不能接受这种状况。速度和安全之间的两难抉择。
同时本身留好人工修数的接口:
才疏学浅,不免会有纰漏,若是你发现了错误的地方,能够在留言区提出来,我对其加以修改。 感谢您的阅读,我坚持原创,十分欢迎并感谢您的关注。
我是 why,一个被代码耽误的文学创做者,不是大佬,可是喜欢分享,是一个又暖又有料的四川好男人。
还有,欢迎关注我呀。