sofa-rpc 服务端源码流程走读

sofa-rpc是阿里开源的一款高性能的rpc框架,这篇文章主要是对sofa-rpc provider启动服务流程的一个代码走读,下面是我简单绘制的一个基本的关系流程图bootstrap

下面咱们根据sofa-rpc代码,对流程进行一个跟踪与走读。咱们以BoltServer的为例缓存

public static void main(String[] args) { ApplicationConfig application = new ApplicationConfig().setAppName("test-server"); ServerConfig serverConfig = new ServerConfig() .setPort(22000) .setDaemon(false); ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) .setApplication(application) .setRef(new HelloServiceImpl()) .setServer(serverConfig) .setRegister(false); ProviderConfig<EchoService> providerConfig2 = new ProviderConfig<EchoService>() .setInterfaceId(EchoService.class.getName()) .setApplication(application) .setRef(new EchoServiceImpl()) .setServer(serverConfig) .setRegister(false); providerConfig.export(); providerConfig2.export(); LOGGER.warn("started at pid {}", RpcRuntimeContext.PID); }

能够看到sofa-rpc经过ProviderConfig类对服务提供方Provider进行了配置信息的初始化,同时也提供了export作为服务启动的入口。微信

public synchronized void export() { if (providerBootstrap == null) { providerBootstrap = Bootstraps.from(this); } providerBootstrap.export(); }
根据ProviderConfig中setBootstrap()配置的Bootstrap类型,咱们经过Bootstaps.from(this)能够获取到不一样的Bootstrap引导服务,分别是DefaultProviderBootstrap与 DubboProviderBootstrap 
/** * 发布一个服务 * * @param providerConfig 服务发布者配置 * @param <T> 接口类型 * @return 发布启动类 */
    public static <T> ProviderBootstrap<T> from(ProviderConfig<T> providerConfig) { String bootstrap = providerConfig.getBootstrap(); if (StringUtils.isEmpty(bootstrap)) { // Use default provider bootstrap 无的话就返回默认DefaultProviderBootstrap
            bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP); providerConfig.setBootstrap(bootstrap); } ProviderBootstrap providerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ProviderBootstrap.class) .getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig }); return (ProviderBootstrap<T>) providerBootstrap; }
 
 

DefaultProviderBootstrap与 DubboProviderBootstrap 都继承自ProviderBootstrap。app

DefaultProviderBootstrap又被BoltProviderBootstrap、Http2ClearTextProviderBootstrap、RestProviderBootstrap三个类所继承,这其实对应了sofa-rpc中的三种server服务方式。框架

咱们看下DefaultProviderBootstrap服务启动源码socket

 @Override public void export() { if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
            Thread thread = factory.newThread(new Runnable() { @Override public void run() { try { Thread.sleep(providerConfig.getDelay()); } catch (Throwable ignore) { // NOPMD
 } doExport(); } }); thread.start(); } else { doExport(); } } private void doExport() { if (exported) { return; } // 检查参数
 checkParameters(); String appName = providerConfig.getAppName(); //key is the protocol of server,for concurrent safe
        Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>(); // 将处理器注册到server
        List<ServerConfig> serverConfigs = providerConfig.getServer(); for (ServerConfig serverConfig : serverConfigs) { String protocol = serverConfig.getProtocol(); String key = providerConfig.buildKey() + ":" + protocol; if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId()); } // 注意同一interface,同一uniqleId,不一样server状况
            AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
            if (cnt == null) { // 没有发布过
                cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0)); } int c = cnt.incrementAndGet(); hasExportedInCurrent.put(serverConfig.getProtocol(), true); int maxProxyCount = providerConfig.getRepeatedExportLimit(); if (maxProxyCount > 0) { if (c > maxProxyCount) { decrementCounter(hasExportedInCurrent); // 超过最大数量,直接抛出异常
                    throw new SofaRpcRuntimeException("Duplicate provider config with key " + key + " has been exported more than " + maxProxyCount + " times!"
                        + " Maybe it's wrong config, please check it."
                        + " Ignore this if you did that on purpose!"); } else if (c > 1) { if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!"
                            + " Maybe it's wrong config, please check it."
                            + " Ignore this if you did that on purpose!", key); } } } } try { // 构造请求调用器
            providerProxyInvoker = new ProviderProxyInvoker(providerConfig); // 初始化注册中心
            if (providerConfig.isRegister()) { List<RegistryConfig> registryConfigs = providerConfig.getRegistry(); if (CommonUtils.isNotEmpty(registryConfigs)) { for (RegistryConfig registryConfig : registryConfigs) { RegistryFactory.getRegistry(registryConfig); // 提早初始化Registry
 } } } // 将处理器注册到server
            for (ServerConfig serverConfig : serverConfigs) { try { //构建Server
                    Server server = serverConfig.buildIfAbsent(); // 注册序列化接口
 server.registerProcessor(providerConfig, providerProxyInvoker); if (serverConfig.isAutoStart()) { //启动服务
 server.start(); } } catch (SofaRpcRuntimeException e) { throw e; } catch (Exception e) { LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
                        + serverConfig.getId(), e); } } // 注册到注册中心
            providerConfig.setConfigListener(new ProviderAttributeListener()); register(); } catch (Exception e) { decrementCounter(hasExportedInCurrent); if (e instanceof SofaRpcRuntimeException) { throw (SofaRpcRuntimeException) e; } else { throw new SofaRpcRuntimeException("Build provider proxy error!", e); } } // 记录一些缓存数据
        RpcRuntimeContext.cacheProviderConfig(this); exported = true; }

代码中经过serverConfig.buildIfAbsent()构建Server服务对象,而在buildIfAbsent()函数中咱们能够看到,sever是经过SeverFactory工厂获取到的,在SeverFactory的getSever()中根据SeverConfig的配置获取Sever的具体实例,并执行Init()进行初始化。ide

/** * 启动服务 * * @return the server */
    public synchronized Server buildIfAbsent() { if (server != null) { return server; } // 提早检查协议+序列化方式 // ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()), // SerializationType.valueOf(getSerialization())); //在sever工厂中拿到sever实例
        server = ServerFactory.getServer(this); return server; }
/** * 初始化Server实例 * * @param serverConfig 服务端配置 * @return Server */
    public synchronized static Server getServer(ServerConfig serverConfig) { try { Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort())); if (server == null) { // 算下网卡和端口
 resolveServerConfig(serverConfig); ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class) .getExtensionClass(serverConfig.getProtocol()); if (ext == null) { throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(), "Unsupported protocol of server!"); } server = ext.getExtInstance(); //服务初始化
 server.init(serverConfig); SERVER_MAP.put(serverConfig.getPort() + "", server); } return server; } catch (SofaRpcRuntimeException e) { throw e; } catch (Throwable e) { throw new SofaRpcRuntimeException(e.getMessage(), e); } }

sofa-rpc提供了三种server类型 BoltServer,RestServer与AbstractHttpServer函数

BoltServer中通信底层经过RemotingServer实现的,RemotingServer是基于阿里sofa-bolt通讯框架开发的。oop

/** * Bolt服务端 */
    protected RemotingServer remotingServer; @Override public void start() { if (started) { return; } synchronized (this) { if (started) { return; } // 生成阿里基于netty的bolt服务Server对象 
            remotingServer = initRemotingServer(); try { if (remotingServer.start(serverConfig.getBoundHost())) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(), serverConfig.getPort()); } } else { throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log."); } started = true; if (EventBus.isEnable(ServerStartedEvent.class)) { EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool)); } } catch (SofaRpcRuntimeException e) { throw e; } catch (Exception e) { throw new SofaRpcRuntimeException("Failed to start bolt server!", e); } } }

AbstractHttpServer 提供http服务,底层通讯经过ServerTransport类实现的post

/** * 服务端通信层 */
    private ServerTransport serverTransport; @Override public void init(ServerConfig serverConfig) { this.serverConfig = serverConfig; this.serverTransportConfig = convertConfig(serverConfig); // 启动线程池
        this.bizThreadPool = initThreadPool(serverConfig); // 服务端处理器
        this.serverHandler = new HttpServerHandler(); // set default transport config
        this.serverTransportConfig.setContainer(container); this.serverTransportConfig.setServerHandler(serverHandler); } @Override public void start() { if (started) { return; } synchronized (this) { if (started) { return; } try { // 启动线程池
                this.bizThreadPool = initThreadPool(serverConfig); this.serverHandler.setBizThreadPool(bizThreadPool); //实例化服务,具体代码见
                serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig); started = serverTransport.start(); if (started) { if (EventBus.isEnable(ServerStartedEvent.class)) { EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool)); } } } catch (SofaRpcRuntimeException e) { throw e; } catch (Exception e) { throw new SofaRpcRuntimeException("Failed to start HTTP/2 server!", e); } } }

ServerTransport是个抽象类,具体实现为transport包下AbstractHttp2ServerTransport

/** * 构造函数 * * @param transportConfig 服务端配置 */
    protected AbstractHttp2ServerTransport(ServerTransportConfig transportConfig) { super(transportConfig); } @Override public boolean start() { if (serverBootstrap != null) { return true; } synchronized (this) { if (serverBootstrap != null) { return true; } boolean flag = false; SslContext sslCtx = SslContextBuilder.build(); // Configure the server.
            EventLoopGroup bossGroup = NettyHelper.getServerBossEventLoopGroup(transportConfig); //能够看到然是基于Netty
            HttpServerHandler httpServerHandler = (HttpServerHandler) transportConfig.getServerHandler(); bizGroup = NettyHelper.getServerBizEventLoopGroup(transportConfig, httpServerHandler.getBizThreadPool()); serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, bizGroup) .channel(transportConfig.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, transportConfig.getBacklog()) .option(ChannelOption.SO_REUSEADDR, transportConfig.isReuseAddr()) .option(ChannelOption.RCVBUF_ALLOCATOR, NettyHelper.getRecvByteBufAllocator()) .option(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator()) .childOption(ChannelOption.SO_KEEPALIVE, transportConfig.isKeepAlive()) .childOption(ChannelOption.TCP_NODELAY, transportConfig.isTcpNoDelay()) .childOption(ChannelOption.SO_RCVBUF, 8192 * 128) .childOption(ChannelOption.SO_SNDBUF, 8192 * 128) .handler(new LoggingHandler(LogLevel.DEBUG)) .childOption(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator()) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark( transportConfig.getBufferMin(), transportConfig.getBufferMax())) .childHandler(new Http2ServerChannelInitializer(bizGroup, sslCtx, httpServerHandler, transportConfig.getPayload())); // 绑定到所有网卡 或者 指定网卡
            ChannelFuture future = serverBootstrap.bind( new InetSocketAddress(transportConfig.getHost(), transportConfig.getPort())); ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { if (LOGGER.isInfoEnabled()) { LOGGER.info("HTTP/2 Server bind to {}:{} success!", transportConfig.getHost(), transportConfig.getPort()); } } else { LOGGER.error("HTTP/2 Server bind to {}:{} failed!", transportConfig.getHost(), transportConfig.getPort()); stop(); } } }); try { channelFuture.await(); if (channelFuture.isSuccess()) { flag = Boolean.TRUE; } else { throw new SofaRpcRuntimeException("Server start fail!", future.cause()); } } catch (InterruptedException e) { LOGGER.error(e.getMessage(), e); } return flag; } }

RestServer 提供Rest服务,底层通讯实现具体可见SofaNettyJaxrsServer。

/** * Rest服务端 */
    protected SofaNettyJaxrsServer httpServer; @Override public void init(ServerConfig serverConfig) { this.serverConfig = serverConfig; httpServer = buildServer(); }

SofaNettyJaxrsServer中服务启动的具体代码

 @Override public void start() { // CHANGE: 增长线程名字
        boolean daemon = serverConfig.isDaemon(); boolean isEpoll = serverConfig.isEpoll(); NamedThreadFactory ioFactory = new NamedThreadFactory("SEV-REST-IO-" + port, daemon); NamedThreadFactory bizFactory = new NamedThreadFactory("SEV-REST-BIZ-" + port, daemon); eventLoopGroup = isEpoll ? new EpollEventLoopGroup(ioWorkerCount, ioFactory) : new NioEventLoopGroup(ioWorkerCount, ioFactory); eventExecutor = isEpoll ? new EpollEventLoopGroup(executorThreadCount, bizFactory) : new NioEventLoopGroup(executorThreadCount, bizFactory); // Configure the server.
        bootstrap = new ServerBootstrap() .group(eventLoopGroup) .channel(isEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .childHandler(createChannelInitializer()) .option(ChannelOption.SO_BACKLOG, backlog) .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isKeepAlive()); // CHANGE: setKeepAlive

        for (Map.Entry<ChannelOption, Object> entry : channelOptions.entrySet()) { bootstrap.option(entry.getKey(), entry.getValue()); } for (Map.Entry<ChannelOption, Object> entry : childChannelOptions.entrySet()) { bootstrap.childOption(entry.getKey(), entry.getValue()); } final InetSocketAddress socketAddress; if (null == hostname || hostname.isEmpty()) { socketAddress = new InetSocketAddress(port); } else { socketAddress = new InetSocketAddress(hostname, port); } bootstrap.bind(socketAddress).syncUninterruptibly(); }

 OK,以上就是sofa-rpc服务端启动的一个基本的流程,这里关注的只是简单的服务启动流程,没有深刻代码功能进行分析,在此基础上,咱们能够进一步探究代码的具体实现。

 

关注微信公众号,查看更多技术文章。

相关文章
相关标签/搜索