一、文本文件test.txtjava
二、建立Scala项目AkkaScalaWordCount项目实现词频统计数组
(1)建立AKKAUtils类,提供获取akka配置的函数app
package net.hw.akka.wc
import java.util.HashMap
import java.util.ArrayList
/**
* Created by howard on 2017/8/27.
*/
object AKKAUtils {
def getConf(ip: String, port: String): HashMap[String, Object] = {
val conf = new HashMap[String, Object]()
val list = new ArrayList[String]()
list.add("akka.remote.netty.tcp")
conf.put("akka.remote.enabled-transports", list)
conf.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider")
conf.put("akka.remote.netty.tcp.hostname", ip)
conf.put("akka.remote.netty.tcp.port", port)
return conf
}
}
(2)建立WcInfo1,封装从WcDriver发往WcMapper的数据
package net.hw.akka.wc
/**
* Created by howard on 2017/8/27.
*/
case class WcInfo1(data: String, mapFunc: String => Array[(String, Int)],
reduceFunc: Array[(String, Int)] => Map[String, Int]) {
val datax = data
val mapFuncx = mapFunc
val reduceFuncx = reduceFunc
}
WcMapper接收字符串data,调用mapFunc进行处理,返回的是tuple数组arr: Array[(String, Int),因而,WcReducer接收的参数就是arr: Array[(String, Int)。
(3)建立WcInfo2,封装从WcDriver发往WcMapper的数据
package net.hw.akka.wc
/**
* Created by howard on 2017/8/27.
*/
case class WcInfo2(arr: Array[(String, Int)],
reduceFunc: Array[(String, Int)] => Map[String, Int]) {
val arrx = arr
val reduceFuncx = reduceFunc
}
(4)建立WcDriver
package net.hw.akka.wc
import java.util.Scanner
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
/**
* Created by howard on 2017/8/27.
*/
object WcDriver {
def main(args: Array[String]): Unit = {
val sys = ActorSystem("myAkkaClientSys", ConfigFactory.parseMap(AKKAUtils.getConf("127.0.0.1", "44444")))
val scan = new Scanner(System.in)
while (true) {
val data = scan.nextLine();
val mapFunc = (line: String) => {
val arr = line.split(" ")
arr.map((_, 1))
}
val reduceFunc = (arr: Array[(String, Int)]) => {
arr.groupBy(_._1).mapValues(_.map(_._2)).mapValues(_.reduce(_ + _))
}
sys.actorSelection("akka.tcp://myAkkaServerSys@127.0.0.1:44443/user/mapperActor") ! new WcInfo1(data, mapFunc, reduceFunc);
}
}
}
WcDriver获取了行信息以后,定义了两个函数。而后定义本身的ActorSystem对象sys,监听本机的44444端口,而后往本机44443端口的mapperActor发送信息WcInfo1对象,封装了读取的行数据,以及对数据进行处理的两个函数。
(5)建立WcMapper
package net.hw.akka.wc
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
/**
* Created by howard on 2017/8/27.
*/
object WcMapper {
class MapperActor extends Actor {
def receive = {
case wif1: WcInfo1 => {
val line = wif1.data
val mapFunc = wif1.mapFunc
val reduceFunc = wif1.reduceFunc
val arr = mapFunc(line)
context.actorSelection("akka.tcp://myAkkaServerSys@127.0.0.1:44442/user/reducerActor") ! new WcInfo2(arr, reduceFunc)
}
}
}
def main(args: Array[String]): Unit = {
val sys = ActorSystem("myAkkaServerSys",
ConfigFactory.parseMap(AKKAUtils.getConf("127.0.0.1", "44443")));
sys.actorOf(Props[MapperActor], "mapperActor")
}
}
(6)建立WcReducer
package net.hw.akka.wc
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
/**
* Created by howard on 2017/8/27.
*/
object WcReducer {
class ReducerActor extends Actor {
def receive = {
case wif2: WcInfo2 => {
val arr = wif2.arr
val reduceFunc = wif2.reduceFunc
val map = reduceFunc(arr)
println("统计结果:")
map.foreach(println)
}
}
}
def main(args: Array[String]): Unit = {
val sys = ActorSystem("myAkkaServerSys",
ConfigFactory.parseMap(AKKAUtils.getConf("127.0.0.1", "44442")))
sys.actorOf(Props[ReducerActor], "reducerActor")
}
}
按顺序启动WcReducer、WcMapper、WcDriver,看WcReducer控制台的统计结果:
本文分享 CSDN - howard2005。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。tcp