dubbo相关说明(官方):bootstrap
在RPC中,Protocol是核心层,也就是只要有Protocol + Invoker + Exporter就能够完成非透明的RPC调用,而后在Invoker的主过程上Filter拦截点。api
图中的Consumer和Provider是抽象概念,只是想让看图者更直观的了解哪些类分属于客户端与服务器端,不用Client和Server的缘由是Dubbo在不少场景下都使用Provider, Consumer, Registry, Monitor划分逻辑拓普节点,保持统一律念。服务器
而Cluster是外围概念,因此Cluster的目的是将多个Invoker假装成一个Invoker,这样其它人只要关注Protocol层Invoker便可,加上Cluster或者去掉Cluster对其它层都不会形成影响,由于只有一个提供者时,是不须要Cluster的。app
Proxy层封装了全部接口的透明化代理,而在其它层都以Invoker为中心,只有到了暴露给用户使用时,才用Proxy将Invoker转成接口,或将接口实现转成Invoker,也就是去掉Proxy层RPC是能够Run的,只是不那么透明,不那么看起来像调本地服务同样调远程服务。socket
而Remoting实现是Dubbo协议的实现,若是你选择RMI协议,整个Remoting都不会用上,Remoting内部再划为Transport传输层和Exchange信息交换层,Transport层只负责单向消息传输,是对Mina,Netty,Grizzly的抽象,它也能够扩展UDP传输,而Exchange层是在传输层之上封装了Request-Response语义。tcp
dubbo远程调用详细过程,官方中有个大概的流程,根据本身的理解跟踪画出下面的调用链(默认使用dubbo协议):ide
调用链经过一系列的Invoker和filter,最终经过netty实现远程通讯。ui
其中:this
经过LazyConnectExchangeClient.request()中调用initClient()对NettyClient进行初始化;编码
private void initClient() throws RemotingException {
if (client != null )
return;
if (logger.isInfoEnabled()) {
logger.info("Lazy connect to " + url);
}
connectLock.lock();
try {
if (client != null)
return;
//建立链接
this.client = Exchangers.connect(url, requestHandler);
} finally {
connectLock.unlock();
}
}
经过NettyClient建立链接:
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
//添加编解码器,对消息进行编码、解码
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
//添加消息接收、发送处理
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
adapter.getEncoder():InternalEncoder:
private class InternalEncoder extends OneToOneEncoder { @Override protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception { com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024); NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler); try { //编码处理 codec.encode(channel, buffer, msg); } finally { NettyChannel.removeChannelIfDisconnected(ch); } return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer()); } }
ExchangeCodec:
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
//对请求参数编码
if (msg instanceof Request) {
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
encodeRequest():
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); // header. 协议头 byte[] header = new byte[HEADER_LENGTH]; // set magic number. 添加2字节的魔数 Bytes.short2bytes(MAGIC, header); //添加序1个字节的列化标识 // set request and serialization flag. header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId()); if (req.isTwoWay()) header[2] |= FLAG_TWOWAY; if (req.isEvent()) header[2] |= FLAG_EVENT; // set request id. 添加8字节的请求惟一id Bytes.long2bytes(req.getId(), header, 4); // encode request data. 对请求数据进行encode int savedWriteIndex = buffer.writerIndex(); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (req.isEvent()) { encodeEventData(channel, out, req.getData()); } else { //经过子类DubboCodec编码请求 encodeRequestData(channel, out, req.getData()); } out.flushBuffer(); bos.flush(); bos.close(); int len = bos.writtenBytes(); checkPayload(channel, len); Bytes.int2bytes(len, header, 12); // write 写入buffer中(请求起始地址、结束地址以及header) buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); // write header. buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); }
DubboCodec.encodeRequestData()发送消息:
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException { RpcInvocation inv = (RpcInvocation) data; //请求消息中包含dubbo版本号、接口、参数等信息 out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION)); out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); out.writeUTF(inv.getMethodName()); out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); Object[] args = inv.getArguments(); if (args != null) for (int i = 0; i < args.length; i++){ out.writeObject(encodeInvocationArgument(channel, inv, i)); } out.writeObject(inv.getAttachments()); }
消息数据包含dubbo版本号、接口名称、、方法名称、参数类等信息,将它们序列化后写入到类型到buffer中。
经过接收到的请求数据,进行decode,解码完成后经过调用连进入NettyHandler.messageReceived()进行处理接收到的消息=>HeaderExchangeHandler.received()
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
//请求消息
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
//处理请求
Response response = handleRequest(exchangeChannel, request);
channel.send(response);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
//......
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
handleRequest()方法:
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
//处理失败的消息
if (req.isBroken()) {
//.........
return res;
}
// find handler by message class.
Object msg = req.getData();
try {
// handle data. 服务端处理接收到的消息,handler为DubboProtocol的内部实现
Object result = handler.reply(channel, msg);
res.setStatus(Response.OK);
res.setResult(result);
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
}
return res;
}
DubboProtocol.requestHandler:
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
//获取invoker
Invoker<?> invoker = getInvoker(channel, inv);
//若是是callback 须要处理高版本调用低版本的问题
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods){
if (inv.getMethodName().equals(method)){
hasMethod = true;
break;
}
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
//经过invoke调用目标方法
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
经过invoke调用目标方法,并最终执行接口相关逻辑;并把结果封装为response返回给客户端(详见HeaderExchangeHandler.received()中channel.send(response)方法)。
同理:返回给客户端的reponse对象也会通过编码encode:包括魔数、序列化协议、响应码、requestId等。
服务端给客户端返回数据以后,客户端会收到IO事件,NettyClient对响应数据进行解码(即解析requestId、响应码、序列化协议、响应数据等);解码完成后经过client中绑定的NettyHandler调用其received()方法处理(相似服务端解析逻辑)。
NettyHandler:
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
//........处理请求request——服务端
} else if (message instanceof Response) {
//响应response——服务消费端
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
handleResponse(channel, (Response) message):
static void handleResponse(Channel channel, Response response) throws RemotingException { //响应内容不为空而且不是心跳检测的请求响应 if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }
DefaultFuture:
public static void received(Channel channel, Response response) {
try {
//移除当前future
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
//唤醒等待响应结果的线程
future.doReceived(response);
} else {
//.....
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
//唤醒等待的线程
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
调用netty发送数据后,该请求线程一直DefaultFuture.await()等待响应。
注:经过绑定的NettyServer:NettyHandler.messageReceived()——>HeaderExchangeHandler唤醒DefaultFuture后续处理