使用JDK的观察者接口进行消息推送

观察者模式就是对对象内部的变化进行观察,当发生改变时作出相应的响应。代码样例见 设计模式整理 !java

由于观察者模式较为重要,使用频率较高,JDK早已经提供了内置的观察者接口以及被观察者父类。设计模式

JDK中的观察者接口源码以下数组

public interface Observer {
    /**
     * 当被观察者发生变化时,执行的方法
     *
     * @param   o     被观察者父类,这里通常要强制转化成被观察者子类
     * @param   arg   an argument passed to the <code>notifyObservers</code>
     *                 method.
     */
    void update(Observable o, Object arg);
}

被观察者父类源码,咱们能够看到它使用了Vector的List列表来保存观察者接口对象,Vector自己是线程安全的,虽然如今已经用的并很少。缓存

public class Observable {
    //被观察者是否发生了改变
    private boolean changed = false;
    //保存观察者接口对象的列表
    private Vector<Observer> obs;

    /** Construct an Observable with zero Observers. */

    public Observable() {
        obs = new Vector<>();
    }

    /**
     * 注册观察者接口对象,加了显示锁来保证线程安全
     *
     * @param   o   an observer to be added.
     * @throws NullPointerException   if the parameter o is null.
     */
    public synchronized void addObserver(Observer o) {
        if (o == null)
            throw new NullPointerException();
        if (!obs.contains(o)) {
            obs.addElement(o);
        }
    }

    /**
     * 移除观察者接口对象
     * @param   o   the observer to be deleted.
     */
    public synchronized void deleteObserver(Observer o) {
        obs.removeElement(o);
    }

    /**
     * 当被观察者发生改变时,通知观察者
     *
     * @see     java.util.Observable#clearChanged()
     * @see     java.util.Observable#hasChanged()
     * @see     java.util.Observer#update(java.util.Observable, java.lang.Object)
     */
    public void notifyObservers() {
        notifyObservers(null);
    }

    /**
     * 调用观察者对象完成响应操做
     *
     * @param   arg   any object.
     * @see     java.util.Observable#clearChanged()
     * @see     java.util.Observable#hasChanged()
     * @see     java.util.Observer#update(java.util.Observable, java.lang.Object)
     */
    public void notifyObservers(Object arg) {
        /*
         * a temporary array buffer, used as a snapshot of the state of
         * current Observers.
         */
        Object[] arrLocal;

        synchronized (this) {
            if (!changed)
                return;
            //将观察者列表转化成数组
            arrLocal = obs.toArray();
            clearChanged();
        }
        //调用逐个观察者进行响应
        for (int i = arrLocal.length-1; i>=0; i--)
            ((Observer)arrLocal[i]).update(this, arg);
    }

    /**
     * 清除全部的观察者接口对象
     */
    public synchronized void deleteObservers() {
        obs.removeAllElements();
    }

    /**
     * 告知被观察者已经发生了改变,这是一个线程安全的
     */
    protected synchronized void setChanged() {
        changed = true;
    }

    /**
     * 观察者已经作出反应后恢复被观察者的改变状态为未改变
     *
     * @see     java.util.Observable#notifyObservers()
     * @see     java.util.Observable#notifyObservers(java.lang.Object)
     */
    protected synchronized void clearChanged() {
        changed = false;
    }

    /**
     * 测试被观察者是否发生了改变
     *
     * @return  <code>true</code> if and only if the <code>setChanged</code>
     *          method has been called more recently than the
     *          <code>clearChanged</code> method on this object;
     *          <code>false</code> otherwise.
     * @see     java.util.Observable#clearChanged()
     * @see     java.util.Observable#setChanged()
     */
    public synchronized boolean hasChanged() {
        return changed;
    }

    /**
     * 获取观察者的数量
     *
     * @return  the number of observers of this object.
     */
    public synchronized int countObservers() {
        return obs.size();
    }
}

咱们先用一个简单的样例来讲明如何使用JDK的这一套观察者模式安全

首先咱们须要一个实际的观察者来实现观察者接口app

public class Subscribe implements Observer {
    /**
     * 构造函数,让被观察者注册本身,便于本身对被观察者进行观察
     * @param o
     */
    public Subscribe(Observable o) {
        o.addObserver(this);
    }

    /**
     * 被观察者发生变化时,作出响应
     * @param o
     * @param arg
     */
    @Override
    public void update(Observable o, Object arg) {
        System.out.println("收到通知" + ((Publish)o).getData());
    }
}

被观察者ide

/**
 * 被观察者子类
 */
public class Publish extends Observable {
    @Getter
    private String data = "";

    /**
     * 当data属性发生变化时,通知观察者作出响应
     * @param data
     */
    public void setData(String data) {
        if (this.data !=null && !this.data.equals(data)) {
            this.data = data;
            setChanged();
        }
        notifyObservers();
    }
}

main方法函数

public class ObserverMain {
    public static void main(String[] args) {
        //建立被观察者子类对象
        Publish publish = new Publish();
        //建立观察者对象,并注册进被观察者子类中
        new Subscribe(publish);
        //被观察者发生变化
        publish.setData("开始");
    }
}

运行结果测试

收到通知开始ui

这是一个相对简单的样例,通常咱们会使用观察者模式来进行MQ消息队列的发送。

以RabbitMQ为例,有一个门店的队列,交换机

/**
 * rabbitmq配置
 *
 */
@Configuration
public class RabbitmqConfig {
   public static final String STORE_QUEUE = "store";
   @Bean
   public Queue storeQueue() {
      return new Queue(STORE_QUEUE);
   }


   @Bean
   public TopicExchange providerExchange() {
      return new TopicExchange(ServiceProviderCenterMq.MQ_EXCHANGE_PROVIDER);
   }

   @Bean
   public Binding bingingStoreToProvider(){
      return BindingBuilder.bind(storeQueue()).to(providerExchange())
            .with(ServiceProviderCenterMq.ROUTING_KEY_ROLE_ADD);
   }
}

重写消息生产者

@Slf4j
@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String exchange,String routingKey,Object content) {
        log.info("send content=" + content);
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
        this.rabbitTemplate.convertAndSend(exchange,routingKey,serialize(content));
    }

    /**
     * 确认后回调:
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.info("send ack fail, cause = " + cause);
        } else {
            log.info("send ack success");
        }
    }

    /**
     * 失败后return回调:
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
    }

    /**
     * 对消息对象进行二进制序列化
     * @param o
     * @return
     */
    private byte[] serialize(Object o) {
        Kryo kryo = new Kryo();
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        Output output = new Output(stream);
        kryo.writeObject(output, o);
        output.close();
        return stream.toByteArray();
    }
}

咱们的观察者类,对门店服务的服务集合新增服务对象进行观察

/**
 * 服务新增观察者
 */
public class ServiceObserver implements Observer {
    private MessageSender sender = SpringBootUtil.getBean(MessageSender.class);
    public ServiceObserver(Observable o) {
        o.addObserver(this);
    }
    @Override
    public void update(Observable o, Object arg) {
        CompletableFuture.runAsync(() ->
            this.sender.send(ServiceProviderCenterMq.MQ_EXCHANGE_PROVIDER,
                ServiceProviderCenterMq.ROUTING_KEY_ROLE_ADD,
                ((ProviderServiceLevel) o).getServiceProviders().poll())
        );
    }
}

具体的被观察者类为门店服务的分类

public class ProviderServiceLevel extends Observable implements Provider

它有一个服务的队列

@Getter
private Queue<Provider> serviceProviders = new ConcurrentLinkedQueue<>();

也有一个服务的列表

@Getter
private List<Provider> serviceListProviders = new CopyOnWriteArrayList<>();

服务分类添加服务对象的方法,你们能够思考一下为何使用队列,而不是直接使用列表在观察者中取出服务对象

@Override
public boolean addProvider(Provider provider) {
    ServiceDao serviceDao = SpringBootUtil.getBean(ServiceDao.class);
    //若是添加的是服务分类
    if (provider instanceof ProviderServiceLevel) {
        ParamLevel paramLevel = new ParamLevel(this.id, ((ProviderServiceLevel) provider).getId());
        ((ProviderServiceLevel) provider).setLevel(2);
        serviceDao.addLevelToLevel(paramLevel);
    //若是添加的是服务    
    }else if (provider instanceof ProviderService) {
        ParamLevel paramLevel = new ParamLevel(this.id, ((ProviderService) provider).getService().getId());
        serviceDao.addServiceToLevel(paramLevel);
        this.serviceListProviders.add(provider);
        //将添加的服务入队列
        this.serviceProviders.add(provider);
        setChanged();
        //通知观察者取出队列服务对象,进行MQ的发送
        notifyObservers();
        return true;
    }
    return this.serviceListProviders.add(provider);
}

最后在访问的controller中会放三个缓存

//服务分类缓存
private Map<Long,Provider> cacheLevelProvider = new ConcurrentHashMap<>();
//门店缓存
private Map<Long,Provider> cacheStoreProvider = new ConcurrentHashMap<>();
//观察者缓存
private Map<Long,ServiceObserver> observerMap = new ConcurrentHashMap<>();

具体的添加服务方法以下

/**
 * 新增商家服务
 * @param providerService
 * @return
 */
@Transactional
@SuppressWarnings("uncheck")
@PostMapping("/serviceprovider-anon/newproviderservice")
public Result<String> newProviderService(@RequestParam("storeid") Long storeId,
                                         @RequestParam("servicelevelid") Long serviceLevelId,
                                         @RequestBody ProviderService providerService) {
    Provider serviceLevelProvider;
    Provider service = ProviderFactory.createProviderService(providerService, true);
    if (cacheLevelProvider.containsKey(serviceLevelId)) {
        serviceLevelProvider = this.cacheLevelProvider.get(serviceLevelId);
    }else {
        serviceLevelProvider = this.levelProvider.findProvider(serviceLevelId);
        this.cacheLevelProvider.put(serviceLevelId,serviceLevelProvider);
    }
    //此处若是不将观察者对象加入缓存,就会不断建立观察者,其实咱们只须要一个观察者
    if (!observerMap.containsKey(serviceLevelId)) {
        observerMap.put(serviceLevelId,new ServiceObserver((ProviderServiceLevel) serviceLevelProvider));
    }
    //服务分类添加了服务对象,就会发生被观察者状态的改变,使观察者作出响应
    serviceLevelProvider.addProvider(service);
    Provider storeProvider;
    if (cacheStoreProvider.containsKey(storeId)) {
        storeProvider = cacheStoreProvider.get(storeId);
    }else {
        storeProvider = this.storeProvider.findProvider(storeId);
        cacheStoreProvider.put(storeId,storeProvider);
    }
    storeProvider.addProvider(service);
    return Result.success("添加商家服务成功");
}