Thrift RPC实战(七) 基于zookeeper和thrift的RPC服务发布订阅

对于Thrift服务化的改造,主要是客户端,能够从以下几个方面进行:java

1.服务端的服务注册,客户端自动发现,无需手工修改配置,这里咱们使用zookeeper,但因为zookeeper自己提供的客户端使用较为复杂,所以采用curator-recipes工具类进行处理服务的注册与发现。node

2.客户端使用链接池对服务调用进行管理,提高性能,这里咱们使用Apache Commons项目commons-pool,能够大大减小代码的复杂度。git

3.关于Failover/LoadBalance,因为zookeeper的watcher,当服务端不可用是及时通知客户端,并移除不可用的服务节点,而LoadBalance有不少算法,这里咱们采用随机加权方式,也是常有的负载算法,至于其余的算法介绍参考:常见的负载均衡的基本算法算法

4.使thrift服务的注册和发现能够基于spring配置,能够提供不少的便利。spring

5.其余的改造如:apache

1)经过动态代理实现client和server端的交互细节透明化,让用户只需经过服务方提供的接口进行访问api

2)Thrift经过两种方式调用服务Client和Iface,Client API的方式, 不推荐, 咱们推荐Service接口的方式(服务化)。缓存

1、pom.xml引入依赖jar包

<dependencies>
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.12</version>
    </dependency>
    <dependency>
        <groupId>org.apache.thrift</groupId>
        <artifactId>libthrift</artifactId>
        <version>0.11.0</version>
    </dependency>

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>4.3.2.RELEASE</version>
    </dependency>
    <!-- 使用Netflix开源的zk开发-->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.0.0</version>
    </dependency>

    <dependency>
        <groupId>commons-pool</groupId>
        <artifactId>commons-pool</artifactId>
        <version>1.6</version>
    </dependency>
</dependencies>

2、使用zookeeper管理服务节点配置

PC服务往平台化的方向发展, 会屏蔽掉更多的服务细节(服务的IP地址集群, 集群的扩容和迁移), 只暴露服务接口. 这部分的演化, 使得server端和client端彻底的解耦合. 二者的交互经过ConfigServer(MetaServer)的中介角色来搭线。服务器

注: 该图源自dubbo的官网
这边借助Zookeeper来扮演该角色, server扮演发布者的角色, 而client扮演订阅者的角色.网络

Zookeeper是分布式应用协做服务. 它实现了paxos的一致性算法, 在命名管理/配置推送/数据同步/主从切换方面扮演重要的角色。 其数据组织相似文件系统的目录结构: 

每一个节点被称为znode, 为znode节点依据其特性, 又能够分为以下类型:
  1). PERSISTENT: 永久节点
  2). EPHEMERAL: 临时节点, 会随session(client disconnect)的消失而消失
  3). PERSISTENT_SEQUENTIAL: 永久节点, 其节点的名字编号是单调递增的
  4). EPHEMERAL_SEQUENTIAL: 临时节点, 其节点的名字编号是单调递增的
  注: 临时节点不能成为父节点
  Watcher观察模式, client能够注册对节点的状态/内容变动的事件回调机制. 其Event事件的两类属性须要关注下:
  1). KeeperState: Disconnected,SyncConnected,Expired
  2). EventType: None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged
RPC服务端:
  做为具体业务服务的RPC服务发布方, 对其自身的服务描述由如下元素构成.
  1). namespace: 命名空间,来区分不一样应用 
  2). service: 服务接口, 采用发布方的类全名来表示
  3). version: 版本号
  借鉴了Maven的GAV坐标系, 三维坐标系更符合服务平台化的大环境.
  *) 数据模型的设计
  具体RPC服务的注册路径为: /rpc/{namespace}/{service}/{version}, 该路径上的节点都是永久节点
  RPC服务集群节点的注册路径为: /rpc/{namespace}/{service}/{version}/{ip:port:weight}, 末尾的节点是临时节点.

1.定义Zookeeper的客户端的管理

ZookeeperFactory.java

package com.lpf.thrift.rpc.zookeeper;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.FactoryBean;

/**
 * Created by lpf on 2018-06-11.
 * 获取zookeeper客户端连接
 */
public class ZookeeperFactory implements FactoryBean<CuratorFramework>{
    private String zkHosts;
    // session超时
    private int sessionTimeout = 30000;
    private int connectionTimeout = 30000;

    // 共享一个zk连接
    private boolean singleton = true;
    private CuratorFramework zkClient;
    // 全局path前缀,经常使用来区分不一样的应用
    private String namespace;

    private final static String ROOT = "rpc";
    //注意必定要实现属性的set方法,不然在spring bean注入的地方会拿不到值
    public void setZkHosts(String zkHosts) {
        this.zkHosts = zkHosts;
    }

    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    public void setConnectionTimeout(int connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
    }

    public void setSingleton(boolean singleton) {
        this.singleton = singleton;
    }

    public void setNamespace(String namespace) {
        this.namespace = namespace;
    }

    public void setZkClient(CuratorFramework zkClient) {
        this.zkClient = zkClient;
    }

    @Override
    public CuratorFramework getObject() throws Exception {
        if (singleton){
            if (zkClient == null){
                zkClient = create();
                zkClient.start();
            }
            return zkClient;
        }
        return  create();
    }

    private CuratorFramework create() throws Exception {
        if (namespace == null || namespace == ""){
            namespace = ROOT;
        }else {
            namespace = ROOT+"/"+namespace;
        }
        return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
    }

    public static CuratorFramework create(String zkHosts, int sessionTimeout, int connectionTimeout, String namespace) {
        return  CuratorFrameworkFactory.builder()
                .connectString(zkHosts)
                .sessionTimeoutMs(sessionTimeout)
                .connectionTimeoutMs(connectionTimeout)
                .canBeReadOnly(true)
                .namespace(namespace)
                .retryPolicy(new ExponentialBackoffRetry(1000,Integer.MAX_VALUE))
                .defaultData(null)
                .build();
    }

    @Override
    public Class<?> getObjectType() {
        return CuratorFramework.class;
    }

    @Override
    public boolean isSingleton() {
        return singleton;
    }

    public void close() {
        if (zkClient != null) {
            zkClient.close();
        }
    }
}

2.服务端注册服务

因为服务端配置须要获取本机的IP地址,所以定义IP获取接口

ThriftServerIpResolve.java

package com.lpf.thrift.rpc.zookeeper;

/**
 * 解析thrift-server端IP地址,用于注册服务
 * 1) 能够从一个物理机器或者虚机的特殊文件中解析
 * 2) 能够获取指定网卡序号的Ip
 * 3) 其余
 * Created by lpf on 2018-06-11.
 */
public interface ThriftServerIpResolve {
    String getServerIp() throws Exception;

    void reset();

    //当IP变动时,将会调用reset方法
    static interface IpRestCalllBack{
        public void rest(String newIp);
    }
}

能够对该接口作不通的实现,下面咱们基于网卡获取IP地址,也能够经过配置serverIp
ThriftServerIpLocalNetworkResolve.java

package com.lpf.thrift.rpc.zookeeper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;

/**
 * 基于网卡获取IP地址
 * Created by lpf on 2018-06-11.
 */
public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve{
    private Logger logger = LoggerFactory.getLogger(getClass());

    //缓存
    private String serverIp;


    @Override
    public String getServerIp() throws Exception {
        if (serverIp != null){
            return serverIp;
        }
    // 一个主机有多个网络接口
        try {
            Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
            while (netInterfaces.hasMoreElements()) {
                NetworkInterface netInterface = netInterfaces.nextElement();
                // 每一个网络接口,都会有多个"网络地址",好比必定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 .
                Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
                while (addresses.hasMoreElements()) {
                    InetAddress address = addresses.nextElement();
                    if(address instanceof Inet6Address){
                        continue;
                    }
                    if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {
                        serverIp = address.getHostAddress();
                        logger.info("resolve rpc ip :"+ serverIp);
                        return serverIp;
                    }
                }
            }
        } catch (SocketException e) {
            e.printStackTrace();
        }
        return serverIp;
    }

    @Override
    public void reset() {
        serverIp = null;
    }
}

接下来咱们定义发布服务接口,并实现将服务信息(服务接口、版本号,IP、port、weight)发布到zookeeper中。
ThriftServerAddressRegister.java

package com.lpf.thrift.rpc.zookeeper;

/**
 * 发布服务地址及端口到服务注册中心,这里是zookeeper服务器
 * Created by lpf on 2018-06-11.
 */
public interface ThriftServerAddressRegister {
    /**
     * 发布服务接口
     * @param service 服务接口名称,一个产品中不能重复
     * @param version 服务接口的版本号,默认1.0.0
     * @param address 服务发布的地址和端口
     */
    void register(String service,String version,String address);
}

实现:ThriftServerAddressRegisterZookeeper.java

package com.lpf.thrift.rpc.zookeeper;

import com.lpf.thrift.rpc.ThriftException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 注册服务列表到Zookeeper
 * Created by lpf on 2018-06-11.
 */
public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{
    private Logger logger = LoggerFactory.getLogger(getClass());

    private CuratorFramework zkClient;

    public ThriftServerAddressRegisterZookeeper(){}

    public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){
        this.zkClient = zkClient;
    }

    public void setZkClient(CuratorFramework zkClient) {
        this.zkClient = zkClient;
    }

    @Override
    public void register(String service, String version, String address) {
        if(zkClient.getState() == CuratorFrameworkState.LATENT){
            zkClient.start();
        }
        if (version == null || version == ""){
            version ="1.0.0";
        }
        //建立临时节点
        try {
            zkClient.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL)
                    .forPath("/"+service+"/"+version+"/"+address);
        } catch (Exception e) {
            logger.error("register api address to zookeeper exception:{}",e);
            throw new ThriftException("register api address to zookeeper exception:{}", e);
        }
    }

    public void close(){
        zkClient.close();
    }
}

3.客户端发现服务

定义获取服务地址接口

ThriftServerAddressProvider.java

package com.lpf.thrift.rpc.zookeeper;

import java.net.InetSocketAddress;
import java.util.List;

/**
 * 获取服务地址接口
 * Created by lpf on 2018-06-11.
 */
public interface ThriftServerAddressProvider {
    //获取服务名称
    String getService();

    /**
     * 获取全部服务端地址
     * @return
     */
    List<InetSocketAddress> findServerAddressList();

    /**
     * 选取一个合适的address,能够随机获取等'
     * 内部可使用合适的算法.
     * @return
     */
    InetSocketAddress selector();

    void close();
}

基于zookeeper服务地址自动发现实现:ThriftServerAddressProviderZookeeper.java

package com.lpf.thrift.rpc.zookeeper;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

import java.io.Closeable;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.CountDownLatch;

/**
 * @author lpf
 * 使用zookeeper做为"config"中心,使用apache-curator方法库来简化zookeeper开发
 */
public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean ,Closeable{

    private Logger logger = LoggerFactory.getLogger(getClass());

    // 注册服务
    private String service;
    // 服务版本号
    private String version = "1.0.0";
    //zk客户端
    private CuratorFramework zkClient;

    private CountDownLatch countDownLatch= new CountDownLatch(1);//避免 zk尚未链接上,就去调用服务

    private PathChildrenCache cachedPath;

    // 用来保存当前provider所接触过的地址记录,当zookeeper集群故障时,可使用trace中地址,做为"备份"
    private Set<String> trace = new HashSet<String>();

    private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();

    private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();

    private Object lock = new Object();

    // 默认权重
    private static final Integer DEFAULT_WEIGHT = 1;

    public void setService(String service) {
        this.service = service;
    }

    public void setVersion(String version) {
        this.version = version;
    }

    public void setZkClient(CuratorFramework zkClient) {
        this.zkClient = zkClient;
    }

    public ThriftServerAddressProviderZookeeper() {
    }

    public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {
        this.zkClient = zkClient;
    }


    @Override
    public void afterPropertiesSet() throws Exception {
        // 若是zk还没有启动,则启动
        if (zkClient.getState() == CuratorFrameworkState.LATENT) {
            zkClient.start();
        }
        buildPathChildrenCache(zkClient, getServicePath(), true);
        cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
        countDownLatch.await();
    }

    private String getServicePath(){
        return "/" + service + "/" + version;
    }
    private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {
        cachedPath = new PathChildrenCache(client, path, cacheData);
        cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                PathChildrenCacheEvent.Type eventType = event.getType();
                switch (eventType) {
                    case CONNECTION_RECONNECTED:
                        logger.info("Connection is reconection.");
                        break;
                    case CONNECTION_SUSPENDED:
                        logger.info("Connection is suspended.");
                        break;
                    case CONNECTION_LOST:
                        logger.warn("Connection error,waiting...");
                        return;
                    case INITIALIZED:
                        // countDownLatch.countDown();
                        logger.warn("Connection init ...");
                    default:
                        //
                }
                // 任何节点的时机数据变更,都会rebuild,此处为一个"简单的"作法.
                cachedPath.rebuild();
                rebuild();
                countDownLatch.countDown();
            }

            protected void rebuild() throws Exception {
                List<ChildData> children = cachedPath.getCurrentData();
                if (children == null || children.isEmpty()) {
                    // 有可能全部的thrift server都与zookeeper断开了连接
                    // 可是,有可能,thrift client与thrift server之间的网络是良好的
                    // 所以此处是否须要清空container,是须要多方面考虑的.
                    container.clear();
                    logger.error("thrift rpc-cluster error....");
                    return;
                }
                List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
                String path = null;
                for (ChildData data : children) {
                    path = data.getPath();
                    logger.debug("get path:"+path);
                    path = path.substring(getServicePath().length()+1);
                    logger.debug("get serviceAddress:"+path);
                    String address = new String(path.getBytes(), "utf-8");
                    current.addAll(transfer(address));
                    trace.add(address);
                }
                Collections.shuffle(current);
                synchronized (lock) {
                    container.clear();
                    container.addAll(current);
                    inner.clear();
                    inner.addAll(current);

                }
            }
        });
    }

    private List<InetSocketAddress> transfer(String address) {
        String[] hostname = address.split(":");
        Integer weight = DEFAULT_WEIGHT;
        if (hostname.length == 3) {
            weight = Integer.valueOf(hostname[2]);
        }
        String ip = hostname[0];
        Integer port = Integer.valueOf(hostname[1]);
        List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
        // 根据优先级,将ip:port添加屡次到地址集中,而后随机取地址实现负载
        for (int i = 0; i < weight; i++) {
            result.add(new InetSocketAddress(ip, port));
        }
        return result;
    }

    @Override
    public List<InetSocketAddress> findServerAddressList() {
        return Collections.unmodifiableList(container);
    }

    @Override
    public synchronized InetSocketAddress selector() {
        if (inner.isEmpty()) {
            if (!container.isEmpty()) {
                inner.addAll(container);
            } else if (!trace.isEmpty()) {
                synchronized (lock) {
                    for (String hostname : trace) {
                        container.addAll(transfer(hostname));
                    }
                    Collections.shuffle(container);
                    inner.addAll(container);
                }
            }
        }
        return inner.poll();
    }

    @Override
    public void close() {
        try {
            cachedPath.close();
            zkClient.close();
        } catch (Exception e) {
        }
    }

    @Override
    public String getService() {
        return service;
    }

}

还能够经过配置获取服务地址

3、服务端服务注册实现

ThriftServiceServerFactory.java

package com.lpf.thrift.rpc;

import com.lpf.thrift.rpc.zookeeper.ThriftServerAddressRegister;
import com.lpf.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve;
import com.lpf.thrift.rpc.zookeeper.ThriftServerIpResolve;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.springframework.beans.factory.InitializingBean;

import java.lang.instrument.IllegalClassFormatException;
import java.lang.reflect.Constructor;

/**
 * 服务端注册服务工厂
 * Created by lpf on 2018-06-11.
 */
public class ThriftServiceServerFactory implements InitializingBean{

    // 服务注册本机端口
    private Integer port = 8299;
    // 优先级
    private Integer weight = 1;// default
    // 服务实现类
    private Object service;// serice实现类
    //服务版本号
    private String version;
    //服务注册
    private ThriftServerAddressRegister thriftServerAddressRegister;

    // 解析本机IP
    private ThriftServerIpResolve thriftServerIpResolve;

    private ServerThread serverThread;

    public void setPort(Integer port) {
        this.port = port;
    }

    public void setWeight(Integer weight) {
        this.weight = weight;
    }

    public void setService(Object service) {
        this.service = service;
    }

    public void setVersion(String version) {
        this.version = version;
    }

    public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {
        this.thriftServerAddressRegister = thriftServerAddressRegister;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        if (thriftServerIpResolve == null){
            thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
        }
        String serverIP = thriftServerIpResolve.getServerIp();
        if (serverIP == null || serverIP.equals("")){
            throw new ThriftException("cant find rpc ip...");
        }
        String hostname = serverIP + ":" + port + ":" + weight;
        Class<?>  serviceClass = service.getClass();
        // 获取实现类接口
        Class<?>[] interfaces = serviceClass.getInterfaces();
        if (interfaces.length == 0) {
            throw new IllegalClassFormatException("api-class should implements Iface");
        }
        // reflect,load "Processor";
        TProcessor processor = null;
        String serviceName = null;
        for (Class<?> clazz : interfaces) {
            String cname = clazz.getSimpleName();
            if (!cname.equals("Iface")) {
                continue;
            }
            serviceName = clazz.getEnclosingClass().getName();
            String pname = serviceName + "$Processor";
            try {
                ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
                Class<?> pclass = classLoader.loadClass(pname);
                if (!TProcessor.class.isAssignableFrom(pclass)) {
                    continue;
                }
                Constructor<?> constructor = pclass.getConstructor(clazz);
                processor = (TProcessor) constructor.newInstance(service);
                break;
            } catch (Exception e) {
                //
            }
        }
        if (processor == null) {
            throw new IllegalClassFormatException("api-class should implements Iface");
        }
        //须要单独的线程,由于serve方法是阻塞的.
        serverThread = new ServerThread(processor, port);
        serverThread.start();
        // 注册服务
        if (thriftServerAddressRegister != null) {
            thriftServerAddressRegister.register(serviceName, version, hostname);
        }
    }

    class ServerThread extends Thread {
        private TServer server;
        ServerThread(TProcessor processor, int port) throws Exception {
            TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
            TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
            TProcessorFactory processorFactory = new TProcessorFactory(processor);
            tArgs.processorFactory(processorFactory);
            tArgs.transportFactory(new TFramedTransport.Factory());
            tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
            server = new TThreadedSelectorServer(tArgs);
        }

        @Override
        public void run(){
            try{
                //启动服务
                server.serve();
            }catch(Exception e){
                //
            }
        }

        public void stopServer(){
            server.stop();
        }
    }

    public void close() {
        serverThread.stopServer();
    }
}

4、客户端获取服务代理及链接池实现

客户端链接池实现:ThriftClientPoolFactory.java

package com.lpf.thrift.rpc;

import com.lpf.thrift.rpc.zookeeper.ThriftServerAddressProvider;
import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.TServiceClientFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

/**
 * 链接池,thrift-client for spring
 */
public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private final ThriftServerAddressProvider serverAddressProvider;
    private final TServiceClientFactory<TServiceClient> clientFactory;
    private PoolOperationCallBack callback;

    protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
        this.serverAddressProvider = addressProvider;
        this.clientFactory = clientFactory;
    }

    protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory,
                                      PoolOperationCallBack callback) throws Exception {
        this.serverAddressProvider = addressProvider;
        this.clientFactory = clientFactory;
        this.callback = callback;
    }

    static interface PoolOperationCallBack {
        // 销毁client以前执行
        void destroy(TServiceClient client);

        // 建立成功是执行
        void make(TServiceClient client);
    }

    @Override
    public void destroyObject(TServiceClient client) throws Exception {
        if (callback != null) {
            try {
                callback.destroy(client);
            } catch (Exception e) {
                logger.warn("destroyObject:{}", e);
            }
        }
        logger.info("destroyObject:{}", client);
        TTransport pin = client.getInputProtocol().getTransport();
        pin.close();
        TTransport pout = client.getOutputProtocol().getTransport();
        pout.close();
    }

    @Override
    public void activateObject(TServiceClient client) throws Exception {
    }

    @Override
    public void passivateObject(TServiceClient client) throws Exception {
    }

    @Override
    public boolean validateObject(TServiceClient client) {
        TTransport pin = client.getInputProtocol().getTransport();
        logger.info("validateObject input:{}", pin.isOpen());
        TTransport pout = client.getOutputProtocol().getTransport();
        logger.info("validateObject output:{}", pout.isOpen());
        return pin.isOpen() && pout.isOpen();
    }

    @Override
    public TServiceClient makeObject() throws Exception {
        InetSocketAddress address = serverAddressProvider.selector();
        if(address==null){
            new ThriftException("No provider available for remote api");
        }
        TSocket tsocket = new TSocket(address.getHostName(), address.getPort());
        TTransport transport = new TFramedTransport(tsocket);
        TProtocol protocol = new TBinaryProtocol(transport);
        TServiceClient client = this.clientFactory.getClient(protocol);
        transport.open();
        if (callback != null) {
            try {
                callback.make(client);
            } catch (Exception e) {
                logger.warn("makeObject:{}", e);
            }
        }
        return client;
    }

}

客户端服务代理工厂实现:ThriftServiceClientProxyFactory.java

package com.lpf.thrift.rpc;

import com.lpf.thrift.rpc.zookeeper.ThriftServerAddressProvider;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.TServiceClientFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;

import java.io.Closeable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;

/**
 * 客户端代理工厂
 */
public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean,Closeable {

    private Logger logger = LoggerFactory.getLogger(getClass());

    private Integer maxActive = 32;// 最大活跃链接数

    private Integer idleTime = 180000;// ms,default 3 min,连接空闲时间, -1,关闭空闲检测
    private ThriftServerAddressProvider serverAddressProvider;

    private Object proxyClient;
    private Class<?> objectClass;

    private GenericObjectPool<TServiceClient> pool;

    private ThriftClientPoolFactory.PoolOperationCallBack callback = new ThriftClientPoolFactory.PoolOperationCallBack() {
        @Override
        public void make(TServiceClient client) {
            logger.info("create");
        }

        @Override
        public void destroy(TServiceClient client) {
            logger.info("destroy");
        }
    };

    public void setMaxActive(Integer maxActive) {
        this.maxActive = maxActive;
    }

    public void setIdleTime(Integer idleTime) {
        this.idleTime = idleTime;
    }

    public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) {
        this.serverAddressProvider = serverAddressProvider;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        // 加载Iface接口
        objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");
        // 加载Client.Factory类
        Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");
        TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();

        ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);
        pool = new GenericObjectPool<TServiceClient>(clientPool, makePoolConfig());

//        InvocationHandler handler = makeProxyHandler();//方式1
        InvocationHandler handler = makeProxyHandler2();//方式2
        proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, handler);
    }

    private InvocationHandler makeProxyHandler() throws Exception{
        ThriftServiceClient2Proxy handler = null;
        TServiceClient client = pool.borrowObject();
        try {
            handler = new ThriftServiceClient2Proxy(client);
            pool.returnObject(client);
        }catch (Exception e){
            pool.invalidateObject(client);
            throw new ThriftException("获取代理对象失败");
        }
        return handler;
    }


    private InvocationHandler makeProxyHandler2() throws Exception{
        ThriftServiceClientProxy handler = new ThriftServiceClientProxy(pool);
        return handler;
    }

    private GenericObjectPool.Config  makePoolConfig() {
        GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
        poolConfig.maxActive = maxActive;
        poolConfig.maxIdle = 1;
        poolConfig.minIdle = 0;
        poolConfig.minEvictableIdleTimeMillis = idleTime;
        poolConfig.timeBetweenEvictionRunsMillis = idleTime * 2L;
        poolConfig.testOnBorrow=true;
        poolConfig.testOnReturn=false;
        poolConfig.testWhileIdle=false;
        return poolConfig;
    }

    @Override
    public Object getObject() throws Exception {
        return proxyClient;
    }

    @Override
    public Class<?> getObjectType() {
        return objectClass;
    }

    @Override
    public boolean isSingleton() {
        return true;
    }

    @Override
    public void close() {
        if(pool!=null){
            try {
                pool.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (serverAddressProvider != null) {
            serverAddressProvider.close();
        }
    }
}

面咱们看一下服务端和客户端的配置;

服务端spring-context-thrift-server.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
            "
   default-lazy-init="false">

   <!-- zookeeper -->
   <bean id="thriftZookeeper" class="com.lpf.thrift.rpc.zookeeper.ZookeeperFactory" destroy-method="close">
      <property name="zkHosts" value="192.168.10.42:2181"/>
      <property name="namespace" value="com.lpf.thrift.rpc" />
      <property name="connectionTimeout" value="3000" />
      <property name="sessionTimeout" value="3000" />
      <property name="singleton" value="true" />
   </bean>
   <bean id="serviceAddressRegister"
        class="com.lpf.thrift.rpc.zookeeper.ThriftServerAddressRegisterZookeeper" destroy-method="close">
      <property name="zkClient" ref="thriftZookeeper" />
   </bean>

   <bean id="smartService" class="com.lpf.thrift.server.service.SmartServiceImpl"/>
   <bean id="helloService" class="com.lpf.thrift.server.service.HelloServiceImpl"/>

   <bean  class="com.lpf.thrift.rpc.ThriftServiceServerFactory"
      destroy-method="close">
      <property name="service" ref="smartService" />
      <property name="port" value="9000" />
      <property name="version" value="1.0.0" />
      <property name="weight" value="1" />
      <property name="thriftServerAddressRegister" ref="serviceAddressRegister" />
   </bean>
   
   <bean  class="com.lpf.thrift.rpc.ThriftServiceServerFactory"
      destroy-method="close">
      <property name="service" ref="helloService" />
      <property name="port" value="9001" />
      <property name="version" value="1.0.0" />
      <property name="weight" value="1" />
      <property name="thriftServerAddressRegister" ref="serviceAddressRegister" />
   </bean>
</beans>

客户端:spring-context-thrift-client.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
   xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
            http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
   default-lazy-init="false">

   <!-- zookeeper -->
   <bean id="thriftZookeeper" class="com.lpf.thrift.rpc.zookeeper.ZookeeperFactory"
      destroy-method="close">
      <property name="zkHosts" value="192.168.10.42:2181" />
      <property name="namespace" value="com.lpf.thrift.rpc"/>
      <property name="connectionTimeout" value="3000" />
      <property name="sessionTimeout" value="3000" />
      <property name="singleton" value="true" />
   </bean>

   <bean id="smartService" class="com.lpf.thrift.rpc.ThriftServiceClientProxyFactory"
      destroy-method="close">
      <property name="maxActive" value="5" />
      <property name="idleTime" value="1800000" />
      <property name="serverAddressProvider">
         <bean class="com.lpf.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper">
            <property name="service" value="com.lpf.thrift.api.SmartService" />
            <property name="version" value="1.0.0" />
            <property name="zkClient" ref="thriftZookeeper" />
         </bean>
      </property>
   </bean>

   <bean id="helloService" class="com.lpf.thrift.rpc.ThriftServiceClientProxyFactory"
      destroy-method="close">
      <property name="maxActive" value="5" />
      <property name="idleTime" value="1800000" />
      <property name="serverAddressProvider">
         <bean class="com.lpf.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper">
            <property name="service" value="com.lpf.thrift.api.HelloService" />
            <property name="version" value="1.0.0" />
            <property name="zkClient" ref="thriftZookeeper" />
         </bean>
      </property>
   </bean>
</beans>

运行服务端后,咱们能够看见zookeeper注册了多个服务地址。

详细实例这里就不详述了,请参考实例代码 https://gitee.com/lemonLove/thrift_zk

相关文章
相关标签/搜索