在学校期间你们都写过很多程序,好比写个hello world服务类,而后本地调用下,以下所示。这些程序的特色是服务消费方和服务提供方是本地调用关系。html
1
2
3
4
5
6
|
public
class
Test {
public
static
void
main(String[] args) {
HelloWorldService helloWorldService =
new
HelloWorldServiceImpl();
helloWorldService.sayHello(
"test"
);
}
}
|
而一旦踏入公司尤为是大型互联网公司就会发现,公司的系统都由成千上万大大小小的服务组成,各服务部署在不一样的机器上,由不一样的团队负责。java
这时就会遇到两个问题:node
因为各服务部署在不一样机器,服务间的调用免不了网络通讯过程,服务消费方每调用一个服务都要写一坨网络通讯相关的代码,不只复杂并且极易出错。web
若是有一种方式能让咱们像调用本地服务同样调用远程服务,而让调用者对网络通讯这些细节透明,那么将大大提升生产力,好比服务消费方在执行helloWorldService.sayHello(“test”)时,实质上调用的是远端的服务。这种方式其实就是RPC(Remote Procedure Call Protocol),在各大互联网公司中被普遍使用,如阿里巴巴的hsf、dubbo(开源)、Facebook的thrift(开源)、Google grpc(开源)、Twitter的finagle(开源)等。编程
要让网络通讯细节对使用者透明,咱们须要对通讯细节进行封装,咱们先看下一个RPC调用的流程涉及到哪些通讯细节:网络
RPC的目标就是要2~8这些步骤都封装起来,让用户对这些细节透明。数据结构
怎么封装通讯细节才能让用户像以本地调用方式调用远程服务呢?对java来讲就是使用代理!java代理有两种方式:架构
尽管字节码生成方式实现的代理更为强大和高效,但代码维护不易,大部分公司实现RPC框架时仍是选择动态代理方式。并发
下面简单介绍下动态代理怎么实现咱们的需求。咱们须要实现RPCProxyClient代理类,代理类的invoke方法中封装了与远端服务通讯的细节,消费方首先从RPCProxyClient得到服务提供方的接口,当执行helloWorldService.sayHello(“test”)方法时就会调用invoke方法。负载均衡
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public
class
RPCProxyClient
implements
java.lang.reflect.InvocationHandler{
private
Object obj;
public
RPCProxyClient(Object obj){
this
.obj=obj;
}
/**
* 获得被代理对象;
*/
public
static
Object getProxy(Object obj){
return
java.lang.reflect.Proxy.newProxyInstance(obj.getClass().getClassLoader(),
obj.getClass().getInterfaces(),
new
RPCProxyClient(obj));
}
/**
* 调用此方法执行
*/
public
Object invoke(Object proxy, Method method, Object[] args)
throws
Throwable {
//结果参数;
Object result =
new
Object();
// ...执行通讯相关逻辑
// ...
return
result;
}
}
|
1
2
3
4
5
6
|
public
class
Test {
public
static
void
main(String[] args) {
HelloWorldService helloWorldService = (HelloWorldService)RPCProxyClient.getProxy(HelloWorldService.
class
);
helloWorldService.sayHello(
"test"
);
}
}
|
上节讲了invoke里须要封装通讯细节(通讯细节再后面几章详细探讨),而通讯的第一步就是要肯定客户端和服务端相互通讯的消息结构。客户端的请求消息结构通常须要包括如下内容:
1)接口名称
在咱们的例子里接口名是“HelloWorldService”,若是不传,服务端就不知道调用哪一个接口了;
2)方法名
一个接口内可能有不少方法,若是不传方法名服务端也就不知道调用哪一个方法;
3)参数类型&参数值
参数类型有不少,好比有bool、int、long、double、string、map、list,甚至如struct(class);以及相应的参数值;
4)超时时间
5)requestID,标识惟一请求id,在下面一节会详细描述requestID的用处。
同理服务端返回的消息结构通常包括如下内容。
1)返回值
2)状态code
3)requestID
一旦肯定了消息的数据结构后,下一步就是要考虑序列化与反序列化了。
什么是序列化?序列化就是将数据结构或对象转换成二进制串的过程,也就是编码的过程。
什么是反序列化?将在序列化过程当中所生成的二进制串转换成数据结构或者对象的过程。
为何须要序列化?转换为二进制串后才好进行网络传输嘛!
为何须要反序列化?将二进制转换为对象才好进行后续处理!
现现在序列化的方案愈来愈多,每种序列化方案都有优势和缺点,它们在设计之初有本身独特的应用场景,那到底选择哪一种呢?从RPC的角度上看,主要看三点:
目前互联网公司普遍使用Protobuf、Thrift、Avro等成熟的序列化解决方案来搭建RPC框架,这些都是久经考验的解决方案。
消息数据结构被序列化为二进制串后,下一步就要进行网络通讯了。目前有两种经常使用IO通讯模型:1)BIO;2)NIO。通常RPC框架须要支持这两种IO模型。
如何实现RPC的IO通讯框架呢?
若是使用netty的话,通常会用channel.writeAndFlush()方法来发送消息二进制串,这个方法调用后对于整个远程调用(从发出请求到接收到结果)来讲是一个异步的,即对于当前线程来讲,将请求发送出来后,线程就能够日后执行了,至于服务端的结果,是服务端处理完成后,再以消息的形式发送给客户端的。因而这里出现如下两个问题:
以下图所示,线程A和线程B同时向client socket发送请求requestA和requestB,socket前后将requestB和requestA发送至server,而server可能将responseA先返回,尽管requestA请求到达时间更晚。咱们须要一种机制保证responseA丢给ThreadA,responseB丢给ThreadB。
怎么解决呢?
1
2
3
4
5
6
7
|
public
Object get() {
synchronized
(
this
) {
// 旋锁
while
(!isDone) {
// 是否有结果了
wait();
//没结果是释放锁,让当前线程处于等待状态
}
}
}
|
1
2
3
4
5
6
7
|
private
void
setDone(Response res) {
this
.res = res;
isDone =
true
;
synchronized
(
this
) {
//获取锁,由于前面wait()已经释放了callback的锁了
notifyAll();
// 唤醒处于等待的线程
}
}
|
如何让别人使用咱们的服务呢?有同窗说很简单嘛,告诉使用者服务的IP以及端口就能够了啊。确实是这样,这里问题的关键在因而自动告知仍是人肉告知。
人肉告知的方式:若是你发现你的服务一台机器不够,要再添加一台,这个时候就要告诉调用者我如今有两个ip了,大家要轮询调用来实现负载均衡;调用者咬咬牙改了,结果某天一台机器挂了,调用者发现服务有一半不可用,他又只能手动修改代码来删除挂掉那台机器的ip。现实生产环境固然不会使用人肉方式。
有没有一种方法能实现自动告知,即机器的增添、剔除对调用方透明,调用者再也不须要写死服务提供方地址?固然能够,现现在zookeeper被普遍用于实现服务自动注册与发现功能!
简单来说,zookeeper能够充当一个服务注册表
(Service Registry),让多个服务提供者
造成一个集群,让服务消费者
经过服务注册表获取具体的服务访问地址(ip+端口)去访问具体的服务提供者。以下图所示:
具体来讲,zookeeper就是个分布式文件系统,每当一个服务提供者部署后都要将本身的服务注册到zookeeper的某一路径上: /{service}/{version}/{ip:port}, 好比咱们的HelloWorldService部署到两台机器,那么zookeeper上就会建立两条目录:分别为/HelloWorldService/1.0.0/100.19.20.01:16888 /HelloWorldService/1.0.0/100.19.20.02:16888。
zookeeper提供了“心跳检测”功能,它会定时向各个服务提供者发送一个请求(实际上创建的是一个 Socket 长链接),若是长期没有响应,服务中心就认为该服务提供者已经“挂了”,并将其剔除,好比100.19.20.02这台机器若是宕机了,那么zookeeper上的路径就会只剩/HelloWorldService/1.0.0/100.19.20.01:16888。
服务消费者会去监听相应路径(/HelloWorldService/1.0.0),一旦路径上的数据有任务变化(增长或减小),zookeeper都会通知服务消费方服务提供者地址列表已经发生改变,从而进行更新。
更为重要的是zookeeper与生俱来的容错容灾能力(好比leader选举),能够确保服务注册表的高可用性。
ipc.RPC类中有一些内部类,为了你们对RPC类有个初步的印象,就先罗列几个咱们感兴趣的分析一下吧:
Invocation :用于封装方法名和参数,做为数据传输层。
ClientCache :用于存储client对象,用socket factory做为hash key,存储结构为hashMap <SocketFactory, Client>。
Invoker :是动态代理中的调用实现类,继承了InvocationHandler.
Server :是ipc.Server的实现类。
1
2
3
4
5
6
7
8
|
public
Object invoke(Object proxy, Method method, Object[] args)
throws
Throwable {
•••
ObjectWritable value = (ObjectWritable)
client.call(
new
Invocation(method, args), remoteId);
•••
return
value.get();
}
|
若是你发现这个invoke()方法实现的有些奇怪的话,那你就对了。通常咱们看到的动态代理的invoke()方法中总会有 method.invoke(ac, arg); 这句代码。而上面代码中却没有,这是为何呢?其实使用 method.invoke(ac, arg); 是在本地JVM中调用;而在hadoop中,是将数据发送给服务端,服务端将处理的结果再返回给客户端,因此这里的invoke()方法必然须要进行网络通讯。而网络通讯就是下面的这段代码实现的:
1
2
|
ObjectWritable value = (ObjectWritable)
client.call(
new
Invocation(method, args), remoteId);
|
Invocation类在这里封装了方法名和参数。其实这里网络通讯只是调用了Client类的call()方法。那咱们接下来分析一下ipc.Client源码吧。和第一章同样,一样是3个问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public
Writable call(Writable param, ConnectionId remoteId)
throws
InterruptedException, IOException {
Call call =
new
Call(param);
//将传入的数据封装成call对象
Connection connection = getConnection(remoteId, call);
//得到一个链接
connection.sendParam(call);
// 向服务端发送call对象
boolean
interrupted =
false
;
synchronized
(call) {
while
(!call.done) {
try
{
call.wait();
// 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程
}
catch
(InterruptedException ie) {
// 因中断异常而终止,设置标志interrupted为true
interrupted =
true
;
}
}
if
(interrupted) {
Thread.currentThread().interrupt();
}
if
(call.error !=
null
) {
if
(call.error
instanceof
RemoteException) {
call.error.fillInStackTrace();
throw
call.error;
}
else
{
// 本地异常
throw
wrapException(remoteId.getAddress(), call.error);
}
}
else
{
return
call.value;
//返回结果数据
}
}
}
|
具体代码的做用我已作了注释,因此这里再也不赘述。但到目前为止,你依然不知道RPC机制底层的网络链接是怎么创建的。分析代码后,咱们会发现和网络通讯有关的代码只会是下面的两句了:
1
2
|
Connection connection = getConnection(remoteId, call);
//得到一个链接
connection.sendParam(call);
// 向服务端发送call对象
|
先看看是怎么得到一个到服务端的链接吧,下面贴出ipc.Client类中的getConnection()方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
private
Connection getConnection(ConnectionId remoteId,
Call call)
throws
IOException, InterruptedException {
if
(!running.get()) {
// 若是client关闭了
throw
new
IOException(
"The client is stopped"
);
}
Connection connection;
//若是connections链接池中有对应的链接对象,就不需从新建立了;若是没有就需从新建立一个链接对象。
//但请注意,该//链接对象只是存储了remoteId的信息,其实还并无和服务端创建链接。
do
{
synchronized
(connections) {
connection = connections.get(remoteId);
if
(connection ==
null
) {
connection =
new
Connection(remoteId);
connections.put(remoteId, connection);
}
}
}
while
(!connection.addCall(call));
//将call对象放入对应链接中的calls池,就不贴出源码了
//这句代码才是真正的完成了和服务端创建链接哦~
connection.setupIOstreams();
return
connection;
}
|
下面贴出Client.Connection类中的setupIOstreams()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
private
synchronized
void
setupIOstreams()
throws
InterruptedException {
•••
try
{
•••
while
(
true
) {
setupConnection();
//创建链接
InputStream inStream = NetUtils.getInputStream(socket);
//得到输入流
OutputStream outStream = NetUtils.getOutputStream(socket);
//得到输出流
writeRpcHeader(outStream);
•••
this
.in =
new
DataInputStream(
new
BufferedInputStream
(
new
PingInputStream(inStream)));
//将输入流装饰成DataInputStream
this
.out =
new
DataOutputStream
(
new
BufferedOutputStream(outStream));
//将输出流装饰成DataOutputStream
writeHeader();
// 跟新活动时间
touch();
//当链接创建时,启动接受线程等待服务端传回数据,注意:Connection继承了Tread
start();
return
;
}
}
catch
(IOException e) {
markClosed(e);
close();
}
}
|
再有一步咱们就知道客户端的链接是怎么创建的啦,下面贴出Client.Connection类中的setupConnection()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
private
synchronized
void
setupConnection()
throws
IOException {
short
ioFailures =
0
;
short
timeoutFailures =
0
;
while
(
true
) {
try
{
this
.socket = socketFactory.createSocket();
//终于看到建立socket的方法了
this
.socket.setTcpNoDelay(tcpNoDelay);
•••
// 设置链接超时为20s
NetUtils.connect(
this
.socket, remoteId.getAddress(),
20000
);
this
.socket.setSoTimeout(pingInterval);
return
;
}
catch
(SocketTimeoutException toe) {
/* 设置最多链接重试为45次。
* 总共有20s*45 = 15 分钟的重试时间。
*/
handleConnectionFailure(timeoutFailures++,
45
, toe);
}
catch
(IOException ie) {
handleConnectionFailure(ioFailures++, maxRetries, ie);
}
}
}
|
终于,咱们知道了客户端的链接是怎样创建的了,其实就是建立一个普通的socket进行通讯。
下面贴出Client.Connection类的sendParam()方法吧:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public
void
sendParam(Call call) {
if
(shouldCloseConnection.get()) {
return
;
}
DataOutputBuffer d=
null
;
try
{
synchronized
(
this
.out) {
if
(LOG.isDebugEnabled())
LOG.debug(getName() +
" sending #"
+ call.id);
//建立一个缓冲区
d =
new
DataOutputBuffer();
d.writeInt(call.id);
call.param.write(d);
byte
[] data = d.getData();
int
dataLength = d.getLength();
out.writeInt(dataLength);
//首先写出数据的长度
out.write(data,
0
, dataLength);
//向服务端写数据
out.flush();
}
}
catch
(IOException e) {
markClosed(e);
}
finally
{
IOUtils.closeStream(d);
}
}
|
下面贴出Client.Connection类和Client.Call类中的相关方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
方法一:
public
void
run() {
•••
while
(waitForWork()) {
receiveResponse();
//具体的处理方法
}
close();
•••
}
方法二:
private
void
receiveResponse() {
if
(shouldCloseConnection.get()) {
return
;
}
touch();
try
{
int
id = in.readInt();
// 阻塞读取id
if
(LOG.isDebugEnabled())
LOG.debug(getName() +
" got value #"
+ id);
Call call = calls.get(id);
//在calls池中找到发送时的那个对象
int
state = in.readInt();
// 阻塞读取call对象的状态
if
(state == Status.SUCCESS.state) {
Writable value = ReflectionUtils.newInstance(valueClass, conf);
value.readFields(in);
// 读取数据
//将读取到的值赋给call对象,同时唤醒Client等待线程,贴出setValue()代码方法三
call.setValue(value);
calls.remove(id);
//删除已处理的call
}
else
if
(state == Status.ERROR.state) {
•••
}
else
if
(state == Status.FATAL.state) {
•••
}
}
catch
(IOException e) {
markClosed(e);
}
}
方法三:
public
synchronized
void
setValue(Writable value) {
this
.value = value;
callComplete();
//具体实现
}
protected
synchronized
void
callComplete() {
this
.done =
true
;
notify();
// 唤醒client等待线程
}
|
完成的功能主要是:启动一个处理线程,读取从服务端传来的call对象,将call对象读取完毕后,唤醒client处理线程。就这么简单,客户端就获取了服务端返回的数据了哦~。客户端的源码分析就到这里了哦,下面咱们来分析Server端的源码吧。
为了让你们对ipc.Server有个初步的了解,咱们先分析一下它的几个内部类吧:
Call :用于存储客户端发来的请求
Listener : 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
Responder :响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
Connection :链接类,真正的客户端请求读取逻辑在这个类中。
Handler :请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操做。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
private
void
initialize(Configuration conf)
throws
IOException {
•••
// 建立 rpc server
InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
if
(dnSocketAddr !=
null
) {
int
serviceHandlerCount =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
//得到serviceRpcServer
this
.serviceRpcServer = RPC.getServer(
this
, dnSocketAddr.getHostName(),
dnSocketAddr.getPort(), serviceHandlerCount,
false
, conf, namesystem.getDelegationTokenSecretManager());
this
.serviceRPCAddress =
this
.serviceRpcServer.getListenerAddress();
setRpcServiceServerAddress(conf);
}
//得到server
this
.server = RPC.getServer(
this
, socAddr.getHostName(),
socAddr.getPort(), handlerCount,
false
, conf, namesystem
.getDelegationTokenSecretManager());
•••
this
.server.start();
//启动 RPC server Clients只容许链接该server
if
(serviceRpcServer !=
null
) {
serviceRpcServer.start();
//启动 RPC serviceRpcServer 为HDFS服务的server
}
startTrashEmptier(conf);
}
|
查看Namenode初始化源码得知:RPC的server对象是经过ipc.RPC类的getServer()方法得到的。下面我们去看看ipc.RPC类中的getServer()源码吧:
1
2
3
4
5
6
7
|
public
static
Server getServer(
final
Object instance,
final
String bindAddress,
final
int
port,
final
int
numHandlers,
final
boolean
verbose, Configuration conf,
SecretManager<?
extends
TokenIdentifier> secretManager)
throws
IOException {
return
new
Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager);
}
|
这时咱们发现getServer()是一个建立Server对象的工厂方法,但建立的倒是RPC.Server类的对象。哈哈,如今你明白了我前面说的“RPC.Server是ipc.Server的实现类”了吧。不过RPC.Server的构造函数仍是调用了ipc.Server类的构造函数的,因篇幅所限,就不贴出相关源码了。
初始化Server后,Server端就运行起来了,看看ipc.Server的start()源码吧:
1
2
3
4
5
6
7
8
9
10
11
|
/** 启动服务 */
public
synchronized
void
start() {
responder.start();
//启动responder
listener.start();
//启动listener
handlers =
new
Handler[handlerCount];
for
(
int
i =
0
; i < handlerCount; i++) {
handlers[i] =
new
Handler(i);
handlers[i].start();
//逐个启动Handler
}
}
|
分析过ipc.Client源码后,咱们知道Client端的底层通讯直接采用了阻塞式IO编程,当时咱们曾作出猜想:Server端是否是也采用了阻塞式IO。如今咱们仔细地分析一下吧,若是Server端也采用阻塞式IO,当链接进来的Client端不少时,势必会影响Server端的性能。hadoop的实现者们考虑到了这点,因此他们采用了java NIO来实现Server端,那Server端采用java NIO是怎么创建链接的呢?分析源码得知,Server端采用Listener监听客户端的链接,下面先分析一下Listener的构造函数吧:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public
Listener()
throws
IOException {
address =
new
InetSocketAddress(bindAddress, port);
// 建立ServerSocketChannel,并设置成非阻塞式
acceptChannel = ServerSocketChannel.open();
acceptChannel.configureBlocking(
false
);
// 将server socket绑定到本地端口
bind(acceptChannel.socket(), address, backlogLength);
port = acceptChannel.socket().getLocalPort();
// 得到一个selector
selector= Selector.open();
readers =
new
Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads);
//启动多个reader线程,为了防止请求多时服务端响应延时的问题
for
(
int
i =
0
; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader =
new
Reader(readSelector);
readers[i] = reader;
readPool.execute(reader);
}
// 注册链接事件
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this
.setName(
"IPC Server listener on "
+ port);
this
.setDaemon(
true
);
}
|
在启动Listener线程时,服务端会一直等待客户端的链接,下面贴出Server.Listener类的run()方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public
void
run() {
•••
while
(running) {
SelectionKey key =
null
;
try
{
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while
(iter.hasNext()) {
key = iter.next();
iter.remove();
try
{
if
(key.isValid()) {
if
(key.isAcceptable())
doAccept(key);
//具体的链接方法
}
}
catch
(IOException e) {
}
key =
null
;
}
}
catch
(OutOfMemoryError e) {
•••
}
|
下面贴出Server.Listener类中doAccept()方法中的关键源码吧:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
void
doAccept(SelectionKey key)
throws
IOException, OutOfMemoryError {
Connection c =
null
;
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while
((channel = server.accept()) !=
null
) {
//创建链接
channel.configureBlocking(
false
);
channel.socket().setTcpNoDelay(tcpNoDelay);
Reader reader = getReader();
//从readers池中得到一个reader
try
{
reader.startAdd();
// 激活readSelector,设置adding为true
SelectionKey readKey = reader.registerChannel(channel);
//将读事件设置成兴趣事件
c =
new
Connection(readKey, channel, System.currentTimeMillis());
//建立一个链接对象
readKey.attach(c);
//将connection对象注入readKey
synchronized
(connectionList) {
connectionList.add(numConnections, c);
numConnections++;
}
•••
}
finally
{
//设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每一个reader都使
//用了wait()方法等待。因篇幅有限,就不贴出源码了。
reader.finishAdd();
}
}
}
|
当reader被唤醒,reader接着执行doRead()方法。
下面贴出Server.Listener.Reader类中的doRead()方法和Server.Connection类中的readAndProcess()方法源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
方法一:
void
doRead(SelectionKey key)
throws
InterruptedException {
int
count =
0
;
Connection c = (Connection)key.attachment();
//得到connection对象
if
(c ==
null
) {
return
;
}
c.setLastContact(System.currentTimeMillis());
try
{
count = c.readAndProcess();
// 接受并处理请求
}
catch
(InterruptedException ieo) {
•••
}
•••
}
方法二:
public
int
readAndProcess()
throws
IOException, InterruptedException {
while
(
true
) {
•••
if
(!rpcHeaderRead) {
if
(rpcHeaderBuffer ==
null
) {
rpcHeaderBuffer = ByteBuffer.allocate(
2
);
}
//读取请求头
count = channelRead(channel, rpcHeaderBuffer);
if
(count <
0
|| rpcHeaderBuffer.remaining() >
0
) {
return
count;
}
// 读取请求版本号
int
version = rpcHeaderBuffer.get(
0
);
byte
[] method =
new
byte
[] {rpcHeaderBuffer.get(
1
)};
•••
data = ByteBuffer.allocate(dataLength);
}
// 读取请求
count = channelRead(channel, data);
if
(data.remaining() ==
0
) {
•••
if
(useSasl) {
•••
}
else
{
processOneRpc(data.array());
//处理请求
}
•••
}
}
return
count;
}
}
|
下面贴出Server.Connection类中的processOneRpc()方法和processData()方法的源码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
方法一:
private
void
processOneRpc(
byte
[] buf)
throws
IOException,
InterruptedException {
if
(headerRead) {
processData(buf);
}
else
{
processHeader(buf);
headerRead =
true
;
if
(!authorizeConnection()) {
throw
new
AccessControlException(
"Connection from "
+
this
+
" for protocol "
+ header.getProtocol()
+
" is unauthorized for user "
+ user);
}
}
}
方法二:
private
void
processData(
byte
[] buf)
throws
IOException, InterruptedException {
DataInputStream dis =
new
DataInputStream(
new
ByteArrayInputStream(buf));
int
id = dis.readInt();
// 尝试读取id
Writable param = ReflectionUtils.newInstance(paramClass, conf);
//读取参数
param.readFields(dis);
Call call =
new
Call(id, param,
this
);
//封装成call
callQueue.put(call);
// 将call存入callQueue
incRpcCount();
// 增长rpc请求的计数
}
|
RPC:
Web service
web service接口就是RPC中的stub组件,规定了server可以提供的服务(web service),这在server和client上是一致的,可是也是跨语言跨平台的。同时,因为web service规范中的WSDL文件的存在,如今各平台的web service框架,均可以基于WSDL文件,自动生成web service接口 。
其实二者差很少,只是传输的协议不一样。
1. http://www.cnblogs.com/LBSer/p/4853234.html
2. http://weixiaolu.iteye.com/blog/1504898
3. http://kyfxbl.iteye.com/blog/1745550
在应用的迭代演进过程当中,随着系统访问量提升,业务复杂度提升,代码复杂度提升,应用逐渐从单体式架构向面向服务的分布式架构转变。RPC(Remote Procedure Call Protocol远程过程调用)是分布式架构的核心,按响应方式分以下两种:
同步调用:客户端调用服务方方法,等待直到服务方返回结果或者超时,再继续本身的操做
异步调用:客户端把消息发送给中间件,再也不等待服务端返回,直接继续本身的操做。
同步调用的实现方式有WebService和RMI。Web Service提供的服务是基于web容器的,底层使用http协议,于是适合不一样语言异构系统间的调用。RMI其实是Java语言的RPC实现,容许方法返回 Java 对象以及基本数据类型,适合用于JAVA语言构建的不一样系统间的调用。
异步调用的JAVA实现版就是JMS(Java Message Service),目前开源的的JMS中间件有Apache社区的ActiveMQ和Kafka,另外有阿里的RocketMQ,昨天(2016年11月28日)看到的新闻阿里已经将此组件捐献给Apache社区基金组织。
下面重点对RPC同步调用的原理进行探讨。简单来讲一个RPC架构里包含以下4个组件:
一、 客户端(Client):服务调用方
二、 客户端存根(Client Stub):存放服务端地址信息,将客户端的请求参数打包成网络消息,再经过网络发送给服务方
三、 服务端存根(Server Stub):接受客户端发送过来的消息并解包,再调用本地服务
四、 服务端(Server):真正的服务提供者。
这4个组件调用时序图以下:
一、 服务调用方(client)调用以本地调用方式调用服务;
二、 client stub接收到调用后负责将方法、参数等组装成可以进行网络传输的消息体;在Java里就是序列化的过程
三、 client stub找到服务地址,并将消息经过网络发送到服务端;
四、 server stub收到消息后进行解码,在Java里就是反序列化的过程;
五、 server stub根据解码结果调用本地的服务;
六、 本地服务执行处理逻辑;
七、 本地服务将结果返回给server stub;
八、 server stub将返回结果打包成消息,Java里的序列化;
九、 server stub将打包后的消息经过网络并发送至消费方
十、 client stub接收到消息,并进行解码, Java里的反序列化;
十一、 服务调用方(client)获得最终结果。
RPC框架的目标就是把2-10步封装起来,把调用、编码/解码的过程封装起来,让用户像调用本地服务同样的调用远程服务。要作到对客户端(调用方)透明化服务, RPC框架须要考虑解决以下问题:
一、 服务端提供的服务如何发布,客户端如何发现服务;
二、 如何对请求对象和返回结果进行序列化和反序列化;
三、 如何更高效进行网络通讯。
以上问题在一些开源的RPC框架里都有比较好的解决,如阿里的Dubbo,Facebook的Thrift。有兴趣的同窗能够对这两个框架进行深刻学习研究。
RPC是每一个分布式应用的必用之术,本文只是进行了一个粗略的描述,但愿能对你们全部帮助,抛砖引玉,引发更多人对底层技术实现的兴趣。