dubbo源码分析8 -- DubboProtocol 之提供端发布服务export

在前面提到,在RegistryProtocol发布服务时,首先会dubbo对外提供接口java

根据url的地址,协议是dubbo,调用protocol.export(…), 可是根据ExtensionLoader.getExtensionLoader获取的到的protocol, 这个protocol是个装饰者(一个是启动lisenter,一个是建立单性列表filter).最后是原生的dubboProtocol执行单性列表filter,最后一个Invoker是Wrapper的invokeMethod..web

如下是DubboProtocol部分源码服务器

  • 根据url获取到ip:port做为key,若是已经建立过了,就直接reset
  • 建立服务 ,在服务外面包装了一个HeaderExchangeServer,主要是提供心跳机制。定时往dubbo服务器发送数据,超时:若是是客户端的话就重连不然关闭服务.
  • 服务是NettyServer,
  • -
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
   //dubbo://..
     URL url = invoker.getUrl();
     // com.test.ITestService:1.0.0:20890
     String key = serviceKey(url);
     DubboExporter<T> exporter = new DubboExporter<T>(invoker, 
      key, exporterMap);
     exporterMap.put(key, exporter);
     //开启服务
     openServer(url);

     // modified by lishen
     optimizeSerialization(url);

     return exporter;
 }
  /**开启服务 当10.118.14.204:20890已经建立过服务,那么就reset,不然建立服务 **/
  private void openServer(URL url) {
     // 10.118.14.204:20890
     String key = url.getAddress();
     //client 也能够暴露一个只有server能够调用的服务。
     boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
     if (isServer) {
        ExchangeServer server = serverMap.get(key);
        //没有的话就是建立服务
        if (server == null) {
            serverMap.put(key, createServer(url));
        } else {
            //server支持reset,配合override功能使用
            server.reset(url);
        }
     }
 }

 /**建立服务 开启心跳检测,默认使用netty。组装url **/
 private ExchangeServer createServer(URL url) {
   //默认开启server关闭时发送readonly事件
    url = url.addParameterIfAbsent("channel.readonly.sent", 
        Boolean.TRUE.toString());
    //默认开启heartbeat 
    url = url.addParameterIfAbsent("heartbeat", String.valueOf(60 * 1000));
    //netty 
    String str = url.getParameter("server", "netty");

    url = url.addParameter("codec", 
        Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : 
        DubboCodec.NAME);

    ExchangeServer server = Exchangers.bind(url, requestHandler);
    //..
    return server;
}

ExchangeServer
dubbo://10.118.14.204:20890/com.test.ITestService?anyhost=true&application=testService..side=providerapp

public static ExchangeServer bind(URL url, 
 ExchangeHandler handler) throws RemotingException {
     url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
     return getExchanger(url).bind(url, handler);
 }
 public static Exchanger getExchanger(URL url) {
     String type = url.getParameter("exchanger", "header");
     return ExtensionLoader.getExtensionLoader(Exchanger.class).
        getExtension(type);
 }

HeaderExchanger

由于已经指定header,提供服务ide

public class HeaderExchanger implements Exchanger {
    public static final String NAME = "header";
    //服务端绑定
    public ExchangeServer bind(URL url, ExchangeHandler handler) 
    throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(
         url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}

HeaderExchangeServer服务端svg

主要就是提供了心跳机制.this

  • 启动心跳机制
public class HeaderExchangeServer implements ExchangeServer {
  private final ScheduledExecutorService scheduled = Executors.
   newScheduledThreadPool(1,new NamedThreadFactory(
   "dubbo-remoting-server-heartbeat", true));
  // 心跳定时器
  private ScheduledFuture<?> heatbeatTimer;
  // 心跳超时,毫秒。缺省0,不会执行心跳。
  private int heartbeat;
  private int heartbeatTimeout;
  private final Server server;
  private volatile boolean closed = false;

  public HeaderExchangeServer(Server server) {
    //..属性赋值
    //心跳
    startHeatbeatTimer();
  }

    private void startHeatbeatTimer() {
      //关闭心跳定时
      stopHeartbeatTimer();
      if (heartbeat > 0) {
         //每隔heartbeat时间执行一次
          heatbeatTimer = scheduled.scheduleWithFixedDelay(
                  new HeartBeatTask( new HeartBeatTask.ChannelProvider() {
                      //获取channels
                      public Collection<Channel> getChannels() {
                          return Collections.unmodifiableCollection(
                                  HeaderExchangeServer.this.getChannels() );
                      }
                  }, heartbeat, heartbeatTimeout),
                  heartbeat, heartbeat,TimeUnit.MILLISECONDS);
      }
      }
      //关闭心跳定时
      private void stopHeartbeatTimer() {
          try {
              ScheduledFuture<?> timer = heatbeatTimer;
              if (timer != null && ! timer.isCancelled()) {
                  timer.cancel(true);
              }
          } catch (Throwable t) {
              logger.warn(t.getMessage(), t);
          } finally {
              heatbeatTimer =null;
          }
      }

心跳线程HeartBeatTaskurl

  • 在超时时间以内,发送数据
  • 在超时时间在外,是客户端的话,重连;是服务端,那么关闭
final class HeartBeatTask implements Runnable {
    public void run() {
      long now = System.currentTimeMillis();
      for ( Channel channel : channelProvider.getChannels() ) {
       //若是通道已经关闭了,跳过
          if (channel.isClosed()) {
              continue;
          }
         //获取最后的读取时间
         Long lastRead = ( Long ) channel.getAttribute(
                 HeaderExchangeHandler.KEY_READ_TIMESTAMP );
         //获取最后的写时间
         Long lastWrite = ( Long ) channel.getAttribute(
                 HeaderExchangeHandler.KEY_WRITE_TIMESTAMP );
         //判断时机
         if ( ( lastRead != null && now - lastRead > heartbeat )  ||
           ( lastWrite != null && now - lastWrite > heartbeat ) ) {
             Request req = new Request();
             req.setVersion( "2.0.0" );
             req.setTwoWay( true );
             req.setEvent( Request.HEARTBEAT_EVENT );
             //向服务端发送数据
             channel.send( req );
         }
         //超时了.
         if ( lastRead != null && now - lastRead > heartbeatTimeout )
             if (channel instanceof Client) {
              //是客户端的话,就重连
                ((Client)channel).reconnect();
             } else {
             //关闭
                channel.close();
             }
         }
       }     
     }
}