(接上篇《架构设计:系统间通讯(13)——RPC实例Apache Thrift 下篇(1)》)java
我已经在CSDN的资源区上传了这个示例工程的全部代码(http://download.csdn.net/detail/yinwenjie/9289999)。读者能够直接到资源下载站进行下载(不收积分哦~~^_^)。这篇文章将紧接上文,主要介绍这个工程几个主要的类代码。node
服务端主程序的类名:processor.MainProcessor,它负责在服务端启动Apache Thrift而且在服务监听启动成功后,链接到zookeeper,注册这个服务的基本信息。apache
这里要注意一下,Apache Thrift的服务监听是阻塞式的,因此processor.MainProcessor的Apache Thrift操做应该另起线程进行(processor.MainProcessor.StartServerThread),而且经过线程间的锁定操做,保证zookeeper的链接必定是在Apache Thrift成功启动后才进行。json
package processor; import java.io.IOException; import java.util.Set; import java.util.concurrent.Executors; import net.sf.json.JSONObject; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.server.ServerContext; import org.apache.thrift.server.TServerEventHandler; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.server.TThreadPoolServer.Args; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; import business.BusinessServicesMapping; import thrift.iface.DIYFrameworkService; import thrift.iface.DIYFrameworkService.Iface; public class MainProcessor { static { BasicConfigurator.configure(); } /** * 日志 */ private static final Log LOGGER = LogFactory.getLog(MainProcessor.class); private static final Integer SERVER_PORT = 8090; /** * 专门用于锁定以保证这个主线程不退出的一个object对象 */ private static final Object WAIT_OBJECT = new Object(); /** * 标记apache thrift是否启动成功了 * 只有apache thrift启动成功了,才须要链接到zk */ private boolean isthriftStart = false; public static void main(String[] args) { /* * 主程序要作的事情: * * 一、启动thrift服务。而且服务调用者的请求 * 二、链接到zk,并向zk注册本身提供的服务名称,告知zk真实的访问地址、访问端口 * (向zk注册的服务,存储在BusinessServicesMapping这个类的K-V常量中) * */ //一、========启动thrift服务 MainProcessor mainProcessor = new MainProcessor(); mainProcessor.startServer(); // 一直等待,apache thrift启动完成 synchronized (mainProcessor) { try { while(!mainProcessor.isthriftStart) { mainProcessor.wait(); } } catch (InterruptedException e) { MainProcessor.LOGGER.error(e); System.exit(-1); } } //二、========链接到zk try { mainProcessor.connectZk(); } catch (IOException | KeeperException | InterruptedException e) { MainProcessor.LOGGER.error(e); System.exit(-1); } // 这个wait在业务层面,没有任何意义。只是为了保证这个守护线程不会退出 synchronized (MainProcessor.WAIT_OBJECT) { try { MainProcessor.WAIT_OBJECT.wait(); } catch (InterruptedException e) { MainProcessor.LOGGER.error(e); System.exit(-1); } } } /** * 这个私有方法用于链接到zk上,而且注册相关服务 * @throws IOException * @throws InterruptedException * @throws KeeperException */ private void connectZk() throws IOException, KeeperException, InterruptedException { // 读取这个服务提供者,须要在zk上注册的服务 Set<String> serviceNames = BusinessServicesMapping.SERVICES_MAPPING.keySet(); // 若是没有任何服务须要注册到zk,那么这个服务提供者就没有继续注册的必要了 if(serviceNames == null || serviceNames.isEmpty()) { return; } // 默认的监听器 MyDefaultWatcher defaultWatcher = new MyDefaultWatcher(); // 链接到zk服务器集群,添加默认的watcher监听 ZooKeeper zk = new ZooKeeper("192.168.61.128:2181", 120000, defaultWatcher); //建立一个父级节点Service Stat pathStat = null; try { pathStat = zk.exists("/Service", defaultWatcher); //若是条件成立,说明节点不存在(只须要判断一个节点的存在性便可) //建立的这个节点是一个“永久状态”的节点 if(pathStat == null) { zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch(Exception e) { System.exit(-1); } // 开始添加子级节点,每个子级节点都表示一个这个服务提供者提供的业务服务 for (String serviceName : serviceNames) { JSONObject nodeData = new JSONObject(); nodeData.put("ip", "127.0.0.1"); nodeData.put("port", MainProcessor.SERVER_PORT); zk.create("/Service/" + serviceName, nodeData.toString().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } //执行到这里,说明全部的service都启动完成了 MainProcessor.LOGGER.info("===================全部service都启动完成了,主线程开始启动==================="); } /** * 这个私有方法用于开启Apache thrift服务端,并进行持续监听 * @throws TTransportException */ private void startServer() { Thread startServerThread = new Thread(new StartServerThread()); startServerThread.start(); } private class StartServerThread implements Runnable { @Override public void run() { MainProcessor.LOGGER.info("看到这句就说明thrift服务端准备工做 ...."); // 服务执行控制器(只要是调度服务的具体实现该如何运行) TProcessor tprocessor = new DIYFrameworkService.Processor<Iface>(new DIYFrameworkServiceImpl()); // 基于阻塞式同步IO模型的Thrift服务,正式生产环境不建议用这个 TServerSocket serverTransport = null; try { serverTransport = new TServerSocket(MainProcessor.SERVER_PORT); } catch (TTransportException e) { MainProcessor.LOGGER.error(e); System.exit(-1); } // 为这个服务器设置对应的IO网络模型、设置使用的消息格式封装、设置线程池参数 Args tArgs = new Args(serverTransport); tArgs.processor(tprocessor); tArgs.protocolFactory(new TBinaryProtocol.Factory()); tArgs.executorService(Executors.newFixedThreadPool(100)); // 启动这个thrift服务 TThreadPoolServer server = new TThreadPoolServer(tArgs); server.setServerEventHandler(new StartServerEventHandler()); server.serve(); } } /** * 为这个TThreadPoolServer对象,设置是一个事件处理器。 * 以便在TThreadPoolServer正式开始监听服务请求前,通知mainProcessor: * “Apache Thrift已经成功启动了” * @author yinwenjie * */ private class StartServerEventHandler implements TServerEventHandler { @Override public void preServe() { /* * 须要实现这个方法,以便在服务启动成功后, * 通知mainProcessor: “Apache Thrift已经成功启动了” * */ MainProcessor.this.isthriftStart = true; synchronized (MainProcessor.this) { MainProcessor.this.notify(); } } /* (non-Javadoc) * @see org.apache.thrift.server.TServerEventHandler#createContext(org.apache.thrift.protocol.TProtocol, org.apache.thrift.protocol.TProtocol) */ @Override public ServerContext createContext(TProtocol input, TProtocol output) { /* * 无需实现 * */ return null; } @Override public void deleteContext(ServerContext serverContext, TProtocol input, TProtocol output) { /* * 无需实现 * */ } @Override public void processContext(ServerContext serverContext, TTransport inputTransport, TTransport outputTransport) { /* * 无需实现 * */ } } /** * 这是默认的watcher,什么也没有,也不须要有什么<br> * 由于按照功能需求,服务器端并不须要监控zk上的任何目录变化事件 * @author yinwenjie */ private class MyDefaultWatcher implements Watcher { public void process(WatchedEvent event) { } } }123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
服务端具体实现的代码很简单,就是在IDL脚本生成了java代码后,对DIYFrameworkService接口进行的实现。服务器
package processor; import java.nio.ByteBuffer; import net.sf.json.JSONObject; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.thrift.TException; import business.BusinessService; import business.BusinessServicesMapping; import business.exception.BizException; import business.exception.ResponseCode; import business.pojo.AbstractPojo; import business.pojo.BusinessResponsePojo; import business.pojo.DescPojo; import thrift.iface.DIYFrameworkService.Iface; import thrift.iface.EXCCODE; import thrift.iface.RESCODE; import thrift.iface.Reponse; import thrift.iface.Request; import thrift.iface.ServiceException; import utils.JSONUtils; /** * IDL文件中,咱们定义的惟一服务接口DIYFrameworkService.Iface的惟一实现 * @author yinwenjie * */ public class DIYFrameworkServiceImpl implements Iface { /** * 日志 */ public static final Log LOGGER = LogFactory.getLog(DIYFrameworkServiceImpl.class); /* (non-Javadoc) * @see thrift.iface.DIYFrameworkService.Iface#send(thrift.iface.Request) */ @SuppressWarnings("unchecked") @Override public Reponse send(Request request) throws ServiceException, TException { /* * 因为MainProcessor中,在Apache Thrift 服务端启动时已经加入了线程池,因此这里就不须要再使用线程池了 * 这个服务方法的实现,须要作如下事情: * * 一、根据request中,描述的具体服务名称,在配置信息中查找具体的服务类 * 二、使用java的反射机制,调用具体的服务类(BusinessService接口的实现类)。 * 三、根据具体的业务处理结构,构造Reponse对象,并进行返回 * */ //一、=================== String serviceName = request.getServiceName(); String className = BusinessServicesMapping.SERVICES_MAPPING.get(serviceName); //未发现服务 if(StringUtils.isEmpty(className)) { return this.buildErrorReponse("无效的服务" , null); } //二、=================== // 首先获得以json为描述格式的请求参数信息 JSONObject paramJSON = null; try { byte [] paramJSON_bytes = request.getParamJSON(); if(paramJSON_bytes != null && paramJSON_bytes.length > 0) { String paramJSON_string = new String(paramJSON_bytes); paramJSON = JSONObject.fromObject(paramJSON_string); } } catch(Exception e) { DIYFrameworkServiceImpl.LOGGER.error(e); // 向调用者抛出异常 throw new ServiceException(EXCCODE.PARAMNOTFOUND, e.getMessage()); } // 试图进行反射 BusinessService<AbstractPojo> businessServiceInstance = null; try { businessServiceInstance = (BusinessService<AbstractPojo>)Class.forName(className).newInstance(); } catch (Exception e) { DIYFrameworkServiceImpl.LOGGER.error(e); // 向调用者抛出异常 throw new ServiceException(EXCCODE.SERVICENOTFOUND, e.getMessage()); } // 进行调用 AbstractPojo returnPojo = null; try { returnPojo = businessServiceInstance.handle(paramJSON); } catch (BizException e) { DIYFrameworkServiceImpl.LOGGER.error(e); return this.buildErrorReponse(e.getMessage() , e.getResponseCode()); } // 构造处理成功状况下的返回信息 BusinessResponsePojo responsePojo = new BusinessResponsePojo(); responsePojo.setData(returnPojo); DescPojo descPojo = new DescPojo("", ResponseCode._200); responsePojo.setDesc(descPojo); // 生成json String returnString = JSONUtils.toString(responsePojo); byte[] returnBytes = returnString.getBytes(); ByteBuffer returnByteBuffer = ByteBuffer.allocate(returnBytes.length); returnByteBuffer.put(returnBytes); // 构造response Reponse reponse = new Reponse(RESCODE._200, returnByteBuffer); return reponse; } /** * 这个私有方法,用于构造“Thrift中错误的返回信息” * @param erroe_mess * @return */ private Reponse buildErrorReponse(String erroe_mess , ResponseCode responseCode) { // 构造返回信息 BusinessResponsePojo responsePojo = new BusinessResponsePojo(); responsePojo.setData(null); DescPojo descPojo = new DescPojo(erroe_mess, responseCode == null?ResponseCode._504:responseCode); responsePojo.setDesc(descPojo); // 存储byteBuffer; String responseJSON = JSONUtils.toString(responsePojo); byte[] responseJSON_bytes = responseJSON.getBytes(); ByteBuffer byteBuffer = ByteBuffer.allocate(responseJSON_bytes.length); byteBuffer.put(byteBuffer); Reponse reponse = new Reponse(RESCODE._500, byteBuffer); return reponse; } }123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
在上文中已经介绍过了,客户端有两件事情须要作:链接到zookeeper查询注册的服务该如何访问;而后向真实的服务提供者发起请求。代码以下:网络
package client; import java.nio.ByteBuffer; import java.util.List; import net.sf.json.JSONObject; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSocket; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; import thrift.iface.DIYFrameworkService.Client; import thrift.iface.Reponse; import thrift.iface.Request; import utils.JSONUtils; public class ThriftClient { /** * 日志 */ private static final Log LOGGER = LogFactory.getLog(ThriftClient.class); private static final String SERVCENAME = "queryUserDetailService"; static { BasicConfigurator.configure(); } public static final void main(String[] main) throws Exception { /* * 服务治理框架的客户端示例,要作如下事情: * * 一、链接到zk,查询当前zk下提供的服务列表中是否有本身须要的服务名称(queryUserDetailService) * 二、若是没有找到须要的服务名称,则客户端终止工做 * 三、若是找到了服务,则经过服务给出的ip,port,基于Thrift进行正式请求 * (这时,和zookeeper是否断开,关系就不大了) * */ // 一、=========================== // 默认的监听器 ClientDefaultWatcher defaultWatcher = new ClientDefaultWatcher(); // 链接到zk服务器集群,添加默认的watcher监听 ZooKeeper zk = new ZooKeeper("192.168.61.128:2181", 120000, defaultWatcher); /* * 为何客户端链接上来之后,也可能建立一个Service根目录呢? * 由于正式的环境下,不能保证客户端一点就在服务器端所有准备好的状况下,再来作调用请求 * */ Stat pathStat = null; try { pathStat = zk.exists("/Service", defaultWatcher); //若是条件成立,说明节点不存在(只须要判断一个节点的存在性便可) //建立的这个节点是一个“永久状态”的节点 if(pathStat == null) { zk.create("/Service", "".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch(Exception e) { System.exit(-1); } // 二、=========================== //获取服务列表(不须要作任何的事件监听,因此第二个参数能够为false) List<String> serviceList = zk.getChildren("/Service", false); if(serviceList == null || serviceList.isEmpty()) { ThriftClient.LOGGER.info("未发现相关服务,客户端退出"); return; } //而后查看要找寻的服务是否在存在 boolean isFound = false; byte[] data; for (String serviceName : serviceList) { if(StringUtils.equals(serviceName, ThriftClient.SERVCENAME)) { isFound = true; break; } } if(!isFound) { ThriftClient.LOGGER.info("未发现相关服务,客户端退出"); return; } else { data = zk.getData("/Service/" + ThriftClient.SERVCENAME, false, null); } /* * 执行到这里,zk的工做就完成了,接下来zk是否断开,就不重要了 * */ zk.close(); if(data == null || data.length == 0) { ThriftClient.LOGGER.info("未发现有效的服务端地址,客户端退出"); return; } // 获得服务器地值说明 JSONObject serverTargetJSON = null; String serverIp; String serverPort; try { serverTargetJSON = JSONObject.fromObject(new String(data)); serverIp = serverTargetJSON.getString("ip"); serverPort = serverTargetJSON.getString("port"); } catch(Exception e) { ThriftClient.LOGGER.error(e); return; } //三、=========================== TSocket transport = new TSocket(serverIp, Integer.parseInt(serverPort)); TProtocol protocol = new TBinaryProtocol(transport); // 准备调用参数 JSONObject jsonParam = new JSONObject(); jsonParam.put("username", "yinwenjie"); byte[] params = jsonParam.toString().getBytes(); ByteBuffer buffer = ByteBuffer.allocate(params.length); buffer.put(params); Request request = new Request(buffer, ThriftClient.SERVCENAME); // 开始调用 Client client = new Client(protocol); // 准备传输 transport.open(); // 正式调用接口 Reponse reponse = client.send(request); // 必定要记住关闭 transport.close(); // 将返回信息显示出来 ThriftClient.LOGGER.info(JSONUtils.toString(reponse)); } } /** * 这是默认的watcher,什么也没有,也不须要有什么<br> * 由于按照功能需求,客户端并不须要监控zk上的任何目录变化事件 * @author yinwenjie */ class ClientDefaultWatcher implements Watcher { public void process(WatchedEvent event) { } }123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
以上代码是服务器端、客户端的主要代码。整个工程还有其余的辅助代码,为了让各位读者可以看得清楚直接,咱们将整个工程结构进行一下说明,下载后导入的工程结构以下图所示:架构
这是一个典型的JAVA工程。请使用 JDK 1.6+ 版本。咱们将讲解整个工程结构。首先来看看这个工程中主要的package和它们的做用。app
business:具体的业务层逻辑都在这个包里面,其中exception包含了一个业务层异常的定义BizException,还有错误代码ResponseCode;impl包中放置具体的业务层实现,它们都必须实现BusinessService接口;Pojo是业务层对象模型。client:为了简单起见,我将服务端的实现和客户端的实现放置在一个工程中,client这个包就是客户端的实现代码了;utils包放置了两个工具类,用来进行日期格式化的DataUtils和用来进行json转换的JSONUtils。框架
定义的apache thrift IDL文件放置在thrift文件夹下面,名字叫作:demoHello.thrift;您可使用它生成各类语言的代码;maven
工程须要maven的支持。