Scala的actor提供了一种基于事件的轻量级线程。只要使用scala.actors.Actor伴生对象的actor方法,就能够建立一个actor。它接受一个函数值/闭包作参数,一建立好就开始运行。用!()方法给actor发消息,用receive()方法从actor接收消息。receive()也能够闭包为参数,一般用模式匹配处理接收到的消息。react
Scala的actor提供了一种基于事件的轻量级线程。只要使用scala.actors.Actor伴生对象的actor方法,就能够建立一个actor。它接受一个函数值/闭包作参数,一建立好就开始运行。用!()方法给actor发消息,用receive()方法从actor接收消息。receive()也能够闭包为参数,一般用模式匹配处理接收到的消息。编程
咱们看个例子,假定咱们须要断定一个给定的数是不是彻底数(彻底数是一个正整数,其因子之和是该数的两倍):安全
非并发编程的实现:闭包
def sumOfFactors(number:Int) = { (0/:(1 to number)){(sum, i) => if(number%i == 0) sum+i else sum } } def isPerfect(candidate:Int) = 2*candidate == sumOfFactors(candidate) println("6 is perfect? " + isPerfect(6)) println("33550336 is perfect? " + isPerfect(33550336)) println("33550337 is perfect? " + isPerfect(33550337))
并发编程的实现,将从1到candidate数这个范围内的数划分红多个区间,把每一个区间内求和的任务分配给单独的进程。并发
import scala.actors.Actor._ class FasterPerfectNumberFinder { def sumOfFactorsInRange(lower:Int, upper:Int, number:Int) = { (0/:(lower to upper)){(sum, i) => if(number%i == 0) sum+i else sum } } def isPerfectConcurrent(candidate:Int) = { val RANGE = 1000000 val numberOfPartitions = (candidate.toDouble/RANGE).ceil.toInt val caller = self for(i<-0 until numberOfPartitions){ val lower = i*RANGE + 1 val upper = candidate min(i+1)*RANGE actor { caller ! sumOfFactorsInRange(lower,upper,candidate) } } val sum = (0 /: (0 until numberOfPartitions)){ (partialSum, i) => receive { case sumInRange:Int => partialSum + sumInRange } } 2 * candidate == sum } println("6 is perfect? " + isPerfectConcurrent(6)) println("33550336 is perfect? " + isPerfectConcurrent(33550336)) println("33550337 is perfect? " + isPerfectConcurrent(33550337)) } object FasterPerfectNumberFinder extends App{ new FasterPerfectNumberFinder() }
程序运行结果以下:less
6 is perfect? true 33550336 is perfect? true 33550337 is perfect? false
比较两种方法用时的程序以下:异步
import scala.actors.Actor._ class FindPerfectNumberOverRange { //普通实现 def sumOfFactors(number:Int) = { (0/:(1 to number)){(sum, i) => if(number%i == 0) sum+i else sum } } def isPerfect(candidate:Int) = 2*candidate == sumOfFactors(candidate) //并发实现 def sumOfFactorsInRange(lower:Int, upper:Int, number:Int) = { (0/:(lower to upper)){(sum, i) => if(number%i == 0) sum+i else sum } } def isPerfectConcurrent(candidate:Int) = { val RANGE = 1000000 val numberOfPartitions = (candidate.toDouble/RANGE).ceil.toInt val caller = self for(i<-0 until numberOfPartitions){ val lower = i*RANGE + 1 val upper = candidate min(i+1)*RANGE actor { caller ! sumOfFactorsInRange(lower,upper,candidate) } } val sum = (0 /: (0 until numberOfPartitions)){ (partialSum, i) => receive { case sumInRange:Int => partialSum + sumInRange } } 2 * candidate == sum } //比较时间花费 def countPerfectNumbersInRange(start:Int, end:Int, isPerfectFinder:Int => Boolean)={ val startTime = System.nanoTime() val numberOfPerfectNumbers = (0 /: (start to end)){(count, candidate) => if(isPerfectFinder(candidate)) count + 1 else count } val endTime = System.nanoTime() println("Found " + numberOfPerfectNumbers + " perfect numbers in given range, took " + (endTime-startTime)/1000000000.0 + " secs") } } object FindPerfectNumberOverRange extends App{ val fpn = new FindPerfectNumberOverRange() val startNumber = 33550300 val endNumber = 33550400 fpn.countPerfectNumbersInRange(startNumber, endNumber, fpn.isPerfect) fpn.countPerfectNumbersInRange(startNumber, endNumber, fpn.isPerfectConcurrent) }
程序运行结果以下:ide
Found 1 perfect numbers in given range, took 53.505288657 secs Found 1 perfect numbers in given range, took 35.739131734 secs
下面看一下消息是如何从一个actor传到另外一个actor。函数
import scala.actors.Actor._ class MessagePassing { var startTime : Long = 0 val caller = self val engrossedActor = actor { println("Number of messages received so far? " + mailboxSize) caller ! "send" Thread.sleep(3000) println("Number of messages received while I was busy? " + mailboxSize) receive { case msg => val receivedTime = System.currentTimeMillis() - startTime println("Received message " + msg + "after " + receivedTime + " ms") } caller ! "received" } receive { case _ =>} println("Sending Message ") startTime = System.currentTimeMillis() engrossedActor ! "hello buddy" val endTime = System.currentTimeMillis() - startTime printf("Took less than %dms to send message\n", endTime) receive { case _ => } } object MessagePassing extends App { new MessagePassing() }
程序运行结果以下:oop
Number of messages received so far? 0 Sending Message Took less than 0ms to send message Number of messages received while I was busy? 0 Received message hello buddyafter 2997 ms
从输出能够看出,发送不阻塞,接收不中断。在actor调用receive()方法接收以前,消息会一直等在那里。
异步地发送和接收消息是一项好的实践——能够最大限度的利用并发。不过,若是对同步的发送消息和接收响应有兴趣,能够用!?()方法。在接收发消息的目标actor给出响应以前,她会一直阻塞在那里。这会引发潜在的死锁。一个已经失败的actor会致使其余actor的失败,而后就轮到应用失败了。因此,即使要用这个方法,至少要用有超时参数的变体,像这样:
package com.cn.gao import scala.actors._ import Actor._ class AskFortune { val fortuneTeller = actor { for(i <- 1 to 4) { Thread.sleep(1000); receive { case _ => sender ! "your day will rock! "+ i //case _ => reply("your day will rock! " + i) // same as above } } } println(fortuneTeller !? (2000, "what's ahead")) println(fortuneTeller !? (500, "what's ahead")) val aPrinter = actor { receive { case msg => println("Ah, fortune message for you-"+ msg)} } fortuneTeller.send("What's up", aPrinter) fortuneTeller ! "How's my future?" Thread.sleep(3000) receive{ case msg : String => println("Received "+ msg)} println("Let's get that lost message") receive { case !(channel,msg) => println("Received belated message "+ msg)} } object AskFortune extends App{ new AskFortune() }
在超时以前,若是actor发送回消息,!?()方法就会返回结果。不然,它会返回None,因此,这个方法的返回类型是Option[Any]。在上面的代码中,sender所引用的是最近一个发送消息的actor。程序运行结果以下:
Some(your day will rock! 1) None Ah, fortune message for you-your day will rock! 3 Received your day will rock! 4 Let's get that lost message Received belated message your day will rock! 2
若是想在actor启动时进行显式控制,但愿在actor里存入更多信息,能够建立一个对象,混入Actor trait。这是对的——Scala的Actor只是个trait,能够在任何喜欢的地方混入它。下面是个例子:
AnsweringService.scala
package com.cn.gao import scala.actors._ import Actor._ class AnsweringService(val folks:String*) extends Actor { def act(){ while(true){ receive{ case(caller: Actor, name:String, msg:String) => caller ! ( if(folks.contains(name)) String.format("Hey it's %s got message %s", name, msg) else String.format("Hey there's no one with the name %s here",name) ) case "ping" => println("ping!") case "quit" => println("existing actor") exit } } } } object AnsweringService extends App{ val answeringService1 = new AnsweringService("Sara", "Kara", "John") answeringService1 ! (self, "Sara", "In town") answeringService1 ! (self, "Kara", "Go shopping?") answeringService1.start() answeringService1 ! (self, "John", "Bug fixed?") answeringService1 ! (self, "Bill", "What's up") for(i <- 1 to 4) { receive { case msg => println(msg)}} answeringService1 ! "ping" answeringService1 ! "quit" answeringService1 ! "ping" Thread.sleep(2000) println("The last ping was not processed") }
程序运行结果以下:
Hey it's Sara got message In town Hey it's Kara got message Go shopping? Hey it's John got message Bug fixed? Hey there's no one with the name Bill here ping! existing actor The last ping was not processed
开始,咱们给actor发送了一些元组消息。这些消息不会当即获得处理,由于actor尚未启动。它们会进入队列,等待后续处理。而后调用start()方法,再发送一些消息。只要调用了start()方法,就会有一个单独的线程调用actor的act()方法。这时,曾经发出去的全部消息都开始进行处理。而后,咱们循环接收对方发出的四条消息的应答。
调用exit()方法能够中止actor。不过这个方法只是抛出异常,试图终止当前线程的执行,因此,在act()方法里调用挺不错。
若是对显式启动actor并不真的那么关注,那么可使用actor()方法。在actor间传递数据,能够用!()和receive()方法。下面从一个使用actor方法的例子开始,而后重构,使其并发。
这个方法isPrime()告诉咱们给定的数是否是素数。为了达到说明的目的,我在方法里加了一些打印语句:
package com.cn.gao import scala.actors._ import Actor._ class PrimeTeller { def isPrime(number: Int) = { println("Going to find if " + number + " is prime") var result = true if(number == 2 || number == 3) result = true for(i <- 2 to Math.sqrt(number.toDouble).floor.toInt;if result){ if(number % i == 0) result = false } println("done finding if " + number + " is prime") result } }
调用上面这段代码的话,接收到应答以前,就会阻塞在那里。以下所示,这里把调用这个方法的职责委托给一个actor。这个actor会肯定一个数是不是素数,而后,用一个异步响应发回给调用者。
package com.cn.gao import scala.actors._ import Actor._ object PrimeTeller extends App { def isPrime(number: Int) = { println("Going to find if " + number + " is prime") var result = true if(number == 2 || number == 3) result = true for(i <- 2 to Math.sqrt(number.toDouble).floor.toInt;if result){ if(number % i == 0) result = false } println("done finding if " + number + " is prime") result } val primeTeller = actor{ var continue = true while(continue){ receive { case (caller: Actor, number:Int) => caller ! (number, isPrime(number)) case "quit" => continue = false } } } primeTeller ! (self, 2) primeTeller ! (self, 131) primeTeller ! (self, 132) for(i<- 1 to 3){ receive { case (number, result) => println(number + "is prime? " + result) } } primeTeller ! "quit" }
primeTeller是一个引用,它指向了用actor()方法建立的一个匿名actor。它会不断循环,直到接收到“quit”消息。除了退出消息,它还能接收一个包含caller和number的元组。收到这个消息时,它会判断给定的数是不是素数,而后,给caller发回一个消息。
程序运行结果以下:
Going to find if 2 is prime done finding if 2 is prime Going to find if 131 is prime 2is prime? true done finding if 131 is prime Going to find if 132 is prime 131is prime? true done finding if 132 is prime 132is prime? false
上面的代码处理了接收到的每一个数字;从输出能够看到这一点。在actor忙于判断一个数是不是素数时,若是又接收到多个请求,它们就会进入队列。所以,即使是将执行委托给了actor,它依然是顺序的。
让这个例子并行至关容易,在PrimeTeller actor的第6行,不要去调用isPrime(),而是把这个职责委托给另外一个actor,让它给调用者回复应答,程序以下:
package com.cn.gao import scala.actors._ import Actor._ object PrimeTeller extends App { def isPrime(number: Int) = { println("Going to find if " + number + " is prime") var result = true if(number == 2 || number == 3) result = true for(i <- 2 to Math.sqrt(number.toDouble).floor.toInt;if result){ if(number % i == 0) result = false } println("done finding if " + number + " is prime") result } val primeTeller = actor{ var continue = true while(continue){ receive { // case (caller: Actor, number:Int) => caller ! (number, // isPrime(number)) case (caller: Actor, number:Int) => actor {caller ! (number, isPrime(number))} case "quit" => continue = false } } } primeTeller ! (self, 2) primeTeller ! (self, 131) primeTeller ! (self, 132) for(i<- 1 to 3){ receive { case (number, result) => println(number + "is prime? " + result) } } primeTeller ! "quit" }
再次运行上面的代码,咱们会看到,多个请求并发地执行了,以下所示:
Going to find if 2 is prime Going to find if 131 is prime Going to find if 132 is prime done finding if 132 is prime done finding if 2 is prime done finding if 131 is prime 132is prime? false 131is prime? true 2is prime? true
receive()接收一个函数值/闭包,返回一个处理消息的应答。下面是个从receive()方法接收结果的例子:
package com.cn.gao import scala.actors.Actor._ object Receive extends App { val caller = self val accumulator = actor { var sum = 0 var continue = true while(continue) { sum += receive { case number:Int => number case "quit" => continue = false 0 } } caller ! sum } accumulator ! 1 accumulator ! 7 accumulator ! 8 accumulator ! "quit" receive{case result => println("Total is " + result)} }
accumulator接收数字,对传给它的数字求和。完成以后,它会发回一个消息,带有求和的结果。上面代码的输出以下:
Total is 16
调用receive()方法会形成程序阻塞,直到实际接收到应答为止。若是预期的actor应答一直没有发过来就麻烦了。这会让咱们一直等下去。用receiveWithin()方法修正这一点,它会接收一个timeout参数,以下:
package com.cn.gao import scala.actors._ import scala.actors.Actor._ object ReceiveWithin extends App { val caller = self val accumulator = actor { var sum = 0 var continue = true while(continue) { sum += receiveWithin(1000) { case number:Int => number case TIMEOUT => println("Time out! Will return result now") continue = false 0 } } caller ! sum } accumulator ! 1 accumulator ! 7 accumulator ! 8 receiveWithin(2000) { case result => println("Total is " + result) } }
在给定的超时期限内,若是什么都没有收到,receiveWithin()方法会收到一个TIMEOUT消息。若是不对其进行模式匹配,就会抛出异常。在上面的代码里,接收到TIMEOUT消息当作了完成值累加的信号。输出以下:
Time out! Will return result now Total is 16
咱们应该倾向于使用receiveWithin()方法而非receive()方法,避免产生活性等待问题。
recevie()和receiveWithin()方法把函数值看成偏应用函数,调用代码块以前,会检查它是否处理消息。因此,若是接收到一个非预期的消息,就会悄悄地忽略它。固然,若是想把忽略的消息显示出来,能够提供一个case_=>...语句。下面这个例子展现了忽略的无效消息:
package com.cn.gao import scala.actors._ import Actor._ object MessageIgnore extends App{ val expectStringOrInteger = actor { for(i <- 1 to 4) { receiveWithin(1000) { case str: String =>println("You said " + str) case num: Int => println("You gave " + num) case TIMEOUT => println("Time out!") } } } expectStringOrInteger ! "only constant is change" expectStringOrInteger ! 1024 expectStringOrInteger ! 22.22 expectStringOrInteger ! (self, 1024) receiveWithin(3000){case _ => } }
在代码最后,放了一个receiveWithin()的调用。由于主线程退出时,程序就退出了,这个语句保证程序还活动着,给actor一个应答的机会。从输出中能够看出,actor处理了前两个发送给它的消息,忽略了后两个,由于它们没有匹配上预期的消息模式。程序最终会超时,由于没有再接收到任何能够匹配的消息。输出结果以下:
You said only constant is change You gave 1024 Time out! Time out!
在每一个actor里,调用receive()的时候实际上会要求有一个单独的线程。这个线程会一直持有,直到这个actor结束。也就是说,即使是在等待消息到达,程序也会持有这些线程,每一个actor一个,这绝对是一种资源浪费。Scala不得不持有这些线程的缘由在于,控制流的执行过程当中有一些具体状态。若是在调用序列里没有须要保持和返回的状态,Scala几乎就能够从线程池里获取任意线程执行消息处理——这偏偏就是使用react()所作的事情。react()不一样于receive(),它并不返回任何结果。实际上,它并不从调用中返回。
若是处理了react()的当前消息后,还要处理更多的消息,就要在消息处理的末尾调用其余方法。Scala会把这个调用执行交给线程池里的任意线程。看一个这种行为的例子:
package com.cn.gao import scala.actors.Actor._ import scala.actors._ object React extends App { def info(msg:String) = println(msg + " received by " + Thread.currentThread()) def receiveMessage(id:Int) { for(i <- 1 to 2) { receiveWithin(20000) { case msg:String => info("receive: " + id + msg) case TIMEOUT => } } } def reactMessage(id:Int){ react { case msg:String => info("react: " + id + msg) reactMessage(id) } } val actors = Array ( actor {info("react: 1 actor created"); reactMessage(1)}, actor {info("react: 2 actor created"); reactMessage(2)}, actor {info("receive: 3 actor created"); receiveMessage(3)}, actor {info("receive: 4 actor created"); receiveMessage(4)} ) Thread.sleep(1000) for(i <- 0 to 3){actors(i) ! " hello"; Thread.sleep(2000)} Thread.sleep(2000) for(i <- 0 to 3){actors(i) ! " hello"; Thread.sleep(2000)} }
上面的代码输出结果以下:
react: 1 actor created received by Thread[ForkJoinPool-1-worker-5,5,main] react: 2 actor created received by Thread[ForkJoinPool-1-worker-3,5,main] receive: 3 actor created received by Thread[ForkJoinPool-1-worker-1,5,main] receive: 4 actor created received by Thread[ForkJoinPool-1-worker-7,5,main] react: 1 hello received by Thread[ForkJoinPool-1-worker-3,5,main] react: 2 hello received by Thread[ForkJoinPool-1-worker-3,5,main] receive: 3 hello received by Thread[ForkJoinPool-1-worker-1,5,main] receive: 4 hello received by Thread[ForkJoinPool-1-worker-7,5,main] react: 1 hello received by Thread[ForkJoinPool-1-worker-5,5,main] react: 2 hello received by Thread[ForkJoinPool-1-worker-5,5,main] receive: 3 hello received by Thread[ForkJoinPool-1-worker-1,5,main] receive: 4 hello received by Thread[ForkJoinPool-1-worker-7,5,main]
使用receiveWithin()方法的actor具备线程关联性(thread affinity);他们会持续的使用分配给他们的同一个线程。从上面的输出中就能够看出。
另外一方面,使用react()的actor能够自由的交换彼此的线程,能够由任何可用的线程处理。
换句话说,使用react()的actor不具备线程关联性,它们会放弃本身的线程,用一个新的线程(或许是同一个)进行后续的消息处理。这种作法对资源更为友善,特别是在消息处理至关快的状况下。因此,咱们鼓励使用react()来代替receive()。
相似于receiveWithin(),若是在超时时段里,没有接到任何消息,reactWithin()就会超时——在这种状况下,若是处理case TIMEOUT,能够采起任何想采起的行动,也能够从方法里退出。下面是一个使用reactWithin()的例子,尝试一下以前使用receiveWithin()实现累加器的例子,此次用reactWithin()方法:
package com.cn.gao import scala.actors._ import scala.actors.Actor._ object ReactWithin extends App { val caller = self def accumulate(sum:Int) { reactWithin(500){ case number:Int => accumulate(sum + number) case TIMEOUT => println("Timed out! Will send result now") caller ! sum } println("This will not be called...") } val accumulator = actor {accumulate(0)} accumulator ! 1 accumulator ! 7 accumulator ! 8 receiveWithin(10000) { case result => println("Total is " + result) } }
上面的代码输出以下:
Timed out! Will send result now Total is 16
同使用receiveWithin()的方案比起来,这个方案更加优雅,等待接收消息时,它并不持有任何线程。
关于react()和reactWithin(),最后要记住的一点是,由于这两个方法并非真的从调用里返回(记住,Scala内部经过让这些方法抛出异常来处理这个问题),放在这些方法后的任何代码都不会执行(好比在accumulate()方法末尾加上打印语句)。因此,在调用这两个方法以后,不要写任何东西。
有两件事情阻碍咱们充分使用react()和reactWithin()。第一个是递归调用。若是有多个case语句,典型状况下,要在每一个case里面重复调用。第二,彷佛没有什么好的方式跳出方法。第一个顾虑的答案是单例对象Actor的loop()方法。第二个的答案是loopWhile()方法。
相比于在reactWithin()里递归的调用方法,能够在loop()调用里放一个对reactWithin()的调用。执行loop()方法的线程遇到reactWithin()的调用时,会放弃控制。消息到达时,任意的线程均可以继续执行适当的case语句。case语句执行完毕,线程会继续回到loop()块的顶部。这会一直继续下去。loopWhile()方法是相似的,可是只有提供的参数是有效的,它才会继续循环下去。由于loopWhile()负责处理循环,因此,能够把局部状态放到循环以外,在reactWithin()方法里访问它。这样的话,就给了咱们一个一箭双鵰的选择,既能够像receiveWithin()那样处理状态,又能够像reactWithin()那样利用来自线程池的线程。下面看一个在loopWhile()里使用reactWithin()的例子。
package com.cn.gao import scala.actors._ import Actor._ object Loop extends App { val caller = self val accumulator = actor { var continue = true var sum = 0 loopWhile(continue){ reactWithin(500){ case number:Int => sum += number case TIMEOUT => continue = false caller ! sum } } } accumulator ! 1 accumulator ! 7 accumulator ! 8 receiveWithin(1000){case result => println("Total is " + result)} }
上面的代码没有任何递归调用——这是由loopWhile()处理的。在退出消息处理的地方,只需简单的设置标记,由它处理退出循环,进而退出actor执行。代码输出以下:
Total is 16
咱们已经见识到了,使用receive时,每一个actor是怎样运行在本身的线程里,react又如何让actor共享来自线程池的线程。不过,有时咱们会想要更强的控制力。好比,结束一个长期运行的任务以后,须要更新UI,这时须要在一个单独的线程里运行任务,而后,在主线程里更新UI。(由于UI组件时常不是线程安全的。)经过使用SingleThreadScheduler,可让Scala在主线程里运行actor。咱们用个例子看看如何作到这点:
package com.cn.gao import scala.actors._ import scala.actors.scheduler._ import Actor._ object InMainThread { def main(args:Array[String]){ if (args.length > 0 && args(0)== "Single") { println("Command-line argument Single found") Scheduler.impl = new SingleThreadedScheduler() } println("Main running in " + Thread.currentThread()) actor {println("Actor1 running in " + Thread.currentThread())} actor {println("Actor2 running in " + Thread.currentThread())} receiveWithin(3000){case _ => } } }
上面的代码里,建立了两个actor。若是不传任何命令行参数,两个actor的代码和主脚本的代码会运行在各自的线程里,输出以下:
Main running in Thread[main,5,main] Actor1 running in Thread[ForkJoinPool-1-worker-5,5,main] Actor2 running in Thread[ForkJoinPool-1-worker-5,5,main]
另外一方面,若是像scala InMainThread.scala Single 这样运行以前的代码,会获得不一样的结果:
Command-line argument Single found Main running in Thread[main,5,main] Actor1 running in Thread[main,5,main] Actor2 running in Thread[main,5,main]
不管actor什么时候启动,Scala都会让单例对象Scheduler去运行它。经过是设置Scheduler的impl,就能够控制整个应用的actor调度策略。
上面的方式影响深远,它让咱们能够控制全部的actor的调度。不过,也许咱们想要让一些线程运行在主线程中,而其它actor运行在各自线程里。经过继承Actor trait,改写scheduler()方法,就能够作到这一点。默认状况下,这个方法为要调度的actor返回单例对象Scheduler。改写这个方法就能够控制调度单独的actor的方式,以下所示:
package com.cn.gao import scala.actors._ import scala.actors.scheduler._ import Actor._ object InMainThreadSelective extends App { trait SingleThreadActor extends Actor { override protected def scheduler() = new SingleThreadedScheduler() } class MyActor1 extends Actor { def act() = println("Actor1 running in " + Thread.currentThread()) } class MyActor2 extends SingleThreadActor { def act() = println("Actor2 running in " + Thread.currentThread()) } println("Main running in " + Thread.currentThread()) new MyActor1().start() new MyActor2().start() actor{println("Actor 3 running in " + Thread.currentThread())} receiveWithin(5000){case _ => } }
上面的代码建立了三个actor,其中,两个继承自Actor trait,一个使用了更为常规的actor()方法。经过改写protected方法scheduler,就能够控制MyActor2的线程。运行上述代码时,使用actor()和MyActor1建立的actor运行于本身的线程。而使用MyActor2建立的actor则运行于主线程,以下所示:
Main running in Thread[main,5,main] Actor2 running in Thread[main,5,main] Actor1 running in Thread[ForkJoinPool-1-worker-5,5,main] Actor 3 running in Thread[ForkJoinPool-1-worker-3,5,main]