本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!前端
面试的时候常常会被问及多线程同步的问题,例如:java
“ 现有 Task一、Task2 等多个并行任务,如何等待所有执行完成后,执行 Task3。”web
在 Kotlin 中咱们有多种实现方式,本文将全部这些方式作了整理,建议收藏。面试
1. Thread.join
2. Synchronized
3. ReentrantLock
4. BlockingQueue
5. CountDownLatch
6. CyclicBarrier
7. CAS
8. Future
9. CompletableFuture
10. Rxjava
11. Coroutine
12. Flow编程
咱们先定义三个Task,模拟上述场景, Task3 基于 Task一、Task2 返回的结果拼接字符串,每一个 Task 经过 sleep 模拟耗时: 后端
val task1: () -> String = {
sleep(2000)
"Hello".also { println("task1 finished: $it") }
}
val task2: () -> String = {
sleep(2000)
"World".also { println("task2 finished: $it") }
}
val task3: (String, String) -> String = { p1, p2 ->
sleep(2000)
"$p1 $p2".also { println("task3 finished: $it") }
}
复制代码
Kotlin 兼容 Java,Java 的全部线程工具默认均可以使用。其中最简单的线程同步方式就是使用 Thread
的 join()
:api
@Test
fun test_join() {
lateinit var s1: String
lateinit var s2: String
val t1 = Thread { s1 = task1() }
val t2 = Thread { s2 = task2() }
t1.start()
t2.start()
t1.join()
t2.join()
task3(s1, s2)
}
复制代码
使用 synchronized
锁进行同步安全
@Test
fun test_synchrnoized() {
lateinit var s1: String
lateinit var s2: String
Thread {
synchronized(Unit) {
s1 = task1()
}
}.start()
s2 = task2()
synchronized(Unit) {
task3(s1, s2)
}
}
复制代码
可是若是超过三个任务,使用 synchrnoized
这种写法就比较别扭了,为了同步多个并行任务的结果须要声明n个锁,并嵌套n个 synchronized
。markdown
ReentrantLock
是 JUC 提供的线程锁,能够替换 synchronized 的使用多线程
@Test
fun test_ReentrantLock() {
lateinit var s1: String
lateinit var s2: String
val lock = ReentrantLock()
Thread {
lock.lock()
s1 = task1()
lock.unlock()
}.start()
s2 = task2()
lock.lock()
task3(s1, s2)
lock.unlock()
}
复制代码
ReentrantLock 的好处是,当有多个并行任务时是不会出现嵌套 synchrnoized
的问题,但仍然须要建立多个 lock 管理不一样的任务,
阻塞队列内部也是经过 Lock 实现的,因此也能够达到同步锁的效果
@Test
fun test_blockingQueue() {
lateinit var s1: String
lateinit var s2: String
val queue = SynchronousQueue<Unit>()
Thread {
s1 = task1()
queue.put(Unit)
}.start()
s2 = task2()
queue.take()
task3(s1, s2)
}
复制代码
固然,阻塞队列更可能是使用在生产/消费场景中的同步。
JUC 中的锁大都基于 AQS
实现的,能够分为独享锁和共享锁。ReentrantLock
就是一种独享锁。相比之下,共享锁更适合本场景。 例如 CountDownLatch
,它可让一个线程一直处于阻塞状态,直到其余线程的执行所有完成:
@Test
fun test_countdownlatch() {
lateinit var s1: String
lateinit var s2: String
val cd = CountDownLatch(2)
Thread() {
s1 = task1()
cd.countDown()
}.start()
Thread() {
s2 = task2()
cd.countDown()
}.start()
cd.await()
task3(s1, s2)
}
复制代码
共享锁的好处是没必要为了每一个任务都建立单独的锁,即便再多并行任务写起来也很轻松
CyclicBarrier
是 JUC 提供的另外一种共享锁机制,它可让一组线程到达一个同步点后再一块儿继续运行,其中任意一个线程未达到同步点,其余已到达的线程均会被阻塞。
与 CountDownLatch
的区别在于 CountDownLatch
是一次性的,而 CyclicBarrier
能够被重置后重复使用,这也正是 Cyclic
的命名由来,能够循环使用
@Test
fun test_CyclicBarrier() {
lateinit var s1: String
lateinit var s2: String
val cb = CyclicBarrier(3)
Thread {
s1 = task1()
cb.await()
}.start()
Thread() {
s2 = task1()
cb.await()
}.start()
cb.await()
task3(s1, s2)
}
复制代码
AQS 内部经过自旋锁实现同步,自旋锁的本质是利用 CompareAndSwap
避免线程阻塞的开销。 所以,咱们可使用基于 CAS 的原子类计数,达到实现无锁操做的目的。
@Test
fun test_cas() {
lateinit var s1: String
lateinit var s2: String
val cas = AtomicInteger(2)
Thread {
s1 = task1()
cas.getAndDecrement()
}.start()
Thread {
s2 = task2()
cas.getAndDecrement()
}.start()
while (cas.get() != 0) {}
task3(s1, s2)
}
复制代码
while
循环空转看起来有些浪费资源,可是自旋锁的本质就是这样,因此 CAS 仅仅适用于一些cpu密集型的短任务同步。
看到 CAS 的无锁实现,也许不少人会想到 volatile
, 是否也能实现无锁的线程安全?
@Test
fun test_Volatile() {
lateinit var s1: String
lateinit var s2: String
Thread {
s1 = task1()
cnt--
}.start()
Thread {
s2 = task2()
cnt--
}.start()
while (cnt != 0) {
}
task3(s1, s2)
}
复制代码
注意,这种写法是错误的 volatile
能保证可见性,可是不能保证原子性,cnt--
并不是线程安全,须要加锁操做
上面不管有锁操做仍是无锁操做,都须要定义两个变量s1
、s2
记录结果很是不方便。 Java 1.5 开始,提供了 Callable
和 Future
,能够在任务执行结束时返回结果。
@Test
fun test_future() {
val future1 = FutureTask(Callable(task1))
val future2 = FutureTask(Callable(task2))
Executors.newCachedThreadPool().execute(future1)
Executors.newCachedThreadPool().execute(future2)
task3(future1.get(), future2.get())
}
复制代码
经过 future.get()
,能够同步等待结果返回,写起来很是方便
future.get()
虽然方便,可是会阻塞线程。 Java 8 中引入了 CompletableFuture
,他实现了 Future 接口的同时实现了 CompletionStage
接口。 CompletableFuture
能够针对多个 CompletionStage
进行逻辑组合、实现复杂的异步编程。 这些逻辑组合的方法以回调的形式避免了线程阻塞:
@Test
fun test_CompletableFuture() {
CompletableFuture.supplyAsync(task1)
.thenCombine(CompletableFuture.supplyAsync(task2)) { p1, p2 ->
task3(p1, p2)
}.join()
}
复制代码
RxJava
提供的各类操做符以及线程切换能力一样能够帮助咱们实现需求: zip
操做符能够组合两个 Observable
的结果;subscribeOn
用来启动异步任务
@Test
fun test_Rxjava() {
Observable.zip(
Observable.fromCallable(Callable(task1))
.subscribeOn(Schedulers.newThread()),
Observable.fromCallable(Callable(task2))
.subscribeOn(Schedulers.newThread()),
BiFunction(task3)
).test().awaitTerminalEvent()
}
复制代码
前面讲了那么多,其实都是 Java 的工具。 Coroutine
终于算得上是 Kotlin 特有的工具了:
@Test
fun test_coroutine() {
runBlocking {
val c1 = async(Dispatchers.IO) {
task1()
}
val c2 = async(Dispatchers.IO) {
task2()
}
task3(c1.await(), c2.await())
}
}
复制代码
写起来特别舒服,能够说是集前面各种工具的优势于一身。
Flow
就是 Coroutine 版的 RxJava,具有不少 RxJava 的操做符,例如 zip
:
@Test
fun test_flow() {
val flow1 = flow<String> { emit(task1()) }
val flow2 = flow<String> { emit(task2()) }
runBlocking {
flow1.zip(flow2) { t1, t2 ->
task3(t1, t2)
}.flowOn(Dispatchers.IO)
.collect()
}
}
复制代码
flowOn
使得 Task 在异步计算并发射结果。
上面这么多方式,就像茴香豆的“茴”字的四种写法,不必都掌握。做为结论,在 Kotlin 上最好用的线程同步方案首推协程!