this.jobClient = (InterTrackerProtocol) UserGroupInformation.getLoginUser().doAs( new PrivilegedExceptionAction<Object>() { public Object run() throws IOException { return RPC.waitForProxy(InterTrackerProtocol.class, InterTrackerProtocol.versionID, jobTrackAddr, fConf); } });
它是经过调用RPC类中的静态方法waitForProxy()方法而获得了InterTrackerProtocol的一个代理,借助于这个代理对象,TaskTracker就能够与JobTracker进行通讯了。java
VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
跟踪Hadoop的源代码,咱们能够发现PRC.waitForProxy()最终是调用的Proxy.newProxyInstance()来建立一个代理对象,第一个参数是类加载器(代理类在运行的过程当中动态生成),第二个参数是要实现的代理类的接口,第三个参数是InvokercationHandler接口的子类,最终调用的也就是InvokercationHandler实现类的的invoker()方法。node
private static class Invoker implements InvocationHandler { private Client.ConnectionId remoteId; private Client client; ..... public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } return value.get(); } .... }
咱们能够看到,InvocationHandler的实现类Invoker中主要包含两个成员变量即remoteId(惟一标识RPC的服务器端)、Client(经过工厂模式获得的客户端),invoke()方法中最重要的就是下面的语句:apache
ObjectWritable value = (ObjectWritable)client.call(new Invocation(method, args), remoteId);
其中call方法的第一个参数封装调用方法和参数并实现Writable接口的对象,以便于在分布式环境中传输,第二个参数勿需多言,它就用于惟一标识RPC Server,也就是与指定的Server进行通讯。call方法的核心代码以下:服务器
public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(param); Connection connection = getConnection(remoteId, call);//请看下面的说明 connection.sendParam(call); // 将参数封装成一个call对象发送给Server boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // 等待Server发送的内容 } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } ... return call.value; }
其中居然出现了一个Call对象,咱们看到此方法返回的结果是call对象的一个成员变量,也就是说Call封装了Client的请求以及Server的响应,synchronized的使用会同步Client的请求以及Server的响应。通Connection对象的sendParam方法能够将请求发送给Server,那么Connection又是什么呢?分布式
private Connection getConnection(ConnectionId remoteId,Call call) throws IOException, InterruptedException { do { synchronized (connections) { connection = connections.get(remoteId); if (connection == null) { connection = new Connection(remoteId); connections.put(remoteId, connection); } } } while (!connection.addCall(call)); ... connection.setupIOstreams(); return connection; }
其实Connection是扩展Thread而获得的一个线程,最终把全部的connection对象都放入到一个Hashtable中,同一个ConnectionId的Connection能够复用,下降了建立线程的开销。connection.setupIOstreams()用于在真正的创建链接,并将RPC的header写入到输出流中,经过start方法启动线程,其核心代码以下所示:ide
public void run() { while (waitForWork()) {//等到能够读响应时返回true receiveResponse();
}
receiveResponse方法主要是从输入流反序列化出value,并将其封装在call对象中,这样client端就获得了server的响应,核心代码以下:函数
private void receiveResponse() { try { int id = in.readInt(); // 读取链接id,以便从calls中取出相应的call对象 Call call = calls.get(id); int state = in.readInt(); // 读取输入流的状态 if (state == Status.SUCCESS.state) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value call.setValue(value); calls.remove(id); } ... }
才疏学浅,错误之处在所不免,恳请各位予以指正。。 oop