Hadoop基于Protocol Buffer的RPC实现代码分析-Server端--转载

 原文地址:http://yanbohappy.sinaapp.com/?p=110 java

  最新版本的Hadoop代码中已经默认了Protocol buffer(如下简称PB,http://code.google.com/p/protobuf/)做为RPC的默认实现,原来的WritableRpcEngine已经被淘汰了。来自cloudera的Aaron T. Myers在邮件中这样说的“since PB can provide support for evolving protocols in a compatible fashion.”node

首先要明白PB是什么,PB是Google开源的一种轻便高效的结构化数据存储格式,能够用于结构化数据序列化/反序列化,很适合作数据存储或 RPC 数据交换格式。它可用于通信协议、数据存储等领域的语言无关、平台无关、可扩展的序列化结构数据格式。目前提供了 C++、Java、Python 三种语言的 API。简单理解就是某个进程把一些结构化数据经过网络通讯的形式传递给另一个进程(典型应用就是RPC);或者某个进程要把某些结构化数据持久化存储到磁盘上(这个有点相似于在Mongodb中的BSON格式)。对于存储的这个例子来讲,使用PB和XML,JSON相比的缺点就是存储在磁盘上的数据用户是没法理解的,除非用PB反序列化以后才行,这个有点相似于IDL。优势就是序列化/反序列化速度快,网络或者磁盘IO传输的数据少,这个在Data-Intensive Scalable Computing中是很是重要的。linux

Hadoop使用PB做为RPC实现的另一个缘由是PB的语言、平台无关性。在mailing list里据说过社区的人有这样的考虑:就是如今每一个MapReduce task都是在一个JVM虚拟机上运行的(即便是Streaming的模式,MR任务的数据流也是经过JVM与NN或者DN进行RPC交换的),JVM最严重的问题就是内存,例如OOM。我看社区里有人讨论说若是用PB这样的RPC实现,那么每一个MR task均可以直接与NN或者DN进行RPC交换了,这样就能够用C/C++来实现每个MR task了。百度作的HCE(https://issues.apache.org/jira/browse/MAPREDUCE-1270)和这种思路有点相似,可是因为当时的Hadoop RPC通讯仍是经过WritableRpcEngine来实现的,因此MR task仍是没有摆脱经过本地的JVM代理与NN或者DN通讯的束缚,由于Child JVM Process仍是存在的,仍是由它来设置运行时环境和RPC交互。apache

关于PB的原理和实现,请你们参考http://code.google.com/p/protobuf/或者http://www.ibm.com/developerworks/cn/linux/l-cn-gpb/?ca=drs-tp4608,本文再也不赘述。服务器

下面来看看Hadoop代码中的RPC是如何实现的。RPC就是一台机器上的某个进程要调用另一台机器上的某个进程的方法,中间通讯传输的就是相似于“方法名、参数一、参数2……”这样的信息,是结构化的。同时通讯除了这些RPC实体之外,还要有header等。网络

咱们要定义一种PB实现的RPC传输格式,首先要定义相应的.proto文件,在Hadoop common工程里,这些文件放在D:\Hadoop-trunk\hadoop-common-project\hadoop-common\src\main\proto目录下;在Hadoop HDFS工程里这些文件放在D:\Hadoop-trunk\hadoop-hdfs-project\hadoop-hdfs\src\main\proto目录下,以此类推。Hadoop编译脚本会调用相应的protoc二进制程序来编译这些以.proto结尾的文件,生成相应的.java文件。数据结构

以D:\Hadoop-trunk\hadoop-hdfs-project\hadoop-hdfs\src\main\proto目录下的ClientNamenodeProtocol.proto为例说明。文件最开始定义了一些参数:多线程

option java_package = "org.apache.hadoop.hdfs.protocol.proto";

option java_outer_classname = "ClientNamenodeProtocolProtos";

option java_generic_services = true;

option java_generate_equals_and_hash = true;

这个表示这个.proto文件通过protoc编译以后会生成org.apache.hadoop.hdfs.protocol.proto这个包下面的ClientNamenodeProtocolProtos.java类文件,那么在Hadoop源码里就能够调用这个类里的方法了。app

这个文件的主体主要是两种数据类型message和rpc,仔细看下这个文件就知道了,message就是这个ClientNamenodeProtocol协议中传输的结构体,rpc就是调用的方法。那么这两种类型在通过编译以后会生成什么呢?框架

编译以后,在Hadoop-trunk/hadoop-hdfs-project/hadoop-hdfs/target/generated-sources/java/org/apache/hadoop/hdfs/protocol/proto目录里生成了ClientNamenodeProtocolProtos.java文件,里面把message都包装成了类,而把rpc都包装成了方法。这个文件是由PB编译器自动生成的,因此不能修改。

有了这些java类以后,咱们就能够看看在Server端是怎么实现RPC的了。首先仍是NameNode初始化的流程,会调用到rpcServer = createRpcServer(conf)来建立RPC server。下面看看NameNodeRpcServer的构造函数里都作了哪些工做:

public NameNodeRpcServer(Configuration conf, NameNode nn)
      throws IOException {
    this.nn = nn;
    this.namesystem = nn.getNamesystem();
    this.metrics = NameNode.getNameNodeMetrics();

    int handlerCount =
      conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY,
                  DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
    InetSocketAddress socAddr = nn.getRpcServerAddress(conf);
    //设置ProtolEngine,目前只支持PB协议。表示接收到的RPC协议若是是ClientNamenodeProtocolPB,
    //那么处理这个RPC协议的引擎是ProtobufRpcEngine
    RPC.setProtocolEngine(conf,ClientNamenodeProtocolPB.class,ProtobufRpcEngine.class);
    //声明一个ClientNamenodeProtocolServerSideTranslatorPB,
    //这个类负责把Server接收到的PB格式对象的数据,拼装成NameNode内村中的数据类型,
    //调用NameNodeRpcServer类中相应的逻辑,而后再把执行结果拼装成PB格式。
    ClientNamenodeProtocolServerSideTranslatorPB
    clientProtocolServerTranslator =
         new ClientNamenodeProtocolServerSideTranslatorPB(this);
    BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);

    DatanodeProtocolServerSideTranslatorPB dnProtoPbTranslator =
        new DatanodeProtocolServerSideTranslatorPB(this);
    BlockingService dnProtoPbService = DatanodeProtocolService
        .newReflectiveBlockingService(dnProtoPbTranslator);

    NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
        new NamenodeProtocolServerSideTranslatorPB(this);
      BlockingService NNPbService = NamenodeProtocolService
          .newReflectiveBlockingService(namenodeProtocolXlator);

    RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator =
        new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this);
    BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService
        .newReflectiveBlockingService(refreshAuthPolicyXlator);

    RefreshUserMappingsProtocolServerSideTranslatorPB refreshUserMappingXlator =
        new RefreshUserMappingsProtocolServerSideTranslatorPB(this);
    BlockingService refreshUserMappingService = RefreshUserMappingsProtocolService
        .newReflectiveBlockingService(refreshUserMappingXlator);

    GetUserMappingsProtocolServerSideTranslatorPB getUserMappingXlator =
        new GetUserMappingsProtocolServerSideTranslatorPB(this);
    BlockingService getUserMappingService = GetUserMappingsProtocolService
        .newReflectiveBlockingService(getUserMappingXlator);

    HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
        new HAServiceProtocolServerSideTranslatorPB(this);
    BlockingService haPbService = HAServiceProtocolService
        .newReflectiveBlockingService(haServiceProtocolXlator);

    WritableRpcEngine.ensureInitialized();

    InetSocketAddress dnSocketAddr = nn.getServiceRpcServerAddress(conf);
    if (dnSocketAddr != null) {
      int serviceHandlerCount =
        conf.getInt(DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
                    DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
      // Add all the RPC protocols that the namenode implements
      this.serviceRpcServer =
          RPC.getServer(org.apache.hadoop.hdfs.protocolPB.
              ClientNamenodeProtocolPB.class, clientNNPbService,
          dnSocketAddr.getHostName(), dnSocketAddr.getPort(),
          serviceHandlerCount,
          false, conf, namesystem.getDelegationTokenSecretManager());
      DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
          serviceRpcServer);
      DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
          serviceRpcServer);
      DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
          serviceRpcServer);
      DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
          refreshAuthService, serviceRpcServer);
      DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
          refreshUserMappingService, serviceRpcServer);
      DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
          getUserMappingService, serviceRpcServer);

      this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
      nn.setRpcServiceServerAddress(conf, serviceRPCAddress);
    } else {
      serviceRpcServer = null;
      serviceRPCAddress = null;
    }
    // Add all the RPC protocols that the namenode implements
    this.clientRpcServer = RPC.getServer(
        org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,
        clientNNPbService, socAddr.getHostName(),
            socAddr.getPort(), handlerCount, false, conf,
            namesystem.getDelegationTokenSecretManager());
    DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
        clientRpcServer);
    DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
        refreshAuthService, clientRpcServer);
    DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
        refreshUserMappingService, clientRpcServer);
    DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
        getUserMappingService, clientRpcServer);

    // set service-level authorization security policy
    if (serviceAuthEnabled =
          conf.getBoolean(
            CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
      this.clientRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
      if (this.serviceRpcServer != null) {
        this.serviceRpcServer.refreshServiceAcl(conf, new HDFSPolicyProvider());
      }
    }

    // The rpc-server port can be ephemeral... ensure we have the correct info
    this.clientRpcAddress = this.clientRpcServer.getListenerAddress();
    nn.setRpcServerAddress(conf, clientRpcAddress);

    this.minimumDataNodeVersion = conf.get(
        DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY,
        DFSConfigKeys.DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT);
  }

ClientNamenodeProtocol是protoc编译生成的ClientNamenodeProtocolProtos类中的inner class。

public static com.google.protobuf.BlockingService
       newReflectiveBlockingService(final BlockingInterface impl) {
	……
       }

这个方法也是由protoc编译器自动生成的。这个方法会返回一个com.google.protobuf.BlockingService类型的对象,这种类型的对象定义了RPC的各类服务,后面会讲。

this.clientRpcServer = RPC.getServer(
        org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,
        clientNNPbService, socAddr.getHostName(),
        socAddr.getPort(), handlerCount, false, conf,
        namesystem.getDelegationTokenSecretManager());

这个RPC.getServer()函数生成一个Server对象,负责接收网络链接,读取数据,调用处理数据函数,返回结果。这个Server对象里有Listener, Handler, Responder内部类,分别开启多个线程负责监听、读取、处理和返回结果。前两个参数表示若是RPC发送过来的是ClientNamenodeProtocolPB协议,那么负责处理这个协议的服务(com.google.protobuf.BlockingService类型的对象)就是clientNNPbService。

这个RPC.getServer()会通过层层调用,由于如今默认的RPCEngine是ProtobufRpcEngine(ProtobufRpcEngine.java),就会调用到下面这个函数,在这生成了一个Server对象,就是用于接收client端RPC请求,处理,回复的Server。这个Server对象是一个纯粹的网络服务的Server,在RPC中起到基础网络IO服务的做用。

public RPC.Server getServer(Class<?> protocol, Object protocolImpl,
      String bindAddress, int port, int numHandlers, int numReaders,
      int queueSizePerHandler, boolean verbose, Configuration conf,
      SecretManager<? extends TokenIdentifier> secretManager,
      String portRangeConfig)
      throws IOException {
    return new Server(protocol, protocolImpl, conf, bindAddress, port,
        numHandlers, numReaders, queueSizePerHandler, verbose, secretManager,
        portRangeConfig);
  }

如今该用到的东西都生成好了,就要看看client端来了一个RPC请求以后,Server端是怎么处理的呢?

Server里的Reader线程也是基于Selector的异步IO模式,每次Select选出一个SelectionKey以后,会调用SelectionKey.attachment()把这个SelectionKey所attach的Connection对象获取,而后执行对应的readAndProcess()方法,把这个SelectionKey所对应的管道上的网络IO数据读入缓冲区。readAndProcess()方法会层层调用到Server.processData()方法,在这个方法内部,会把刚才从网络IO中读取的数据反序列化成对象rpcRequest对象。rpcRequest对象的类型是继承自Writable类型的子类的对象,也就是说能够序列化/反序列化的类。这里rpcRequest对象里包含的RPC请求的内容对象是由.proto文件中Message生成的类,也就是说PB框架自动编译出来的类,后面能够经过调用这个类的get方法获取RPC中真正传输的数据。以后把生成的rpcRequest对象放到一个Call对象里面,再把Call对象放到队列Server.callQueue里面。至此网络服务器的Reader线程作的工做就OK了。

下面看看Handler线程是怎么处理的。Handler线程默认有10个,因此处理逻辑是多线程的。每一个Handler线程会从刚才提到的callQueue中取一个Call对象,而后调用Server.call()方法执行这个Call对象中蕴含的RPC请求。Server.call()->RPC.Server.call()->Server.getRpcInvoker()->ProtobufRpcInvoker.call()在最后这个call()函数里面真正执行喽。。。。重点看这个函数,首先校验这个请求发过来的数据是否是合理的。而后就是获取实现这个协议的服务。实现协议的服务在初始化的时候已经注册过了,就是前面说的那个com.google.protobuf.BlockingService类型的对象,例如:

BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);

这个就是实现Client和NameNode之间的ClientNamenodeProtocol协议的服务。固然还有dnProtoPbService, NNPbService, refreshAuthService, refreshUserMappingService, haPbService等等这些不一样的服务。

这个Service获取了以后,经过调用这句代码

result = service.callBlockingMethod(methodDescriptor, null, param);

就会执行这个RPC请求的逻辑。

再往深刻执行就要涉及到google protocol buffer内部的东西了,这个service对象会把相应的方法调用转移到一个继承自BlockingInterface接口的实现类上。Service的真正实现类就是clientProtocolServerTranslator,是newReflectiveBlockingService()这个函数的参数。

BlockingService clientNNPbService = ClientNamenodeProtocol.
         newReflectiveBlockingService(clientProtocolServerTranslator);

这个初始化过程当中的参数,也就是service.callBlockingMethod()真正调用的是clientProtocolServerTranslator中对应的方法。这一点能够经过由protoc自动编译生成的代码中看出:

public static com.google.protobuf.BlockingService
        newReflectiveBlockingService(final BlockingInterface impl) {
      return new com.google.protobuf.BlockingService() {
        public final com.google.protobuf.Descriptors.ServiceDescriptor
            getDescriptorForType() {
          return getDescriptor();
        }

        public final com.google.protobuf.Message callBlockingMethod(
            com.google.protobuf.Descriptors.MethodDescriptor method,
            com.google.protobuf.RpcController controller,
            com.google.protobuf.Message request)
            throws com.google.protobuf.ServiceException {
          if (method.getService() != getDescriptor()) {
            throw new java.lang.IllegalArgumentException(
              "Service.callBlockingMethod() given method descriptor for " +
              "wrong service type.");
          }
          switch(method.getIndex()) {
            case 0:
              return impl.getBlockLocations(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto)request);
            case 1:
              return impl.getServerDefaults(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetServerDefaultsRequestProto)request);
            case 2:
              return impl.create(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto)request);
            case 3:
              return impl.append(controller, (org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto)request);
            ……
}
……
}

上面就是proto编译生成的ClientNamenodeProtocolProtos.java文件,从中能够看出对callBlockingMethod()方法的调用都是转移到BlockingInterface impl上面了。

而后咱们看看clientProtocolServerTranslator是怎么进一步执行的。下面以getBlockLocations()函数为例说明:

public GetBlockLocationsResponseProto getBlockLocations(
      RpcController controller, GetBlockLocationsRequestProto req)
      throws ServiceException {
    try {
      //下面这个server是由NameNodeRpcServer类生成的对象,定义了HDFS元数据操做逻辑。
      LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(),
          req.getLength());
      //因为server返回的是NameNode内存中的数据结构,要把这个结果经过RPC传回client端,
      //那么咱们须要利用PB框架提供的对应Message的Builder类,把内存中的数据结构经过这个接口序列化。
      Builder builder = GetBlockLocationsResponseProto
          .newBuilder();
      if (b != null) {
        builder.setLocations(PBHelper.convert(b)).build();
      }
      return builder.build();
    } catch (IOException e) {
      throw new ServiceException(e);
    }
  }

至此,Hadoop的RPC流程Server端已经分析结束,不过这个是正确执行的流程。若是中间抛出了异常呢?仍是以上面这个getBlockLocations()函数为例,若是元数据操做逻辑NameNodeRpcServer里面抛出IOException,那么它都会把它封装成ServiceException,而后一路传递给client端。在client端,会经过ProtobufHelper.getRemoteException()把封装在ServiceException中的IOException获取出来。

相关文章
相关标签/搜索