Dubbo线程模型和调度策略

1、服务调用

首先服务消费者经过代理对象 Proxy 发起远程调用,接着经过网络客户端 Client 将编码后的请求发送给服务提供方的网络层上,也就是 Server。Server 在收到请求后,首先要作的事情是对数据包进行解码。而后将解码后的请求发送至分发器 Dispatcher,再由分发器将请求派发到指定的线程池上,最后由线程池调用具体的服务。这就是一个远程调用请求的发送与接收过程。html

那么在dubbo中请求是如何派发的?以及线程模型是什么样的那?算法

2、I/O线程和业务线程分离

  • 若是事件处理的逻辑能迅速完成,而且不会发起新的 IO请求,好比只是在内存中记个标识,则直接在 IO线程上处理更快,由于减小了线程池调度。数据库

  • 但若是事件处理逻辑较慢,或者须要发起新的 IO 请求,好比须要查询数据库,则必须派发到线程池,不然 IO 线程阻塞,将致使不能接收其它请求。apache

  • 若是用 IO 线程处理事件,又在事件处理过程当中发起新的 IO 请求,好比在链接事件中发起登陆请求,会报“可能引起死锁”异常,但不会真死锁。bootstrap

因此在真实的业务场景中是须要将业务线程和I/O线程进行分离处理的。dubbo做为一个服务治理框架,底层的采用Netty做为网络通讯的组件,在请求派发的时候支持不一样的派发策略。缓存

参考文章:www.cnblogs.com/my_life/art…bash

3、请求派发策略

链接创建

派发策略

从官方描述来看,duboo支持五种派发策略,下面看下是如何实现的。以Ntty4.x为例:网络

  1. NettyServer
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
         super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
     }
    复制代码
  2. ChannelHandlers#wrapInternal
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
         // 选择调度策略 默认是all
         return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                 .getAdaptiveExtension().dispatch(handler, url))); 
     }
    复制代码
    在NettyServer的构造方法中经过ChannelHandlers#wrap方法设置MultiMessageHandlerHeartbeatHandler并经过SPI扩展选择调度策略。
  3. NettyServer#doOpen
protected void doOpen() throws Throwable {
      bootstrap = new ServerBootstrap();
      // 多线程模型
      // boss线程池,负责和消费者创建新的链接
      bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
      // worker线程池,负责链接的数据交换
      workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
              new DefaultThreadFactory("NettyServerWorker", true));

      final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
      channels = nettyServerHandler.getChannels();

      bootstrap.group(bossGroup, workerGroup)
              .channel(NioServerSocketChannel.class)
              .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) // nagele 算法
              .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)// TIME_WAIT
              .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //内存池
              .childHandler(new ChannelInitializer<NioSocketChannel>() {
                  @Override
                  protected void initChannel(NioSocketChannel ch) throws Exception {
                      NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                      ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                              .addLast("decoder", adapter.getDecoder()) //设置编解码器
                              .addLast("encoder", adapter.getEncoder())
                              .addLast("handler", nettyServerHandler);
                  }
              });
      // bind 端口
      ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
      channelFuture.syncUninterruptibly();
      channel = channelFuture.channel();

  }
复制代码

设置Netty的boss线程池数量为1,worker线程池(也就是I/O线程)为cpu核心数+1和向Netty中注测Handler用于消息的编解码和处理。多线程

若是咱们在一个JVM进程只暴露一个Dubbo服务端口,那么一个JVM进程只会有一个NettyServer实例,也会只有一个NettyHandler实例。而且设置了三个handler,用来处理编解码、链接的建立、消息读写等。在dubbo内部定义了一个ChannelHandler用来和Netty的Channel关联起来,经过上述的代码会发现NettyServer自己也是一个ChannelHandler。经过NettyServer#doOpen暴露服务端口后,客户端就能和服务端创建链接了,而提供者在初始化链接后会调用NettyHandler#channelActive方法来建立一个NettyChannel并发

  1. NettyChannel
public void channelActive(ChannelHandlerContext ctx) throws Exception {
      logger.debug("channelActive <" + NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()) + ">" + " channle <" + ctx.channel());
      //获取或者建立一个NettyChannel
      NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
      try {
          if (channel != null) {
              // <ip:port, channel>
              channels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
          }
          // 这里的 handler就是NettyServer
          handler.connected(channel);
      } finally {
          NettyChannel.removeChannelIfDisconnected(ctx.channel());
      }
  }
复制代码

与Netty和Dubbo都有本身的ChannelHandler同样,Netty和Dubbo也有着本身的Channel。该方法最后会调用NettyServer#connected方法来检查新添加channel后是否会超出提供者配置的accepts配置,若是超出,则直接打印错误日志并关闭该Channel,这样的话消费者端天然会收到链接中断的异常信息,详细能够见AbstractServer#connected方法。

  1. AbstractServer#connected
public void connected(Channel ch) throws RemotingException {
     // If the server has entered the shutdown process, reject any new connection
     if (this.isClosing() || this.isClosed()) {
         logger.warn("Close new channel " + ch + ", cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.");
         ch.close();
         return;
     }

     Collection<Channel> channels = getChannels();
     //大于accepts的tcp链接直接关闭
     if (accepts > 0 && channels.size() > accepts) { 
         logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);
         ch.close();
         return;
     }
     super.connected(ch);
 }
复制代码
  • 在dubbo中消费者和提供者默认只创建一个TCP长链接(详细代码请参考官网源码导读,服务引用一节),为了增长消费者调用服务提供者的吞吐量,能够在消费方增长以下配置:
<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" connections="20"/>
复制代码
  • 提供者可使用accepts控制长链接的数量防止链接数量过多,配置以下:
<dubbo:protocol name="dubbo" port="20880" accepts="10"/>
复制代码

请求接收

当链接创建完成后,消费者就能够请求提供者的服务了,当请求到来,提供者这边会依次通过以下Handler的处理:

--->NettyServerHandler#channelRead:接收请求消息。

--->AbstractPeer#received:若是服务已经关闭,则返回,不然调用下一个Handler来处理。

--->MultiMessageHandler#received:若是是批量请求,则依次对请求调用下一个Handler来处理。

--->HeartbeatHandler#received: 处理心跳消息。

--->AllChannelHandler#received:该Dubbo的Handler很是重要,由于从这里是IO线程池和业务线程池的隔离。

--->DecodeHandler#received: 消息解码。

--->HeaderExchangeHandler#received:消息处理。

--->DubboProtocol : 调用服务。

  1. AllChannelHandler#received
public void received(Channel channel, Object message) throws RemotingException {
       // 获取业务线程池
       ExecutorService cexecutor = getExecutorService();
       try {
           // 使用线程池处理消息
           cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
       } catch (Throwable t) {
        throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
       }
   }
复制代码

这里对execute进行了异常捕获,这是由于I/O线程池是无界的,但业务线程池多是有界的,因此进行execute提交可能会遇到RejectedExecutionException异常 。

那么这里是如何获取到业务线程池的那?实际上WrappedChannelHandlerxxxChannelHandlerd的装饰类,根据dubbo spi能够知道,获取AllChannelHandler会首先实例化WrappedChannelHandler

  1. WrappedChannelHandler
public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        // 获取业务线程池
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }

复制代码

线程模型

  1. FixedThreadPool
public class FixedThreadPool implements ThreadPool {

  @Override
  public Executor getExecutor(URL url) {
      // 线程池名称DubboServerHanler-server:port
      String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
      // 缺省线程数量200
      int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
      // 任务队列类型
      int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);

      return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
              queues == 0 ? new SynchronousQueue<Runnable>() :
                      (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                              : new LinkedBlockingQueue<Runnable>(queues)),
              new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
  }

}
复制代码

缺省状况下使用200个线程和SynchronousQueue这意味着若是若是线程池全部线程都在工做再有新任务会直接拒绝。

  1. CachedThreadPool
public class CachedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 核心线程数量 缺省为0
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // 最大线程数量 缺省为Integer.MAX_VALUE
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        // queue 缺省为0
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // 空闲线程存活时间
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }
}
复制代码

缓存线程池,能够看出若是提交任务的速度大于maxThreads将会不断建立线程,极端条件下将会耗尽CPU和内存资源。在突发大流量进入时不适合使用。

  1. LimitedThreadPool
public class  LimitedThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 缺省核心线程数量为0
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // 缺省最大线程数量200
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        // 任务队列缺省0
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}
复制代码

不配置的话和FixedThreadPool没有区别。

  1. EagerThreadPool
public class EagerThreadPool implements ThreadPool {

   @Override
   public Executor getExecutor(URL url) {
       String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
       // 0
       int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
       // Integer.MAX_VALUE
       int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
       // 0
       int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
       // 60s
       int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);

       // init queue and executor
       // 初始任务队列为1
       TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
       EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
               threads,
               alive,
               TimeUnit.MILLISECONDS,
               taskQueue,
               new NamedInternalThreadFactory(name, true),
               new AbortPolicyWithReport(name, url));
       taskQueue.setExecutor(executor);
       return executor;
   }
}
复制代码

EagerThreadPoolExecutor

public void execute(Runnable command) {
       if (command == null) {
           throw new NullPointerException();
       }
       // do not increment in method beforeExecute!
       //已提交任务数量
       submittedTaskCount.incrementAndGet();
       try {
           super.execute(command);
       } catch (RejectedExecutionException rx) { //大于最大线程数被拒绝任务 从新添加到任务队列
           // retry to offer the task into queue.
           final TaskQueue queue = (TaskQueue) super.getQueue();
           try {
               if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                   submittedTaskCount.decrementAndGet();
                   throw new RejectedExecutionException("Queue capacity is full.", rx);
               }
           } catch (InterruptedException x) {
               submittedTaskCount.decrementAndGet();
               throw new RejectedExecutionException(x);
           }
       } catch (Throwable t) {
           // decrease any way
           submittedTaskCount.decrementAndGet();
           throw t;
       }
   }
复制代码

TaskQueue

public boolean offer(Runnable runnable) {
       if (executor == null) {
           throw new RejectedExecutionException("The task queue does not have executor!");
       }
       // 获取当前线程池中的线程数量
       int currentPoolThreadSize = executor.getPoolSize();
       // have free worker. put task into queue to let the worker deal with task.
       // 若是已经提交的任务数量小于当前线程池中线程数量(不是很理解这里的操做)
       if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
           return super.offer(runnable);
       }

       // return false to let executor create new worker.
       //当前线程数小于最大线程程数直接建立新worker
       if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
           return false;
       }

       // currentPoolThreadSize >= max
       return super.offer(runnable);
   }
复制代码

优先建立Worker线程池。在任务数量大于corePoolSize可是小于maximumPoolSize时,优先建立Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)。

根据以上的代码分析,若是消费者的请求过快颇有可能致使服务提供者业务线程池抛出RejectedExecutionException异常。这个异常是duboo的采用的线程拒绝策略AbortPolicyWithReport#rejectedExecution抛出的,而且会被反馈到消费端,此时简单的解决办法就是将提供者的服务调用线程池数目调大点,例如以下配置:

<dubbo:provider threads="500"/>
或
<dubbo:protocol name="dubbo" port="20882" accepts="10" threads="500"/>
复制代码

为了保证模块内的主要服务有线程可用(防止次要服务抢占过多服务调用线程),能够对次要服务进行并发限制,例如:

<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" executes="100"/>
复制代码

dubbo的dispatcher 策略默认是all,实际上比较好的处理方式是I/O线程和业务线程分离,因此采起message是比较好得配置。而且若是采用all若是使用的dubo版本比较低颇有可能会触发dubbo的bug。一旦业务线程池满了,将抛出执行拒绝异常,将进入caught方法来处理,而该方法使用的仍然是业务线程池,因此颇有可能这时业务线程池仍是满的,致使下游的一个HeaderExchangeHandler没机会调用,而异常处理后的应答消息正是HeaderExchangeHandler#caught来完成的,因此最后NettyHandler#writeRequested没有被调用,Consumer只能死等到超时,没法收到Provider的线程池打满异常(2.6.x已经修复该问题)。

  • 推荐配置
<dubbo:protocol name="dubbo" port="8888" threads="500" dispatcher="message" />
复制代码

参考文章:manzhizhen.iteye.com/blog/239117…

相关文章
相关标签/搜索