必备技术点:
1. 动态代理(参考 :http://weixiaolu.iteye.com/blog/1477774 )
2. Java NIO(参考 :http://weixiaolu.iteye.com/blog/1479656 )
3. Java网络编程
目录:
一.RPC协议
二.ipc.RPC源码分析
三.ipc.Client源码分析
四.ipc.Server源码分析
分析:
一.RPC协议
在分析协议以前,我以为咱们颇有必要先搞清楚协议是什么。下面我就谈一点本身的认识吧。若是你学过java的网络编程,你必定知道:当客户端发送一个字节给服务端时,服务端必须也要有一个读字节的方法在阻塞等待;反之亦然。 这种我把它称为底层的通讯协议。但是对于一个大型的网络通讯系统来讲,很显然这种说法的协议粒度过小,不方便咱们理解整个网络通讯的流程及架构,因此我造了个说法:架构层次的协议。通俗一点说,就是我把某些接口和接口中的方法称为协议,客户端和服务端只要实现这些接口中的方法就能够进行通讯了,从这个角度来讲,架构层次协议的说法就能够成立了(注:若是从架构层次的协议来分析系统,咱们就先不要太在乎方法的具体实现,呵呵,我相信你懂得~)。
Hadoop的RPC机制正是采用了这种“架构层次的协议”,有一整套做为协议的接口。如图:java
下面就几个重点的协议介绍一下吧:node
VersionedProtocol :它是全部RPC协议接口的父接口,其中只有一个方法:getProtocolVersion()
(1)HDFS相关
ClientDatanodeProtocol :一个客户端和datanode之间的协议接口,用于数据块恢复
ClientProtocol :client与Namenode交互的接口,全部控制流的请求均在这里,如:建立文件、删除文件等;
DatanodeProtocol : Datanode与Namenode交互的接口,如心跳、blockreport等;
NamenodeProtocol :SecondaryNode与Namenode交互的接口。
(2)Mapreduce相关
InterDatanodeProtocol :Datanode内部交互的接口,用来更新block的元数据;
InnerTrackerProtocol :TaskTracker与JobTracker交互的接口,功能与DatanodeProtocol类似;
JobSubmissionProtocol :JobClient与JobTracker交互的接口,用来提交Job、得到Job等与Job相关的操做;
TaskUmbilicalProtocol :Task中子进程与母进程交互的接口,子进程即map、reduce等操做,母进程即TaskTracker,该接口能够回报子进程的运行状态(词汇扫盲: umbilical 脐带的, 关系亲密的) 。编程
一会儿罗列了这么多的协议,有些人可能要问了,hadoop是怎么使用它们的呢?呵呵,不要着急哦,其实本篇博客所分析的是hadoop的RPC机制底层的具体实现,而这些协议倒是应用层上的东西,好比hadoop是怎么样保持“心跳”的啊。因此在个人下一篇博客:源码级分析hadoop的心跳机制中会详细说明以上协议是怎样被使用的。尽请期待哦~。如今就开始咱们的RPC源码之旅吧•••
二.ipc.RPC源码分析
ipc.RPC类中有一些内部类,为了你们对RPC类有个初步的印象,就先罗列几个咱们感兴趣的分析一下吧:缓存
Invocation :用于封装方法名和参数,做为数据传输层,至关于VO吧。
ClientCache :用于存储client对象,用socket factory做为hash key,存储结构为hashMap <SocketFactory, Client>。
Invoker :是动态代理中的调用实现类,继承了InvocationHandler.
Server :是ipc.Server的实现类。网络
从以上的分析能够知道,Invocation类仅做为VO,ClientCache类只是做为缓存,而Server类用于服务端的处理,他们都和客户端的数据流和业务逻辑没有关系。如今就只剩下Invoker类了。若是你对动态代理(参考:http://weixiaolu.iteye.com/blog/1477774 )比较了解的话,你一下就会想到,咱们接下来去研究的就是RPC.Invoker类中的invoke()方法了。代码以下:
代码一:架构
public Object invoke(Object proxy, Method method, Object[] args) tcp
throws Throwable { 函数
••• oop
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);
•••
return value.get();
}
呵呵,若是你发现这个invoke()方法实现的有些奇怪的话,那你就对了。通常咱们看到的动态代理的invoke()方法中总会有 method.invoke(ac, arg); 这句代码。而上面代码中却没有,这是为何呢?其实使用 method.invoke(ac, arg); 是在本地JVM中调用;而在hadoop中,是将数据发送给服务端,服务端将处理的结果再返回给客户端,因此这里的invoke()方法必然须要进行网络通讯。而网络通讯就是下面的这段代码实现的:
代码二:
ObjectWritable value = (ObjectWritable)
client.call(new Invocation(method, args), remoteId);
Invocation类在这里封装了方法名和参数,充当VO。其实这里网络通讯只是调用了Client类的call()方法。那咱们接下来分析一下ipc.Client源码吧。不过在分析ipc.Client源码以前,为了避免让咱们像盲目的苍蝇同样乱撞,我想先肯定一下咱们分析的目的是什么,我总结出了三点须要解决的问题:
1. 客户端和服务端的链接是怎样创建的?
2. 客户端是怎样给服务端发送数据的?
3. 客户端是怎样获取服务端的返回数据的?
基于以上三个问题,咱们开始吧!!!
三.ipc.Client源码分析
一样,为了对Client类有个初步的了解,咱们也先罗列几个咱们感兴趣的内部类:
Call :用于封装Invocation对象,做为VO,写到服务端,同时也用于存储从服务端返回的数据
Connection :用以处理远程链接对象。继承了Thread
ConnectionId :惟一肯定一个链接
问题1:客户端和服务端的链接是怎样创建的?
下面咱们来看看Client类中的cal()方法吧:
代码三:
public Writable call(Writable param, ConnectionId remoteId)
throws InterruptedException, IOException {
Call call = new Call(param); //将传入的数据封装成call对象
Connection connection = getConnection(remoteId, call); //得到一个链接
connection.sendParam(call); // 向服务端发送call对象
boolean interrupted = false;
synchronized (call) {
while (!call.done) {
try {
call.wait(); // 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程
} catch (InterruptedException ie) {
// 因中断异常而终止,设置标志interrupted为true
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
if (call.error != null) {
if (call.error instanceof RemoteException) {
call.error.fillInStackTrace();
throw call.error;
} else { // 本地异常
throw wrapException(remoteId.getAddress(), call.error);
}
} else {
return call.value; //返回结果数据
}
}
}
具体代码的做用我已作了注释,因此这里再也不赘述。但到目前为止,你依然不知道RPC机制底层的网络链接是怎么创建的。呵呵,那咱们只好再去深究了,分析代码后,咱们会发现和网络通讯有关的代码只会是下面的两句了:
代码四:
Connection connection = getConnection(remoteId, call); //得到一个链接
connection.sendParam(call); // 向服务端发送call对象
先看看是怎么得到一个到服务端的链接吧,下面贴出ipc.Client类中的getConnection()方法。
代码五:
private Connection getConnection(ConnectionId remoteId,
Call call)
throws IOException, InterruptedException {
if (!running.get()) {
// 若是client关闭了
throw new IOException("The client is stopped");
}
Connection connection;
//若是connections链接池中有对应的链接对象,就不需从新建立了;若是没有就需从新建立一个链接对象。
//但请注意,该//链接对象只是存储了remoteId的信息,其实还并无和服务端创建链接。
do {
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
connection = new Connection(remoteId);
connections.put(remoteId, connection);
}
}
} while (!connection.addCall(call)); //将call对象放入对应链接中的calls池,就不贴出源码了
//这句代码才是真正的完成了和服务端创建链接哦~
connection.setupIOstreams();
return connection;
}
若是你还有兴趣继续分析下去,那咱们就一探创建链接的过程吧,下面贴出Client.Connection类中的setupIOstreams()方法:
代码六:
private synchronized void setupIOstreams() throws InterruptedException {
•••
try {
•••
while (true) {
setupConnection(); //创建链接
InputStream inStream = NetUtils.getInputStream(socket); //得到输入流
OutputStream outStream = NetUtils.getOutputStream(socket); //得到输出流
writeRpcHeader(outStream);
•••
this.in = new DataInputStream(new BufferedInputStream
(new PingInputStream(inStream))); //将输入流装饰成DataInputStream
this.out = new DataOutputStream
(new BufferedOutputStream(outStream)); //将输出流装饰成DataOutputStream
writeHeader();
// 跟新活动时间
touch();
//当链接创建时,启动接受线程等待服务端传回数据,注意:Connection继承了Tread
start();
return;
}
} catch (IOException e) {
markClosed(e);
close();
}
}
再有一步咱们就知道客户端的链接是怎么创建的啦,下面贴出Client.Connection类中的setupConnection()方法:
代码七:
private synchronized void setupConnection() throws IOException {
short ioFailures = 0;
short timeoutFailures = 0;
while (true) {
try {
this.socket = socketFactory.createSocket(); //终于看到建立socket的方法了
this.socket.setTcpNoDelay(tcpNoDelay);
•••
// 设置链接超时为20s
NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
this.socket.setSoTimeout(pingInterval);
return;
} catch (SocketTimeoutException toe) {
/* 设置最多链接重试为45次。
* 总共有20s*45 = 15 分钟的重试时间。
*/
handleConnectionFailure(timeoutFailures++, 45, toe);
} catch (IOException ie) {
handleConnectionFailure(ioFailures++, maxRetries, ie);
}
}
}
终于,咱们知道了客户端的链接是怎样创建的了,其实就是建立一个普通的socket进行通讯。呵呵,那服务端是否是也是建立一个ServerSocket进行通讯的呢?呵呵,先不要急,到这里咱们只解决了客户端的第一个问题,下面还有两个问题没有解决呢,咱们一个一个地来解决吧。
问题2:客户端是怎样给服务端发送数据的?
咱们回顾一下代码四吧。第一句为了完成链接的创建,咱们已经分析完毕;而第二句是为了发送数据,呵呵,分析下去,看能不能解决咱们的问题呢。下面贴出Client.Connection类的sendParam()方法吧:
代码八:
public void sendParam(Call call) {
if (shouldCloseConnection.get()) {
return;
}
DataOutputBuffer d=null;
try {
synchronized (this.out) {
if (LOG.isDebugEnabled())
LOG.debug(getName() + " sending #" + call.id);
//建立一个缓冲区
d = new DataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte[] data = d.getData();
int dataLength = d.getLength();
out.writeInt(dataLength); //首先写出数据的长度
out.write(data, 0, dataLength); //向服务端写数据
out.flush();
}
} catch(IOException e) {
markClosed(e);
} finally {
IOUtils.closeStream(d);
}
}
其实这就是java io的socket发送数据的通常过程哦,没有什么特别之处。到这里问题二也解决了,来看看问题三吧。
问题3:客户端是怎样获取服务端的返回数据的?
咱们再回顾一下代码六吧。代码六中,当链接创建时会启动一个线程用于处理服务端返回的数据,咱们看看这个处理线程是怎么实现的吧,下面贴出Client.Connection类和Client.Call类中的相关方法吧:
代码九:
方法一:
public void run() {
•••
while (waitForWork()) {
receiveResponse(); //具体的处理方法
}
close();
•••
}
方法二:
private void receiveResponse() {
if (shouldCloseConnection.get()) {
return;
}
touch();
try {
int id = in.readInt(); // 阻塞读取id
if (LOG.isDebugEnabled())
LOG.debug(getName() + " got value #" + id);
Call call = calls.get(id); //在calls池中找到发送时的那个对象
int state = in.readInt(); // 阻塞读取call对象的状态
if (state == Status.SUCCESS.state) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in); // 读取数据
//将读取到的值赋给call对象,同时唤醒Client等待线程,贴出setValue()代码方法三
call.setValue(value);
calls.remove(id); //删除已处理的call
} else if (state == Status.ERROR.state) {
•••
} else if (state == Status.FATAL.state) {
•••
}
} catch (IOException e) {
markClosed(e);
}
}
方法三:
public synchronized void setValue(Writable value) {
this.value = value;
callComplete(); //具体实现
}
protected synchronized void callComplete() {
this.done = true;
notify(); // 唤醒client等待线程
}
代码九完成的功能主要是:启动一个处理线程,读取从服务端传来的call对象,将call对象读取完毕后,唤醒client处理线程。就这么简单,客户端就获取了服务端返回的数据了哦~。客户端的源码分析就到这里了哦,下面咱们来分析Server端的源码吧。
四.ipc.Server源码分析
一样,为了让你们对ipc.Server有个初步的了解,咱们先分析一下它的几个内部类吧:
Call :用于存储客户端发来的请求
Listener : 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
Responder :响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
Connection :链接类,真正的客户端请求读取逻辑在这个类中。
Handler :请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操做。
若是你看过ipc.Server的源码,你会发现其实ipc.Server是一个abstract修饰的抽象类。那随之而来的问题就是:hadoop是怎样初始化RPC的Server端的呢?这个问题着实也让我想了好长时间。不事后来我想到Namenode初始化时必定初始化了RPC的Sever端,那咱们去看看Namenode的初始化源码吧:
1. 初始化Server
代码十:
private void initialize(Configuration conf) throws IOException {
•••
// 建立 rpc server
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if (dnSocketAddr != null) {
int serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
//得到serviceRpcServer
this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
dnSocketAddr.getPort(), serviceHandlerCount,
false, conf, namesystem.getDelegationTokenSecretManager());
this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
setRpcServiceServerAddress(conf);
}
//得到server
this.server = RPC.getServer(this, socAddr.getHostName(),
socAddr.getPort(), handlerCount, false, conf, namesystem
.getDelegationTokenSecretManager());
•••
this.server.start(); //启动 RPC server Clients只容许链接该server
if (serviceRpcServer != null) {
serviceRpcServer.start(); //启动 RPC serviceRpcServer 为HDFS服务的server
}
startTrashEmptier(conf);
}
查看Namenode初始化源码得知:RPC的server对象是经过ipc.RPC类的getServer()方法得到的。下面我们去看看ipc.RPC类中的getServer()源码吧:
代码十一:
public static Server getServer(final Object instance, final String bindAddress, final int port,
final int numHandlers,
final boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
}
这时咱们发现getServer()是一个建立Server对象的工厂方法,但建立的倒是RPC.Server类的对象。哈哈,如今你明白了我前面说的“RPC.Server是ipc.Server的实现类”了吧。不过RPC.Server的构造函数仍是调用了ipc.Server类的构造函数的,因篇幅所限,就不贴出相关源码了。
2. 运行Server
如代码十所示,初始化Server后,Server端就运行起来了,看看ipc.Server的start()源码吧:
代码十二:
/** 启动服务 */
public synchronized void start() {
responder.start(); //启动responder
listener.start(); //启动listener
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
handlers[i] = new Handler(i);
handlers[i].start(); //逐个启动Handler
}
}
3. Server处理请求
1)创建链接
分析过ipc.Client源码后,咱们知道Client端的底层通讯直接采用了阻塞式IO编程,当时咱们曾作出猜想:Server端是否是也采用了阻塞式IO。如今咱们仔细地分析一下吧,若是Server端也采用阻塞式IO,当链接进来的Client端不少时,势必会影响Server端的性能。hadoop的实现者们考虑到了这点,因此他们采用了java NIO来实现Server端,java NIO可参考博客:http://weixiaolu.iteye.com/blog/1479656 。那Server端采用java NIO是怎么创建链接的呢?分析源码得知,Server端采用Listener监听客户端的链接,下面先分析一下Listener的构造函数吧:
代码十三:
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
// 建立ServerSocketChannel,并设置成非阻塞式
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(false);
// 将server socket绑定到本地端口
bind(acceptChannel.socket(), address, backlogLength);
port = acceptChannel.socket().getLocalPort();
// 得到一个selector
selector= Selector.open();
readers = new Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads);
//启动多个reader线程,为了防止请求多时服务端响应延时的问题
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader = new Reader(readSelector);
readers[i] = reader;
readPool.execute(reader);
}
// 注册链接事件
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
}
在启动Listener线程时,服务端会一直等待客户端的链接,下面贴出Server.Listener类的run()方法:
代码十四:
public void run() {
•••
while (running) {
SelectionKey key = null;
try {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
key = iter.next();
iter.remove();
try {
if (key.isValid()) {
if (key.isAcceptable())
doAccept(key); //具体的链接方法
}
} catch (IOException e) {
}
key = null;
}
} catch (OutOfMemoryError e) {
•••
}
下面贴出Server.Listener类中doAccept ()方法中的关键源码吧:
代码十五:
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Connection c = null;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) { //创建链接
channel.configureBlocking(false);
channel.socket().setTcpNoDelay(tcpNoDelay);
Reader reader = getReader(); //从readers池中得到一个reader
try {
reader.startAdd(); // 激活readSelector,设置adding为true
SelectionKey readKey = reader.registerChannel(channel);//将读事件设置成兴趣事件
c = new Connection(readKey, channel, System.currentTimeMillis());//建立一个链接对象
readKey.attach(c); //将connection对象注入readKey
synchronized (connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
•••
} finally {
//设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每一个reader都使
//用了wait()方法等待。因篇幅有限,就不贴出源码了。
reader.finishAdd();
}
}
}
当reader被唤醒,reader接着执行doRead()方法。
2)接收请求
下面贴出Server.Listener.Reader类中的doRead()方法和Server.Connection类中的readAndProcess()方法源码:
代码十六:
方法一:
void doRead(SelectionKey key) throws InterruptedException {
int count = 0;
Connection c = (Connection)key.attachment(); //得到connection对象
if (c == null) {
return;
}
c.setLastContact(System.currentTimeMillis());
try {
count = c.readAndProcess(); // 接受并处理请求
} catch (InterruptedException ieo) {
•••
}
•••
}
方法二:
public int readAndProcess() throws IOException, InterruptedException {
while (true) {
•••
if (!rpcHeaderRead) {
if (rpcHeaderBuffer == null) {
rpcHeaderBuffer = ByteBuffer.allocate(2);
}
//读取请求头
count = channelRead(channel, rpcHeaderBuffer);
if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
return count;
}
// 读取请求版本号
int version = rpcHeaderBuffer.get(0);
byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
•••
data = ByteBuffer.allocate(dataLength);
}
// 读取请求
count = channelRead(channel, data);
if (data.remaining() == 0) {
•••
if (useSasl) {
•••
} else {
processOneRpc(data.array());//处理请求
}
•••
}
}
return count;
}
}
3)得到call对象
下面贴出Server.Connection类中的processOneRpc()方法和processData()方法的源码。
代码十七:
方法一:
private void processOneRpc(byte[] buf) throws IOException,
InterruptedException {
if (headerRead) {
processData(buf);
} else {
processHeader(buf);
headerRead = true;
if (!authorizeConnection()) {
throw new AccessControlException("Connection from " + this
+ " for protocol " + header.getProtocol()
+ " is unauthorized for user " + user);
}
}
}
方法二:
private void processData(byte[] buf) throws IOException, InterruptedException {
DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
int id = dis.readInt(); // 尝试读取id
Writable param = ReflectionUtils.newInstance(paramClass, conf);//读取参数
param.readFields(dis);
Call call = new Call(id, param, this); //封装成call
callQueue.put(call); // 将call存入callQueue
incRpcCount(); // 增长rpc请求的计数
}
4)处理call对象
你还记得Server类中还有个Handler内部类吗?呵呵,对call对象的处理就是它干的。下面贴出Server.Handler类中run()方法中的关键代码:
代码十八:
while (running) {
try {
final Call call = callQueue.take(); //弹出call,可能会阻塞
•••
//调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实如今RPC.Server类中
value = call(call.connection.protocol, call.param, call.timestamp);
synchronized (call.connection.responseQueue) {
setupResponse(buf, call,
(error == null) ? Status.SUCCESS : Status.ERROR,
value, errorClass, error);
•••
//给客户端响应请求
responder.doRespond(call);
}
}
5)返回请求
下面贴出Server.Responder类中的doRespond()方法源码:
代码十九:
方法一:
void doRespond(Call call) throws IOException {
synchronized (call.connection.responseQueue) {
call.connection.responseQueue.addLast(call);
if (call.connection.responseQueue.size() == 1) {
// 返回响应结果,并激活writeSelector
processResponse(call.connection.responseQueue, true);
}
}
}
小结:
到这里,hadoop RPC机制的源码分析就结束了,请继续关注个人后续博客:hadoop心跳机制的源码分析。在这里须要感谢我所参考的iteye上相关博主的文章,因太多了,就不一一列举了,不过最感谢的是wikieno的博客,博客地址为:http://www.wikieno.com/2012/02/hadoop-ipc-server/ 。