观察者模式使用频率很高,用于创建一种对象之间的依赖关系,当一个对象发生改变时自动通知其余对象,其余对象将作出相应反应。在观察者模式中,发生改变的对象叫作观察目标,也叫被观察者,而被通知的对象叫作观察者。java
一个观察目标能够对应多个观察者,并且这些观察者之间没有任何相互关联,能够根据须要增长和删除观察者,使得系统便于扩展。编程
观察者模式:定义对象之间的一种一对多依赖关系,使得每个对象状态发生改变时,其相关依赖对象皆获得通知并自动更新。api
观察者模式是一种对象行为型模式。安全
Subejct
(抽象目标):又叫主题,指被观察的对象,也就是被观察者,在目标中定义了一个观察者集合,同时提供一系列方法来增长或者删除观察者对象,也定义了通知方法notify
ConcreteSubject
(具体目标):抽象目标的子类,一般包含有常常改变的数据,当状态发生改变时,向各个观察者发出通知,同时还实现了目标类中定义的抽象业务逻辑,若是无须扩展抽象目标类则能够省略具体目标类Observer
(抽象观察者):对观察目标做出响应,通常定义为接口ConcreteObserver
(具体观察者):具体观察者中维护一个指向具体目标的引用,存储具体观察者的有关状态,这些状态须要与具体目标的状态保持一致,同时实现了抽象观察者的update
方法notifyObserver
的通知观察者的抽象方法interface Observer { void update(String state); }
这里实现为一个接口,update
方法供抽象目标,也就是供被观察者调用。微信
class ConcreteObserver implements Observer { public String state; public ConcreteObserver(String state) { this.state = state; } @Override public void update(String state) { System.out.println("观察者状态更新为"+state); } }
实现其中的update
方法,这里只是简单将状态输出。异步
abstract class Subject { private List<Observer> list = new ArrayList<>(); public void attach(Observer observer) { list.add(observer); } public void detach(Observer observer) { list.remove(observer); } public void notifyObservers(String state) { list.forEach(t->t.update(state)); } public abstract void change(String newState); }
抽象目标类负责管理观察者集合,使用List
存储抽象观察者,包含添加/删除观察者方法。notifyObservers
中通知了全部的观察者,将状态做为具体参数进行传递。change
做为被观察者的状态改变函数,将新状态做为参数传入。ide
class ConcreteSubject extends Subject { private String state; public String getState() { return state; } @Override public void change(String newState) { state = newState; System.out.println("被观察者状态为:"+newState); notifyObservers(newState); } }
具体目标类负责实现抽象目标的change
方法,保存新状态后,经过抽象目标的notifyObservers
通知全部观察者。函数
public static void main(String[] args) { Observer observer1 = new ConcreteObserver("111"); Observer observer2 = new ConcreteObserver("111"); Observer observer3 = new ConcreteObserver("111"); Subject subject = new ConcreteSubject(); subject.attach(observer1); subject.attach(observer2); subject.attach(observer3); subject.change("2222"); }
客户端针对抽象观察者以及抽象目标进行编程,定义好各个观察者后,添加到抽象目标中进行管理,接着更新被观察者的状态。性能
输出以下:测试
一个多人联机游戏中,拥有战队机制,当基地受到攻击时,将通知该战队全部成员进入警惕状态,使用观察者模式进行设计。
设计以下:
Observer
Player
Subject
Base
抽象观察者:
interface Observer { void update(String state); }
包含一个供抽象目标调用的update()
方法。
接着是具体观察者:
class Player implements Observer { public String state; public String name; public Player(String name,String state) { this.name = name; this.state = state; } @Override public void update(String state) { System.out.println("战队成员"+name+"状态更新为"+state); } }
在update
中输出更新的状态。
抽象目标以下:
abstract class Subject { private List<Observer> list = new ArrayList<>(); public void attach(Observer observer) { list.add(observer); } public void detach(Observer observer) { list.remove(observer); } public void notifyObservers(String state) { System.out.println("基地通知全部战队成员"); list.forEach(t->t.update(state)); } public abstract void change(String newState); }
使用List
存储全部战队成员,在通知方法中通知全部的观察者,change
定义为抽象方法供子类实现。
具体目标(被观察者)以下:
class Base extends Subject { private String state; public String getState() { return state; } @Override public void change(String newState) { state = newState; System.out.println("基地状态更新为:"+newState); notifyObservers(newState); } }
实现抽象目标的change
方法,里面须要调用notifyObservers
方法通知全部观察者。
测试:
public static void main(String[] args) { Observer player1 = new Player("A","无警惕状态"); Observer player2 = new Player("B","无警惕状态"); Observer player3 = new Player("C","无警惕状态"); Subject subject = new Base(); subject.attach(player1); subject.attach(player2); subject.attach(player3); subject.change("警惕状态"); }
输出以下:
在观察者模式中,能够分为推模型以及拉模型。
推模型是被观察者向观察者推送观察目标的详细信息,无论观察者是否须要,推送的信息一般是被观察者对象的所有或部分数据。像上面的例子就是推模型,被观察者(基地)主动把状态数据推送给观察者(战队成员)。
拉模型当被观察者通知观察者时,只传递少许信息,若是观察者须要更加详细的信息,由观察者主动到观察目标中获取,至关于时观察者从主题对象中拉去数据。这种方式通常把被观察者自身经过update
传递给观察者,获取数据时时直接经过这个被观察者引用获取。
能够将上面的基地例子修改从推模型修改成拉模型,首先修改观察者中的update()
参数:
interface Observer { void update(Subject subject); }
接着修改具体观察者:
class Player implements Observer { public String state; public String name; public Player(String name,String state) { this.name = name; this.state = state; } @Override public void update(Subject subject) { System.out.println("战队成员"+name+"状态更新为"+subject.getState()); } }
主要的不一样是原来的推模型直接把状态做为参数传递,如今传递一个抽象目标对象,须要具体观察者从中主动获取数据。
而后是抽象目标:
abstract class Subject { private String state; private List<Observer> list = new ArrayList<>(); public void attach(Observer observer) { list.add(observer); } public void detach(Observer observer) { list.remove(observer); } public void notifyObservers() { System.out.println("基地通知全部战队成员"); list.forEach(t->t.update(this)); } public String getState() { return state; } public void setState(String state) { this.state = state; } public abstract void change(String newState); }
主要改变是多了一个state
成员,同时去掉notifyObservers()
中的参数。
最后是具体目标:
class Base extends Subject { @Override public void change(String newState) { setState(newState); System.out.println("基地状态更新为:"+newState); notifyObservers(); } }
客户端代码无须任何修改,测试输出结果一致:
update
方法Observer
与Observable
观察者模式在Java中很是重要,JDK的java.util
提供了Observer
以及Observable
接口做为对观察者模式的支持。
Observer
java.util.Observer
接口充当抽象观察者,只声明了一个方法:
void update(Observable o,Object arg);
当观察目标的状态发生变化时,该方法会被调用,在Observer
子类实现update
,不一样的具体观察者具备不一样的更新行为,当调用Observable
的notifyObservers()
时,将执行update
方法。
update
的接口两个参数中,一个表示被观察者,一个表示调用notifyObservers
的参数,换句话说,这样设计能同时支持推模型与拉模型:
notifyObervers()
中传入arg
参数,也就是update
中的arg
参数notifyObservers
中传入参数,可是须要在被观察者中声明获取状态或数据的方法,方便在update
中经过被观察者引用o
进行强制类型转换后调用Observable
java.util.Observable
充当抽象目标类,其中定义了一个Vector
存储观察者对象,包含的方法(OpenJDK11.0.2)以下:
public class Observable { private boolean changed = false; private Vector<Observer> obs; public Observable() { //构造函数,初始化 obs } public synchronized void addObserver(Observer o) { //注册观察者到obs中 } public synchronized void deleteObserver(Observer o) { //删除obs中的某个观察者 } public void notifyObservers() { //通知方法,内部调用每个观察者的update() } public void notifyObservers(Object arg) { //相似上面的通知方法,带参数调用update() } public synchronized void deleteObservers() { //删除全部观察者 } protected synchronized void setChanged() { //设置changed为true,表示观察目标的状态发生变化 } protected synchronized void clearChanged() { //清除changed的状态,表示观察目标状态再也不发生改变 //或者已经通知了全部的观察者 } public synchronized boolean hasChanged() { //返回changed,表示观察对象是否发生改变 } public synchronized int countObservers() { //返回观察者数量 } }
将上面基地的例子用Observable
以及Observer
实现以下:
public class Test { public static void main(String[] args) { Observer player1 = new Player("A","无警惕状态"); Observer player2 = new Player("B","无警惕状态"); Observer player3 = new Player("C","无警惕状态"); Base base = new Base(); base.addObserver(player1); base.addObserver(player2); base.addObserver(player3); base.change("警惕状态"); } } class Player implements Observer { private String name; private String state; public Player(String name,String state) { this.name = name; this.state = state; } @Override public void update(Observable o,Object arg) { System.out.println("战队成员"+name+"更新状态为"+arg); } } class Base extends Observable { public void change(String state) { setChanged(); notifyObservers(state); } }
具体观察者Player
实现Observer
接口,具体目标Base
(被观察者)继承Observable
,注意须要在notifyObservers
以前,使用Observable
的setChanged
表示被观察者状态改变,这样使用notifyObservers
才能生效,不然认为被观察者没有发生状态改变:
查看源码发现notifyObservers
中先对changed
内部布尔变量进行了判断,若是具体目标没有使用setChanged
方法,将致使没法通知观察者。
这里使用了推模型实现,具体目标在notifyObservers
中传递状态参数:
class Player implements Observer { //... @Override public void update(Observable o,Object arg) { System.out.println("战队成员"+name+"更新状态为"+arg); } } class Base extends Observable { public void change(String state) { setChanged(); notifyObservers(state); } }
使用拉模型修改以下:
class Player implements Observer { //... public void update(Observable o,Object arg) { System.out.println("战队成员"+name+"更新状态为"+((Base)o).getState()); } } class Base extends Observable { private String state; public String getState() { return state; } public void change(String state) { this.state = state; setChanged(); notifyObservers(); } }
具体观察者的update
中由原来的从arg
获取状态变为从Observable
中经过getter获取状态,同时具体目标增长了state
成员,在notifyObservers
中不需手动传入状态参数。
Flow API
虽然使用JDK的Observable
以及Observer
实现观察者模式很容易,不须要定义抽象目标以及抽象观察者,可是很遗憾的是从Java9开始标记为过期了(看着一条条横线也挺难受的):
查了一下缘由,标记为过期主要是由于:
Observable
没有实现序列化接口Flow API
为了克服原来的缺点,从JDK9开始出现了Flow API
,位于java.util.concurrent
下。
在讲Flow API
以前,先看一下响应式编程。
响应式编程能够理解为一种处理数据项的异步流,即在数据产生的时候,接收者就对其进行响应。在响应式编程中,会有一个数据发布者(Publisher
)以及数据订阅者(Subscriber
),后者用于异步接收发布者发布的数据。
在该模式中,还引入了一个更高级的特性:数据处理器(Processor
),用于将数据发布者发布的数据进行某些转换操做,而后再发布给数据订阅者。响应式编程是异步非阻塞编程,可以提高程序性能,能够解决传统编程遇到的困难,基于这个模型实现的有Java 9 Flow API
,RxJava
,Reactor
等。
Flow API
Flow
是一个final
类,里面定义了四个接口:
Publisher<T>
:数据发布者接口Subscriber<T>
:数据订阅者接口Subscription
:发布者和订阅者之间的订阅关系Processor<T,R>
:数据处理器public static int defaultBufferSize()
:返回缓冲区长度,默认256。当发布者发送速率高于接收速率时,数据接收者缓冲区将会被填满,当缓冲区填满后,发布者会中止发送数据,直到订阅者有空闲位置时,发布者才会继续发布数据Publisher<T>
Publisher
源码以下:
@FunctionalInterface public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); }
这是一个函数式接口,只包含一个subscribe
方法,经过该方法将数据发布出去。
Subscriber<T>
Subscriber
源码以下:
public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); }
方法解释以下:
onSubscribe
:订阅成功的回调方法,用于初始化Subscription
,代表能够开始接收订阅数据了onNext
:接收下一项订阅数据的回调方法onError
:在Publisher
或Subscriber
遇到不可恢复的错误时会调用该方法,Subscriber
再也不接收订阅信息onComplete
:接收完全部订阅数据,而且发布者已经关闭后会回调该方法Subscription
Subscription
源码以下:
public static interface Subscription { public void request(long n); public void cancel(); }
方法解释以下:
request
:用于向数据发布者请求n个数据项cancel
:取消消息订阅,订阅者再也不接收数据Processor<T,R>
Processor
源码以下:
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {}
这是一个空接口,继承了Subscriber
以及Publisher
,它既能发布数据也能订阅数据,基于这个特性它能够充当数据转换的角色,先从数据发布者接收数据,通过处理后发布给数据订阅者。
public class Test { public static void main(String[] args) { //JDK9自带的数据发布者,实现了Publisher<T> SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); //建立订阅者,用于接收发布者消息 Subscriber<String> subscriber = new Subscriber<>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { //经过Subscription和发布者保持订阅关系 //并用它来给发布者反馈 this.subscription = subscription; //请求一个数据 this.subscription.request(1); } @Override public void onNext(String item) { //接收发布者发布的信息 System.out.println("订阅者接收消息:"+item); //接收后再次请求一个数据 this.subscription.request(1); //若是不想接收直接调用cancel // this.subscription.cancel(); } @Override public void onError(Throwable throwable) { //异常回调 System.out.println("订阅者接收数据异常:"+throwable); throwable.printStackTrace(); this.subscription.cancel(); } @Override public void onComplete() { //发布者发送的数据都被接收了 //而且发布者关闭后就会回调该方法 System.out.println("订阅者接收数据完毕"); } }; //创建发布者与订阅者的关系 publisher.subscribe(subscriber); //发布数据 for(int i=0;i<10;++i) { String message = "flow api "+i; System.out.println("发布者发布消息:"+message); publisher.submit(message); } //发布结束后关闭发布者 publisher.close(); //main延迟关闭,不然订阅者没接收完消息线程就被关闭 try { Thread.currentThread().join(2000); } catch(Exception e) { e.printStackTrace(); } } }
步骤:
SubmissionPublisher<String>
做为消息发布者Subscriber<String>
做为消息订阅者publisher.subscribe(subsciber)
创建submit
发布数据close()
关闭发布者,同时会回调订阅者的onComplete
方法输出以下:
注意例子中最后须要延迟关闭main
线程,若是没有这个操做,订阅者就不能彻底接收全部信息:
能够从输出看到,订阅者接收到第8条消息后,线程就被关闭了。
前面说过Flow
中有一个静态方法返回缓冲区大小,下面进行模拟填满,在订阅者中的订阅方法中,加入延迟:
@Override public void onNext(String item) { //模拟接收数据缓慢填满缓冲池 try { TimeUnit.MILLISECONDS.sleep(300); } catch(InterruptedException e) { e.printStackTrace(); } System.out.println("订阅者接收消息:"+item); //接收后再次请求一个数据 this.subscription.request(1); }
由于默认的缓冲区大小为256,所以,发布256条信息后,能够看到再也不发送,直到等到订阅者处理才继续发布:
Processor
Processor
就是Publisher
+Subscriber
,一般是用做接收发布者发布的信息,进行相应处理后,再将数据发布,供消息者订阅接收,下面是一个简例:
public class Test { public static void main(String[] args) { //JDK9自带的数据发布者,实现了Publisher<T> SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); //建立订阅者,用于接收发布者消息 TestProcessor processor = new TestProcessor(); Subscriber<String> subscriber = new Subscriber<>() { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(String item) { System.out.println("订阅者接收消息:"+item); this.subscription.request(1); } @Override public void onError(Throwable throwable) { System.out.println("订阅者接收异常"); throwable.printStackTrace(); this.subscription.cancel(); } @Override public void onComplete() { System.out.println("订阅者接收完毕"); } }; publisher.subscribe(processor); processor.subscribe(subscriber); //发布数据 for(int i=0;i<10;++i) { String message = "flow api "+i; System.out.println("发布者发布消息:"+message); publisher.submit(message); } //发布结束后关闭发布者 publisher.close(); //main延迟关闭,不然订阅者没接收完消息线程就被关闭 try { Thread.currentThread().join(2000); } catch(Exception e) { e.printStackTrace(); } } } class TestProcessor extends SubmissionPublisher<String> implements Processor<String,String> { private Subscription subscription; @Override public void onSubscribe(Subscription subscription) { //经过Subscription和发布者保持订阅关系 //并用它来给发布者反馈 this.subscription = subscription; //请求一个数据 this.subscription.request(1); } @Override public void onNext(String item) { //模拟接收数据缓慢填满缓冲池 System.out.println("处理器处理消息:"+item); item = "通过处理器处理的消息:"+item; //接收后再次请求一个数据 this.submit(item); this.subscription.request(1); } @Override public void onError(Throwable throwable) { //异常回调 System.out.println("处理器处理数据异常:"+throwable); throwable.printStackTrace(); this.subscription.cancel(); } @Override public void onComplete() { System.out.println("处理者处理数据完毕"); this.close(); } }
步骤:
SubmissionPublisher<String>
SubmissionPublisher<String>
并实现Processor<String,String>
的类,在其中的onNext
方法中对消息进行处理并调用submit
发布给订阅者,在其中的onComplete
调用close()
关闭处理器Subscriber<String>
输出:
Flow API
实现例子讲了这么多Flow API
的例子,下面来看看如何使用Flow API
实现基地的例子。
public class Test { public static void main(String[] args) { Base base = new Base(); Player player1 = new Player("A", "非戒备状态"); Player player2 = new Player("B", "非戒备状态"); Player player3 = new Player("C", "非戒备状态"); base.add(player1); base.add(player2); base.add(player3); base.changed("戒备状态"); base.close(); } } class Base { SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); private List<Player> players = new ArrayList<>(); public void add(Player player) { publisher.subscribe(player); players.add(player); } public void remove(Player player) { player.cancel(); players.remove(player); } public void changed(String state) { System.out.println("基地正在遭受攻击"); publisher.submit(state); } public void close() { publisher.close(); try { Thread.currentThread().join(2000); } catch(Exception e) { e.printStackTrace(); } } } class Player implements Subscriber<String> { private Subscription subscription; private String name; private String state; public Player(String name,String state) { this.name = name; this.state = state; } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; this.subscription.request(1); } @Override public void onNext(String item) { System.out.println("战队成员"+name+"更新状态:"+item); this.subscription.request(1); } @Override public void onError(Throwable throwable) { System.out.println("战队成员接收异常"); throwable.printStackTrace(); this.subscription.cancel(); } public void cancel() { this.subscription.cancel(); } @Override public void onComplete() { System.out.println("战队成员接收完毕"); } }
大部分代码都与上面的例子相同,就不解释了,贴一下输出:
若是以为文章好看,欢迎点赞。
同时欢迎关注微信公众号:氷泠之路。