Java设计模式——观察者模式的灵活应用

灵感来源于一个猪队友给个人题目java

 

 看到这个,我抓住的关键字是:任何子任务失败,要通知全部子任务执行取消逻辑。dom

 这不就是消息广播吗?观察者模式!ide

干活

首先是收听者测试

package com.example.broadcast;

/**
 * 每一个节点便是广播者,也是收听者
 */
public interface Listener {

    /**
     * 设置调度中心
     */
    void setCenter(DispatchCenter center);

    /**
     * 主动通知其它收听者
     */
    void notice(String msg);

    /**
     * 本身收到通知的处理逻辑
     * @param msg
     */
    void whenReceived(String msg);

    /**
     * 收听者标志:惟一
     * @return
     */
    String identify();

}

而后是调度中心this

package com.example.broadcast;

/**
 * 调度中心
 */
public interface DispatchCenter {

    /**
     * 广播
     * @param own 广播的时候,要排除本身
     * @param msg 广播消息
     */
    void broadcast(String own, String msg);

    /**
     * 添加收听者
     * @param listener
     */
    void addListener(Listener listener);

}

调度中心实现spa

package com.example.broadcast;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


public class DispatchCenterImpl implements DispatchCenter {

    private static final Map<String, Listener> MAP = new ConcurrentHashMap<>();

    @Override
    public void broadcast(String own, String msg) {
        MAP.forEach((k,v) -> {
            // 不用给本身发通知
            if (!k.equals(own)){
                v.whenReceived(msg);
            }
        });
    }

    @Override
    public void addListener(Listener listener) {
        listener.setCenter(this);
        MAP.put(listener.identify(), listener);
    }
}

剩下三个收听者线程

package com.example.broadcast;

import java.util.UUID;

public class ListenerA implements Listener {

    private DispatchCenter center;
    private String identify;

    public ListenerA() {
        identify = UUID.randomUUID().toString();
    }

    @Override
    public void setCenter(DispatchCenter center) {
        this.center = center;
    }

    @Override
    public void notice(String msg) {
        center.broadcast(identify, msg);
    }

    @Override
    public void whenReceived(String msg) {
        System.out.println(this.getClass().getName() + "收到消息:" + msg);
    }

    @Override
    public String identify() {
        return identify;
    }
}

B和C除了类名不同,其余都同样,再也不赘述。目录以下3d

 

测试

package com.example.broadcast;


import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {


    public static void main(String[] args) {
        DispatchCenter center = new DispatchCenterImpl();
        ListenerA listenerA = new ListenerA();
        ListenerB listenerB = new ListenerB();
        ListenerC listenerC = new ListenerC();
        center.addListener(listenerA);
        center.addListener(listenerB);
        center.addListener(listenerC);

        ExecutorService executorService = Executors.newFixedThreadPool(3);

        // A触发1条事件
        executorService.submit(() -> {
            int i = 1;
            while (i > 0){
                listenerA.notice(listenerA.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
                i--;
            }
        });
        // B触发2条事件
        executorService.submit(() -> {
            int i = 2;
            while (i > 0){
                listenerB.notice(listenerB.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
                i--;
            }
        });
        // C触发3条事件
        executorService.submit(() -> {
            int i = 3;
            while (i > 0){
                listenerC.notice(listenerC.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
                i--;
            }
        });

        executorService.shutdown();

    }


}

输出:code

 

 流程图

 

 当其中的B节点,发生了错误,除了把本身处理好以外blog

1. 向调度中心发送广播请求,并携带须要的消息

2. 调度中心遍历收听者,挨个通知(执行)每个收听者接受消息的逻辑

 

关于中止任务

由于题目要求,【快速取消】全部子任务

关于线程中止的方法也有不少:

1. 优雅退出run方法

2. 暴力stop

3. run方法抛出异常

 

若是说要求,A异常了,B和C收到消息以后,线程当即中止,不能有一点迟疑,说实话我还没想到该怎么作。由于你要知道,实际上的任务的run方法内部,不太多是个while循环,人家可能就是个顺序执行,因此中止标志位的方式,并不适用。

而其它的方法,我也没想到很好的。我只能写个按照标志位中止的“玩具”

修改三个收听者代码和测试类

package com.example.broadcast;

import lombok.SneakyThrows;

import java.util.Random;
import java.util.UUID;

public class ListenerA implements Listener,Runnable {

    private DispatchCenter center;
    private String identify;

    public ListenerA() {
        identify = UUID.randomUUID().toString();
    }

    @Override
    public void setCenter(DispatchCenter center) {
        this.center = center;
    }

    @Override
    public void notice(String msg) {
        center.broadcast(identify, msg);
    }

    @Override
    public void whenReceived(String msg) {
        System.out.println(this.getClass().getName() + "收到消息:" + msg);
    }

    @Override
    public String identify() {
        return identify;
    }

    @SneakyThrows
    @Override
    public void run() {
        // 5秒以后,模拟发生异常
        Thread.sleep(5000);
        notice(this.getClass().getName() + "说:我有" + new Random().nextInt(1000000) + "元");
        System.out.println(this.getClass().getName() + "程序异常,并已经传播了消息...");
    }
}
package com.example.broadcast;

import lombok.SneakyThrows;

import java.util.UUID;

public class ListenerB implements Listener,Runnable {

    private DispatchCenter center;
    private String identify;
    private volatile Boolean stopFlag = false;

    public ListenerB() {
        identify = UUID.randomUUID().toString();
    }

    @Override
    public void setCenter(DispatchCenter center) {
        this.center = center;
    }

    @Override
    public void notice(String msg) {
        center.broadcast(identify, msg);
    }

    @Override
    public void whenReceived(String msg) {
        System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg);
        // 中止当前线程
        stopFlag = true;
    }

    @Override
    public String identify() {
        return identify;
    }

    @SneakyThrows
    @Override
    public void run() {
        while (!stopFlag){
            Thread.sleep(1000);
            System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B在执行任务");
        }
        System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__B Dead");
    }
}
package com.example.broadcast;

import lombok.SneakyThrows;

import java.util.UUID;

public class ListenerC implements Listener,Runnable {

    private DispatchCenter center;
    private String identify;
    private volatile Boolean stopFlag = false;

    public ListenerC() {
        identify = UUID.randomUUID().toString();
    }

    @Override
    public void setCenter(DispatchCenter center) {
        this.center = center;
    }

    @Override
    public void notice(String msg) {
        center.broadcast(identify, msg);
    }

    @Override
    public void whenReceived(String msg) {
        System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "收到消息:" + msg);
        // 中止当前线程
        stopFlag = true;
    }

    @Override
    public String identify() {
        return identify;
    }

    @SneakyThrows
    @Override
    public void run() {
        while (!stopFlag){
            Thread.sleep(1000);
            System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C在执行任务");
        }
        System.out.println(this.getClass().getName() + "_" + Thread.currentThread().getName() + "__C Dead");
    }
}

测试

package com.example.broadcast;


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {


    public static void main(String[] args) {
        DispatchCenter center = new DispatchCenterImpl();
        ListenerA listenerA = new ListenerA();
        ListenerB listenerB = new ListenerB();
        ListenerC listenerC = new ListenerC();
        center.addListener(listenerA);
        center.addListener(listenerB);
        center.addListener(listenerC);

        ExecutorService executorService = Executors.newFixedThreadPool(3);

        // A
        executorService.submit(listenerA);
        // B
        executorService.submit(listenerB);
        // C
        executorService.submit(listenerC);

        executorService.shutdown();

    }


}

 

 

这个是这么多年第一个发到首页的,就是想问下你们怎样解决这种状况下的线程中止问题

相关文章
相关标签/搜索