第十五章 客户信息管理系统15.1 项目的开发流程15.2 项目的需求分析15.3 项目的界面15.4 项目的设计-程序框架图15.5 项目的功能实现15.5.1 完成 Customer 类15.5.2 完成显示主菜单和退出软件功能15.5.3 完成显示客户列表的功能15.5.4 完成添加客户的功能15.5.5 完成删除客户的功能15.5.6 完善退出确认功能15.5.7 完善删除确认功能15.5.8 完成修改客户的功能第十六章 并发编程模型 Akka16.1 Akka 的介绍16.2 Actor 模型用于解决什么问题16.3 Akka 中 Actor 模型详解16.4 Actor 模型工做机制说明16.5 Actor 模型应用实例16.5.1 Actor 自我通信16.5.2 Actor 之间通信16.7 Akka 网络编程16.7.1 Akka 网络编程基本介绍16.7.2 协议(tcp/ip)16.7.3 OSI 与 Tcp/ip 参考模型16.7.4 ip 地址16.7.5 端口(port)16.8 Akka 网络编程-小黄鸡客服案例16.8.1 需求分析 + 界面设计16.8.2 程序框架图16.8.3 功能实现16.9 Akka 网络编程-Spark Master Worker 进程通信项目16.9.1 项目意义16.9.2 项目需求分析16.9.3 项目界面设计16.9.4 实现功能 1-Worker 完成注册16.9.5 实现功能 2-Worker 定时发送心跳16.9.6 实现功能 3-Master 启动定时任务,定时检测注册的 Worker16.9.7 实现功能 4-Master,Worker 的启动参数运行时指定16.9.8 Master Worker 进行分布式部署css
模拟实现基于文本界面的《客户信息管理软件》。
该软件 scala 可以实现对客户对象的插入、修改、删除、显示、查询(用 ArrayBuffer 或者 ListBuffer 实现),并可以打印客户明细表。前端
主界面java
程序框架图:设计系统有多少个文件,以及文件之间的调用关系,能够帮助程序员实现模块的设计(清晰),便于程序员之间对项目交流分析。【业务优化,设计方案】程序员
根据需求文档或者页面,写出 Customer 类
Customer.scalaapache
package com.atguigu.chapter15.customercrm.bean
class Customer {
// 属性
var id: Int = _
var name: String = _
var gender: Char = _
var age: Short = _
var tel: String = _
var email: String = _
// 辅助构造器
def this(id: Int, name: String, gender: Char, age: Short, tel: String, email: String) {
this
this.id = id
this.name = name
this.gender = gender
this.age = age
this.tel = tel
this.email = email
}
}
CustomerView.scala 功能分析:
1. 将主菜单的显示放入到 while
2. 用户能够根据输入,选择本身的操做
3. 若是输入5退出
CustomerView.scala编程
package com.atguigu.chapter15.customercrm.view
import scala.io.StdIn
class CustomerView {
// 定义一个循环变量,控制是否退出
var loop = true
// 定义一个 key 用于接收用户输入的选项
var key = ' '
def mainMenu(): Unit = {
do {
println("-----------------客户信息管理软件-----------------")
println(" 1 添 加 客 户 ")
println(" 2 修 改 客 户 ")
println(" 3 删 除 客 户 ")
println(" 4 客 户 列 表 ")
println(" 5 退 出 ")
println(" 请选择(1-5): ")
key = StdIn.readChar()
key match {
case '1' => println("添 加 客 户")
case '2' => println("修 改 客 户")
case '3' => println("删 除 客 户")
case '4' => println("客 户 列 表")
case '5' => this.loop = false
}
} while (loop)
println("你退出了系统...")
}
}
示例代码以下:服务器
package com.atguigu.chapter15.customercrm.app
import com.atguigu.chapter15.customercrm.view.CustomerView
object CustomerCrm {
def main(args: Array[String]): Unit = {
new CustomerView().mainMenu()
}
}
CustomerView.scala 功能分析:
1. 接收4,显示客户列表
2. 调用 CustomerService 的方法 list
3. 须要一个 CustomerService 对象(属性)
CustomerService.sacla 功能分析:
1. 编写一个方法 list,返回当前系统有哪些客户
2. 客户放在哪?--> 内存 --> 可变集合 --> ArrayBuffer
一、在 Customer.sacla 中重写 toString 方法网络
override def toString: String = {
this.id + "\t\t" + this.name + "\t\t" + this.gender + "\t\t" + this.age + "\t\t" + this.tel + "\t\t" + this.email
}
二、在 CustomerService.scala 中编写一个方法 list,返回当前系统有哪些客户并发
class CustomerService {
// customers 是存放客户用的,为了方便测试,咱们先进行初始化
val customers = ArrayBuffer(new Customer(1, "tom", '男', 20, "110", "tom@sohu.com"))
// 查询客户列表的方法
def list(): ArrayBuffer[Customer] = {
this.customers
}
}
三、在 CustomerView.scala 中 调用 CustomerService 的方法 listapp
val customerService = new CustomerService()
/*
---------------------------客户列表---------------------------
编号 姓名 性别 年龄 电话 邮箱
1 张三 男 30 010-56253825 abc@email.com
2 李四 女 23 010-56253825 lisi@ibm.com
3 王芳 女 26 010-56253825 wang@163.com
-------------------------客户列表完成-------------------------
*/
def list(): Unit = {
println()
println("---------------------------客户列表---------------------------")
println("编号\t\t姓名\t\t性别\t\t年龄\t\t电话\t\t邮箱")
// 遍历
// 调用 CustomerService 的方法 list
val customers = customerService.list()
for (customer <- customers) {
// 方式一:输出
// println(customer.id + "\t\t" + ...)
// 方式二:重写 Customer 的 toString 方法,返回信息,而且格式化
println(customer)
}
println("-------------------------客户列表完成-------------------------")
}
CustomerView.scala 功能分析:
1. 接收客户的信息,并封装成对应的 Customer 对象
2. 调用 CustomerService 的方法 add
CustomerService.sacla 功能分析:
1. 编写一个方法 add,接收一个 Customer 对象
2. 加入到 ArrayBuffer 中
3. 规定:以添加客户是第几个做为它的 id
一、在 Customer.sacla 中添加一个新的 辅助构造器(没有id属性)
// 辅助构造器(没有id属性)
def this(name: String, gender: Char, age: Short, tel: String, email: String) {
this
this.name = name
this.gender = gender
this.age = age
this.tel = tel
this.email = email
}
二、在 CustomerService.scala 中编写一个方法 add,接收一个 Customer 对象,并设置 id 后再加入到 ArrayBuffer 中
// 用于设置用户 id
var customerNum = 1
// 添加客户的方法
def add(customer: Customer): Boolean = {
// 设置 id
customerNum += 1
customer.id = customerNum
// 加入到 ArrayBuffer 中
customers.append(customer)
true
}
三、在 CustomerView.scala 中 调用 CustomerService 的方法 add
/*
---------------------添加客户---------------------
姓名:张三
性别:男
年龄:30
电话:010-56253825
邮箱:zhang@abc.com
---------------------添加完成---------------------
*/
def add(): Unit = {
println()
println("---------------------添加客户---------------------")
println("姓名:")
val name = StdIn.readLine()
println("性别:")
val gender = StdIn.readChar()
println("年龄:")
val age = StdIn.readShort()
println("电话:")
val tel = StdIn.readLine()
println("邮箱:")
val email = StdIn.readLine()
// 封装对象
val customer = new Customer(name, gender, age, tel, email)
// 调用 CustomerService 的方法 add
customerService.add(customer)
println("---------------------添加完成---------------------")
}
CustomerView.scala 功能分析:
1. 接收客户 id,准备删除
2. 调用 CustomerService 的 del(id)
CustomerService.sacla 功能分析:
1. 编写一个方法 del,接收一个 id,先去调用另外一个方法 findIndexById,判断
2. 编写一个方法 findIndexById(由于咱们的 ArrayBuffer 索引和 id 并非对应的)
3. 若是发现有,则删除,若是没有就返回 false
一、在 CustomerService.scala 中编写一个方法 del,接收一个 id,先去调用另外一个方法 findIndexById,判断
// 先根据 id 查找 用户的 index
def findIndexById(id: Int): Int = {
// 先假定一个索引,默认 -1,若是找到就改为对应的,若是没有找到就返回 -1
var index = -1
// 遍历 ArrayBuffer
breakable {
for (i <- 0 until customers.length) {
if (customers(i).id == id) {
index = i
break()
}
}
}
index
}
// 再根据 id 删除用户
def del(id: Int): Boolean = {
val index = findIndexById(id)
if (index != -1) {
customers.remove(index)
true
} else {
false
}
}
二、在 CustomerView.scala 中接收客户 id,调用 CustomerService 的 del(id)
/*
---------------------删除客户---------------------
请选择待删除客户编号(-1退出):1
确认是否删除(Y/N):y
---------------------删除完成---------------------
*/
def del(): Unit = {
println()
println("---------------------删除客户---------------------")
println("请选择待删除客户编号(-1退出):")
val id = StdIn.readInt()
if (id == -1) {
println("---------------------删除没有完成---------------------")
return
}
println("确认是否删除(Y/N):")
val choice = StdIn.readChar().toLower
if (choice == 'y') {
if (customerService.del(id)) {
println("---------------------删除完成---------------------")
return
}
}
println("---------------------删除没有完成---------------------")
}
功能说明:
要求用户在退出时提示 "确认是否退出(Y/N):",用户必须输入y/n,不然循环提示。且输入为y时,退出系统;输入为n时,不退出系统。
一、在 CustomerView.scala 中定义一个方法 isOut,并修改 key 所对应的函数。
// 要求用户在退出时提示"确认是否退出(Y/N):",用户必须输入y/n,不然循环提示。且输入为y时,退出系统;输入为n时,不退出系统。
def isOut(): Unit = {
println()
println("确认是否退出(Y/N):")
key = StdIn.readChar().toLower
key match {
case 'y' => this.loop = false
case 'n' => this.loop = true
case _ => isOut()
}
}
功能说明:
要求用户在删除确认时提示 "确认是否删除(Y/N):",用户必须输入y/n,不然循环提示。
一、在 CustomerView.scala 中,修改 del() 方法便可
/*
---------------------删除客户---------------------
请选择待删除客户编号(-1退出):1
确认是否删除(Y/N):y
---------------------删除完成---------------------
*/
def del(): Unit = {
println()
println("---------------------删除客户---------------------")
println("请选择待删除客户编号(-1退出):")
val id = StdIn.readInt()
if (id == -1) {
println("---------------------删除没有完成---------------------")
return
}
println("确认是否删除(Y/N):")
var choice = ' '
// 要求用户在删除确认时提示 "确认是否删除(Y/N):",用户必须输入y/n,不然循环提示。
breakable {
do {
choice = StdIn.readChar().toLower
if (choice == 'y' || choice == 'n') {
break()
}
println("确认是否删除(Y/N):")
} while (true)
}
if (choice == 'y') {
if (customerService.del(id)) {
println("---------------------删除完成---------------------")
return
}
}
println("---------------------删除没有完成---------------------")
}
一、在 CustomerService.scala 中定义一个方法根据 id 修改用户(更新用户)的方法 和 // 根据 id 查找用户信息 的方法
// 根据 id 查找用户信息
def findCustomerById(id: Int): Customer = {
val index = findIndexById(id)
if (index != -1) {
customers(index)
} else {
null
}
}
// 根据 id 修改用户(更新用户)
def update(id: Int, customer: Customer): Boolean = {
val index = findIndexById(id)
customers.update(index, customer)
true
}
二、在 CustomerView.scala 中定义一个方法 update
/*
---------------------修改客户---------------------
请选择待修改客户编号(-1退出):1
姓名(张三):<直接回车表示不修改>
性别(男):
年龄(30):
电话(010-56253825):
邮箱(zhang@abc.com):zsan@abc.com
---------------------修改完成---------------------
*/
def update(): Unit = {
println()
println("---------------------修改客户---------------------")
println("请选择待修改客户编号(-1退出):")
var id = StdIn.readInt()
if (id == -1) {
println("---------------------修改没有完成---------------------")
return
}
val customer = customerService.findCustomerById(id)
if (customer == null) {
println("---------------------修改没有完成---------------------")
return
}
var name = customer.name
print(s"姓名(${name}):")
name = StdIn.readLine()
if (name.length == 0) name = customer.name
var gender = customer.gender
print(s"性别(${gender}):")
gender = StdIn.readChar()
var age = customer.age
print(s"年龄(${age}):")
age = StdIn.readShort()
var tel = customer.tel
print(s"电话(${tel}):")
tel = StdIn.readLine()
if (tel.length == 0) tel = customer.tel
var email = customer.email
print(s"邮箱(${email}):")
email = StdIn.readLine()
if (email.length == 0) email = customer.email
// 封装对象
val newCustomer = new Customer(id, name, gender, age, tel, email)
// 调用 CustomerService 的方法 update
customerService.update(id, newCustomer)
println("---------------------修改完成---------------------")
}
Actor 模型及其说明
应用实例需求
代码实现
SayHelloActor 项目步骤:
1) 建立项目 Mew -> New Project -> 选择 Maven
2) 给项目命名
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu.akka</groupId>
<artifactId>SayHelloActor</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 定义一下常量 -->
<properties>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<akka.version>2.4.17</akka.version>
</properties>
<dependencies>
<!-- 添加scala的依赖 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 添加akka的actor依赖 -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_${scala.compat.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<!-- 多进程之间的Actor通讯 -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_${scala.compat.version}</artifactId>
<version>${akka.version}</version>
</dependency>
</dependencies>
<!-- 指定插件-->
<build>
<!-- 指定源码包和测试包的位置 -->
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<!-- 指定编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<!-- maven打包的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<!-- 指定main方法 -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>xxx</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
6) 由于按照配置模板的内容 "指定源码包和测试包的位置" 的部分:
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
咱们须要建立对应的 scala 目录,并 mark 为 Sources Root
7) 当修改后,第一次速度比较慢,由于 maven 须要 resolve 包的依赖,要下载相关的包。注意
:须要如图勾选,update snapshots,并且不须要联网,若是使用 maven 解决依赖后,仍然 pom.xml 有误,则只须要重启下 idea, 或者动一下 pom.xml 文件(不用改),从新保存便可。
package com.atguigu.akka.actor
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
// 1. 当咱们继承 Actor 后,就是一个 Actor,须要重写该 Actor 的核心方法 receive
class SayHelloActor extends Actor {
// 循环的接收消息
// 1. receive方法,会被该 Actor 的 MailBox(实现了 Runnable 接口)调用
// 2. 当该 Actor 的 MailBox 接收到消息,就会调用 receive 方法
// 3. Receive 的底层:type Receive = PartialFunction[Any, Unit]
override def receive: Receive = {
// 接受消息并处理,若是接收到 exit,就退出
case "hello" => println("发送:hello\t\t回应:hello too:)")
case "ok" => println("发送:ok\t\t\t回应:ok too:)")
case "exit" => {
println("接收到exit~指令,退出系统...")
context.stop(self) // 中止本身的 ActorRef
context.system.terminate() // 关闭 ActorSystem
}
}
}
object SayHelloActor {
// 1. 先建立一个 ActorSystem,专门用于建立 Actor
private val actoryFactory = ActorSystem("actoryFactory")
// 2. 建立一个 Actor 的同时,返回 Actor 的 ActorRef
private val sayHelloActorRef: ActorRef = actoryFactory.actorOf(Props[SayHelloActor], "sayHelloActor")
// (1) Props[SayHelloActor] 建立了一个 SayHelloActor 实例,这里使用到了反射
// (2) "sayHelloActor" 是 Actor 的名字
// (3) sayHelloActorRef: ActorRef =>是 Props[SayHelloActor] 的引用
// (4) 建立的 SayHelloActor 实例被 ActorSystme 接管
def main(args: Array[String]): Unit = {
// 给 SayHelloActor 发消息(邮箱)
sayHelloActorRef ! "hello"
sayHelloActorRef ! "ok"
sayHelloActorRef ! "ok~"
// 研究异步如何退出 ActorSystem
sayHelloActorRef ! "exit"
}
}
输出结果以下:
发送:hello 回应:hello too:)
发送:ok 回应:ok too:)
接收到exit~指令,退出系统...
9) 运行的效果
代码的示意图和小结
应用实例需求
两个 Actor 的通信机制原理图
代码实现
AActor.scala
package com.atguigu.akka.actors
import akka.actor.{Actor, ActorRef}
class AActor(bActorRef: ActorRef) extends Actor {
var count = 0
override def receive: Receive = {
case "start" => {
println("AActor 出招了,start ok")
bActorRef ! "我打"
}
case "我打" => {
count += 1
// 给 BActor 发出消息
// 这里须要持有 BActor 的引用(BActorRef)才能够
println(s"AActor(黄飞鸿) 厉害!看我佛山无影脚 第${count}脚")
Thread.sleep(1000)
bActorRef ! "我打" // 给 BActor 发出消息
}
}
}
BActor.scala
package com.atguigu.akka.actors
import akka.actor.Actor
class BActor extends Actor {
var count = 0
override def receive: Receive = {
case "我打" => {
count += 1
println(s"BActor(乔峰) 挺猛 看我降龙十八掌 第${count}掌")
Thread.sleep(1000)
// 经过 sender() 方法,能够获取到发送消息的 Actor 的 ActorRef
sender() ! "我打"
}
}
}
ActorApp.scala
package com.atguigu.akka.actors
import akka.actor.{ActorRef, ActorSystem, Props}
// 100招后,就退出
object ActorApp extends App {
// 建立 ActorSystem
val actorfactory = ActorSystem("actorfactory")
// 先建立 BActor 的引用/代理
val bActorRef: ActorRef = actorfactory.actorOf(Props[BActor], "bActor")
// 建立 AActor 的引用时须要持有 BActor 的引用
val aActorRef: ActorRef = actorfactory.actorOf(Props(new AActor(bActorRef)), "aActor")
// aActor 先出招
aActorRef ! "start"
}
输出结果以下:
AActor 出招了,start ok
BActor(乔峰) 挺猛 看我降龙十八掌 第1掌
AActor(黄飞鸿) 厉害!看我佛山无影脚 第1脚
BActor(乔峰) 挺猛 看我降龙十八掌 第2掌
AActor(黄飞鸿) 厉害!看我佛山无影脚 第2脚
BActor(乔峰) 挺猛 看我降龙十八掌 第3掌
AActor(黄飞鸿) 厉害!看我佛山无影脚 第3脚
BActor(乔峰) 挺猛 看我降龙十八掌 第4掌
AActor(黄飞鸿) 厉害!看我佛山无影脚 第4脚
BActor(乔峰) 挺猛 看我降龙十八掌 第5掌
AActor(黄飞鸿) 厉害!看我佛山无影脚 第5脚
BActor(乔峰) 挺猛 看我降龙十八掌 第6掌
AActor(黄飞鸿) 厉害!看我佛山无影脚 第6脚
......
代码的小结
如何理解 Actor 的 receive 方法被调用?
看两个实际应用(socket/tcp/ip)
QQ、迅雷、百度网盘客户端、新浪网站、京东商城、淘宝
TCP/IP(Transmission Control Protocol/Internet Protocol)的简写,中文译名为传输控制协议/因特网互联协议,又叫网络通信协议,这个协议是Internet 最基本的协议、是 Internet 国际互联网络的基础,简单地说,就是由网络层的IP协议和传输层的TCP协议组成的。
TCP/IP 3本圣经级别书籍:xxx
概述:每一个 internet 上的主机和路由器都有一个 ip 地址,它包括网络号和主机号,ip 地址有 ipv4(32位) 或者 ipv6(128位),能够经过 ipconfig(ifconfig) 来查看。
一个小技巧:网络不通时,如何肯定是哪个路由(ip地址)出现问题?答:使用 tracert 指令。演示以下:
咱们这里所指的端口不是指物理意义上的端口,而是特指TCP/IP协议中的端口,是逻辑意义上的端口。若是把 IP 地址比做一间房子,端口就是出入这间房子的门。真正的房子只有几个门,可是一个 IP 地址的端口 能够有65535(即:256×256-1)个之多!端口是经过端口号来标记的。
端口(port)-分类
需求分析
一、服务端进行监听(9999)
二、客户端能够经过键盘输入,发送咨询问题给小黄鸡客服(服务端)
三、小黄鸡(服务端)回答客户的问题
界面设计
服务端:
代码结构:
package com.atguigu.akka.yellowchicken.server
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.atguigu.akka.yellowchicken.common.{ClientMessage, ServerMessage}
import com.typesafe.config.ConfigFactory
class YellowChickenServer extends Actor {
override def receive: Receive = {
case "start" => println("start 小黄鸡客服开始工做了...")
// 若是接收到了服务端的发来的消息,即 ClientMessage
case ClientMessage(mes) => {
println("客户咨询的问题是:" + mes)
mes match {
// 使用 match case 匹配(模糊匹配)
case "大数据学费" => sender() ! ServerMessage("20000 RMB")
case "学校地址" => sender() ! ServerMessage("北京市朝阳区青年路大悦城")
case "学习什么技术" => sender() ! ServerMessage("大数据 前端 Python")
case _ => sender() ! ServerMessage("你说的啥子:)")
}
}
}
}
// 主程序入口
object YellowChickenServerApp extends App {
val host = "127.0.0.1" // 服务端ip地址
val port = 9999 // 端口
// 建立 config 对象,指定协议类型、监听的ip和端口
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$host
|akka.remote.netty.tcp.port=$port
""".stripMargin)
// 建立 ActorSystem
val serverActorSystem = ActorSystem("Server", config)
// 建立 YellowChickenServer 的 Actor 和 ActorRef
val yellowChickenServerActorRef: ActorRef = serverActorSystem.actorOf(Props[YellowChickenServer], "YellowChickenServer-01")
// 启动服务端
yellowChickenServerActorRef ! "start"
}
CustomerActor.scala
package com.atguigu.akka.yellowchicken.client
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.atguigu.akka.yellowchicken.common.{ClientMessage, ServerMessage}
import com.typesafe.config.ConfigFactory
import scala.io.StdIn
class CustomerActor(serverHost: String, serverPort: Int) extends Actor {
// 定义一个 YellowChickenServerRef
var serverActorRef: ActorSelection = _
// 在 Actor 中有一个方法 preStart 方法,它会在 Actor 运行前执行
// 在 Akka 开发中,一般将初始化的工做,放在 preStart 方法中
override def preStart(): Unit = {
this.serverActorRef = context.actorSelection(s"akka.tcp://Server@${serverHost}:${serverPort}/user/YellowChickenServer-01")
println("this.serverActorRefer=" + this.serverActorRef)
}
override def receive: Receive = {
case "start" => println("start 客户端运行,能够咨询问题")
case mes: String => {
// 发给服务端
// serverActorRef ! mes // 不该该发送字符串,应该包装一把,应该发送一个(样例)对象(即协议)
serverActorRef ! ClientMessage(mes) // 此时发送的是一个对象,该样例类默认实现了序列化 和 apply 方法
}
// 若是接受到了服务器端的消息
case ServerMessage(mes) => {
println(s"收到小黄鸡客服(Server)消息:$mes")
}
}
}
// 主程序入口
object CustomerActorApp extends App {
val (host, port, serverHost, serverPort) = ("127.0.0.1", 9990, "127.0.0.1", 9999)
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$host
|akka.remote.netty.tcp.port=$port
""".stripMargin)
// 建立 ActorSystem
val clientActorSystem = ActorSystem("Client", config)
// 建立 CustomerActor 的 Actor 和 ActorRef
val clientActorRef: ActorRef = clientActorSystem.actorOf(Props(new CustomerActor(serverHost, serverPort)), "CustomerActor-01")
// 启动客户端
clientActorRef ! "start"
// 客户端发送消息
while (true) {
val mes = StdIn.readLine()
clientActorRef ! mes
}
}
MessageProtocol.scala
package com.atguigu.akka.yellowchicken.common
// 使用样例类来构建协议
// 一、客户端发送服务端的协议(序列化对象)
case class ClientMessage(mes: String) // 回顾:样例类的构造器中的每个参数都默认为 val ,即只可读。
// 二、服务器端发送给客户端的协议
case class ServerMessage(mes: String)
一、深刻理解 Spark 的 Master 和 Worker 的通信机制。
二、为了方便同窗们看 Spark 的底层源码,命名的方式和源码保持一致(如:通信消息类命名就是同样的)。
三、加深对主从服务心跳检测机制(HeartBeat)
的理解,方便之后 spark 源码二次开发。
咱们主要是经过应用实例,来剖析 Spark 的 Master 和 Worker 的通信机制,所以功能比较简洁,设计的界面以下。看后面演示便可。
功能要求: Worker 注册到 Master,Master 完成注册,并回复 Worker 注册成功。
package com.atguigu.akka.sparkmasterworker.master
import akka.actor.{Actor, ActorSystem, Props}
import com.atguigu.akka.sparkmasterworker.common.{RegisterWorkerInfo, RegisteredWorkerInfo, WorkerInfo}
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
class MasterActor extends Actor {
// 定义一个 mutable.HashMap 属性,用于管理 Worker
val workers = mutable.HashMap[String, WorkerInfo]()
override def receive: Receive = {
case "start" => println("Master服务器启动了...")
// 接收到 Worker 客户端注册的信息,保存进 HashMap
case RegisterWorkerInfo(id, cpu, ram) => {
if (!workers.contains(id)) {
// 建立 WorkerInfo
val workerInfo = new WorkerInfo(id, cpu, ram)
// 加入到 HashMap
workers += (id -> workerInfo)
println("服务器的Workers= " + workers)
// 回复客户端注册成功
sender() ! RegisteredWorkerInfo
}
}
}
}
object MasterActorApp {
def main(args: Array[String]): Unit = {
val host = "127.0.0.1" // 服务端ip地址
val port = 10005 // 端口
// 建立 config 对象,指定协议类型、监听的ip和端口
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$host
|akka.remote.netty.tcp.port=$port
""".stripMargin)
// 先建立 ActorSystem
val masterActorSystem = ActorSystem("Master", config)
// 再建立 Master 的 Actor 和 ActorRef
val masterActorRef = masterActorSystem.actorOf(Props[MasterActor], "MasterActor-01")
// 启动 Master
masterActorRef ! "start"
}
}
WorkerActor.scala
package com.atguigu.akka.sparkmasterworker.worker
import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.atguigu.akka.sparkmasterworker.common.{RegisterWorkerInfo, RegisteredWorkerInfo}
import com.typesafe.config.ConfigFactory
class WorkerActor(serverHost: String, serverPort: Int) extends Actor {
// 定义一个 MasterActorRef
var masterActorProxy: ActorSelection = _
// 定义 Worker 的编号
var id = java.util.UUID.randomUUID().toString
// 在 Actor 中有一个方法 preStart 方法,它会在 Actor 运行前执行
// 在 Akka 开发中,一般将初始化的工做,放在 preStart 方法中
override def preStart(): Unit = {
this.masterActorProxy = context.actorSelection(s"akka.tcp://Master@${serverHost}:${serverPort}/user/MasterActor-01")
println("this.masterActorProxy=" + this.masterActorProxy)
}
override def receive = {
case "start" => {
println("Worker客户端启动运行")
// 给服务器发送一个注册信息
masterActorProxy ! RegisterWorkerInfo(id, 16, 16 * 1024)
}
case RegisteredWorkerInfo => {
println("WorkedId= " + id + " 注册成功!")
}
}
}
object WorkerActorApp {
def main(args: Array[String]): Unit = {
val (host, port, serverHost, serverPort) = ("127.0.0.1", 10001, "127.0.0.1", 10005)
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$host
|akka.remote.netty.tcp.port=$port
""".stripMargin)
// 建立 ActorSystem
val workerActorSystem = ActorSystem("Worker", config)
// 建立 WorkerActor 的 Actor 和 ActorRef
val workerActorRef: ActorRef = workerActorSystem.actorOf(Props(new WorkerActor(serverHost, serverPort)), "WorkerActor-01")
// 启动客户端
workerActorRef ! "start"
}
}
MessageProtocol.scala
package com.atguigu.akka.sparkmasterworker.common
// 使用样例类来构建协议
// Worker 注册信息
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)
// 这个是 WorkerInfo,是保存在 Master 的 HashMap 中的,该 HashMap 用于管理 Worker
// 未来这个 WorkerInfo 会扩展,好比 增长 Worker 上一次的心跳时间
class WorkerInfo(val id: String, val cpu: Int, val ram: Int)
// 当 Worker 注册成功,服务器返回一个 RegisteredWorkerInfo 对象
case object RegisteredWorkerInfo
功能要求:Worker 定时发送心跳给 Master,Master 可以接收到,并更新 Worker 上一次心跳时间。
package com.atguigu.akka.sparkmasterworker.common
// 使用样例类来构建协议
// Worker 注册信息
case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int)
// 这个是 WorkerInfo,是保存在 Master 的 HashMap 中的,该 HashMap 用于管理 Worker
// 未来这个 WorkerInfo 会扩展,好比 增长 Worker 上一次的心跳时间
class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {
// 新增属性:心跳时间
var lastHeartBeatTime: Long = _
}
// 当 Worker 注册成功,服务器返回一个 RegisteredWorkerInfo 对象
case object RegisteredWorkerInfo
// 每隔必定时间定时器发送给 Master 一个心跳
case class HeartBeat(id: String)
// Worker 每隔必定时间定时器发送给 本身 一个消息
case object SendHeartBeat
MasterActor.scala 中增长代码
case HeartBeat(id) => {
// 更新对应的 Worker 的心跳时间
// 一、先从 Worker 中取出 WorkerInfo
val workerInfo = workers(id)
workerInfo.lastHeartBeatTime = System.currentTimeMillis()
println("Master更新了 " + id + " 的心跳时间 ")
}
WorkerActor.scala 中增长代码
// 当客户端注册成功后,就定义一个定时器,每隔必定时间,发送 SendHeartBeat 给本身
import context.dispatcher
context.system.scheduler.schedule(0 millis, 3000 millis, self, SendHeartBeat)
case SendHeartBeat => {
println("WorkedId= " + id + " 给Master发送心跳")
masterActorProxy ! HeartBeat(id)
}
功能要求:Master 启动定时任务,定时检测注册的 Worker 有哪些没有更新心跳,已经超时的 Worker,将其从 HashMap 中删除掉。
// Master 给本身发送一个触发检查超时 Worker 的信息
case object StartTimeOutWorker
// Master 给本身发消息,检测 Worker,对于心跳超时的
case object RemoveTimeOutWorker
MasterActor.scala 中增长代码
case "start" => {
println("Master服务器启动了...")
// Master 启动定时任务,定时检测注册的 Worker 有哪些没有更新心跳,已经超时的 Worker,将其从 HashMap 中删除掉。
self ! StartTimeOutWorker
}
// 开启定时器,每隔必定时间检测是否有 Worker 的心跳超时
case StartTimeOutWorker => {
println("开启了定时检测Worker心跳的任务")
import context.dispatcher // 使用调度器时候必须导入dispatcher
context.system.scheduler.schedule(0 millis, 9000 millis, self, RemoveTimeOutWorker)
}
// 判断哪些 Worker 心跳超时(nowTime - lastHeartBeatTime),对已经超时的 Worker,将其从 HashMap 中删除掉。
case RemoveTimeOutWorker => {
// 首先获取全部 Workers 的全部 WorkerInfo
val workerInfos = workers.values
val nowTime = System.currentTimeMillis()
// 过滤出全部超时的 workerInfo 并删除便可
workerInfos.filter(workerInfo => (nowTime - workerInfo.lastHeartBeatTime) > 6000)
.foreach(workerInfo => workers.remove(workerInfo.id))
println("当前有 " + workers.size + " 个Worker存活")
}
功能要求:Master,Worker 的启动参数运行时指定,而不是固定写在程序中的。
if (args.length != 3) {
println("请输入参数 host port MasterActor的名字")
sys.exit()
}
val host = args(0) // 服务端ip地址
val port = args(1) // 端口
val masterName = args(2) // MasterActor的名字
......
// 再建立 Master 的 Actor 和 ActorRef
val masterActorRef = masterActorSystem.actorOf(Props[MasterActor], s"${masterName}")
WorkerActor.scala 中增修改代码
if (args != 6) {
println("请输入参数 host port WorkerActor的名字 serverHost serverPort MasterActor的名字")
}
val host = args(0)
val port = args(1)
val workerName = args(2)
val serverHost = args(3)
val serverPort = args(4)
val masterName = args(5)
......
// 建立 WorkerActor 的 Actor 和 ActorRef
val workerActorRef: ActorRef = workerActorSystem.actorOf(Props(new WorkerActor(serverHost, serverPort.toInt, masterName)), s"${workerName}")
Master 配置参数截图:
Master Worker 进行分布式部署:Linux 系统 -> 如何给 maven 项目打包 -> 上传Linux
步骤以下:
步骤一:先给 MasterActor 打包,修改 pom.xml 文件的 <mainClass>xxx</mainClass>
节点,指定咱们程序 MasterActor 的主类
即修改为以下:<mainClass>com.atguigu.akka.sparkmasterworker.master.MasterActorApp</mainClass>
步骤二:给 MasterActor 打包
步骤三:将打好的 jar 包拷贝至某个目录里,并修改 jar 包名称为 MasterActor.jar,等待上传至 Linux 系统
步骤四:再给 WorkerActor 打包,修改 pom.xml 文件的 <mainClass>xxx</mainClass>
节点,指定咱们程序 WorkerActor 的主类
即修改为以下:<mainClass>com.atguigu.akka.sparkmasterworker.worker.WorkerActorApp</mainClass>
步骤五:给 WorkerActor 打包,操做同 步骤二,注意:打包前咱们须要先 clean 下
步骤六:将打好的 jar 包拷贝至某个目录里,并修改 jar 包名称为 WorkerActor.jar,等待上传至 Linux 系统
步骤七:将两个 jar 包上传至 Linux
步骤八:测试运行,,命令以下:
java -jar MasterActor.jar 127.0.0.1 10005 MasterActor-01
java -jar WorkerActor.jar 127.0.0.1 10001 WorkerActor-01 127.0.0.1 10005 MasterActor-01
java -jar WorkerActor.jar 127.0.0.1 10002 WorkerActor-02 127.0.0.1 10005 MasterActor-01