本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归做者(秦凯新)全部,禁止转载,欢迎学习。java
TransportContext 内部握有建立TransPortClient和TransPortServer的方法实现,但却属于最底层的RPC通信设施。为何呢?算法
由于成员变量RPCHandler是抽象的,并无具体的消息处理,并且TransportContext功能也在于建立TransPortClient客户端和TransPortServer服务端。具体解释以下:apache
Contains the context to create a {@link TransportServer}, {@link TransportClientFactory}, and to
setup Netty Channel pipelines with a
{@link org.apache.spark.network.server.TransportChannelHandler}.
复制代码
因此TransportContext只能为最底层的通信基础。上层为NettyRPCEnv高层封装,并持有TransportContext引用,在TransportContext中传入NettyRpcHandler实体,来实现netty通信回调Handler处理。TransportContext代码片断以下:bootstrap
/* The TransportServer and TransportClientFactory both create a TransportChannelHandler for each
* channel. As each TransportChannelHandler contains a TransportClient, this enables server
* processes to send messages back to the client on an existing channel.
*/
public class TransportContext {
private final Logger logger = LoggerFactory.getLogger(TransportContext.class);
private final TransportConf conf;
private final RpcHandler rpcHandler;
private final boolean closeIdleConnections;
private final MessageEncoder encoder;
private final MessageDecoder decoder;
public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
this(conf, rpcHandler, false);
}
复制代码
TransportClient 和TransportServer 在配置Netty的pipeLine的handler处理器时,均采用TransportChannelHandler, 来作统一的消息receive处理。为何呢?在于统一消息处理入口,TransportChannelHandlerer根据消息类型执行不一样的处理,代码片断以下:缓存
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
if (request instanceof RequestMessage) {
requestHandler.handle((RequestMessage) request);
} else if (request instanceof ResponseMessage) {
responseHandler.handle((ResponseMessage) request);
} else {
ctx.fireChannelRead(request);
}
复制代码
}架构
TransportContext初始化Pipeline的代码片断:app
public TransportChannelHandler initializePipeline(
SocketChannel channel,
RpcHandler channelRpcHandler) {
try {
TransportChannelHandler channelHandler = createChannelHandler(channel,
channelRpcHandler);
channel.pipeline()
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast("decoder", DECODER)
.addLast("idleStateHandler", new IdleStateHandler(0, 0,
conf.connectionTimeoutMs() / 1000))
.addLast("handler", channelHandler);
return channelHandler;
} catch (RuntimeException e) {
logger.error("Error while initializing Netty pipeline", e);
throw e;
}
复制代码
客户端和服务端统一的消息接收处理器 TransportChannelHandlerer 是这个函数:createChannelHandler(channel, channelRpcHandler)实现的,也即统一了这个netty的消息接受处理,代码片断以下:框架
/**
* Creates the server- and client-side handler which is used to handle both RequestMessages and
* ResponseMessages. The channel is expected to have been successfully created, though certain
* properties (such as the remoteAddress()) may not be available yet.
*/
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
TransportResponseHandler responseHandler = new
TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler, conf.maxChunksBeingTransferred());
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), closeIdleConnections);
}
复制代码
不过transportClient对应的是TransportResponseHander,TransportServer对应的的是TransportRequestHander。 在进行消息处理时,首先会通过TransportChannelHandler根据消息类型进行处理器选择,分别进行netty的消息生命周期管理:ide
客户端一旦发送消息(均为Request消息),就会在函数
private final Map<Long, RpcResponseCallback> outstandingRpcs;
private final Map<StreamChunkId, ChunkReceivedCallback> outstandingFetches
中缓存,用于回调处理。
服务端接收消息类型(均为Request消息)
服务端响应类型(均为Response消息):
上层建筑NettyRPCEnv,持有TransportContext引用,在TransportContext中传入NettyRpcHandler实体,来实现netty通信回调Handler处理
Spark在Endpoint的设计上核心设计即为Inbox与Outbox,其中Inbox核心要点为:
注意: 一个端点对应一个Dispacher,一个Inbox , 多个OutBox,能够看到 inbox在Dispacher 中且在EndPointData内部:
private final RpcHandler rpcHandler;
/**
* A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
*/
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
}
private val endpoints = new ConcurrentHashMap[String, EndpointData]
private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]
// Track the receivers whose inboxes may contain messages.
private val receivers = new LinkedBlockingQueue[EndpointData]
复制代码
注意: 一个端点对应一个Dispacher,一个Inbox , 多个OutBox,能够看到 OutBox在NettyRpcEnv内部:
private[netty] class NettyRpcEnv(
val conf: SparkConf,
javaSerializerInstance: JavaSerializerInstance,
host: String,
securityManager: SecurityManager) extends RpcEnv(conf) with Logging {
private val dispatcher: Dispatcher = new Dispatcher(this)
private val streamManager = new NettyStreamManager(this)
private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))
/**
* A map for [[RpcAddress]] and [[Outbox]]. When we are connecting to a remote [[RpcAddress]],
* we just put messages to its [[Outbox]] to implement a non-blocking `send` method.
*/
private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
复制代码
Dispatcher的代码片断中,包含了核心的消息发送代码逻辑,意思是:向服务端发送一条消息,也即同时放进Dispatcher中的receiverrs中,也放进inbox的messages中。这个高层封装,如Master和Worker端点发送消息都是经过NettyRpcEnv中的 Dispatcher来实现的。在Dispatcher中有一个线程,叫作MessageLoop,实现消息的及时处理。
/**
* Posts a message to a specific endpoint.
*
* @param endpointName name of the endpoint.
* @param message the message to post
* @param callbackIfStopped callback function if the endpoint is stopped.
*/
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val error = synchronized {
val data = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (data == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
data.inbox.post(message)
receivers.offer(data)
None
}
}
复制代码
注意:默认第一条消息为onstart,为何呢?看这里:
看到下面的 new EndpointData(name, endpoint, endpointRef) 了吗?
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
val data = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
receivers.offer(data) // for the OnStart message
}
endpointRef
复制代码
}
注意EndpointData里面包含了inbox,所以Inbox初始化的时候,放进了onstart
private class EndpointData(
val name: String,
val endpoint: RpcEndpoint,
val ref: NettyRpcEndpointRef) {
val inbox = new Inbox(ref, endpoint)
复制代码
}
onstart在Inbox初始化时出现了,注意每个端点只有一个inbox,好比:master 节点。
- endpoint(Master) -> NettyRpcEnv-> Dispatcher -> postMessage -> MessageLoop(Dispatcher) -> inbox -> process -> endpoint.receiveAndReply
复制代码
解释以下:端点经过本身的RPCEnv环境,向本身的Inbox中发送消息,而后交由Dispatch来进行消息的处理,调用了端点本身的receiveAndReply方法
这里着重讲一下MessageLoop是何时启动的,参照Dispatcher的代码段以下,一旦初始化就会启动,由于是成员变量:
private val threadpool: ThreadPoolExecutor = {
val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
math.max(2, Runtime.getRuntime.availableProcessors()))
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
for (i <- 0 until numThreads) {
pool.execute(new MessageLoop)
}
pool
}
复制代码
接着讲nettyRpcEnv是什么时候初始化的,Dispatcher是什么时候初始化的?
master初始化RpcEnv环境时,调用NettyRpcEnvFactory().create(config)进行初始化nettyRpcEnv,而后其成员变量Dispatcher开始初始化,而后Dispatcher内部成员变量threadpool开始启动messageLoop,而后开始处理消息,可谓是一环套一环啊。以下是Master端点初始化RPCEnv。
其中nettyRpcEnv.startServer,代码段以下,而后调用底层 transportContext.createServer来建立Server,并初始化netty 的 pipeline:
server = transportContext.createServer(host, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
复制代码
最终端点开始不断向本身的Inboxz中发送消息便可,代码段以下:
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
error = synchronized {
val data = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (data == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
data.inbox.post(message)
receivers.offer(data)
None
}
}
复制代码
- endpointRef(Worker) ->TransportChannelHandler -> channelRead0 -> TransPortRequestHander -> handle -> processRpcRequest ->NettyRpcHandler(在NettyRpcEnv中) -> receive -> internalReceive -> dispatcher.postToAll(RemoteProcessConnected(remoteEnvAddress)) (响应)-> dispatcher.postRemoteMessage(messageToDispatch, callback) (发送远端来的消息放进inbox)-> postMessage -> inbox -> process
复制代码
以下图展现了整个消息接收到inbox的流程:
下图展现了 TransportChannelHandler接收消息:
@Override
public void channelRead0(ChannelHandlerContext ctx, Message request) throws Exception {
if (request instanceof RequestMessage) {
requestHandler.handle((RequestMessage) request);
} else {
responseHandler.handle((ResponseMessage) request);
}
}
复制代码
而后TransPortRequestHander来进行消息匹配处理:
最终交给inbox的process方法,实际上由端点 endpoint.receiveAndReply(context)方法处理:
/**
* Process stored messages.
*/
def process(dispatcher: Dispatcher): Unit = {
var message: InboxMessage = null
inbox.synchronized {
if (!enableConcurrent && numActiveThreads != 0) {
return
}
message = messages.poll()
if (message != null) {
numActiveThreads += 1
} else {
return
}
}
while (true) {
safelyCall(endpoint) {
message match {
case RpcMessage(_sender, content, context) =>
try {
endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
} catch {
case NonFatal(e) =>
context.sendFailure(e)
// Throw the exception -- this exception will be caught by the safelyCall function.
// The endpoint's onError function will be called.
throw e
}
case OneWayMessage(_sender, content) =>
endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
case OnStart =>
endpoint.onStart()
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
enableConcurrent = true
}
}
}
case OnStop =>
val activeThreads = inbox.synchronized { inbox.numActiveThreads }
assert(activeThreads == 1,
s"There should be only a single active thread but found $activeThreads threads.")
dispatcher.removeRpcEndpointRef(endpoint)
endpoint.onStop()
assert(isEmpty, "OnStop should be the last message")
case RemoteProcessConnected(remoteAddress) =>
endpoint.onConnected(remoteAddress)
case RemoteProcessDisconnected(remoteAddress) =>
endpoint.onDisconnected(remoteAddress)
case RemoteProcessConnectionError(cause, remoteAddress) =>
endpoint.onNetworkError(cause, remoteAddress)
}
}
inbox.synchronized {
// "enableConcurrent" will be set to false after `onStop` is called, so we should check it
// every time.
if (!enableConcurrent && numActiveThreads != 1) {
// If we are not the only one worker, exit
numActiveThreads -= 1
return
}
message = messages.poll()
if (message == null) {
numActiveThreads -= 1
return
}
}
}
复制代码
}
本文花了将近两天时间进行剖析Spark的 Rpc 工做原理,真是不容易,关键是你看懂了吗?欢迎评论
秦凯新