[从源码学设计]蚂蚁金服SOFARegistry之消息总线

[从源码学设计]蚂蚁金服SOFARegistry之消息总线

0x00 摘要

SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。javascript

本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让你们借以学习阿里如何设计。html

本文为第四篇,介绍SOFARegistry之消息总线。java

0x01 相关概念

1.1 事件驱动模型

事件驱动模型,也便是咱们一般说的观察者。基于发布-订阅模式的编程模型。spring

1.1.1 概念

定义对象间的一种一对多的依赖关系,当一个对象的状态发生变化时,全部依赖它的对象都获得通知并自动更新。编程

从程序设计的角度来看,事件驱动模型的核心构件一般包含如下几个:数组

  • 事件源:负责产生事件的对象。好比咱们常见的按钮,按钮就是一个事件源,可以产生“点击”这个事件
  • 事件监听器(事件处理器):负责处理事件的对象
  • 事件:或者称为事件对象,是事件源和事件监听器之间的信息桥梁。是整个事件模型驱动的核心

1.1.2 应用环境

当咱们面对以下的环境时,事件驱动模型一般是一个好的选择:网络

  • 程序中有许多任务;
  • 任务之间高度独立(所以它们不须要互相通讯,或者等待彼此);
  • 在等待事件到来时,某些任务会阻塞;

1.2 消息总线

总线(Bus)通常指计算机各类功能部件之间传送信息的公共通讯干线,而EventBus则是事件源(publisher)向订阅方(subscriber)发送订阅事件的总线,它解耦了观察者模式中订阅方和事件源之间的强依赖关系session

消息总线扮演着一种消息路由的角色,拥有一套完备的路由机制来决定消息传输方向。发送端只须要向消息总线发出消息而不用管消息被如何转发,为了避免消息丢失,部分消息总线提供了必定的持久化存储和灾备的机制架构

消息总线简单理解就是一个消息中心,众多微服务实例能够链接到总线上,实例能够往消息中心发送或接收信息(经过监听)。app

通常的应用的场景就是在用观察者模式的地方就能够用EventBus进行替代。

img

0x02 业务领域

2.1 业务范畴

DataServer 本质上是一个网络应用程序,因此有以下特色:

  • 须要处理各个方面发送来的消息;
  • 程序中任务繁多,任务之间独立,大多数任务不存在互斥通信等操做;在等待事件到来时,某些任务会阻塞;
  • 某一个消息每每有多个投递源;

所以自然适合用事件驱动机制来实现。

2.2 问题点

可以想到的问题点以下:

  • 由于一个事件每每会有多个投递源,如何解耦事件投递和事件处理之间的逻辑?
  • 怎样实现Listener一次注册,就可以知道Listener对那些事件感兴趣的,进而在有某类事件发生时通知到Listener的呢?
  • 如何使得一个Listener能够处理多个事件?
  • 如何使得一个事件被多个Listener处理?
  • 能否简化注册流程?
  • 是否须要维护消息顺序?
  • 处理消息方式是异步仍是同步?
  • 多个一样消息是否要归并?

具体咱们在后文会详述阿里的思路。

2.3 解决方案

DataServer 内部逻辑主要是经过事件驱动机制来实现的,下图列举了部分事件在事件中心的交互流程,从图中能够看到,一个事件每每会有多个投递源,很是适合用 EventCenter 来解耦事件投递和事件处理之间的逻辑;

0x03 EventCenter

业界消息总线有不少,好比 Android EventBus是一个发布/订阅事件总线框架,基于观察者模式,将事件的接收者和发送者分开,简化了组件之间的通讯。

而SOFARegistry EventCenter 的做用也相似:从逻辑上解耦,将事件的接收者和发送者分开,简化组件之间通讯。阿里的实现有本身的特色,开发者能够借鉴这里的使用技巧和思路。

3.1 目录结构

├── event
│   ├── AfterWorkingProcess.java
│   ├── DataServerChangeEvent.java
│   ├── Event.java
│   ├── EventCenter.java
│   ├── LocalDataServerChangeEvent.java
│   ├── MetaServerChangeEvent.java
│   ├── RemoteDataServerChangeEvent.java
│   ├── StartTaskEvent.java
│   ├── StartTaskTypeEnum.java
│   └── handler
│       ├── AbstractEventHandler.java
│       ├── AfterWorkingProcessHandler.java
│       ├── DataServerChangeEventHandler.java
│       ├── LocalDataServerChangeEventHandler.java
│       ├── MetaServerChangeEventHandler.java
│       └── StartTaskEventHandler.java

3.2 类定义

类定义以下:

public class EventCenter {

    private Multimap<Class<? extends Event>, AbstractEventHandler> MAP = ArrayListMultimap.create();

    /**
     * eventHandler register
     * @param handler
     */
    public void register(AbstractEventHandler handler) {
        List<Class<? extends Event>> interests = handler.interest();
        for (Class<? extends Event> interest : interests) {
            MAP.put(interest, handler);
        }
    }

    /**
     * event handler handle process
     * @param event
     */
    public void post(Event event) {
        Class clazz = event.getClass();
        if (MAP.containsKey(clazz)) {
            Collection<AbstractEventHandler> handlers = MAP.get(clazz);
            if (handlers != null) {
                for (AbstractEventHandler handler : handlers) {
                    handler.handle(event);
                }
            }
        } else {
            throw new RuntimeException("no suitable handler was found:" + clazz);
        }
    }
}

3.2.1 操做

普通 EventBus 大多有三个操做:

  • 注册 Listener--register (Object Listener);
  • 注销 Listener--unregister (Object Listener);
  • 发布 Event--post (Object event);

可是阿里的EventCenter并无注销操做,由于业务上不须要,因此只有以下接口。

  • register(AbstractEventHandler handler) 的工做就是找出这个Listener对哪些事件感兴趣,而后把这种事件类型和对应的Listener注册到 EventCenter;
  • post一个event时候,会遍历这个消息的处理函数列表,逐一调用处理函数,其实就是同步执行了,固然也许 EventHandler 内部本身实现了异步;由于是同步执行,因此不须要维持消息的有序性,不然须要使用queue来实现每一个线程post的Event是有序的;

具体使用举例以下:在MetaServerChangeEventHandler中有以下代码投放消息。

eventCenter.post(new StartTaskEvent(set));

eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap,
        DataServerChangeEvent.FromType.REGISTER_META));

3.2.2 执行 & 解耦

handler中声明了本身支持什么种类的event,当register时候,会以event为key,把本身注册到eventCenter的map中,在 post 函数中,根据event的class,取出了handler,从而执行,也作到了解耦。

3.2.3 Listener列表

在观察者模式中,事件源中会维护一个Listener的列表,并且向这个事件源注册的Listener通常只会收到一类事件的通知,若是Listener对多个不一样类的事件感兴趣,则须要向多个事件源注册。

EventCenter 是怎样实现Listener一次注册,可以知道Listener对那些事件感兴趣的,进而在有某类事件发生时通知到Listener的呢

答案在ArrayListMultimap,其key是Event,其 Value 就是 AbstractEventHandler。这个 map 就是 Event 事件类型 和对其感兴趣的处理函数的列表,一个 Event 可能有多个处理函数。

3.2.4 ArrayListMultimap

顾名思义,com.google.common.collect.ArrayListMultimap 能够在key对应的value中设置一个ArrayList。这样就保证了一个事件能够有多个处理函数

具体能够见下例子。

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;

import java.util.Collection;

public class testArrayListMultimap {
     static void main() {
        Multimap<String, String> multimap = ArrayListMultimap.create();
        multimap.put("fruit", "banana");
        multimap.put("fruit", "apple");
        multimap.put("fruit", "apple");
        multimap.put("fruit", "peach");
        multimap.put("fish","crucian");
        multimap.put("fish","carp");

        System.err.println(multimap.size());//6
        Collection<String> fruits = multimap.get("fruit");
        System.err.println(fruits);//[bannana, apple, apple, peach]
    }
}

3.3 Listener

Listener 是由 AbstractEventHandler 的派生类实现的。

3.3.1 基类

EventHandler基类AbstractEventHandler定义具体以下:

public abstract class AbstractEventHandler<Event> implements InitializingBean {

    @Autowired
    private EventCenter         eventCenter;

    @Override
    public void afterPropertiesSet() throws Exception {
        eventCenter.register(this);
    }

    /**
     * event handle func
     * @param event
     */
    public void handle(Event event) {
            doHandle(event);
    }

    public abstract List<Class<? extends Event>> interest();

    public abstract void doHandle(Event event);
}

其主要做用为三点:

  • 派生类必须实现interest来声明本身想处理什么Event,并且Event是配置在一个数组中,这样就使得一个函数能够处理多个事件
@Override
public List<Class<? extends LocalDataServerChangeEvent>> interest() {
    return Lists.newArrayList(LocalDataServerChangeEvent.class);
}
  • 派生类实现doHandle来处理消息;

  • 由于afterPropertiesSet中作了设定,因此每个继承此类的Handler都会自动注册到EventCenter之中

3.3.2 派生类

以MetaServerChangeEventHandler为例,只要在interest函数中声明本身对哪些消息感兴趣,在doHandle函数中实现业务便可。

public class MetaServerChangeEventHandler extends AbstractEventHandler<MetaServerChangeEvent> {
  
    @Override
    public List<Class<? extends MetaServerChangeEvent>> interest() {
        return Lists.newArrayList(MetaServerChangeEvent.class);
    }
  
		@Override
    public void doHandle(MetaServerChangeEvent event) {
       ......
    }
}

3.3.2 自动注册

这里须要专门说一下自动注册,由于初接触者很容易疏漏从而感到奇怪。

自动注册使用的是Spring的afterPropertiesSet方法完成

afterPropertiesSet方法能够针对某个具体的bean进行配置,其将在Bean全部的属性被初始化后调用,可是会在init前调用。afterPropertiesSet 必须实现 InitializingBean接口。

package org.springframework.beans.factory;

public interface InitializingBean {
    void afterPropertiesSet() throws Exception;
}

基类AbstractEventHandler实现InitializingBean接口。

public abstract class AbstractEventHandler<Event> implements InitializingBean

而每个派生类就注册了派生类自己到eventCenter。

@Override
public void afterPropertiesSet() throws Exception {
    eventCenter.register(this);
}

3.4 核心消息

具体涉及到业务,EventCenter主要处理三种消息:

  • DataServerChangeEvent,是其余Data Server的节点变化消息;
  • MetaServerChangeEvent,是Meta Sever的变化消息;
  • StartTaskEvent:;

分别对应三个消息处理handler:

  • public class DataServerChangeEventHandler extends AbstractEventHandler

  • public class MetaServerChangeEventHandler extends AbstractEventHandler

  • public class StartTaskEventHandler extends AbstractEventHandler

咱们用 StartTaskEvent 举例,具体消息内容根据具体业务设置。

public class StartTaskEvent implements Event {
    private final Set<StartTaskTypeEnum> suitableTypes;

    public StartTaskEvent(Set<StartTaskTypeEnum> suitableTypes) {
        this.suitableTypes = suitableTypes;
    }

    public Set<StartTaskTypeEnum> getSuitableTypes() {
        return suitableTypes;
    }
}

3.5 主要逻辑

EventCenter主要逻辑以下图所示:

+------------------------------+
          | MetaServerChangeEventHandler |
          +-----------+------------------+
                      |
                      |  post(new StartTaskEvent)
                      |
                      |
                      |                                      +------------------------+
                      v                                      |  StartTaskEventHandler |
+---------------------+-----------------------+              |                        |
|                EventCenter                  |              | +--------------------+ |
|                                             |              | |                    | |
| +-----------------------------------------+ +---------------------> doHandle      | |
| |Multimap< <Event>, AbstractEventHandler> | |              | |                    | |
| +-----------------------------------------+ | <--------------+ afterPropertiesSet | |
|                                             |  register    | |                    | |
+---------------------------------------------+              | |      interest      | |
                                                             | |                    | |
                                                             | +--------------------+ |
                                                             +------------------------+

手机以下图:

0x04 总结

SOFARegistry EventCenter 的做用与业界大多总线相似:从逻辑上解耦,将事件的接收者和发送者分开,简化组件之间通讯。可是阿里的实现有本身的特色,开发者能够借鉴这里的使用技巧和思路。

针对咱们前面提出的问题,如今回答以下:

  • 由于一个事件每每会有多个投递源,如何解耦事件投递和事件处理之间的逻辑?
    • 答案:handler中声明了本身支持什么种类的event,当register时候会以event为key,把本身注册到eventCenter的map中;在 post 函数中,根据event的class,取出了handler从而执行,也作到了解耦。
  • 怎样实现Listener一次注册,就可以知道Listener对那些事件感兴趣的,进而在有某类事件发生时通知到Listener的呢?
    • 答案:派生类必须实现interest来声明本身想处理什么Event;
  • 如何使得一个Listener能够处理多个事件?
    • 答案:接上问题,Event是配置在一个数组中,这样就使得一个函数能够处理多个事件
  • 如何使得一个事件被多个Listener处理?
    • 答案:采用ArrayListMultimap实现listener列表;
  • 能否简化注册流程?
    • 答案:自动注册,派生类不须要操心。afterPropertiesSet中作了设定,因此每个继承此类的Handler都会自动注册到EventCenter之中
  • 是否须要维护消息顺序?
    • 答案:不须要,由于是同步处理;
  • 处理消息方式是异步仍是同步?
    • 答案:这里是同步;
  • 多个一样消息是否要归并?
    • 答案:这里不须要归并,没有业务需求;

0xFF 参考

Guava中EventBus分析

蚂蚁金服服务注册中心如何实现 DataServer 平滑扩缩容

蚂蚁金服服务注册中心 SOFARegistry 解析 | 服务发现优化之路

服务注册中心 Session 存储策略 | SOFARegistry 解析

海量数据下的注册中心 - SOFARegistry 架构介绍

相关文章
相关标签/搜索