在Spark中不少地方都涉及网络通讯,好比Spark各个组件间的消息互通、用户文件与Jar包的上传、节点间的Shuffle过程、Block数据的复制与备份等。在Spark 0.x.x与Spark 1.x.x版本中,组件间的消息通讯主要借助于Akka[1],使用Akka能够轻松的构建强有力的高并发与分布式应用。可是Akka在Spark 2.0.0版本中被移除了,Spark官网文档对此的描述为:“Akka的依赖被移除了,所以用户可使用任何版本的Akka来编程了。”Spark团队的决策者或许认为对于Akka具体版本的依赖,限制了用户对于Akka不一样版本的使用。尽管如此,笔者依然认为Akka是一款很是优秀的开源分布式系统,我参与的一些Java Application或者Java Web就利用Akka的丰富特性实现了分布式一致性、最终一致性以及分布式事务等分布式环境面对的问题。在Spark 1.x.x版本中,用户文件与Jar包的上传采用了由Jetty[2]实现的HttpFileServer,但在Spark 2.0.0版本中也被废弃了,如今使用的是基于Spark内置RPC框架的NettyStreamManager。节点间的Shuffle过程和Block数据的复制与备份这两个部分在Spark 2.0.0版本中依然沿用了Netty[3],经过对接口和程序进行从新设计将各个组件间的消息互通、用户文件与Jar包的上传等内容统一归入到Spark的RPC框架体系中。html
咱们先来看看RPC框架的基本架构,如图1所示。java
图1 Spark内置RPC框架的基本架构数据库
TransportContext内部包含传输上下文的配置信息TransportConf和对客户端请求消息进行处理的RpcHandler。TransportConf在建立TransportClientFactory和TransportServer时都是必须的,而RpcHandler只用于建立TransportServer。TransportClientFactory是RPC客户端的工厂类。TransportServer是RPC服务端的实现。图中记号的含义以下:编程
记号①:表示经过调用TransportContext的createClientFactory方法建立传输客户端工厂TransportClientFactory的实例。在构造TransportClientFactory的实例时,还会传递客户端引导程序TransportClientBootstrap的列表。此外,TransportClientFactory内部还存在针对每一个Socket地址的链接池ClientPool,这个链接池缓存的定义以下:bootstrap
private final ConcurrentHashMap<SocketAddress, ClientPool> connectionPool;
ClientPool的类型定义以下:数组
private static class ClientPool { TransportClient[] clients; Object[] locks; ClientPool(int size) { clients = new TransportClient[size]; locks = new Object[size]; for (int i = 0; i < size; i++) { locks[i] = new Object(); } } }
因而可知,ClientPool实际是由TransportClient的数组构成,而locks数组中的Object与clients数组中的TransportClient按照数组索引一一对应,经过对每一个TransportClient分别采用不一样的锁,下降并发状况下线程间对锁的争用,进而减小阻塞,提升并发度。缓存
记号②:表示经过调用TransportContext的createServer方法建立传输服务端TransportServer的实例。在构造TransportServer的实例时,须要传递TransportContext、host、port、RpcHandler以及服务端引导程序TransportServerBootstrap的列表。安全
有了对Spark内置RPC框架的基本架构的了解,如今正式介绍Spark的RPC框架所包含的各个组件:服务器
拓展知识:为何须要MessageEncoder和MessageDecoder?由于在基于流的传输里(好比TCP/IP),接收到的数据首先会被存储到一个socket接收缓冲里。不幸的是,基于流的传输并非一个数据包队列,而是一个字节队列。即便你发送了2个独立的数据包,操做系统也不会做为2个消息处理而仅仅认为是一连串的字节。所以不能保证远程写入的数据会被准确地读取。举个例子,让咱们假设操做系统的TCP/TP协议栈已经接收了3个数据包:ABC、DEF、GHI。因为基于流传输的协议的这种统一的性质,在你的应用程序在读取数据的时候有很高的可能性被分红下面的片断:AB、CDEFG、H、I。所以,接收方无论是客户端仍是服务端,都应该把接收到的数据整理成一个或者多个更有意义而且让程序的逻辑更好理解的数据。网络
[1] Akka是基于Actor并发编程模型实现的并发的分布式的框架。Akka是用Scala语言编写的,它提供了Java和Scala两种语言的API,减小开发人员对并发的细节处理,并保证分布式调用的最终一致性。在附录B中有关于Akka的进一步介绍,感兴趣的读者不妨一读。
[2] Jetty 是一个开源的Servlet容器,它为基于Java的Web容器,例如JSP和Servlet提供运行环境。Jetty是使用Java语言编写的,它的API以一组JAR包的形式发布。开发人员能够将Jetty容器实例化成一个对象,能够迅速为一些独立运行的Java应用提供网络和Web链接。在附录C中有对Jetty的简单介绍,感兴趣的读者能够选择阅读。
[3] Netty是由Jboss提供的一个基于NIO的客户、服务器端编程框架,使用Netty 能够确保你快速、简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。附录G中有对Netty的简单介绍,感兴趣的读者能够一读。
上文提到TransportContext中的TransportConf给Spark的RPC框架提供配置信息,它有两个成员属性——配置提供者conf和配置的模块名称module。这两个属性的定义以下:
private final ConfigProvider conf; private final String module;
其中conf是真正的配置提供者,其类型ConfigProvider是一个抽象类,见代码清单1。
代码清单1 ConfigProvider的实现
public abstract class ConfigProvider { public abstract String get(String name); public String get(String name, String defaultValue) { try { return get(name); } catch (NoSuchElementException e) { return defaultValue; } } public int getInt(String name, int defaultValue) { return Integer.parseInt(get(name, Integer.toString(defaultValue))); } public long getLong(String name, long defaultValue) { return Long.parseLong(get(name, Long.toString(defaultValue))); } public double getDouble(String name, double defaultValue) { return Double.parseDouble(get(name, Double.toString(defaultValue))); } public boolean getBoolean(String name, boolean defaultValue) { return Boolean.parseBoolean(get(name, Boolean.toString(defaultValue))); } }
从代码清单1,能够看到ConfigProvider中包括get、getInt、getLong、getDouble、getBoolean等方法,这些方法都是基于抽象方法get获取值,通过一次类型转换而实现。这个抽象的get方法将须要子类去实现。
Spark一般使用SparkTransportConf建立TransportConf,其实现见代码清单2。
代码清单2 SparkTransportConf的实现
object SparkTransportConf { private val MAX_DEFAULT_NETTY_THREADS = 8 def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = { val conf = _conf.clone val numThreads = defaultNumThreads(numUsableCores) conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString) conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString) new TransportConf(module, new ConfigProvider { override def get(name: String): String = conf.get(name) }) } private def defaultNumThreads(numUsableCores: Int): Int = { val availableCores = if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors() math.min(availableCores, MAX_DEFAULT_NETTY_THREADS) } }
从代码清单2看到,可使用SparkTransportConf的fromSparkConf方法来构造TransportConf。传递的三个参数分别为SparkConf、模块名module及可用的内核数numUsableCores。若是numUsableCores小于等于0,那么线程数是系统可用处理器的数量,不过系统的内核数不可能所有用于网络传输使用,因此这里还将分配给网络传输的内核数量最多限制在8个。最终肯定的线程数将被用于设置客户端传输线程数(spark.$module.io.clientThreads属性)和服务端传输线程数(spark.$module.io.serverThreads属性)。fromSparkConf最终构造TransportConf对象时传递的ConfigProvider为实现了get方法的匿名的内部类,get的实现实际是代理了SparkConf的get方法。
TransportClientFactory是建立传输客户端(TransportClient)的工厂类。在说明图3-1中的记号①时提到过TransportContext的createClientFactory方法能够建立TransportClientFactory的实例,其实现见代码清单3。
代码清单3 建立客户端工厂
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) { return new TransportClientFactory(this, bootstraps); } public TransportClientFactory createClientFactory() { return createClientFactory(Lists.<TransportClientBootstrap>newArrayList()); }
能够看到TransportContext中有两个重载的createClientFactory方法,它们最终在构造TransportClientFactory时都会传递两个参数:TransportContext和TransportClientBootstrap列表。TransportClientFactory构造器的实现见代码清单4。
代码清单4 TransportClientFactory的构造器
public TransportClientFactory( TransportContext context, List<TransportClientBootstrap> clientBootstraps) { this.context = Preconditions.checkNotNull(context); this.conf = context.getConf(); this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps)); this.connectionPool = new ConcurrentHashMap<>(); this.numConnectionsPerPeer = conf.numConnectionsPerPeer(); this.rand = new Random(); IOMode ioMode = IOMode.valueOf(conf.ioMode()); this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode); this.workerGroup = NettyUtils.createEventLoop( ioMode, conf.clientThreads(), conf.getModuleName() + "-client"); this.pooledAllocator = NettyUtils.createPooledByteBufAllocator( conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()); }
TransportClientFactory构造器中的各个变量分别为:
图2 TransportClientFactory的connectionPool
TransportClientFactory里大量使用了NettyUtils,关于NettyUtils的具体实现,请看附录G。[1]
提示:NIO是指Java中New IO的简称,其特色包括:为全部的原始类型提供(Buffer)缓冲支持;字符集编码解码解决方案;提供一个新的原始I/O 抽象Channel,支持锁和内存映射文件的文件访问接口;提供多路非阻塞式(non-bloking)的高伸缩性网络I/O 。其具体使用属于Java语言的范畴,本文不过多介绍。
[1] Spark将对Netty框架的使用细节都封装在NettyUtils工具类中,因为Netty的API使用不属于本书主要阐述的内容,故此放入附录G中,对Netty的使用感兴趣的读者能够选择阅读。
TransportClientFactory的clientBootstraps属性是TransportClientBootstrap的列表。TransportClientBootstrap是在TransportClient上执行的客户端引导程序,主要对链接创建时进行一些初始化的准备(例如验证、加密)。TransportClientBootstrap所做的操做每每是昂贵的,好在创建的链接能够重用。TransportClientBootstrap的接口定义见代码清单5。
代码清单5 TransportClientBootstrap的定义
public interface TransportClientBootstrap { void doBootstrap(TransportClient client, Channel channel) throws RuntimeException; }
TransportClientBootstrap有两个实现类:EncryptionDisablerBootstrap和SaslClientBootstrap。为了对TransportClientBootstrap的做用能有更深的了解,这里以EncryptionDisablerBootstrap为例,EncryptionDisablerBootstrap的实现见代码清单6。
代码清单6 EncryptionDisablerBootstrap的实现
private static class EncryptionDisablerBootstrap implements TransportClientBootstrap { @Override public void doBootstrap(TransportClient client, Channel channel) { channel.pipeline().remove(SaslEncryption.ENCRYPTION_HANDLER_NAME); } }
根据代码清单6,能够看到EncryptionDisablerBootstrap的做用是移除客户端管道中的SASL加密。
有了TransportClientFactory,Spark的各个模块就可使用它建立RPC客户端TransportClient了。每一个TransportClient实例只能和一个远端的RPC服务通讯,因此Spark中的组件若是想要和多个RPC服务通讯,就须要持有多个TransportClient实例。建立TransportClient的方法见代码清单7(实际为从缓存中获取TransportClient)。
代码清单7 从缓存获取TransportClient
public TransportClient createClient(String remoteHost, int remotePort) throws IOException, InterruptedException { // 建立InetSocketAddress final InetSocketAddress unresolvedAddress = InetSocketAddress.createUnresolved(remoteHost, remotePort); ClientPool clientPool = connectionPool.get(unresolvedAddress); if (clientPool == null) { connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer)); clientPool = connectionPool.get(unresolvedAddress); } int clientIndex = rand.nextInt(numConnectionsPerPeer); // 随机选择一个TransportClient TransportClient cachedClient = clientPool.clients[clientIndex]; if (cachedClient != null && cachedClient.isActive()) {// 获取并返回激活的TransportClient TransportChannelHandler handler = cachedClient.getChannel().pipeline() .get(TransportChannelHandler.class); synchronized (handler) { handler.getResponseHandler().updateTimeOfLastRequest(); } if (cachedClient.isActive()) { logger.trace("Returning cached connection to {}: {}", cachedClient.getSocketAddress(), cachedClient); return cachedClient; } } final long preResolveHost = System.nanoTime(); final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort); final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000; if (hostResolveTimeMs > 2000) { logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs); } else { logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs); } // 建立并返回TransportClient对象 synchronized (clientPool.locks[clientIndex]) { cachedClient = clientPool.clients[clientIndex]; if (cachedClient != null) { if (cachedClient.isActive()) { logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient); return cachedClient; } else { logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress); } } clientPool.clients[clientIndex] = createClient(resolvedAddress); return clientPool.clients[clientIndex]; } }
从代码清单7得知,建立TransportClient的步骤以下:
代码清单7的整个执行过程实际解决了TransportClient缓存的使用以及createClient方法的线程安全问题,并无涉及建立TransportClient的实现。TransportClient的建立过程在重载的createClient方法(见代码清单8)中实现。
代码清单8 建立TransportClient
private TransportClient createClient(InetSocketAddress address) throws IOException, InterruptedException { logger.debug("Creating new connection to {}", address); // 构建根引导器Bootstrap并对其进行配置 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(socketChannelClass) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs()) .option(ChannelOption.ALLOCATOR, pooledAllocator); final AtomicReference<TransportClient> clientRef = new AtomicReference<>(); final AtomicReference<Channel> channelRef = new AtomicReference<>(); // 为根引导程序设置管道初始化回调函数 bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { TransportChannelHandler clientHandler = context.initializePipeline(ch); clientRef.set(clientHandler.getClient()); channelRef.set(ch); } }); long preConnect = System.nanoTime(); ChannelFuture cf = bootstrap.connect(address);// 使用根引导程序链接远程服务器 if (!cf.await(conf.connectionTimeoutMs())) { throw new IOException( String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs())); } else if (cf.cause() != null) { throw new IOException(String.format("Failed to connect to %s", address), cf.cause()); } TransportClient client = clientRef.get(); Channel channel = channelRef.get(); assert client != null : "Channel future completed successfully with null client"; // Execute any client bootstraps synchronously before marking the Client as successful. long preBootstrap = System.nanoTime(); logger.debug("Connection to {} successful, running bootstraps...", address); try { for (TransportClientBootstrap clientBootstrap : clientBootstraps) { clientBootstrap.doBootstrap(client, channel);// 给TransportClient设置客户端引导程序 } } catch (Exception e) { // catch non-RuntimeExceptions too as bootstrap may be written in Scala long bootstrapTimeMs = (System.nanoTime() - preBootstrap) / 1000000; logger.error("Exception while bootstrapping client after " + bootstrapTimeMs + " ms", e); client.close(); throw Throwables.propagate(e); } long postBootstrap = System.nanoTime(); logger.info("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)", address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000); return client; }
从代码清单8得知,真正建立TransportClient的步骤以下:
TransportServer是RPC框架的服务端,可提供高效的、低级别的流服务。在说明图1中的记号②时提到过TransportContext的createServer方法用于建立TransportServer,其实现见代码清单9。
代码清单9 建立RPC服务端
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) { return new TransportServer(this, null, port, rpcHandler, bootstraps); } public TransportServer createServer( String host, int port, List<TransportServerBootstrap> bootstraps) { return new TransportServer(this, host, port, rpcHandler, bootstraps); } public TransportServer createServer(List<TransportServerBootstrap> bootstraps) { return createServer(0, bootstraps); } public TransportServer createServer() { return createServer(0, Lists.<TransportServerBootstrap>newArrayList()); }
代码清单9中列出了四个名为createServer的重载方法,可是它们最终调用了TransportServer的构造器(见代码清单10)来建立TransportServer实例。
代码清单10 TransportServer的构造器
public TransportServer( TransportContext context, String hostToBind, int portToBind, RpcHandler appRpcHandler, List<TransportServerBootstrap> bootstraps) { this.context = context; this.conf = context.getConf(); this.appRpcHandler = appRpcHandler; this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps)); try { init(hostToBind, portToBind); } catch (RuntimeException e) { JavaUtils.closeQuietly(this); throw e; } }
TransportServer的构造器中的各个变量分别为:
TransportServer的构造器(见代码清单10)中调用了init方法, init方法用于对TransportServer进行初始化,见代码清单11。
代码清单11 TransportServer的初始化
private void init(String hostToBind, int portToBind) { // 根据Netty的API文档,Netty服务端需同时建立bossGroup和workerGroup IOMode ioMode = IOMode.valueOf(conf.ioMode()); EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server"); EventLoopGroup workerGroup = bossGroup; // 建立一个聚集ByteBuf但对本地线程缓存禁用的分配器 PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator( conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads()); // 建立Netty的服务端根引导程序并对其进行配置 bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NettyUtils.getServerChannelClass(ioMode)) .option(ChannelOption.ALLOCATOR, allocator) .childOption(ChannelOption.ALLOCATOR, allocator); if (conf.backLog() > 0) { bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog()); } if (conf.receiveBuf() > 0) { bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf()); } if (conf.sendBuf() > 0) { bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf()); } // 为根引导程序设置管道初始化回调函数 bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { RpcHandler rpcHandler = appRpcHandler; for (TransportServerBootstrap bootstrap : bootstraps) { rpcHandler = bootstrap.doBootstrap(ch, rpcHandler); } context.initializePipeline(ch, rpcHandler); } }); // 给根引导程序绑定Socket的监听端口 InetSocketAddress address = hostToBind == null ? new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind); channelFuture = bootstrap.bind(address); channelFuture.syncUninterruptibly(); port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); logger.debug("Shuffle server started on port: {}", port); }
代码清单11中TransportServer初始化的步骤以下:
小贴士:根据Netty的API文档,Netty服务端需同时建立bossGroup和workerGroup。
提示:代码清单11中使用了NettyUtils工具类的不少方法,在附录G中有对它们的详细介绍。EventLoopGroup、PooledByteBufAllocator、ServerBootstrap都是Netty提供的API,对于它们的更多介绍,请访问http://netty.io/。
在代码清单8建立TransportClient和代码清单11对TransportServer初始化的实现中都在管道初始化回调函数中调用了TransportContext的initializePipeline方法,initializePipeline方法(见代码清单12)将调用Netty的API对管道初始化。
代码清单12 管道初始化
public TransportChannelHandler initializePipeline( SocketChannel channel, RpcHandler channelRpcHandler) { try { TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler); channel.pipeline() .addLast("encoder", ENCODER) .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder()) .addLast("decoder", DECODER) .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) .addLast("handler", channelHandler); return channelHandler; } catch (RuntimeException e) { logger.error("Error while initializing Netty pipeline", e); throw e; } }
根据代码清单12,initializePipeline方法的执行步骤以下:
对管道进行设置,这里的ENCODER(即MessageEncoder)派生自Netty的ChannelOutboundHandler接口;DECODER(即MessageDecoder)、TransportChannelHandler以及TransportFrameDecoder(由工具类NettyUtils的静态方法createFrameDecoder建立)派生自Netty的ChannelInboundHandler接口;IdleStateHandler同时实现了ChannelOutboundHandler和ChannelInboundHandler接口。根据Netty的API行为,经过addLast方法注册多个handler时,ChannelInboundHandler按照注册的前后顺序执行;ChannelOutboundHandler按照注册的前后顺序逆序执行,所以在管道两端(不管是服务端仍是客户端)处理请求和响应的流程如图3所示。
代码清单13 建立TransportChannelHandler
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) { TransportResponseHandler responseHandler = new TransportResponseHandler(channel); TransportClient client = new TransportClient(channel, responseHandler); TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, rpcHandler); return new TransportChannelHandler(client, responseHandler, requestHandler, conf.connectionTimeoutMs(), closeIdleConnections); }
图3 管道处理请求和响应的流程图
TransportChannelHandler实现了Netty的ChannelInboundHandler[1],以便对Netty管道中的消息进行处理。图3中的这些Handler(除了MessageEncoder)因为都实现了ChannelInboundHandler接口,做为自定义的ChannelInboundHandler,于是都要重写channelRead方法。Netty框架使用工做链模式来对每一个ChannelInboundHandler的实现类的channelRead方法进行链式调用。TransportChannelHandler实现的channelRead方法见代码清单14。
代码清单14 TransportChannelHandler的channelRead实现
@Override public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception { if (request instanceof RequestMessage) { requestHandler.handle((RequestMessage) request); } else if (request instanceof ResponseMessage) { responseHandler.handle((ResponseMessage) request); } else { ctx.fireChannelRead(request); } }
从代码清单14看到,当TransportChannelHandler读取到的request是RequestMessage类型时,则将此消息的处理进一步交给TransportRequestHandler,当request是ResponseMessage时,则将此消息的处理进一步交给TransportResponseHandler。
TransportRequestHandler与TransportResponseHandler都继承自抽象类MessageHandler,MessageHandler定义了子类的规范,详细定义见代码清单15。
代码清单15 MessageHandler规范
public abstract class MessageHandler<T extends Message> { public abstract void handle(T message) throws Exception; public abstract void channelActive(); public abstract void exceptionCaught(Throwable cause); public abstract void channelInactive(); }
MessageHandler中定义的各个方法的做用分别为:
Spark中MessageHandler类的继承体系如图4所示。
图4 MessageHandler类的继承体系
根据代码清单15,咱们知道MessageHandler同时也是一个Java泛型类,其子类能处理的消息都派生自接口Message。Message的定义见代码清单16。
代码清单16 Message的定义
public interface Message extends Encodable { Type type(); ManagedBuffer body(); boolean isBodyInFrame();
Message中定义的三个接口方法的做用分别为:
Message接口继承了Encodable接口,Encodable的定义见代码清单17。
代码清单17 Encodable的定义
public interface Encodable { int encodedLength(); void encode(ByteBuf buf); }
实现Encodable接口的类将能够转换到一个ByteBuf中,多个对象将被存储到预先分配的单个ByteBuf,因此这里的encodedLength用于返回转换的对象数量。下面一块儿来看看Message的类继承体系,如图5所示。
图5 Message的类继承体系
从图5看到最终的消息实现类都直接或间接的实现了RequestMessage或ResponseMessage接口,其中RequestMessage的具体实现有四种,分别是:
因为OneWayMessage 不须要响应,因此ResponseMessage的对于成功或失败状态的实现各有三种,分别是:
回头再看看代码清单16中对body接口的定义,能够看到其返回内容体的类型为ManagedBuffer。ManagedBuffer提供了由字节构成数据的不可变视图(也就是说ManagedBuffer并不存储数据,也不是数据的实际来源,这同关系型数据库的视图相似)。咱们先来看看抽象类ManagedBuffer中对行为的定义,见代码清单18。
代码清单18 ManagedBuffer的定义
public abstract class ManagedBuffer { public abstract long size(); public abstract ByteBuffer nioByteBuffer() throws IOException; public abstract InputStream createInputStream() throws IOException; public abstract ManagedBuffer retain(); public abstract ManagedBuffer release(); public abstract Object convertToNetty() throws IOException; }
ManagedBuffer中定义了六个方法,分别为:
ManagedBuffer的具体实现有不少,咱们能够经过图6来了解。
图6 ManagedBuffer的继承体系
图6中列出了ManagedBuffer的五个实现类,其中TestManagedBuffer和RecordingManagedBuffer用于测试。NettyManagedBuffer中的缓冲为io.netty.buffer.ByteBuf,NioManagedBuffer中的缓冲为java.nio.ByteBuffer。NettyManagedBuffer和NioManagedBuffer的实现都很是简单,留给读者自行阅读。本节挑选FileSegmentManagedBuffer做为ManagedBuffer具体实现的例子进行介绍。
FileSegmentManagedBuffer的做用为获取一个文件中的一段,它一共有四个由final修饰的属性,所有都经过FileSegmentManagedBuffer的构造器传入属性值,这四个属性为:
下面将逐个介绍FileSegmentManagedBuffer对于ManagedBuffer的实现。
代码清单19 nioByteBuffer方法的实现
@Override public ByteBuffer nioByteBuffer() throws IOException { FileChannel channel = null; try { channel = new RandomAccessFile(file, "r").getChannel(); if (length < conf.memoryMapBytes()) { ByteBuffer buf = ByteBuffer.allocate((int) length); channel.position(offset); while (buf.remaining() != 0) { if (channel.read(buf) == -1) { throw new IOException(String.format("Reached EOF before filling buffer\n" + "offset=%s\nfile=%s\nbuf.remaining=%s", offset, file.getAbsoluteFile(), buf.remaining())); } } buf.flip(); return buf; } else { return channel.map(FileChannel.MapMode.READ_ONLY, offset, length); } } catch (IOException e) { try { if (channel != null) { long size = channel.size(); throw new IOException("Error in reading " + this + " (actual file length " + size + ")", e); } } catch (IOException ignored) { // ignore } throw new IOException("Error in opening " + this, e); } finally { JavaUtils.closeQuietly(channel); } }
nioByteBuffer的实现仍是很简单的,主要利用RandomAccessFile获取FileChannel,而后使用java.nio.ByteBuffer和FileChannel的API将数据写入缓冲区java.nio.ByteBuffer中。
代码清单20 createInputStream的实现
@Override public InputStream createInputStream() throws IOException { FileInputStream is = null; try { is = new FileInputStream(file); ByteStreams.skipFully(is, offset); return new LimitedInputStream(is, length); } catch (IOException e) { try { if (is != null) { long size = file.length(); throw new IOException("Error in reading " + this + " (actual file length " + size + ")", e); } } catch (IOException ignored) { // ignore } finally { JavaUtils.closeQuietly(is); } throw new IOException("Error in opening " + this, e); } catch (RuntimeException e) { JavaUtils.closeQuietly(is); throw e; } }
createInputStream的实现仍是很简单的,这里很少做介绍。
代码清单21 convertToNetty的实现
@Override public Object convertToNetty() throws IOException { if (conf.lazyFileDescriptor()) { return new DefaultFileRegion(file, offset, length); } else { FileChannel fileChannel = new FileInputStream(file).getChannel(); return new DefaultFileRegion(fileChannel, offset, length); } }
[1] ChannelInboundHandler接口的实现及原理不属于本书要分析的内容,感兴趣的同窗能够阅读Netty的官方文档或者研究Netty的源码。
因为TransportRequestHandler实际是把请求消息交给RpcHandler进一步处理的,因此这里对RpcHandler首先作个介绍。RpcHandler是一个抽象类,定义了一些RPC处理器的规范,其主要实现见代码清单22。
代码清单22 RpcHandler的实现
public abstract class RpcHandler { private static final RpcResponseCallback ONE_WAY_CALLBACK = new OneWayRpcCallback(); public abstract void receive( TransportClient client, ByteBuffer message, RpcResponseCallback callback); public abstract StreamManager getStreamManager(); public void receive(TransportClient client, ByteBuffer message) { receive(client, message, ONE_WAY_CALLBACK); } public void channelActive(TransportClient client) { } public void channelInactive(TransportClient client) { } public void exceptionCaught(Throwable cause, TransportClient client) { } private static class OneWayRpcCallback implements RpcResponseCallback { private static final Logger logger = LoggerFactory.getLogger(OneWayRpcCallback.class); @Override public void onSuccess(ByteBuffer response) { logger.warn("Response provided for one-way RPC."); } @Override public void onFailure(Throwable e) { logger.error("Error response provided for one-way RPC.", e); } } }
代码清单22中RpcHandler的各个方法的做用以下:
public interface RpcResponseCallback { void onSuccess(ByteBuffer response); void onFailure(Throwable e); }
介绍完RpcHandler,如今回到TransportRequestHandler的处理过程。TransportRequestHandler处理以上四种RequestMessage的实现见代码清单23。
代码清单23 TransportRequestHandler的handle方法
@Override public void handle(RequestMessage request) { if (request instanceof ChunkFetchRequest) { processFetchRequest((ChunkFetchRequest) request); } else if (request instanceof RpcRequest) { processRpcRequest((RpcRequest) request); } else if (request instanceof OneWayMessage) { processOneWayMessage((OneWayMessage) request); } else if (request instanceof StreamRequest) { processStreamRequest((StreamRequest) request); } else { throw new IllegalArgumentException("Unknown request type: " + request); } }
结合代码清单23,下面逐一详细分析这四种类型请求的处理过程。
processFetchRequest方法用于处理ChunkFetchRequest类型的消息,其实现见代码清单24。
代码清单24 processFetchRequest的实现
private void processFetchRequest(final ChunkFetchRequest req) { if (logger.isTraceEnabled()) { logger.trace("Received req from {} to fetch block {}", getRemoteAddress(channel), req.streamChunkId); } ManagedBuffer buf; try { streamManager.checkAuthorization(reverseClient, req.streamChunkId.streamId); streamManager.registerChannel(channel, req.streamChunkId.streamId); buf = streamManager.getChunk(req.streamChunkId.streamId, req.streamChunkId.chunkIndex); } catch (Exception e) { logger.error(String.format("Error opening block %s for request from %s", req.streamChunkId, getRemoteAddress(channel)), e); respond(new ChunkFetchFailure(req.streamChunkId, Throwables.getStackTraceAsString(e))); return; } respond(new ChunkFetchSuccess(req.streamChunkId, buf)); }
代码清单24中的streamManager是经过调用RpcHandler的getStreamManager方法获取的StreamManager。processFetchRequest的处理都依托于RpcHandler的StreamManager,其处理步骤以下:
有关StreamManager的具体实现,读者能够参考《Spark内核设计的艺术》一书5.3.5节介绍的NettyStreamManager和《Spark内核设计的艺术》一书6.9.2节介绍的NettyBlockRpcServer中的OneForOneStreamManager。
processRpcRequest方法用于处理RpcRequest类型的消息,其实现见代码清单25。
代码清单25 processRpcRequest的实现
private void processRpcRequest(final RpcRequest req) { try { rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { respond(new RpcResponse(req.requestId, new NioManagedBuffer(response))); } @Override public void onFailure(Throwable e) { respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); } }); } catch (Exception e) { logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e); respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e))); } finally { req.body().release(); } }
代码清单25中将RpcRequest消息的内容体、发送消息的客户端以及一个RpcResponseCallback类型的匿名内部类做为参数传递给了RpcHandler的receive方法。这就是说真正用于处理RpcRequest消息的是RpcHandler,而非TransportRequestHandler。因为RpcHandler是抽象类(见代码清单22),其receive方法也是抽象方法,因此具体的操做将由RpcHandler的实现了receive方法的子类来完成。全部继承RpcHandler的子类都须要在其receive方法的具体实现中回调RpcResponseCallback的onSuccess(处理成功时)或者onFailure(处理失败时)方法。从RpcResponseCallback的实现来看,不管处理结果成功仍是失败,都将调用respond方法对客户端进行响应。
processStreamRequest方法用于处理StreamRequest类型的消息,其实现见代码清单26。
代码清单26 processStreamRequest的实现
private void processStreamRequest(final StreamRequest req) { ManagedBuffer buf; try { buf = streamManager.openStream(req.streamId);// 将获取到的流数据封装为ManagedBuffer } catch (Exception e) { logger.error(String.format( "Error opening stream %s for request from %s", req.streamId, getRemoteAddress(channel)), e); respond(new StreamFailure(req.streamId, Throwables.getStackTraceAsString(e))); return; } if (buf != null) { respond(new StreamResponse(req.streamId, buf.size(), buf)); } else { respond(new StreamFailure(req.streamId, String.format( "Stream '%s' was not found.", req.streamId))); } }
代码清单26中也使用了RpcHandler的StreamManager,其处理步骤以下:
processOneWayMessage方法用于处理StreamRequest类型的消息,其实现见代码清单27。
代码清单27 processOneWayMessage的实现
private void processOneWayMessage(OneWayMessage req) { try { rpcHandler.receive(reverseClient, req.body().nioByteBuffer()); } catch (Exception e) { logger.error("Error while invoking RpcHandler#receive() for one-way message.", e); } finally { req.body().release(); } }
processOneWayMessage方法的实现processRpcRequest很是类似,区别在于processOneWayMessage调用了代码清单22中ONE_WAY_CALLBACK的receive方法,于是processOneWayMessage在处理完RPC请求后不会对客户端做出响应。
从以上四种处理的分析能够看出最终的处理都由RpcHandler及其内部组件完成。除了OneWayMessage的消息外,其他三种消息都是最终调用respond方法响应客户端,其实现见代码清单28。
代码清单28 respond的实现
private void respond(final Encodable result) { final SocketAddress remoteAddress = channel.remoteAddress(); channel.writeAndFlush(result).addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { logger.trace("Sent result {} to client {}", result, remoteAddress); } else { logger.error(String.format("Error sending result %s to %s; closing connection", result, remoteAddress), future.cause()); channel.close(); } } } ); }
能够看到respond方法中实际调用了Channel的writeAndFlush方法[1]来响应客户端。
TransportServer的构造器(见代码清单10)中的bootstraps是TransportServerBootstrap的列表。接口TransportServerBootstrap定义了服务端引导程序的规范,服务端引导程序旨在当客户端与服务端创建链接以后,在服务端持有的客户端管道上执行的引导程序。TransportServerBootstrap的定义见代码清单29。
代码清单29 TransportServerBootstrap的定义
public interface TransportServerBootstrap { RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler); }
TransportServerBootstrap的doBootstrap方法将对服务端的RpcHandler进行代理,接收客户端的请求。TransportServerBootstrap有SaslServerBootstrap和EncryptionCheckerBootstrap两个实现类。为了更清楚的说明TransportServerBootstrap的意义,咱们以SaslServerBootstrap为例,来说解其实现(见代码清单30)。
代码清单30 SaslServerBootstrap的doBootstrap实现
public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) { return new SaslRpcHandler(conf, channel, rpcHandler, secretKeyHolder); }
根据代码清单30,咱们知道SaslServerBootstrap的doBootstrap方法实际建立了SaslRpcHandler,SaslRpcHandler负责对管道进行SASL(Simple Authentication and Security Layer)加密。SaslRpcHandler自己也继承了RpcHandler,因此咱们重点来看其receive方法的实现,见代码清单31。
代码清单31 SaslRpcHandler的receive方法
@Override public void receive(TransportClient client, ByteBuffer message, RpcResponseCallback callback) { if (isComplete) { // 将消息传递给SaslRpcHandler所代理的下游RpcHandler并返回 delegate.receive(client, message, callback); return; } ByteBuf nettyBuf = Unpooled.wrappedBuffer(message); SaslMessage saslMessage; try { saslMessage = SaslMessage.decode(nettyBuf);// 对客户端发送的消息进行SASL解密 } finally { nettyBuf.release(); } if (saslServer == null) { // 若是saslServer还未建立,则须要建立SparkSaslServer client.setClientId(saslMessage.appId); saslServer = new SparkSaslServer(saslMessage.appId, secretKeyHolder, conf.saslServerAlwaysEncrypt()); } byte[] response; try { response = saslServer.response(JavaUtils.bufferToArray(// 使用saslServer处理已解密的消息 saslMessage.body().nioByteBuffer())); } catch (IOException ioe) { throw new RuntimeException(ioe); } callback.onSuccess(ByteBuffer.wrap(response)); if (saslServer.isComplete()) { logger.debug("SASL authentication successful for channel {}", client); isComplete = true;// SASL认证交换已经完成 if (SparkSaslServer.QOP_AUTH_CONF.equals(saslServer.getNegotiatedProperty(Sasl.QOP))) { logger.debug("Enabling encryption for channel {}", client); // 对管道进行SASL加密 SaslEncryption.addToChannel(channel, saslServer, conf.maxSaslEncryptedBlockSize()); saslServer = null; } else { saslServer.dispose(); saslServer = null; } } }
根据代码清单31,SaslRpcHandler处理客户端消息的步骤以下:
SaslServerBootstrap是经过SaslRpcHandler对下游RpcHandler进行代理的一种TransportServerBootstrap。EncryptionCheckerBootstrap是另外一种TransportServerBootstrap的实现,它经过将自身加入Netty的管道中实现引导,EncryptionCheckerBootstrap的doBootstrap方法的实现见代码清单32。
代码清单32 EncryptionCheckerBootstrap的doBootstrap实现
@Override public RpcHandler doBootstrap(Channel channel, RpcHandler rpcHandler) { channel.pipeline().addFirst("encryptionChecker", this); return rpcHandler; }
在详细介绍了TransportChannelHandler以后咱们就能够对图3-3进行扩展,把TransportRequestHandler、TransportServerBootstrap及RpcHandler的处理流程增长进来,如图7所示。
图7 RPC框架服务端处理请求、响应流程图
有读者可能会问,图7中并未见TransportServerBootstrap的身影。根据对TransportServerBootstrap的两种实现的举例,咱们知道TransportServerBootstrap将可能存在于图中任何两个组件的箭头连线中间,起到引导、包装、代理的做用。
在介绍完服务端RpcHandler对请求消息的处理以后,如今来看看客户端发送RPC请求的原理。咱们在分析代码清单13中的createChannelHandler方法时,看到调用了TransportClient的构造器(见代码清单33),其中TransportResponseHandler的引用将赋给handler属性。
代码清单33 TransportClient的构造器
public TransportClient(Channel channel, TransportResponseHandler handler) { this.channel = Preconditions.checkNotNull(channel); this.handler = Preconditions.checkNotNull(handler); this.timedOut = false; }
TransportClient一共有五个方法用于发送请求,分别为:
本节只选择最经常使用的sendRpc和fetchChunk进行分析,其他实现均可以举一反三。
sendRpc方法的实现见代码清单34。
代码清单34 sendRpc的实现
public long sendRpc(ByteBuffer message, final RpcResponseCallback callback) { final long startTime = System.currentTimeMillis(); if (logger.isTraceEnabled()) { logger.trace("Sending RPC to {}", getRemoteAddress(channel)); } // 使用UUID生成请求主键requestId final long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits()); handler.addRpcRequest(requestId, callback);// 添加requestId与RpcResponseCallback的引用之间的关系 // 发送RPC请求 channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))).addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { long timeTaken = System.currentTimeMillis() - startTime; if (logger.isTraceEnabled()) { logger.trace("Sending request {} to {} took {} ms", requestId, getRemoteAddress(channel), timeTaken); } } else { String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId, getRemoteAddress(channel), future.cause()); logger.error(errorMsg, future.cause()); handler.removeRpcRequest(requestId); channel.close(); try { callback.onFailure(new IOException(errorMsg, future.cause())); } catch (Exception e) { logger.error("Uncaught exception in RPC response callback handler!", e); } } } }); return requestId; }
结合代码清单34,咱们知道sendRpc方法的实现步骤以下:
1) 使用UUID生成请求主键requestId;
2) 调用addRpcRequest向handler(特别提醒下读者这里的handler不是RpcHandler,而是经过TransportClient构造器传入的TransportResponseHandler)添加requestId与回调类RpcResponseCallback的引用之间的关系。TransportResponseHandler的addRpcRequest方法(见代码清单35)将更新最后一次请求的时间为当前系统时间,而后将requestId与RpcResponseCallback之间的映射加入到outstandingRpcs缓存中。outstandingRpcs专门用于缓存发出的RPC请求信息。
代码清单35 添加RPC请求到缓存
public void addRpcRequest(long requestId, RpcResponseCallback callback) { updateTimeOfLastRequest(); outstandingRpcs.put(requestId, callback); }
3) 调用Channel的writeAndFlush方法将RPC请求发送出去,这和在代码清单28中服务端调用的respond方法响应客户端的同样,都是使用Channel的writeAndFlush方法。当发送成功或者失败时会回调ChannelFutureListener的operationComplete方法。若是发送成功,那么只会打印requestId、远端地址及花费时间的日志,若是发送失败,除了打印错误日志外,还要调用TransportResponseHandler的removeRpcRequest方法(见代码清单36)将这次请求从outstandingRpcs缓存中移除。
代码清单36 从缓存中删除RPC请求
public void removeRpcRequest(long requestId) { outstandingRpcs.remove(requestId); }
请求发送成功后,客户端将等待接收服务端的响应。根据图3,返回的消息也会传递给TransportChannelHandler的channelRead方法(见代码清单14),根据以前的分析,消息的分析将最后交给TransportResponseHandler的handle方法来处理。TransportResponseHandler的handle方法分别对图5中的六种ResponseMessage进行处理,因为服务端使用processRpcRequest方法(见代码清单25)处理RpcRequest类型的消息后返回给客户端的消息为RpcResponse或RpcFailure,因此咱们来看看客户端的TransportResponseHandler的handle方法是如何处理RpcResponse和RpcFailure,见代码清单37。
代码清单37 RpcResponse和RpcFailure消息的处理
} else if (message instanceof RpcResponse) { RpcResponse resp = (RpcResponse) message; RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);// 获取RpcResponseCallback if (listener == null) { logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding", resp.requestId, getRemoteAddress(channel), resp.body().size()); } else { outstandingRpcs.remove(resp.requestId); try { listener.onSuccess(resp.body().nioByteBuffer()); } finally { resp.body().release(); } } } else if (message instanceof RpcFailure) { RpcFailure resp = (RpcFailure) message; RpcResponseCallback listener = outstandingRpcs.get(resp.requestId); // 获取RpcResponseCallback if (listener == null) { logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding", resp.requestId, getRemoteAddress(channel), resp.errorString); } else { outstandingRpcs.remove(resp.requestId); listener.onFailure(new RuntimeException(resp.errorString)); }
从代码清单37看到,处理RpcResponse的逻辑为:
处理RpcFailure的逻辑为:
fetchChunk的实现见代码清单38。
代码清单38 fetchChunk的实现
public void fetchChunk( long streamId, final int chunkIndex, final ChunkReceivedCallback callback) { final long startTime = System.currentTimeMillis(); if (logger.isDebugEnabled()) { logger.debug("Sending fetch chunk request {} to {}", chunkIndex, getRemoteAddress(channel)); } final StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);// 建立StreamChunkId // 添加StreamChunkId与ChunkReceivedCallback之间的对应关系 handler.addFetchRequest(streamChunkId, callback); // 发送块请求 channel.writeAndFlush(new ChunkFetchRequest(streamChunkId)).addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { long timeTaken = System.currentTimeMillis() - startTime; if (logger.isTraceEnabled()) { logger.trace("Sending request {} to {} took {} ms", streamChunkId, getRemoteAddress(channel), timeTaken); } } else { String errorMsg = String.format("Failed to send request %s to %s: %s", streamChunkId, getRemoteAddress(channel), future.cause()); logger.error(errorMsg, future.cause()); handler.removeFetchRequest(streamChunkId); channel.close(); try { callback.onFailure(chunkIndex, new IOException(errorMsg, future.cause())); } catch (Exception e) { logger.error("Uncaught exception in RPC response callback handler!", e); } } } }); }
结合代码清单38,咱们知道fetchChunk方法的实现步骤以下:
1) 使用流的标记streamId和块的索引chunkIndex建立StreamChunkId;
2) 调用addFetchRequest向handler(特别提醒下读者这里的handler不是RpcHandler,而是经过TransportClient构造器传入的TransportResponseHandler)添加StreamChunkId与回调类ChunkReceivedCallback的引用之间的关系。TransportResponseHandler的addFetchRequest方法(见代码清单39)将更新最后一次请求的时间为当前系统时间,而后将StreamChunkId与ChunkReceivedCallback之间的映射加入到outstandingFetches缓存中。outstandingFetches专门用于缓存发出的块请求信息。
代码清单39 添加块请求到缓存
public void addFetchRequest(StreamChunkId streamChunkId, ChunkReceivedCallback callback) { updateTimeOfLastRequest(); outstandingFetches.put(streamChunkId, callback); }
3) 调用Channel的writeAndFlush方法将块请求发送出去,这和在代码清单28中服务端调用的respond方法响应客户端的同样,都是使用Channel的writeAndFlush方法。当发送成功或者失败时会回调ChannelFutureListener的operationComplete方法。若是发送成功,那么只会打印StreamChunkId、远端地址及花费时间的日志,若是发送失败,除了打印错误日志外,还要调用TransportResponseHandler的removeFetchRequest方法(见代码清单40)将这次请求从outstandingFetches缓存中移除。
代码清单40 从缓存中删除RPC请求
public void removeRpcRequest(long requestId) { outstandingRpcs.remove(requestId); }
请求发送成功后,客户端将等待接收服务端的响应。根据图3,返回的消息也会传递给TransportChannelHandler的channelRead方法(见代码清单14),根据以前的分析,消息的分析将最后交给TransportResponseHandler的handle方法来处理。TransportResponseHandler的handle方法分别对图5中的六种处理结果进行处理,因为服务端使用processFetchRequest方法(见代码清单24)处理ChunkFetchRequest类型的消息后返回给客户端的消息为ChunkFetchSuccess或ChunkFetchFailure,因此咱们来看看客户端的TransportResponseHandler的handle方法是如何处理ChunkFetchSuccess和ChunkFetchFailure,见代码清单41。
代码清单41 ChunkFetchSuccess和ChunkFetchFailure消息的处理
if (message instanceof ChunkFetchSuccess) { ChunkFetchSuccess resp = (ChunkFetchSuccess) message; ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId); if (listener == null) { logger.warn("Ignoring response for block {} from {} since it is not outstanding", resp.streamChunkId, getRemoteAddress(channel)); resp.body().release(); } else { outstandingFetches.remove(resp.streamChunkId); listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body()); resp.body().release(); } } else if (message instanceof ChunkFetchFailure) { ChunkFetchFailure resp = (ChunkFetchFailure) message; ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId); if (listener == null) { logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding", resp.streamChunkId, getRemoteAddress(channel), resp.errorString); } else { outstandingFetches.remove(resp.streamChunkId); listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException( "Failure while fetching " + resp.streamChunkId + ": " + resp.errorString)); } }
从代码清单41看到,处理ChunkFetchSuccess的逻辑为:
处理ChunkFetchFailure的逻辑为:
在详细介绍了TransportClient和TransportResponseHandler以后,对于客户端咱们就能够扩展图3,把TransportResponseHandler及TransportClient的处理流程增长进来,如图8所示。
图8 客户端请求、响应流程图
图8中的序号①表示调用TransportResponseHandler的addRpcRequest方法(或addFetchRequest方法)将更新最后一次请求的时间为当前系统时间,而后将requestId与RpcResponseCallback之间的映射加入到outstandingRpcs缓存中(或将StreamChunkId与ChunkReceivedCallback之间的映射加入到outstandingFetches缓存中)。②表示调用Channel的writeAndFlush方法将RPC请求发送出去。图中的虚线表示当TransportResponseHandler处理RpcResponse和RpcFailure时将从outstandingRpcs缓存中获取此请求对应的RpcResponseCallback(或处理ChunkFetchSuccess和ChunkFetchFailure时将从outstandingFetches缓存中获取StreamChunkId对应的ChunkReceivedCallback),并执行回调。此外,TransportClientBootstrap将可能存在于图8中任何两个组件的箭头连线中间。
通过近一年的准备,基于Spark2.1.0版本的《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:
纸质版售卖连接以下: