[从源码学设计]蚂蚁金服SOFARegistry 之 如何与Meta Server交互

[从源码学设计]蚂蚁金服SOFARegistry 之 如何与Meta Server交互

0x00 摘要

SOFARegistry 是蚂蚁金服开源的一个生产级、高时效、高可用的服务注册中心。javascript

本系列文章重点在于分析设计和架构,即利用多篇文章,从多个角度反推总结 DataServer 或者 SOFARegistry 的实现机制和架构思路,让你们借以学习阿里如何设计。java

本文为第十篇,主要是从业务角度进行梳理。看看DataServer如何与MetaServer交互。node

0x01 业务范畴

1.1 MetaServer的重要性

首先咱们要复习下MetaServer的重要性。spring

MetaServer元数据服务器集群。这个集群管辖的范围是 Session 服务器集群和 Data 服务器集群的服务器信息,其角色就至关于 SOFARegistry 架构内部的服务注册中心,只不过 SOFARegistry 做为服务注册中心是服务于广大应用服务层,而 Meta 集群是服务于 SOFARegistry 内部的 Session 集群和 Data 集群,Meta 层可以感知到 Session 节点和 Data 节点的变化,并通知集群的其它节点。bootstrap

因此,若是想获取节点的变化,DataServer就必须重点研究如何与MetaServer交互服务器

1.2 推拉模型

居于Bolt协议,DataServer在与Meta Server的交互中,使用了推拉模型。网络

1.3 分析策略

咱们在这里重点分析其设计策略以下:架构

  • 用什么来确保交互的有效性。
  • 用什么来解耦。
  • 用什么来确保网络交互的效率。

0x02 目录结构

此模块目录结构以下,大体能够推论,并发

  • DefaultMetaServiceImpl 是 Meta Server 相关模块主体;异步

  • MetaServerConnectionFactory是链接管理;

  • ConnectionRefreshMetaTask 是按期循环task;

  • handler目录下是三个响应函数;

  • provideData 目录下是配置相关功能;

具体目录结构以下:

│   ├── metaserver
│   │   ├── DefaultMetaServiceImpl.java
│   │   ├── IMetaServerService.java
│   │   ├── MetaServerConnectionFactory.java
│   │   ├── handler
│   │   │   ├── NotifyProvideDataChangeHandler.java
│   │   │   ├── ServerChangeHandler.java
│   │   │   └── StatusConfirmHandler.java
│   │   ├── provideData
│   │   │   ├── ProvideDataProcessor.java
│   │   │   ├── ProvideDataProcessorManager.java
│   │   │   └── processor
│   │   │       └── DatumExpireProvideDataProcessor.java
│   │   └── task
│   │       └── ConnectionRefreshMetaTask.java

0x03 Bean

MetaServer相关组件以下:

  • metaServerService,用来与MetaServer进行交互,基于raft和Bolt;
  • datumLeaseManager,用来维护具体数据;

0x04 Raft协议

这里有一个问题 :为何 DataServerBootstrap 之中还有 startRaftClient,按说DataServer只用Http和Bolt就能够了。

原来是用 raft 协议来获取MetaServer集群中leader的地址等信息raftClient.getLeader(); 好比 renewNodeTask 时候会用到。

Raft相关启动是在startRaftClient,此函数的做用是:

  • 启动Raft客户端,保证分布式一致性;
  • 向 EventCenter 投放MetaServerChangeEvent;

具体代码是:

private void startRaftClient() {
    metaServerService.startRaftClient();
    eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}

0x05 消息处理

前面提到了,当系统启动以后,会主动发送一个MetaServerChangeEvent,咱们就看看其内容。

5.1 MetaServerChangeEvent

public class MetaServerChangeEvent implements Event {

    private Map<String, Set<String>> ipMap;

    /**
     * constructor
     * @param ipMap
     */
    public MetaServerChangeEvent(Map<String, Set<String>> ipMap) {
        this.ipMap = ipMap;
    }

    public Map<String, Set<String>> getIpMap() {
        return ipMap;
    }
}

其运行状态以下:

event = {MetaServerChangeEvent@5991} 
 ipMap = {HashMap@5678}  size = 1
  "DefaultDataCenter" -> {ConcurrentHashMap$KeySetView@6007}  size = 1

5.2 消息来源

MetaServerChangeEvent有三种来源:启动主动获取,按期,推送。这三种具体以下:

  • 启动主动获取:这个主动查询而且拉取的过程,这个过程基本上相似一个同步过程,体现为客户端一次查询结果的同步返回。
  • 版本变动推送:为了肯定服务发布数据的变动,对于这个服务感兴趣的全部客户端订阅方都须要推送,进行推送。因为性能要求必须并发执行而且异步肯定推送成功。
  • 按期轮训:这样避免了某次变动通知没有通知到全部订阅方的状况。

咱们先简述来源:

5.2.1 启动

这就是上面提到的,启动时会从配置里面读取meta server配置,metaServerService.getMetaServerMap();据此构建MetaServerChangeEvent,投放到EventCenter之中。

当 DataServer 节点初始化成功后,会启动任务自动去链接 MetaServer。即,该任务会往事件中心 EventCenter 注册一个 DataServerChangeEvent 事件,该事件注册后会被触发,以后将对新增节点计算 Hash 值,同时进行纳管分片。

具体启动时,会从配置里面读取meta server配置,metaServerService.getMetaServerMap();据此构建MetaServerChangeEvent,投放到EventCenter之中。

private void startRaftClient() {
    metaServerService.startRaftClient();
    eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
}

堆栈以下

register:44, MetaServerConnectionFactory (com.alipay.sofa.registry.server.data.remoting.metaserver)
registerMetaServer:129, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler)
doHandle:92, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler)
doHandle:55, MetaServerChangeEventHandler (com.alipay.sofa.registry.server.data.event.handler)
handle:51, AbstractEventHandler (com.alipay.sofa.registry.server.data.event.handler)
post:56, EventCenter (com.alipay.sofa.registry.server.data.event)
startRaftClient:197, DataServerBootstrap (com.alipay.sofa.registry.server.data.bootstrap)
start:131, DataServerBootstrap (com.alipay.sofa.registry.server.data.bootstrap)
start:47, DataServerInitializer (com.alipay.sofa.registry.server.data.bootstrap)
doStart:173, DefaultLifecycleProcessor (org.springframework.context.support)
access$200:50, DefaultLifecycleProcessor (org.springframework.context.support)
start:350, DefaultLifecycleProcessor$LifecycleGroup (org.springframework.context.support)
startBeans:149, DefaultLifecycleProcessor (org.springframework.context.support)
onRefresh:112, DefaultLifecycleProcessor (org.springframework.context.support)
finishRefresh:880, AbstractApplicationContext (org.springframework.context.support)
refresh:546, AbstractApplicationContext (org.springframework.context.support)
refresh:693, SpringApplication (org.springframework.boot)
refreshContext:360, SpringApplication (org.springframework.boot)
run:303, SpringApplication (org.springframework.boot)
run:1118, SpringApplication (org.springframework.boot)
run:1107, SpringApplication (org.springframework.boot)
main:41, DataApplication (com.alipay.sofa.registry.server.data)

5.2.2 定时

这部分是ConnectionRefreshMetaTask完成。ConnectionRefreshMetaTask 是按期 task,其在 Bean tasks 里面配置。

StartTaskEventHandler 会调用到 tasks,当收到 StartTaskEvent 以后,会启动 tasks里面的几个AbstractTask。

public class StartTaskEventHandler extends AbstractEventHandler<StartTaskEvent> {
    @Resource(name = "tasks")
    private List<AbstractTask>       tasks;

    private ScheduledExecutorService executor     = null;

    @Override
    public List<Class<? extends StartTaskEvent>> interest() {
        return Lists.newArrayList(StartTaskEvent.class);
    }

    @Override
    public void doHandle(StartTaskEvent event) {
        if (executor == null || executor.isShutdown()) {
            getExecutor();
        }

        for (AbstractTask task : tasks) {
            if (event.getSuitableTypes().contains(task.getStartTaskTypeEnum())) {
                executor.scheduleWithFixedDelay(task, task.getInitialDelay(), task.getDelay(),task.getTimeUnit());
            }
        }
    }

    private void getExecutor() {
        executor = ExecutorFactory.newScheduledThreadPool(tasks.size(), this.getClass()
            .getSimpleName());
    }
}

具体tasks以下:

@Bean(name = "tasks")
public List<AbstractTask> tasks() {
    List<AbstractTask> list = new ArrayList<>();
    list.add(connectionRefreshTask());
    list.add(connectionRefreshMetaTask());
    list.add(renewNodeTask());
    return list;
}

ConnectionRefreshMetaTask 是按期task,会按期向EventCenter投放一个 MetaServerChangeEvent。

执行时候调用 metaServerService.getMetaServerMap();返回一个MetaServerChangeEvent,而且添加到EventCenter之中。

public class ConnectionRefreshMetaTask extends AbstractTask {

    @Autowired
    private IMetaServerService metaServerService;

    @Autowired
    private EventCenter        eventCenter;

    @Override
    public void handle() {
        eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));
    }
}

5.2.3 推送

ServerChangeHandler 是 metaClientHandler 的一部分,是MetaNodeExchanger 的响应函数。

ServerChangeHandler 继承了AbstractClientHandler,在interest之中,配置了会响应NodeChangeResult。

若是Meta有推送,ServerChangeHandler这里就有响应,这个会是 Meta Server 主动通知

在ServerChangeHandler之中,拿到了NodeChangeResult以后,会判断变动节点类型,这里会根据 Note 类型不一样,决定产生 DataServerChangeEvent 仍是MetaServerChangeEvent。若是是NodeType.META,就发送消息给eventCenter,eventCenter.post(new MetaServerChangeEvent(map));,这就是MetaServerChangeEvent的来源之一。

public class ServerChangeHandler extends AbstractClientHandler<NodeChangeResult> {

    @Autowired
    private EventCenter         eventCenter;

    @Autowired
    private DataServerConfig    dataServerConfig;

    @Override
    public Object doHandle(Channel channel, NodeChangeResult request) {
        ExecutorFactory.getCommonExecutor().execute(() -> {
          
            if (request.getNodeType() == NodeType.DATA) {
              
                eventCenter.post(new DataServerChangeEvent(request.getNodes(),
                        request.getDataCenterListVersions(), FromType.META_NOTIFY));
              
            } else if (request.getNodeType() == NodeType.META) {
              
                Map<String, Map<String, MetaNode>> metaNodesMap = request.getNodes();
                if (metaNodesMap != null && !metaNodesMap.isEmpty()) {
                    Map<String, MetaNode> metaNodeMap = metaNodesMap.get(dataServerConfig.getLocalDataCenter());
                    if (metaNodeMap != null && !metaNodeMap.isEmpty()) {
                        HashMap<String, Set<String>> map = new HashMap<>();
                        map.put(dataServerConfig.getLocalDataCenter(), metaNodeMap.keySet());
                        eventCenter.post(new MetaServerChangeEvent(map));
                    }
                }
            }
        });
        return CommonResponse.buildSuccessResponse();
    }

    @Override
    public Class interest() {
        return NodeChangeResult.class;
    }

    @Override
    public HandlerType getType() {
        return HandlerType.PROCESSER;
    }

    @Override
    protected Node.NodeType getConnectNodeType() {
        return Node.NodeType.DATA;
    }
}

此时逻辑图以下,能够看到三种MetaServerChangeEvent消息来源,ServerChangeHandler也会提供DataServerChangeEvent:

+-------------------------------+
|[DataServerBootstrap]          |   MetaServerChangeEvent
|                               |
|                               +-------------------------+
|       startRaftClient         |                         |
|                               |                         |
|                               |                         |
+-------------------------------+                         |
+-------------------------------+                         |
| [Timer]                       |                         |
|                               |                         |      +-------------+
|  ConnectionRefreshMetaTask    +------------------------------> | EventCenter |
|                               | MetaServerChangeEvent   |      +-------+-----+
+-------------------------------+                         |              ^
+-------------------------------+                         |              |
| [Push<NodeChangeResult>]      |                         |              |
|                               |                         |              |
|                               +-------------------------+              |
|                               |  MetaServerChangeEvent                 |
|      ServerChangeHandler      |                                        |
|                               +----------------------------------------+
+-------------------------------+      DataServerChangeEvent

0x06 MetaServerChangeEventHandler

MetaServerChangeEventHandler 用来响应 MetaServerChangeEvent 消息。由于其继承了AbstractEventHandler,因此 MetaServerChangeEventHandler 已经注册到了EventCenter之上。

注意,这里有一个再次转换DataServerChangeEvent的过程,即这里又会主动和MetaServer交互,若是返回消息是NodeChangeResult,就转换为DataServerChangeEvent。

这是由于Meta Server的这个推送,也许是告诉data Server,"hi,目前data server也有变更,兄弟你再来拉取下"。

在处理时候,MetaServerChangeEventHandler会去与MetaServer交互,看看其有效性,若是有效,就注册。

逻辑以下:

  • 在MetaServerChangeEventHandler之中,会遍历MetaServerChangeEvent之中的 dataCenter, ip进行注册,registerMetaServer(dataCenter, ip); 在registerMetaServer之中:
    • 获取 meta server的 leader;
    • 使用 metaNodeExchanger.connect 对 IP,getMetaServerPort 进行链接;
    • 获得Channel以后,注册到 metaServerConnectionFactory 之中
    • 若是 ip不是meta leader,则再次调用metaNodeExchanger注册本身 DataNode(new URL(DataServerConfig.IP), dataServerConfig .getLocalDataCenter());
    • 注册成功以后,则给EventCenter发送 DataServerChangeEvent,内部继续处理 ;

具体代码以下:

public class MetaServerChangeEventHandler extends AbstractEventHandler<MetaServerChangeEvent> {

    @Autowired
    private DataServerConfig            dataServerConfig;

    @Autowired
    private IMetaServerService          metaServerService;

    @Autowired
    private MetaNodeExchanger           metaNodeExchanger;

    @Autowired
    private EventCenter                 eventCenter;

    @Autowired
    private MetaServerConnectionFactory metaServerConnectionFactory;

    @Override
    public List<Class<? extends MetaServerChangeEvent>> interest() {
        return Lists.newArrayList(MetaServerChangeEvent.class);
    }

    @Override
    public void doHandle(MetaServerChangeEvent event) {
        Map<String, Set<String>> ipMap = event.getIpMap();
        for (Entry<String, Set<String>> ipEntry : ipMap.entrySet()) {
            String dataCenter = ipEntry.getKey();
            Set<String> ips = ipEntry.getValue();
            if (!CollectionUtils.isEmpty(ips)) {
                for (String ip : ips) {
                    Connection connection = metaServerConnectionFactory.getConnection(dataCenter,
                        ip);
                    if (connection == null || !connection.isFine()) {
                        registerMetaServer(dataCenter, ip);
                    }
                }
                Set<String> ipSet = metaServerConnectionFactory.getIps(dataCenter);
                for (String ip : ipSet) {
                    if (!ips.contains(ip)) {
                        metaServerConnectionFactory.remove(dataCenter, ip);
                    }
                }
            } else {
                //remove connections of dataCenter if the connectionMap of the dataCenter in ipMap is empty
                removeDataCenter(dataCenter);
            }
        }
        //remove connections of dataCenter if the dataCenter not exist in ipMap
        Set<String> dataCenters = metaServerConnectionFactory.getAllDataCenters();
        for (String dataCenter : dataCenters) {
            if (!ipMap.containsKey(dataCenter)) {
                removeDataCenter(dataCenter);
            }
        }
    }

    private void registerMetaServer(String dataCenter, String ip) {

        PeerId leader = metaServerService.getLeader();

        for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) {
            try {
                Channel channel = metaNodeExchanger.connect(new URL(ip, dataServerConfig
                    .getMetaServerPort()));
                //connect all meta server
                if (channel != null && channel.isConnected()) {
                    metaServerConnectionFactory.register(dataCenter, ip,
                        ((BoltChannel) channel).getConnection());
                }
              
                //register leader meta node
                if (ip.equals(leader.getIp())) {
                    Object obj = null;
                    try {
                        obj = metaNodeExchanger.request(new Request() {
                            @Override
                            public Object getRequestBody() {
                                return new DataNode(new URL(DataServerConfig.IP), dataServerConfig
                                    .getLocalDataCenter());
                            }

                            @Override
                            public URL getRequestUrl() {
                                return new URL(ip, dataServerConfig.getMetaServerPort());
                            }
                        }).getResult();
                    } 
                  
                    if (obj instanceof NodeChangeResult) {
                        NodeChangeResult<DataNode> result = (NodeChangeResult<DataNode>) obj;
                        Map<String, Long> versionMap = result.getDataCenterListVersions();

                        //send renew after first register dataNode
                        Set<StartTaskTypeEnum> set = new HashSet<>();
                        set.add(StartTaskTypeEnum.RENEW);
                        eventCenter.post(new StartTaskEvent(set));

                        eventCenter.post(new DataServerChangeEvent(result.getNodes(), versionMap,DataServerChangeEvent.FromType.REGISTER_META));
                        break;
                    }
                }
        }
    }
}

此时逻辑图以下:

+-------------------------------+
|[DataServerBootstrap]          |   MetaServerChangeEvent
|                               |
|                               +-------------------------+
|       startRaftClient         |                         |
|                               |                         |              +---------------+
|                               |                         |              |               |
+-------------------------------+                         |              |               |
+-------------------------------+                         |              |               |
| [Timer]                       |                         |              v               |
|                               |                         |  1   +-------+-----+         |
|  ConnectionRefreshMetaTask    +------------------------------> | EventCenter +----+    |
|                               | MetaServerChangeEvent   |      +-------+-----+    |    |
+-------------------------------+                         |              ^          |    |
+-------------------------------+                         |              |          |    |
|                               |                         |              |          |    |
| [Push<NodeChangeResult>]      |                         |              |          |    |
|                               |                         |              |          |    |
|                               +-------------------------+              |          |    |
|                               |  MetaServerChangeEvent                 |          |    |
|      ServerChangeHandler      |                               2        |          |    |
|                               +----------------------------------------+          |    |
+-------------------------------+      DataServerChangeEvent                        |    |
                                                                                    |    |
                                                                                    |    |
                                 MetaServerChangeEvent                              |    |
                                                                   3                |    |
                               +----------------------------------------------------+    |
                               |                                                         |
                               v                                                         |
             +-----------------+--------------+         DataServerChangeEvent            |
             |                                |                                   4      |
             |  MetaServerChangeEventHandler  +------------------------------------------+
             |                                |
             +--------------------------------+

手机以下:

6.1 链接管理

下面咱们讲讲dataServer如何管理metaServer的链接。

咱们知道,一次 tcp 请求大体分为三个步骤:创建链接、通讯、关闭链接。每次创建新链接都会经历三次握手,中间包含三次网络传输,对于高并发的系统,这是一笔不小的负担;关闭链接一样如此。为了减小每次网络调用请求的开销,对链接进行管理、复用,能够极大的提升系统的性能。

为了提升通讯效率,咱们须要考虑复用链接,减小 TCP 三次握手的次数,所以须要有链接管理的机制。

关于链接管理,SOFARegistry有两个层次的链接管理,分别是 Connection 和 Node。

6.1.1 Connection管理

能够用socket(localIp,localPort, remoteIp,remotePort )表明一个链接,在Netty中用Channel来表示,在sofa-bolt使用Connection对象来抽象一个链接,一个链接在client跟server端各用一个connection对象表示。

有了Connection这个抽象以后,天然的须要提供接口来管理Connection, 这个接口就是ConnectionFactory。

6.1.2 ConnectionFactory

不管是服务端仍是客户端,其实本质都在作一件事情:建立 ConnectionEventHandler 实例并添加到 Netty 的 pipeline 中。
以后当有 ConnectionEvent 触发时(不管是 Netty 定义的事件被触发,仍是 SOFABolt 定义的事件被触发),ConnectionEventHandler 会经过异步线程执行器通知 ConnectionEventListener,ConnectionEventListener 将消息派发给具体的 ConnectionEventProcessor 实现类。

6.1.3 MetaServerConnectionFactory

metaServerConnectionFactory 是用来存储全部 meta Sever Connection,这是Bolt的机制应用,须要维持一个长链接。

MetaServerChangeEvent 内容是:dataCenter,以及其下面的Data Server ip 列表。对应MetaServerConnectionFactory 的 MAP 是:

Map< dataCenter : Map<ip, Connection> >

具体定义以下:

public class MetaServerConnectionFactory {

    private final Map<String, Map<String, Connection>> MAP = new ConcurrentHashMap<>();

    /**
     *
     * @param dataCenter
     * @param ip
     * @param connection
     */
    public void register(String dataCenter, String ip, Connection connection) {

        Map<String, Connection> connectionMap = MAP.get(dataCenter);
        if (connectionMap == null) {
            Map<String, Connection> newConnectionMap = new ConcurrentHashMap<>();
            connectionMap = MAP.putIfAbsent(dataCenter, newConnectionMap);
            if (connectionMap == null) {
                connectionMap = newConnectionMap;
            }
        }

        connectionMap.put(ip, connection);
    }

}

6.1.4 添加Connection

只是在 MetaServerChangeEventHandler . doHandle 函数中有添加操做,调用了metaServerConnectionFactory.register

因此在 doHandle 函数中,遍历Event全部的 meta Server IP,这里每个ip对应一个 data Center。对于每个ip作以下操做:

  • 重连registerMetaServer。
    • connect all meta server,就是把Connection放进MetaServerConnectionFactory
    • register leader meta node,就是从新向 leader meta node 发送一个 DataNode 请求;
    • 当收到请求结果时候,根据结果内容,往 EventCenter中插入DataServerChangeEvent,这个之后处理;
  • 若是MetaServerConnectionFactory中有在Event中不存在的 meta server ip,就从 MetaServerConnectionFactory 中移除。
  • 若是 MetaServerConnectionFactory 中有在Event中不存在的 data server ip,就removeDataCenter(dataCenter);

其中使用了metaNodeExchanger去链接metaServer。具体代码以下:

private void registerMetaServer(String dataCenter, String ip) {

    PeerId leader = metaServerService.getLeader();

    for (int tryCount = 0; tryCount < TRY_COUNT; tryCount++) {
        try {
            Channel channel = metaNodeExchanger.connect(new URL(ip, dataServerConfig
                .getMetaServerPort()));
            //connect all meta server
            if (channel != null && channel.isConnected()) {
                metaServerConnectionFactory.register(dataCenter, ip,
                    ((BoltChannel) channel).getConnection());
            }
            //其余操做
    }
}

MetaServerConnectionFactory在运行时以下:

metaServerConnectionFactory = {MetaServerConnectionFactory@5387} 
 MAP = {ConcurrentHashMap@6154}  size = 1
  "DefaultDataCenter" -> {ConcurrentHashMap@6167}  size = 1

0x07 MetaNodeExchanger

dataServer和metaServer之间是推拉模型交互

MetaNodeExchanger 是 bolt Exchange,把metaServer相关的网络操做集中在一块儿。不管是MetaServerChangeEventHandler仍是DefaultMetaServiceImpl,都基于此与Meta Server交互。其中

  • connect 设置了响应函数metaClientHandlers

  • 而 request 时候,若是失败了,则会 metaServerService.refreshLeader().getIp() 刷新地址,从新调用。

这里会测试MetaServer有效性 。

public class MetaNodeExchanger implements NodeExchanger {
    @Autowired
    private Exchange                          boltExchange;

    @Autowired
    private IMetaServerService                metaServerService;

    @Autowired
    private DataServerConfig                  dataServerConfig;

    @Resource(name = "metaClientHandlers")
    private Collection<AbstractClientHandler> metaClientHandlers;

    @Override
    public Response request(Request request) {
        Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
        try {
            final Object result = client.sendSync(request.getRequestUrl(), request.getRequestBody(),
                    dataServerConfig.getRpcTimeout());
            return () -> result;
        } catch (Exception e) {
            //retry
            URL url = new URL(metaServerService.refreshLeader().getIp(),
                    dataServerConfig.getMetaServerPort());
            final Object result = client.sendSync(url, request.getRequestBody(),
                    request.getTimeout() != null ? request.getTimeout() : dataServerConfig.getRpcTimeout());
            return () -> result;
        }
    }

    public Channel connect(URL url) {
        Client client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
        if (client == null) {
            synchronized (this) {
                client = boltExchange.getClient(Exchange.META_SERVER_TYPE);
                if (client == null) {
                    client = boltExchange.connect(Exchange.META_SERVER_TYPE, url,
                        metaClientHandlers.toArray(new ChannelHandler[metaClientHandlers.size()]));
                }
            }
        }
        //try to connect data
        Channel channel = client.getChannel(url);
        if (channel == null) {
            synchronized (this) {
                channel = client.getChannel(url);
                if (channel == null) {
                    channel = client.connect(url);
                }
            }
        }

        return channel;
    }
}

7.1 Client Handler

MetaNodeExchanger响应Handler以下,这部分是推模型,前面已经提到了,serverChangeHandler会响应推送。

@Bean(name = "metaClientHandlers")
public Collection<AbstractClientHandler> metaClientHandlers() {
    Collection<AbstractClientHandler> list = new ArrayList<>();
    list.add(serverChangeHandler());
    list.add(statusConfirmHandler());
    list.add(notifyProvideDataChangeHandler());
    return list;
}

0x08 核心服务

DefaultMetaServiceImpl是Meta Server相关服务的核心实现。

8.1 DefaultMetaServiceImpl

其中,raftClient是raft的入口,metaNodeExchanger 是bolt的入口。metaServerConnectionFactory 保存目前全部的 meta server bolt connection。

public class DefaultMetaServiceImpl implements IMetaServerService {
    @Autowired
    private DataServerConfig            dataServerConfig;

    @Autowired
    private MetaNodeExchanger           metaNodeExchanger;

    @Autowired
    private MetaServerConnectionFactory metaServerConnectionFactory;

    @Autowired
    private DataServerCache             dataServerCache;

    private RaftClient                  raftClient;
}

8.2 刷新

刷新是重要功能之一,用来获取raft leader。

@Override
public PeerId getLeader() {
    if (raftClient == null) {
        startRaftClient();
    }
    PeerId leader = raftClient.getLeader();
    if (leader == null) {
        throw new RuntimeException(
            "[DefaultMetaServiceImpl] register MetaServer get no leader!");
    }
    return leader;
}

@Override
public PeerId refreshLeader() {
    if (raftClient == null) {
        startRaftClient();
    }
    PeerId leader = raftClient.refreshLeader();
    if (leader == null) {
        throw new RuntimeException("[RaftClientManager] refresh MetaServer get no leader!");
    }
    return leader;
}

8.3 重连

另一个重要功能是重连。

getMetaServerMap完成了重连,getMetaServerMap 的做用:

  • 获取 Meta Server 的IP列表,放入set;
  • 获取 Meta Server 的 Connection列表,放入connectionMap;
  • 若是 connectionMap 是空,则对于 set 中的 ip列表,进行重连;
  • 若是 connectionMap 非空,则对于 connectionMap 中的 ip列表,进行重连;
  • 拿到上面的 Connection 以后,进行调用 GetNodesRequest(NodeType.META)
  • 根据 GetNodesRequest(NodeType.META) 的结果 NodeChangeResult,构建一个 MetaServerChangeEvent,放入EventCenter。eventCenter.post(new MetaServerChangeEvent(metaServerService.getMetaServerMap()));

具体代码以下:

@Override
public Map<String, Set<String>> getMetaServerMap() {
    HashMap<String, Set<String>> map = new HashMap<>();
    Set<String> set = dataServerConfig.getMetaServerIpAddresses();

    Map<String, Connection> connectionMap = metaServerConnectionFactory
        .getConnections(dataServerConfig.getLocalDataCenter());
    Connection connection = null;
    try {
        if (connectionMap.isEmpty()) {
            List<String> list = new ArrayList(set);
            Collections.shuffle(list);
            connection = ((BoltChannel) metaNodeExchanger.connect(new URL(list.iterator()
                .next(), dataServerConfig.getMetaServerPort()))).getConnection();
        } else {
            List<Connection> connections = new ArrayList<>(connectionMap.values());
            Collections.shuffle(connections);
            connection = connections.iterator().next();
            if (!connection.isFine()) {
                connection = ((BoltChannel) metaNodeExchanger.connect(new URL(connection
                    .getRemoteIP(), dataServerConfig.getMetaServerPort()))).getConnection();
            }
        }

        GetNodesRequest request = new GetNodesRequest(NodeType.META);
        final Connection finalConnection = connection;
        Object obj = metaNodeExchanger.request(new Request() {
            @Override
            public Object getRequestBody() {
                return request;
            }

            @Override
            public URL getRequestUrl() {
                return new URL(finalConnection.getRemoteIP(), finalConnection.getRemotePort());
            }
        }).getResult();
        if (obj instanceof NodeChangeResult) {
            NodeChangeResult<MetaNode> result = (NodeChangeResult<MetaNode>) obj;

            Map<String, Map<String, MetaNode>> metaNodesMap = result.getNodes();
            if (metaNodesMap != null && !metaNodesMap.isEmpty()) {
                Map<String, MetaNode> metaNodeMap = metaNodesMap.get(dataServerConfig
                    .getLocalDataCenter());
                if (metaNodeMap != null && !metaNodeMap.isEmpty()) {
                    map.put(dataServerConfig.getLocalDataCenter(), metaNodeMap.keySet());
                } 
            }
        }
    } 
    return map;
}

其中,具体获取MetaServer信息是在

@ConfigurationProperties(prefix = DataServerConfig.PRE_FIX)
public class DataServerConfig {
    /**
     * Getter method for property <tt>metaServerIpAddress</tt>.
     *
     * @return property value of metaServerIpAddress
     */
    public Set<String> getMetaServerIpAddresses() {
        if (metaIps != null && !metaIps.isEmpty()) {
            return metaIps;
        }
        metaIps = new HashSet<>();
        if (commonConfig != null) {
            Map<String, Collection<String>> metaMap = commonConfig.getMetaNode();
            if (metaMap != null && !metaMap.isEmpty()) {
                String localDataCenter = commonConfig.getLocalDataCenter();
                if (localDataCenter != null && !localDataCenter.isEmpty()) {
                    Collection<String> metas = metaMap.get(localDataCenter);
                    if (metas != null && !metas.isEmpty()) {
                        metaIps = metas.stream().map(NetUtil::getIPAddressFromDomain).collect(Collectors.toSet());
                    }
                }
            }
        }
        return metaIps;
    }  
}

0x09 后续

在文中咱们能够看到,MetaServerChangeEvent也会转化为 DataServerChangeEvent,投放到EventCenter。

前图的2,4两步。这是由于Meta Server的这个推送,也许是告诉data Server,"hi,目前data server也有变更"。因此下一期咱们介绍如何处理DataServerChangeEvent

前图

+-------------------------------+
|[DataServerBootstrap]          |   MetaServerChangeEvent
|                               |
|                               +-------------------------+
|       startRaftClient         |                         |
|                               |                         |              +---------------+
|                               |                         |              |               |
+-------------------------------+                         |              |               |
+-------------------------------+                         |              |               |
| [Timer]                       |                         |              v               |
|                               |                         |  1   +-------+-----+         |
|  ConnectionRefreshMetaTask    +------------------------------> | EventCenter +----+    |
|                               | MetaServerChangeEvent   |      +-------+-----+    |    |
+-------------------------------+                         |              ^          |    |
+-------------------------------+                         |              |          |    |
|                               |                         |              |          |    |
| [Push<NodeChangeResult>]      |                         |              |          |    |
|                               |                         |              |          |    |
|                               +-------------------------+              |          |    |
|                               |  MetaServerChangeEvent                 |          |    |
|      ServerChangeHandler      |                               2        |          |    |
|                               +----------------------------------------+          |    |
+-------------------------------+      DataServerChangeEvent                        |    |
                                                                                    |    |
                                                                                    |    |
                                 MetaServerChangeEvent                              |    |
                                                                   3                |    |
                               +----------------------------------------------------+    |
                               |                                                         |
                               v                                                         |
             +-----------------+--------------+         DataServerChangeEvent            |
             |                                |                                   4      |
             |  MetaServerChangeEventHandler  +------------------------------------------+
             |                                |
             +--------------------------------+

手机以下:

0xFF 参考

sofa-bolt学习

相关文章
相关标签/搜索