package com.dcx.scala.actor import akka.actor.{Actor, ActorRef, ActorSystem, Props} import scala.collection.mutable.HashMap import scala.collection.mutable.ListBuffer import scala.io.Source /** * 思路: * 要有个Server * 要有个Client去通讯,client统计文本后把(qy,3)输出给Server;Server再把全部的qy聚合,放到ListBuffer中 */ object AkkaWordCount { // 可变长List val list = new ListBuffer[HashMap[String,Int]] def main(args: Array[String]): Unit = { // 输入数据文本 val files: Array[String] = Array("D:\\tmp\\input\\word3.txt", "D:\\tmp\\input\\word3.txt", "D:\\tmp\\input\\word3.txt") //存放接收到的每一个actor处理的结果数据 //存放有actor返回结果的Future数据 //拿ActorSystem是一个静态工厂 val weChatApp = ActorSystem("WeChatApp") //拿到两个Actor的通讯地址 val akkaServerRef: ActorRef = weChatApp.actorOf(Props[AkkaServer],"jianjian1") val clientRef: ActorRef = weChatApp.actorOf(Props(new Client(akkaServerRef)),"jianjian") for (file <- files) { clientRef ! file } // 让该线程先睡一下,过早进入死循环会致使list没有3个,一直循环不出来 Thread.sleep(1000) // 若是list把三个文件都放满了,就退出循环 while(true){ if(list.size == 3){ // 输出list println(list(list.size -1)) return } } } } //把每次聚合后的值都发送给AkkaServer class Client(val serverRef:ActorRef) extends Actor { override def receive: Receive = { { // 偏函数 经常使用做模式匹配 // case filePath: String => { //// map阶段 // val list: List[String] = Source.fromFile(filePath).getLines().toList // val words: List[String] = list.flatMap(_.split(" ")) // val res: Map[String, Int] = words.map((_, 1)).groupBy(_._1).mapValues(_.size) // //异步发送结果数据 res发送到Server,去模式匹配 // serverRef ! res // } case filePath:String => { val list: List[String] = Source.fromFile(filePath).getLines().toList val words: List[String] = list.flatMap(_.split(" ")) // 得出: (qy,3) 格式 val res: Map[String, Int] = words.map((_, 1)).groupBy(_._1).mapValues(_.size) serverRef ! res } } } } import scala.collection.mutable.HashMap class AkkaServer extends Actor { private var hashMap: HashMap[String, Int] = new HashMap[String, Int] override def receive: Receive = { case context: Map[String, Int] =>{ // (qy,3) context.map( (map:(String,Int)) => { // 聚合 val value: Any = hashMap.getOrElse(map._1,None) if(value != None){ hashMap(map._1) = value.asInstanceOf[Int] + map._2 }else{ hashMap(map._1) = map._2 } } ) AkkaWordCount.list += hashMap } } }