Hadoop之RPC

       Hadoop的RPC主要是经过Java的动态代理(Dynamic Proxy)与反射(Reflect)实现,代理类是由java.lang.reflect.Proxy类在运行期时根据接口,采用Java反射功能动态生成的,而且结合java.lang.reflect.InvocationHandler来处理客户端的请求,当用户调用这个动态生成的实现类时,其实是调用了InvocationHandler实现类的invoke方法。RPC源代码在org.apache.hadoop.ipc下,有如下几个主要类: 
    Client: 客户端,链接服务器、传递函数名和相应的参数、等待结果;
    Server:服务器端,主要接受Client的请求、执行相应的函数、返回结果;
    VersionedProtocol:通讯双方所遵循契约的父接口;
    RPC:RPC通讯机制,主要是为通讯的服务方提供代理。

  1.通讯双方遵循的契约

    要经过RPC服务进行通讯,服务的提供方必须实现某个接口,而这个便可是VersionedProtocol的子类,诸如:
InterTrackerProtocol,它是TaskTracker与JobTracker进行通讯所遵循的契约,JobTracker是一个Server,它必须实现这个接口;
JobSubmissionProtocol,它是JobTracker与JobClient通信所遵循的契约,JobClient利用契约中的方法能够提交做业去执行, 而且获得当前系统的状态;
DatanodeProtocol,利用此契约,DataNode能够向NameNode汇报本身的块状态以及负载状况。
InterDatanodeProtocol,DataNode之间利用此契约能够更新数据块。
其它的接口在此再也不一一赘述。

    2.Hadoop中RPC通讯原理 

  咱们经过TaskTracker与JobTracker的通讯来剖析其通讯过程,JobTracker的代理是经过下面的方法获得的,
 this.jobClient = (InterTrackerProtocol) 
    UserGroupInformation.getLoginUser().doAs(
        new PrivilegedExceptionAction<Object>() {
      public Object run() throws IOException {
        return RPC.waitForProxy(InterTrackerProtocol.class,
            InterTrackerProtocol.versionID,
            jobTrackAddr, fConf);
      }
    });
View Code

  它是经过调用RPC类中的静态方法waitForProxy()方法而获得了InterTrackerProtocol的一个代理,借助于这个代理对象,TaskTracker就能够与JobTracker进行通讯了。java

  VersionedProtocol proxy =
        (VersionedProtocol) Proxy.newProxyInstance(
            protocol.getClassLoader(), new Class[] { protocol },
            new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
View Code

  跟踪Hadoop的源代码,咱们能够发现PRC.waitForProxy()最终是调用的Proxy.newProxyInstance()来建立一个代理对象,第一个参数是类加载器(代理类在运行的过程当中动态生成),第二个参数是要实现的代理类的接口,第三个参数是InvokercationHandler接口的子类,最终调用的也就是InvokercationHandler实现类的的invoker()方法。node

  private static class Invoker implements InvocationHandler {
    private Client.ConnectionId remoteId;
    private Client client;
.....

    public Object invoke(Object proxy, Method method, Object[] args)
      throws Throwable {
      final boolean logDebug = LOG.isDebugEnabled();
      long startTime = 0;
      if (logDebug) {
        startTime = System.currentTimeMillis();
      }

      ObjectWritable value = (ObjectWritable)
        client.call(new Invocation(method, args), remoteId);
      if (logDebug) {
        long callTime = System.currentTimeMillis() - startTime;
        LOG.debug("Call: " + method.getName() + " " + callTime);
      }
      return value.get();
    }
    
....
  }
View Code

  咱们能够看到,InvocationHandler的实现类Invoker中主要包含两个成员变量即remoteId(惟一标识RPC的服务器端)、Client(经过工厂模式获得的客户端),invoke()方法中最重要的就是下面的语句:apache

ObjectWritable value = (ObjectWritable)client.call(new Invocation(method, args), remoteId);

  其中call方法的第一个参数封装调用方法和参数并实现Writable接口的对象,以便于在分布式环境中传输,第二个参数勿需多言,它就用于惟一标识RPC Server,也就是与指定的Server进行通讯。call方法的核心代码以下:服务器

  public Writable call(Writable param, ConnectionId remoteId)  throws InterruptedException, IOException {
    Call call = new Call(param);
    Connection connection = getConnection(remoteId, call);//请看下面的说明
    connection.sendParam(call);                 // 将参数封装成一个call对象发送给Server
    boolean interrupted = false;
    synchronized (call) {
      while (!call.done) {
        try {
          call.wait();                           // 等待Server发送的内容
        } catch (InterruptedException ie) {
          // save the fact that we were interrupted
          interrupted = true;
        }
      }
...
        return call.value;
  }
View Code

  其中居然出现了一个Call对象,咱们看到此方法返回的结果是call对象的一个成员变量,也就是说Call封装了Client的请求以及Server的响应,synchronized的使用会同步Client的请求以及Server的响应。通Connection对象的sendParam方法能够将请求发送给Server,那么Connection又是什么呢?分布式

   private Connection getConnection(ConnectionId remoteId,Call call) throws IOException, InterruptedException {
    do {
      synchronized (connections) {
        connection = connections.get(remoteId);
        if (connection == null) {
          connection = new Connection(remoteId);
          connections.put(remoteId, connection);
        }
      }
    } while (!connection.addCall(call));
    
...
    connection.setupIOstreams();
    return connection;
  }
View Code

  其实Connection是扩展Thread而获得的一个线程,最终把全部的connection对象都放入到一个Hashtable中,同一个ConnectionId的Connection能够复用,下降了建立线程的开销。connection.setupIOstreams()用于在真正的创建链接,并将RPC的header写入到输出流中,经过start方法启动线程,其核心代码以下所示:ide

  public void run() {
      while (waitForWork()) {//等到能够读响应时返回true
        receiveResponse();
}   

  receiveResponse方法主要是从输入流反序列化出value,并将其封装在call对象中,这样client端就获得了server的响应,核心代码以下:函数

private void receiveResponse() {
          try {
        int id = in.readInt();                  // 读取链接id,以便从calls中取出相应的call对象
        Call call = calls.get(id);
        int state = in.readInt();     // 读取输入流的状态
        if (state == Status.SUCCESS.state) {
          Writable value = ReflectionUtils.newInstance(valueClass, conf);
          value.readFields(in);                 // read value
          call.setValue(value);
          calls.remove(id);
        } 
...
    }
View Code

才疏学浅,错误之处在所不免,恳请各位予以指正。。 oop

相关文章
相关标签/搜索