责任链模式是经典的GoF 23种设计模式之一,也许你已经了解这种模式。无论你是否熟悉,建议读者在阅读本文以前,不妨先思考下面三个问题:react
(1) 如何用多种风格迥异的编程范式来实现责任链模式? (2) 可否让责任链上的结点多任务并发执行? (3) 可否把责任链部署到分布式环境下,分布在世界各地的多台计算机,经过某种方式构成一条责任链,协同工做,可否作到呢?
引言程序员
责任链铁索连环 无模式的实现方式面向过程风格 面向对象风格OOP 1 解法一用模板方法模式实现责任链模式 2 解法二用策略模式实现责任链模式 函数式风格Functional Programming 1 解法三用一等公民函数替换策略模式实现责任链模式 2 解法四用偏应用函数实现责任链模式 21 替换Node类 22 为函数升阶函数的Curry化为更高阶的函数 23 为函数降阶偏应用函数 3 解法五用偏函数实现责任链模式 31 偏函数我不完美但咱们很完美 32 用偏函数实现责任链模式 响应式风格Reactive Programming 1 响应式思惟别拉进来推出去 2 解法六用Actor模型实现责任链模式 21 Actor模型简介 22 Akka一个支持高并发分布式计算消息驱动的Actor库 23 为责任链上的处理者结点定义Actor类 24 建立actor对象 25 连环计建立基于actor的责任链 26 如何礼貌地向别人提问题 27 对将来值的聪明响应我不理你但当你有结果的时候必定要告诉我 28 并发世界也有秩序 3 解法七用RXReactive eXtension响应式扩展实现责任链模式 4 两种响应式编程方式的比较 回顾与总结
本文对责任链模式给出7种不一样实现方式,其中:编程
OOP(面向对象编程):有2种采用面向对象编程范式,即:
(1)用模板方法模式实现责任链模式(第3.1节)
(2)用策略模式实现责任链模式(第3.2节)设计模式
一种采用混合OOP和FP的编程范式,即:
(3)用一等公民函数替换策略模式实现责任链模式(第4.1节)数组
FP(函数式编程):有2种采用纯函数式编程范式,即:
(4)用偏应用函数实现责任链模式(第4.2节)
(5)用偏函数实现责任链模式(第4.3节)安全
RP(响应式编程):最后2种采用响应式编程范式,即:
(6)用Actor模型实现责任链模式(第5.2节)
(7)用RX响应式扩展实现责任链模式(第5.3节)性能优化
某用户购买了一款软件,用了几天就crash,重要在数据丢失。用户很生气,后果很严重。因而打电话给软件公司客服投诉。客服若是不能解决,会问经理,若经理也不能解决或者认为不规他解决,会转给工程师解决,这样客服、经理、工程师就构成了一条责任链。bash
记用户反映在问题为Input,每一个问题Input都用各自不一样的业务值value(可由一个字符表示),用Scala代码描述为:网络
case class Input(value: Char)
记解答为输出Output,带字符串信息value。多线程
case class Output(value: String)
舒适提示:Scala中的case类经常使用于定义数据类和消息事件。别小看上面这一行代码,它定义了不少东西:
(1)定义了类Output,实现序列化接口 (2)定义类里有个不可变成员value,类型为String (3)提供带参数的构造函数,参数为value,类型为String,实现方式是为不可变成员value赋值 (4)提供不可变成员value的get方法,方法名为value。 (5)覆盖toString方法并提供可读信息 (6)覆盖hashCode方法并实现 (7)覆盖equals方法并实现 (8)还有其余方法,如copy, apply, unapply…
若用Java写,要写很长的代码,以下:
public class Output implements Serializable { private final String value; public Output(String value) { this.value = value; } public String value() { return value; } @Override public String toString() { ... } @Override public int hashCode() { ... } @Override public boolean equals(Output that) { ... } 还有其余方法,如copy, apply, unapply... }
假设客服只负责处理业务值为a, w;处理问题在方式是返回信息:”Customer Service handles ” + 业务值。即:
def canHandleByCustomService(request: Input) = "aw" contains request.value
def handleByCustomService(request: Input) = Output("Customer Service handles " + request.value)
相似的,经理只负责处理业务值为b, w, z;工程师只负责处理业务值为c, z。即:
def canHandleByManager(request: Input) = "bwz" contains request.value def handleByManager(request: Input) = Output("Manager handles " + request.value) def canHandleByEngineer(request: Input) = "cz" contains request.value def handleByEngineer(request: Input) = Output("Engineer handles " + request.value)
根据前面描述,处理过程可由一系列条件分支来描述:
object NoDp { def handle(request: Input): Option[Output] = if (canHandleByCustomService(request)) { Some(handleByCustomService(request)) } else if (canHandleByManager(request)) { Some(handleByManager(request)) } else if (canHandleByEngineer(request)) { Some(handleByEngineer(request)) } else { None } }
返回值为Option类型,当有解答时为其子类Some并给出结果,不然是None。
这种实现方式虽然简单,但存在如下缺点:
扩展性差,扩展须要增长else if语句,违反开闭原则。
灵活性差,不能随意动态调整调用顺序。
给用户代码暴露太多细节,用户须要知道哪些人能处理哪些事,以及处理的顺序。
下面给出7种不一样责任链模式在实现,都能解决以上问题。
3.1 解法一:用模板方法模式实现责任链模式
定义责任链上在结点类:
abstract class HandlerNode
存在可变成员变量指向后续结点
var next: HandlerNode = _
处理过程就是能处理则处理,不然递归向后传:
@tailrec final def handle(request: Input): Option[Output] = { if (canHandle(request)) Some(doHandle(request)) else if (next == null) None else next handle request }
舒适提示:@tailrec代表该函数使用了尾递归优化。虽然不上加@tailrec,Scala编译器也会对任何能够尾递归优化的代码进行尾递归优化,可是加上@tailrec以后,若是这个函数不能作尾递归优化,那是编译不过的。有了@tailrec,妈妈不再用担忧这个函数能不能作了尾递归优化了。
上面在处理过程是个模板(用到模板方式模式),用到两个抽象方法canHandle和doHandle:
protected def canHandle(request: Input): Boolean protected def doHandle(request: Input): Output
客服,经理,工程师,属于不一样处理结点,也就是继承HandlerNode类,并提供各自不一样的实现:
class CustomerService extends HandlerNode { override protected def canHandle(request: Input): Boolean = canHandleByCustomService(request) override protected def doHandle(request: Input): Output = handleByCustomService(request) } class Manager extends HandlerNode { override protected def canHandle(request: Input): Boolean = canHandleByManager(request) override protected def doHandle(request: Input): Output = handleByManager(request) } class Engineer extends HandlerNode { override protected def canHandle(request: Input): Boolean = canHandleByEngineer(request) override protected def doHandle(request: Input): Output = handleByEngineer(request) }
类图以下:
建立上面3个处理结点:
val customerService = new CustomerService val manager = new Manager val engineer = new Engineer
由多个结点的不一样顺序构成不一样的责任链:
customerService.next = manager
manager.next = engineer
如今用户只需投诉给客服就行,无需知道处理细节:
def handle(request: Input): Option[Output] = {
customerService handle request
}
假如连续收到一连串投诉码”cafebabe wenzhe”,对于每一个投诉,打印处理结果,若是没人处理的业务则不打印:
val cafebabe = "cafebabe wenzhe" println("----- Oop1: inherit -----") for (ch <- cafebabe) { Oop1 handle Input(ch) map (_.value) foreach println }
输出正确结果:
Engineer handles c Customer Service handles a Manager handles b Customer Service handles a Manager handles b Customer Service handles w Manager handles z
3.2 解法二:用策略模式实现责任链模式
上面的实现中,用到模板方法模式,用到继承,处理者与结点是同一个类层次中;如今换一种方式:用策略模式,用聚合。
将处理者与结点分开。
咱们首先定义处理者接口:
trait Handler {
def canHandle(request: Input): Boolean
def handle(request: Input): Output
}
客服,经理,工程师分别给出不一样的处理者实现:
class CustomerService extends Handler { def canHandle(request: Input): Boolean = canHandleByCustomService(request) def handle(request: Input): Output = handleByCustomService(request) } class Manager extends Handler { def canHandle(request: Input): Boolean = canHandleByManager(request) def handle(request: Input): Output = handleByManager(request) } class Engineer extends Handler { def canHandle(request: Input): Boolean = canHandleByEngineer(request) def handle(request: Input): Output = handleByEngineer(request) }
结点除了带有可变在nextNode外,还聚合了一个处理者(不可变,构造时传入):
class Node(handler: Handler) { var nextNode: Node = _
处理过程与上相似:
@tailrec final def handle(request: Input): Option[Output] = { if (handler canHandle request) Some(handler handle request) else if (nextNode == null) None else nextNode handle request } }
类图以下:
建立3个处理结点: 客服,经理,工程师
val customerService = new Node(new CustomerService) val manager = new Node(new Manager) val engineer = new Node(new Engineer)
构成责任链:
customerService.nextNode = manager
manager.nextNode = engineer
如今用户只需投诉给A(客服)就行,无需知道处理细节:
def handle(request: Input): Option[Output] = {
customerService handle request
}
假如连续收到一连串投诉码”cafebabe wenzhe”,对于每一个投诉,打印处理结果,若是没人处理的业务则不打印:
val cafebabe = "cafebabe wenzhe" println("----- Oop2: aggregation -----") for (ch <- cafebabe) { Oop2 handle Input(ch) map (_.value) foreach println }
输出正确结果:
Engineer handles c Customer Service handles a Manager handles b Customer Service handles a Manager handles b Customer Service handles w Manager handles z
大部分程序员都熟悉面对对象,而不熟悉函数式编程,为了让面向对象程序员容易理解,咱们先从上面3.2节在OOP实现方式出发,一步一步地重构为FP。
4.1 解法三:用一等公民函数替换策略模式实现责任链模式
OOP经过抽取接口,构建具体子类,利用多态实现各类设计模式。从FP在角度,接口Handler的做用也可看做是提供两个函数的东西。那么,能够将3.2节代码中接口Handler及其子类去掉,把接口中的两个方法做为类Node的不可变成员变量(构造函数传入),那么类Node变为:
class Node(canHandle: Input => Boolean, doHandle: Input => Output) { var nextNode: Node = _ @tailrec final def handle(request: Input): Option[Output] = { if (canHandle(request)) Some(doHandle(request)) else if (nextNode == null) None else nextNode handle request } }
类图以下,只有一个Node类,简单不少吧:
相应的,修改构造责任链的代码:
val customerService = new Node(canHandleByCustomService, handleByCustomService) val manager = new Node(canHandleByManager, handleByManager) val engineer = new Node(canHandleByEngineer, handleByEngineer)
其余代码与3.2节同样,这里不在提供。比较3.2节的纯OOP方式,这里的实现不只变短了,并且扩展起来更方便,不须要为新的不一样的处理策略再定义类,只需传如不一样的函数便可。
4.2 解法四:用偏应用函数实现责任链模式
4.2.1 替换Node类
上一小节咱们已经成功地用“一等公民”函数替换Handler接口及其子类。如今就只剩下Node类了,可否也用函数替换呢?
分析一下,Node类有3个成员变量:
(1)canHandle: Input => Boolean,它是一个函数变量 (2)doHandle: Input => Output,它也是一个函数变量 (3)nextNode: Node,指向下一结点的变量。
另外Node类在主要方法是handle,它有一个参数request:Input,返回输出结果Option[Output],表明可能有结果,可能没结果。
从FP角度,Node类没有存在的必要,能够写一个函数代替它,这个函数具备下面的特色:
(1)函数的内容就是Node类handle方法的内容,须要的外部环境所有由参数提供 (2)返回值是Option[Output] (3)有4个输入参数(canHandle, doHandle, nextNode, request)
咱们能够写成下面的函数:
def handleByChainNode(canHandle: Input => Boolean, handle: Input => Output, nextHandle: Input => Option[Output], request: Input): Option[Output] = { if (canHandle(request)) Some(handle(request)) else if (nextHandle == null) None else nextHandle(request) }
4.2.2 为函数升阶:函数的Curry化为更高阶的函数
上面的函数有太多参数(4个),是一种坏味道(Code Smell),由于这样的函数很差使用。每一个处理者(客服、经理、工程师),咱们只须要赋予canHandle,handle两个参数;把处理者链起来的连接顺序,只须要传递nextHandle参数;处理业务请求,只须要传入request参数。那么,如今这么多参数,咱们很难一次性提供,那么咱们应该怎么用它来解决问题呢?
解决方法首先就是要减小参数个数,每减小一个参数,函数的返回值是另外一个函数(做为返回值的函数是一等公民),那个被减小的参数做为返回值函数的参数。好比把request参数从handleByChainNode的参数中去掉,那么handleByChainNode须要返回另外一个函数(参数就是request),代码以下:
def handleByChainNode(canHandle: Input => Boolean, handle: Input => Output, nextHandle: Input => Option[Output]): Input => Option[Output] = { (request: Input) => { if (canHandle(request)) Some(handle(request)) else if (nextHandle == null) None else nextHandle(request) } }
这是一个返回函数的函数,咱们叫作高阶函数。这样写的代码是否是有的绕?为了方便阅读,scala提供语法糖,上面的代码也能够写出这样:
def handleByChainNode(canHandle: Input => Boolean, handle: Input => Output, nextHandle: Input => Option[Output]) (request: Input): Option[Output] = { if (canHandle(request)) Some(handle(request)) else if (nextHandle == null) None else nextHandle(request) }
这是一个二阶函数,拥有两个参数列表,第一个参数列表包含三个参数(canHandle,handle,nextHandle),另外一个参数列表只有一个参数(request)。这里虽然返回值不是函数,而是原来的Option[Output],但从普通函数(一阶函数)的角度看它的返回值就是一个函数。
如今,第一个参数列表的参数仍是太多,怎么办?一样办法,咱们能够继续增长参数列表,减小每一个参数列表中参数个数,最终咱们获得下面的4阶函数(有4个参数列表):
def handleByChainNode(canHandle: Input => Boolean) (handle: Input => Output) (nextHandle: Input => Option[Output]) (request: Input): Option[Output] = { if (canHandle(request)) Some(handle(request)) else if (nextHandle == null) None else nextHandle(request) }
上面这个经过增长函数阶数的办法来减小参数个数,直到每一个参数列表只有一个参数的过程,叫作函数的Curry化。那么,这又有什么用呢?接下来你就知道!
4.2.3 为函数降阶:偏应用函数
有了上面的Curry化后的高阶函数,建立每一个处理结点就很容易了,就是调用上面“半”个handleByChainNode方法:
val customerService = handleByChainNode(canHandleByCustomService)(handleByCustomService) _ val manager = handleByChainNode(canHandleByManager)(handleByManager) _ val engineer = handleByChainNode(canHandleByEngineer)(handleByEngineer) _
之因此说“半”个方法,是由于还没给handler函数后面两个参数列表赋值(函数调用后面的下划线表示没提供数据),即没有给参数nextHandle赋值,也没有给request参数赋值。这在函数式编程中叫作“偏应用函数”,方便把一个高阶的函数降阶为更低阶的函数。这个低阶函数虽然也是函数,当实际上已经包含了一些不可变状态(就是调用高阶函数时那些已经赋值的参数,这里是canHandleByXX, handleByXX)。
如今customerService,manager,engineer都是2阶函数,其第一个参数列表是(nextHandle: Input => Option[Output]),第二个参数列表是(request: Input)。咱们能够对nextHandle参数赋值从而把这些结点链起来:
val handleChain = customerService(manager(engineer(null)))
若要调整顺序就很简单了,只要换一下调用顺序便可。如今handleChain是一个一阶函数,也就是咱们熟悉的普通函数,参数为request,返回Option[Output],处理用户业务请求也就是调用handleChain而已:
def handle(request: Input): Option[Output] = {
handleChain(request)
}
测试下, 假如连续收到一连串投诉码”cafebabe wenzhe”,对于每一个投诉,打印处理结果,若是没人处理的业务则不打印:
val cafebabe = "cafebabe wenzhe" println("----- Fp1: partial apply function -----") for (ch <- cafebabe) { Fp1 handle Input(ch) map (_.value) foreach println }
输出正确结果:
Engineer handles c Customer Service handles a Manager handles b Customer Service handles a Manager handles b Customer Service handles w Manager handles z
如今责任链的实现彻底没有类了,也不存在可变状态(OOP中可变成员变量nextHandler在多线程环境下不是线程安全的,而如今彻底不存在可变状态),这是本文给出的第一个纯函数式实现。从代码来看,更短更简洁!
4.3 解法五:用偏函数实现责任链模式
4.3.1 偏函数:我不完美,但咱们很完美!
若一个函数只能处理其参数的某些取值范围,而对其它取值范围在处理没有定义,这样的函数不是一个完整的函数,而像是一部分函数,咱们称之为“部分实现函数”,或者叫“偏函数”(Partial Function)。
有点抽象吧,那我举个例子,定义这样一个函数:输入参数类型是整数类型,对偶数有定义,行为是对偶数平方,但没为奇数定义行为,这显然不是个完整的函数,而是部分实现的函数,即偏函数。
舒适提示:上一节中“偏应用函数”与本节的“偏函数”,尽管名字很像,都是不完整的函数,但倒是彻底不一样的概念,不一样之处在于缺乏的部分不同。函数能够有多个参数列表,若缺乏了某个参数列表,就是“偏应用函数”,侧重参数列表的不完整性;于此不一样的是,若一个函数有且只有一个的参数,而且没有实现对该参数有部分取值范围的处理,就是“偏函数”,侧重实现的不完整性。“偏应用函数”的好处在于将大函数(高阶函数)分解为一系列可重用的小函数(低阶函数)并得到某些不可变状态,侧重于“分”;而“偏函数”的好处在于灵活组合一系列功能简单的小函数构成功能强大的大函数,侧重于“合”。
val squareEven = new PartialFunction[Int, Int] { def apply(x: Int) = x * x def isDefinedAt(x: Int) = x % 2 == 0 }
更方便的是用模式匹配来描述:
val squareEven: PartialFunction[Int, Int] = { case x: Int if x % 2 == 0 => x * x }
case语句很容易读,即:”当整数x对2取模为0时,返回x的平方”。
偏函数比普通函数更小,所以代码的可重用粒度也就更细。经过定义一些列“不完美”的偏函数,经过不一样的组合方式,能够产生各类不一样的“更完美”的偏函数。下面介绍偏函数如何组合实现责任链模式。
4.3.2 用偏函数实现责任链模式
因为客服,经理,工程师每一个人都只负责处理某些业务,他们都是部分业务的处理者,能够用偏函数来描述:
def handler(canHandle: Input => Boolean, handle: Input => Output): PartialFunction[Input, Output] = { case request: Input if canHandle(request) => handle(request) }
将客服,经理,工程师各自不一样的责任范围和处理方法传入,从而产生责任链上的结点:
val customService = handler(canHandleByCustomService, handleByCustomService) val manager = handler(canHandleByManager, handleByManager) val engineer = handler(canHandleByEngineer, handleByEngineer)
如今customService,manager,engineer都是偏函数。偏函数虽然不完整,但能够组合,多个不完整的偏函数能够组合成一个更完整一点的偏函数。责任链的构造能够由一系列偏函数组合而成:
val handleChain = customService orElse manager orElse engineer
能够看到咱们很容易调整执行顺序。handleChain也是偏函数,可是处理能力更强了。向handleChain输入业务请求,直接调用这个函数:
def handle(request: Input): Output = { handleChain(request) // throw MatchError exception when no handler can handle the request }
这样调用的话可能会抛出异常:当request取值在偏函数handleChain定义范围以外(即责任链上全部结点都不能处理)时,会抛出MatchError异常。为了跟前面其余实现的调用方法一致,咱们但愿返回一个Option[Output],若是可以处理,就返回结果Some[Output];若不能处理,返回None。
有两种方法能够实现:
(1)使用Try类封装异常(无异常返回Success[Output],抛出异常返回Failure),而后再转成Option,代码以下:
def handle(request: Input): Option[Output] = {
Try(handleChain(request)) toOption
}
这是一种通用的解决方式。
(2)对于Scala的偏函数,还有更专用的方式,能够经过偏函数的lift方法把处理结果转成Option,
def handle(request: Input): Option[Output] = {
handleChain lift request
}
测试下, 假如连续收到一连串投诉码”cafebabe wenzhe”,对于每一个投诉,打印处理结果,若是没人处理的业务则不打印:
val cafebabe = "cafebabe wenzhe" println("----- Fp2: partial function -----") for (ch <- cafebabe) { Fp2 handle Input(ch) map (_.value) foreach println }
输出正确结果:
Engineer handles c Customer Service handles a Manager handles b Customer Service handles a Manager handles b Customer Service handles w Manager handles z
利用偏函数间的组合(orElse)方式,取代了nextHandler状态。比起前面4种实现方式,偏函数的实现方式更加简洁,优雅。
到目前为止的5种责任链模式的实现方式,都是单线程的。那么,请读者考虑下面两个问题:
(1) 可否让责任链上的结点多任务并发执行? (2) 可否把责任链部署到分布式环境下,分布在世界各地的多台计算机,经过某种方式构成一条责任链,协同工做,可否作到呢?
5.1 响应式思惟:别拉进来,推出去!
面向对象设计模式通常经过接口(或抽象类)对代码进行隔离,减小代码间的耦合度(见第3节);函数式风格实现的设计模式也是经过“接口”隔离,只不过这个“接口”更加通用,其实就是“一等公民”函数(4.1节)或者偏函数(4.2节)。不管哪一种方式,说到底都是对象间直接的方法调用,都是“拉”式的。用户向客服投诉,客服处理,其实就是用户代码调用了客服类(AHandler)的handle方法(OOP,见第3节),或者调用客服处理函数(FP,见第4节),这些都是“拉”的思惟方式。用户搞不定,因而把客服“拉”进来;客服也搞不定时,把经理“拉”进来;经理也搞不定时,把工程师“拉”进来。他们经过直接调用对方的服务接口实现交互。
与“拉”不一样的方式是“推”,用户搞不定,“推送”消息给客服反映问题;客服对此事件做出响应,若能搞定就把解决方案“推送”回给用户,不然就找帮手,把消息forward给经理;一样的,经理作出响应后若搞不定则再forward给工程师,工程师再对消息事件作出响应。每一个个体之间都是独立的、事件消息驱动的、响应式的,他们之间经过把事件消息“推”出去来实现交互,经过响应不一样的事件消息来作各类具体不一样的事。这种思惟方式就是响应式思惟,更加符合天然。在天然界中,人与人的交互都是事件消息驱动、响应式处理的,人与人都是独立个体,都能并行处理,都是分布式的。基于这种思想的响应式编程(RP)更容易处理并发问题,更适合分布式计算。对象之间能够彻底独立解耦,甚至能够分布在不一样的机器上,就好像咱们能够经过手机给地球另外一端的人交流消息同样。
5.2 解法六:用Actor模型实现责任链模式
5.2.1 Actor模型简介
一个Actor能够比作一我的,或者比作一台单核单任务计算机,或者比作一个企业里的某个工做角色,虽然它只是一个很轻量级的对象而言。具备以下特性:
(1) 灵活部署:多个Actor能够在同一进程内(这时候是并发),也能够跨进程,还能够跨机器跨网络,分布式应用。 (2) Actor内代码执行方式:同一时间一个Actor内永远只有一条线程在执行,所以保证了Actor内部线程安全。Actor就比如一台单核单任务计算机。 (3) Actor间的交互方式:Actor与Actor之间的交流,就像人与人之间的交流,发送事件消息(无类型限制),响应事件并处理。不能直接调用Actor的方法,由于不一样Actor极可能不在同一台机器上。 (4) 有序的树状组织结构:多个Actor能够构成一个社会,每一个Actor就像一个公司里不一样的职位,有做为老板的Actor管理多个部门经理Actor,经理Actor管理多个员工Actor。Actor间有监管机制,如父Actor监管子Actor。 (5) 出错处理:不怕,让他挂!(Let it crash!)当某一Actor挂了,其监控者(另外一Actor)会收到消息,响应方式能够是恢复那个Actor,重启Actor,中止Actor,也能够把本身也挂起,或者也能够继续向它的上级Actor汇报,等等,Actor具备很高的容错性,怎么处理,彻底取决于你! (6) Actor很是轻量级,一个应用程序能够建立几百万个Actor,就像你的计算机瞬间就变成几百万台计算机同样。 (7) Actor很是适合描述现实世界中的对象,相似OOP,只是每一个对象都是Actor并具备Actor的一切优势:线程安全、并发、分布式、高容错性。
5.2.2 Akka,一个支持高并发、分布式计算、消息驱动的Actor库
Akka Actor是使用scala实现的Actor模型,目前已经成为scala的标准Actor模型。Scala之因此在并发编程方面有强大的优点,Akka Actor是其重要缘由。Akka Actor是一个分布式计算库,著名的大数据框架Spark底层就是用它来实现分布式计算的。能够在build.sbt增长依赖(相似Maven,SBT编译时会从Maven中心仓库递归下载依赖):
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.17"
5.2.3 为责任链上的处理者结点定义Actor类
从面向对象的角度,责任链上的每一个结点,即客服,经理,工程师,都是描述现实世界的对象。在Actor响应式编程中,这种描述现实世界的对象很适合用Actor来定义。所以,咱们为这些结点定义一个Actor的子类:HandlerActor(准确讲是混入Actor特质的类,这里说成子类是便于理解)。不一样的结点,除了如下3点不一样,其余都是相同的:
判断有责任处理输入请求的函数:canHandle,类型为:Input => Boolean
处理输入请求的函数:handle,类型为: Input => Output
对下一个处理者结点的引用:nextHandler,类型为:ActorRef,由于每一个结点都是Actor。
咱们能够把这些不一样点做为HandlerActor类的成员变量,对于不一样的结点它们有不一样的值。其中,canHandle和handle两个成员变量能够定义为不可变的,做为构造函数的输入参数,结点构造时须要外部指定;而nextHandler,为了方便在运行时更换结点顺序,设计为可变的。
class HandlerActor(canHandle: Input => Boolean, handle: Input => Output) extends Actor with ActorLogging { private var nextHandler: ActorRef = _
这个类很像第4.1节中的Node类,只不过它继承Actor(后面混入ActorLogging特质是为了方便打log)。
自定义Actor类中惟一一个必须override的方法,是receive方法,它负责接收事件消息而且作出响应。
def receive = { case SetNextHandler(nextHandler) => this.nextHandler = nextHandler case handleEvent @ Handle(request) => { log debug s"${request.value}" if (canHandle(request)) sender ! Result(Some(handle(request))) else if (nextHandler == null) sender ! Result(None) else nextHandler forward handleEvent } }
方法receive返回一个偏函数(见第4.3节),一般咱们用模式匹配来描述偏函数。上面代码中匹配了两个case,代表收到这两种消息事件(无类型限制)以及各自的响应逻辑,下面对这两种事件进行解释:
(1) 第一种事件:SetNextHandler,顾名思义,它请求HandlerActor去设置下一个处理者结点,事件中附带着指望设置为下一处理者的引用。SetNextHandler事件由下面的case类来定义:
case class SetNextHandler(nextHandler: ActorRef)
对这类事件的响应,就是把可变成员变量nextHandler设置为事件要求的值:
case SetNextHandler(nextHandler) => this.nextHandler = nextHandler
舒适提示:能够这样理解模式匹配,即把每一个case语句都当成一个方法(实际不是),会容易理解得多。好比上面的代码,能够想象为:方法名为SetNextHandler,参数为nextHandler(类型为ActorRef,可从case类SetNextHandler中推断出来),方法体为this.nextHandler = nextHandler,返回值可从方法体推断(面向表达式编程,把方法体当成表达式)。对比一下,通常面向对象的思路会定义方法:
def setNextHandler(nextHandler: ActorRef) = this.nextHandler = nextHandler
对比一下,是否是很类似呢?这样对比,可让模式匹配很是容易理解。复杂一点,你能够试试用这种办法理解后面的case Handle(request) => { … }
(2) 第二种事件:Handle,顾名思义,它请求HandlerActor去处理输入,事件中附带着输入请求request。Handle事件可下面的case类来定义:
case class Handle(request: Input)
对该类事件的响应,就是先判断可否处理输入请求,能够的话,就处理该请求,并将处理结果加个信封,做为表示结果(Result)的消息事件,做为回复,告知(tell)原信息的发送者(sender)。
if (canHandle(request)) sender tell Result(Some(handle(request)))
上面的结果Result,对处理结果进行包装,它有下面的case类定义:
case class Result(value: Option[Output])
方法tell是actor中一个很是经常使用的方法(我估计其经常使用度排名第一),它告诉其余actor一个消息(也能够告诉本身,那样能够实现状态模式),也就是向其余actor发送消息。多是它太经常使用了,Akka Actor专门为它定义操做符方法:!,读做tell,表示发送消息。上面的代码,更多时候是这样写的:
if (canHandle(request)) sender ! Result(Some(handle(request)))
若是该结点不能处理,则判断下一结点是否存在,若不存在,就向原消息的发送者回复告知没人能处理该请求。
else if (nextHandler == null) sender ! Result(None)
不然,把请求处理的消息事件forward给下一处理者:
else nextHandler forward handleEvent
5.2.4 建立actor对象
在建立具体Actor对象以前,须要先建立Actor系统(当不再用的时候要关闭它,不然程序不会结束,也不能放在Shutdown hook中关闭),它为咱们提供Actor模型所需的上下文。
val system = ActorSystem("ActorSystem")
接下来,咱们能够为每一个处理者建立actor对象:
val customService = system actorOf (Props(new HandlerActor(canHandleByCustomService, handleByCustomService)), "customService") val manager = system actorOf (Props(new HandlerActor(canHandleByManager, handleByManager)), "manager") val engineer = system actorOf (Props(new HandlerActor(canHandleByEngineer, handleByEngineer)), "engineer")
注意这里customService,manager,engineer的类型不是HandlerActor,而是ActorRef,表明对actor的引用。在Actor编程中,咱们不提倡直接引用Actor类的对象,由于这样很容易直接调用Actor类的方法,而响应式思惟是经过发消息来通知Actor使其作出响应。ActorRef是Akka Actor为咱们提供的抽象,它所引用的actor能够是本地的,也能够是远程的,经过ActorRef的抽象让咱们没必要关注这些底层通讯细节,咱们只要专一于所要处理的业务就行。
虽然这里代码中把customService,manager,engineer这三个actor都在同一个进程里建立了,实际上,它们也能够在不一样的进程、不一样的机器上建立,在代码中咱们能够经过其逻辑路径找到其余机器上(网络上)的actor,持有它的引用(ActorRef),使用起来的代码更本地建立的代码是没有区别的。
5.2.5 连环计,建立基于actor的责任链
接着是把这3个处理者连接起来,构造责任链。不一样于OOP,咱们不能直接调用HandlerActor的setNextHandler方法(固然咱们也没有提供这个方法,即便有也不推荐直接调用),而应该向这些结点发消息,好比向customService发送消息,告诉他若是搞不定能够找manager帮忙:
customService ! SetNextHandler(manager)
一样的,向manager发送消息,告诉他若是连他也搞不定的话能够找engineer帮忙:
manager ! SetNextHandler(engineer)
这样,责任链就造成了。
舒适提示:要更好地理解发送消息的符号“!”以及actor间基于消息传递的交互方式,能够与面向对象中直接方法调用的“.”符号作对比,即把“!”想象成“.”。好比上面的代码,如果直接方法调用,则为:
manager.setNextHandler(engineer)
表示直接调用bHandler的setNextHandler方法,传递参数cHandler。而
manager ! SetNextHandler(engineer)
表示向manager发送消息事件SetNextHandler,附带参数engineer。
其实目的都是同样的,很是类似吧,只是把符号”.”换成符号”!”,把普通对象换成actor对象,把方法名换成事件名,其余都同样,就把方法直接调用变成异步的消息事件发送的了。
5.2.6 如何礼貌地向别人提问题
因为消息处理是异步的,咱们定义的处理函数应该返回一个表明将来值的Future,而不能等待结果处理完才返回。
def handle(request: Input): Future[Option[Output]] = {
如今用户询问客服,但愿客服在5秒以内进行回复:
val future = customService ask (Handle(request), Timeout(5 seconds))
这里ask与前面的tell都是发消息,不一样之处在于tell是说完就忘,不期待别人回复,而ask是期待别人回复的。可是别人可能不会马上就回复你,也可能永远都不给你回复,而你也不会一直傻傻地等着他回复,谁都不能阻塞你,不过你心理有一个超时时间,超过这个时间你就认为他再也不回复了。
咱们已经知道,操做符!与tell是经过意思,使用起来就好像函数直接调用。相似的,ask也有一个同义的操做符,你猜猜看是哪一个?相信你能够猜到,就是问号操做符“?”。
另外,若是每次向人家问问题时老是加上这么一句:“给你5秒钟回答个人问题”,显得很不礼貌,是否是?所以咱们把这个超时时间记在心理就行,干吗非得说出来呢?咱们能够定义把它成隐式变量:
implicit val timeout = Timeout(5 seconds)
接着,问问题就礼貌不少吧:
val future = customService ? Handle(request)
当调用问号方法操做符,它会在上下文查下有没有隐式的Timeout。没有的话,休想编译过!
这里返回值future是Future[Any]类型,表明将来值,这样不至于人家不回答你而让你白白等上一段时间,这样你才不会被阻塞。
惋惜返回值future的泛型是Any,而不是咱们指望的输出结果类型Option[Output]。Any类型能够类比地理解为Java里面的Object(其实Any更强,由于它还包括基本类型,而Java的Object是不行的,因此Scala是一门彻底面向对象的语言,而Java不是)。那么咱们须要把future转化为咱们但愿的类型:Future[Option[Output]]。根据前面代码中HandleActor类中对Handle事件的处理,咱们知道HandleActor会把处理结果Option[Output]封装在信封(Result类)里,所以这里的Any实际上就是Result,转成Result就能够拿到它的value,也就是咱们指望的输出结果了。
future.mapTo[Result] map (_.value)
如今就能够获得充满期待的将来值Future[Option[Output]]了。
5.2.7 对将来值的聪明响应:“我不理你,但当你有结果的时候必定要告诉我!”
测试下, 假如连续收到一连串投诉码”cafebabe wenzhe”,对于每一个投诉,打印处理结果,若是没人处理的业务则不打印:
val cafebabe = "cafebabe wenzhe" println("----- 6. Rp1: akka actor Reactive Programming -----") val futures = cafebabe map (Rp1 handle Input(_))
问了一连串的问题,会获得一连串的答复。不一样于前面5种实现方式,如今的状况是说有的消息处理都是并发的、异步的,handle方法只给你返回不是最终结果,而是表明结果的将来值,就好像有人告诉你“之后你能赚到一个亿”同样,是否是有点忽悠人?那么何时能拿到结果?拿到怎样的结果呢?真的“赚到一个亿”?
这里的futures是一个可索引的序列,类型是:IndexedSeq[Future[Option[Output]]],其中每一个元素是表明每一个输入请求处理结果的将来值。
有两种方式拿到结果:一种是傻傻地等待;另外一种是聪明的响应式思惟:“我不理你,但当你有结果的时候必定要告诉我!”。
具体的,用聪明的响应式思惟,就是对于每一个将来值,当它的任务完成的时候,若是成功,就打印处理结果,若是没人处理的业务则不打印;若是一直等不到回复,超时了,或者其余缘由的问题,就会收到失败的信息,并附带异常(超时的异常为AskTimeoutException),那么咱们就打印异常堆栈信息。
futures foreach (_.onComplete { case Success(output) => output map (_.value) foreach println case Failure(exception) => exception printStackTrace })
运行结果以下:(每次运行顺序都不同)
Customer Service handles w Customer Service handles a Manager handles b Customer Service handles a Engineer handles c Manager handles b Manager handles z
处理结果不是顺序的了,并且每次运行都不同,代表事件响应的过程是并发执行的。
5.2.8 并发世界也有秩序
若是咱们想让运行结果有序,怎么办?这有何难,把序列futures经过Future类的sequence方法合并成一个将来值:
val mergedFuture = Future sequence futures
这个合并的将来值mergedFuture,类型是Future[IndexedSeq[Option[Output]]],即它只是一个Future,将来值的结果是一个可索引的序列。这个将来值mergedFuture,只有当全部的输入请求所有处理完并拿到全部输出结果时,把结果按照输入请求的顺序,可索引序列做为结果输出。咱们只要拿到这个结果就是有序的了。
前面第5.2.7节已经介绍过聪明的办法,这里不重复了,恰恰就用很傻很天真的办法,傻傻地等待,阻塞当前线程,直到别人把全部结果都告诉你为止。(固然也不会傻到等上一成天,其实等上5秒就足够傻了,^_^)
val outputs = Await result (mergedFuture, 5 seconds)
output是咱们想要的有序结果,类型是IndexedSeq[Option[Output]],如今对于每一个投诉,有序地打印处理结果,若是没人处理的业务则不打印:
outputs.flatten map (_.value) foreach println
输出与输入一样顺序的结果:
Engineer handles c Customer Service handles a Manager handles b Customer Service handles a Manager handles b Customer Service handles w Manager handles z
5.3 解法七:用RX(Reactive eXtension,响应式扩展)实现责任链模式
RX是基于事件流处理的响应式编程开源库,目前已经有多种语言的实现,好比RxJava,RxScala(RxScala实际上是在RxJava基础上增长了一层adapter,使API更友好)。关于RxJava能够参考我另外几篇文章:
(1)实验驱动开发与响应式编程 —- File Watcher的技术实现
(2)性能优化:RxJava异步响应式编程提高响应速度
(3)基于RxJava实现事件总线
本文使用RxScala,须要在build.sbt增长依赖(相似Maven,SBT编译时会从Maven中心仓库递归下载依赖):
libraryDependencies += "io.reactivex" %% "rxscala" % "0.26.5"
对RX进行响应式编程,主要是对事件流(Observable)进行一连串响应,包括过滤,转换,处理,等等操做,使其流向指望的目的地。咱们能够把事件流(Observable)比做FP中的高阶函数(见4.2节)。
相似FP,对于客服,经理,工程师,他们响应输入请求的过程是:为输入请求request构造事件流,而后过滤使得只有可以处理(canHandle)的事件经过,而后处理(handle)请求并返回带有结果的事件。这个过程须要外部提供如下3个参数:输入请求request,判断是否可以处理的函数(canHandle),具体处理的函数(handle)。因而咱们能够定义下面的三阶函数(3个参数列表)来表示每一个结点的响应输入请求的处理过程:
def handler(canHandle: Input => Boolean)(handle: Input => Output)(request: Input) = {
Observable just request filter canHandle map handle
}
handler函数是这样一个3阶函数,它接受输入request,构造出以Input为消息的事件流Observable,事件流通过filter过滤,只让那些canHandle事件日后流,接着事件流到map,经过handle把输入消息Input转化为Output,而后把事件流做为返回值流向函数的调用者,以便后续控制事件流的流向。
所以咱们能够构造出每一个结点:客服,经理,工程师,他们的不一样方式在于判断是否可以处理的函数(canHandle),具体处理的函数(handle),咱们把这些不一样的地方传入上面的3阶函数handler,获得表明每一个处理结点的偏应用函数(见4.2节):
val customService = handler(canHandleByCustomService)(handleByCustomService) _ val manager = handler(canHandleByManager)(handleByManager) _ val engineer = handler(canHandleByEngineer)(handleByEngineer) _
上面customService,manager,engineer,已降为一阶的普通函数了,类型是:Input => Observable[Output],即输入参数是输入请求Input,输出是带有输出信息Output的事件流。
将结点链起来,就能够处理用户投诉事件了。当客服不能处理request事件,就switch给经理,经理不能处理就switch给工程师。
def handle(request: Input): Observable[Output] = {
customService(request) switchIfEmpty manager(request) switchIfEmpty engineer(request)
}
输入一连串业务,好比cafebabe = “cafebabe wenzhe”,处理逻辑能够用下面一条事件流描述,事件流最后流到print,把输出值打印出来。
println("----- Rp2: RX Reactive Programming (single thread)-----")
Observable from cafebabe map (Input(_)) flatMap (Rp2 handle _) map (_.value) foreach println
咱们没有对事件流进行异步处理,所以上面的处理过程是单线程的,输出有序的结果:
----- Rp2: RX Reactive Programming (single thread)----- Engineer handles c Customer Service handles a Manager handles b Customer Service handles a Manager handles b Customer Service handles w Manager handles z
咱们很容易把事件流的响应过程异步化,好比让经理处理的串行事件流manager(request)流过subscribeOn操做符,能够转化为异步事件流asyncHandleByManager:
val asyncHandleByManager = manager(request) subscribeOn ComputationScheduler()
subscribeOn方法后面接受线程池调度器,这里用的ComputationScheduler使用的线程池里的线程个数与计算机CPU的核数相同,你也能够把它替换成你想要的。
相似的办法能够建立经理和工程师的异步事件流,再用switchIfEmpty操做符将它们连接起来构成异步的责任链:
def asyncHandle(request: Input): Observable[Output] = { val asyncHandleByCustomService = customService(request) subscribeOn ComputationScheduler() val asyncHandleByManager = manager(request) subscribeOn ComputationScheduler() val asyncHandleByEngineer = engineer(request) subscribeOn ComputationScheduler() asyncHandleByCustomService switchIfEmpty asyncHandleByManager switchIfEmpty asyncHandleByEngineer }
输入一样一连串业务cafebabe = “cafebabe wenzhe”,让事件流流入能并发处理的asyncHandleByABC:
println("----- Rp2: RX Reactive Programming (multiple thread)-----")
Observable from cafebabe map (Input(_)) flatMap (Rp2 asyncHandle _) map (_.value) foreach println
运行结果以下:(每次运行顺序都不同)
----- Rp2: RX Reactive Programming (multiple thread)----- Customer Service handles a Engineer handles c Manager handles b Manager handles b Customer Service handles w Customer Service handles a Manager handles z
处理结果再也不是顺序的了,并且每次运行都不同,代表事件响应的过程是并发执行的。
5.4 两种响应式编程方式的比较
Actor处理可以在进程内使用,还能够跨进程、跨机器、跨网络,可以适用于分布式计算;RX只能在进程内使用。
若是要串行(单线程)执行,或者单线程多线程切换,RX要比Actor更加方便。
从编程风格看,Actor更像面向对象OOP,须要定义一个Actor类,一个无需考虑线程安全问题的类;而RX更像是函数式编程,使用高阶函数,经过流式处理,代码可读性更好。
因为Scala语言简洁易懂,读起来类似天然语言,开发效率和运行效率都很高,并且支持多种编程范式(便是彻底面向对象语言,又是函数式编程语言),再加上响应式的Akka Actor、RxScala),所以本文采用Scala做为例子代码show给读做,也顺带介绍了一些Scala语言特性,读者若是熟悉Java 8,会很容易理解,由于Java 8不少特性都借鉴自scala。
本文描述了责任链模式的应用场景,而后给出7中不一样风格的实现方式。第一种是传统面向对象的基于继承的实现方式;接着以聚合代替继承,给出了第二种面向对象实现方式;接下来从OOP逐步过渡到FP,第三种实现就是混合了OOP和FP两种范式的实现方式;接下来的第四种和第五种都是纯FP实现,分别使用了偏应用函数和偏函数;从第一种到第五种风格的演化过程当中,代码越来短,当可扩展能力和灵活性却愈来愈好;接着介绍响应式思惟,以及两种不一样的实现,第六种实现是基于Actor模型,而第七种实现是基于事件流响应的流式处理,最后比较了这两种响应式风格。在介绍这7种实现风格的过程当中,考虑到大多数程序员是面向对象出身,本文对函数式编程、响应式编程的概念进行了比较细致的介绍。原文:https://blog.csdn.net/liuwenzhe2008/article/details/70199520