Hadoop源码学习笔记之NameNode启动场景流程四:rpc server初始化及启动

老规矩,仍是分三步走,分别为源码调用分析、伪代码核心梳理、调用关系图解。node

1、源码调用分析apache

  根据上篇的梳理,直接从initialize()方法着手。源码以下,部分代码的功能以及说明,已经在注释阐述了。安全

protected void initialize(Configuration conf) throws IOException { // 能够经过找到下面变量名的映射,在hdfs-default.xml中找到对应的配置
  if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) { String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY); if (intervals != null) { conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS, intervals); } } ...... // 核心代码:启动HttpServer
  if (NamenodeRole.NAMENODE == role) { startHttpServer(conf); } this.spanReceiverHost = SpanReceiverHost.getInstance(conf); // 核心代码:FSNamesystem初始化
 loadNamesystem(conf); // 核心代码:建立一个rpc server实例
  rpcServer = createRpcServer(conf); ...... // 核心代码:启动一些服务组件,包括rpc server等
 startCommonServices(conf); }

  这段代码涉及到rpc server初始化及启动的核心,有两处:ide

  第一处是rpcServer = createRpcServer(conf); 这个createRpcServer()的功能就是建立了一个rpc server的实例。以下:函数

protected NameNodeRpcServer createRpcServer(Configuration conf) throws IOException { return new NameNodeRpcServer(conf, this); }

  咱们继续进去NameNodeRpcServer类的构造方法中看一看,到底里面作了哪些事情:oop

public NameNodeRpcServer(Configuration conf, NameNode nn) throws IOException { this.nn = nn; this.namesystem = nn.getNamesystem(); this.metrics = NameNode.getNameNodeMetrics(); int handlerCount = conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY, DFS_NAMENODE_HANDLER_COUNT_DEFAULT); RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class); 
// ----------1------------
// ---- 下面一堆都是实例化各类协议和服务的对象,全部的服务都是BlockingService接口的实现
// client和namenode之间进行通讯须要调用的接口,包括:建立目录、管理block、设置权限等一些操做 ClientNamenodeProtocolServerSideTranslatorPB clientProtocolServerTranslator = new ClientNamenodeProtocolServerSideTranslatorPB(this); BlockingService clientNNPbService = ClientNamenodeProtocol. newReflectiveBlockingService(clientProtocolServerTranslator); // datanode和namenode之间进行通讯调用的接口,包括:datanode注册、heartbeatReport、blockReport等接口 DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator = new DatanodeProtocolServerSideTranslatorPB(this); BlockingService dnProtoPbService = DatanodeProtocolService .newReflectiveBlockingService(dnProtoPbTranslator); // 不一样的namenode之间进行通讯须要调用的接口 NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = new NamenodeProtocolServerSideTranslatorPB(this); BlockingService NNPbService = NamenodeProtocolService .newReflectiveBlockingService(namenodeProtocolXlator); ......
// ---- 以上都是初始化rpc server关键的部分
// 确保供写数据的rpc服务引擎已经初始化,若是没有初始化,
// 则在此方法中调用registerProtocolEngine()从而将WritableRpcEngine引擎加入到内存中的引擎map WritableRpcEngine.ensureInitialized();

......
if (serviceRpcAddr != null) { ......
// -----------2----------
    // 实例化一个监听datanode请求的rpc server
this.serviceRpcServer = new RPC.Builder(conf) .setProtocol(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService) .setBindAddress(bindHost) .setPort(serviceRpcAddr.getPort()).setNumHandlers(serviceHandlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()) .build();     // 将前面实例化的各类协议service添加到这个监听datanode请求的rpc server // Add all the RPC protocols that the namenode implements DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, serviceRpcServer); DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, serviceRpcServer); ...... ......
} else { ...... } ...... // -----------3------------
// 实例化一个监听客户端请求的rpc server
this.clientRpcServer = new RPC.Builder(conf) .setProtocol( org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class) .setInstance(clientNNPbService).setBindAddress(bindHost) .setPort(rpcAddr.getPort()).setNumHandlers(handlerCount) .setVerbose(false) .setSecretManager(namesystem.getDelegationTokenSecretManager()).build();
   // 将前面实例化的各类协议service添加到这个监听客户端请求的RPC server
// Add all the RPC protocols that the namenode implements DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService, clientRpcServer); DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService, clientRpcServer); ......
......
}

  进入到NameNodeRpcServer类的构造方法中,能够看到除了上面几行的一些初始化赋值以外,下面的代码好长啊。其实并不复杂,ui

  根据代码的逻辑划分,主要有三部分:this

    第一部分,实例化各类通讯协议和服务对象,好比:负责建立目录、管理block、设置权限等操做的客户端同NameNode通讯的spa

      协议服务——ClientNameNodeProtocolServerSideTranslatorPB、负责datanode的启动时向NameNode注册、发送心跳报告、线程

      block信息报告等操做的datanode同NameNode通讯的协议服务——DatanodeProtocolServerSideTranslatorPB。篇幅有限,

      后面还有不一样NameNode之间通讯协议服务、HA高可靠、用户权限管理等不在一一详细说明。

    第二部分,实例化了一个监听datanode请求的rpc server,而且将第一部分实例化的各类ProtocolService同此rpc server进行绑定,

      用于处理rpc server监听到的来自datanode的各类rpc请求。

    第三部分,实例化了一个监听客户端请求的rpc server,并将第一部分实例化的各类ProtocolService同此rpc server进行绑定,用于

      处理监听到的来自客户端的rpc请求。

  至此,rpc server启动以前相关的准备工做已经完毕,接下来就要开始启动rpc server了。继续回到本篇刚开始的initialize()方法中的最后一处核心,

  startCommonServices(); 内容以下:  

private void startCommonServices(Configuration conf) throws IOException {
// 核心代码: namesystem.startCommonServices(conf, haContext); registerNNSMXBean();
if (NamenodeRole.NAMENODE != role) { startHttpServer(conf); httpServer.setNameNodeAddress(getNameNodeAddress()); httpServer.setFSImage(getFSImage()); } // 启动rpcServer rpcServer.start(); ...... }

  在此方法中,直接就启动了rpcServer。

  可是在第一行的startCommonServices()方法也是核心之一,里面有很重要的一些服务,好比磁盘检查、安全模式判断等,后续篇幅会跟进。

2、伪代码调用流程梳理

  NameNode.main() // 入口函数
    |——createNameNode(); // 经过new NameNode()进行实例化
      |——initialize(); // 方法进行初始化操做
        |——startHttpServer(); // 启动HttpServer
        |——loadNamesystem(); // 加载元数据
        |——createRpcServer(); // 建立rpc server实例
          |——new NameNodeRpcServer();
            |——service1 // 各类通讯协议service
            |——service2 // 各类通讯协议service
            |——service.... // 各类通讯协议service
            |——serviceRpcServer = new RPC.builder(); // 实例化一个监听datanode请求的rpc server
            |——serviceRpcServer.add(service1...); // 将各类service添加到serviceRpcServer
            |——clientRpcServer = new RPC.builder(); // 实例化一个监听客户端请求的rpc server
            |——clientRpcServer.add(service2...); // 将各类service添加到clientRpcServer
        |——startCommonServices();
          |——namesystem.startCommonServices(); // 启动一些磁盘检查、安全模式等一些后台服务及线程
          |——rpcServer.start(); // 启动rpcServer
    |——join()

3、rpc server初始化及启动流程图解

相关文章
相关标签/搜索