#<i class="icon-file">Gaea请求处理流程</i>java
Gaea支持tcp/http/telnet三种通讯信息,其中主要的通讯部分是由netty通讯框架完成,netty提供了一种高性能的非阻塞通讯工具。bootstrap
##<i class="icon-share">Gaea各服务启动</i>服务器
启动服务的配置,这里就tcp的配置简单介绍一下,telnet,http的基本相同。多线程
<property> <name>gaea.servers</name> <value>gaea.server.tcp,gaea.server.http,gaea.server.telnet</value> </property> <property> <name>gaea.server.tcp.enable</name> <value>true</value> </property> <property> <name>gaea.server.tcp.implement</name> <value>com.bj58.spat.gaea.server.core.communication.tcp.SocketServer</value> </property> //其中还有一些配置IP和端口,字符缓冲区大小等,就很少送,看配置便可明白
配置以上的配置,便可标明TCP服务的启动。也能够本身实现某一个服务,继承IServer接口,而后在配置文件中添加以上配置,便可在服务启动的时候,启动你的服务。如Gaea监控服务。app
IServer框架
public interface IServer { public void start() throws Exception; public void stop() throws Exception; }
SocketServer是TCP服务的IServer的实现类。dom
//SocketServer的主要代码 initSocketServer() { //初始化TCPServer SocketHandler handler = new SocketHandler(); //TCP服务处理事件的Handler bootstrap.setPipelineFactory(new SocketPipelineFactory(handler, Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.frameMaxLength"))); bootstrap.setOption("child.tcpNoDelay", tcpNoDelay); bootstrap.setOption("child.receiveBufferSize", Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.receiveBufferSize")); bootstrap.setOption("child.sendBufferSize", Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.sendBufferSize")); try { InetSocketAddress socketAddress = null; socketAddress = new InetSocketAddress(Global.getSingleton().getServiceConfig().getString("gaea.server.tcp.listenIP"), Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.listenPort")); Channel channel = bootstrap.bind(socketAddress); //绑定地址 allChannels.add(channel); //把生成的channel放入一个ChannelGroup中,提供后续使用 } catch (Exception e) { logger.error("init socket server error", e); System.exit(1); } }
关于netty,网上的资料也不少,能够多看看。异步
##<i class="icon-share">Gaea接收数据</i>socket
netty接收数据等事件处理,都在Handler中实现,所以在这里说一下SocketHandlerasync
在Channel被绑定的时候,触发ChannelOpen事件;
SocketServer.allChannels.add(e.getChannel()); /** * 若是当前服务启动权限认证,则增长当前链接对应的SecureContext */ if(Global.getSingleton().getGlobalSecureIsRights()){//是否经过链接认证 Global.getSingleton().addChannelMap(e.getChannel(), new SecureContext());此链接对应一个SecureContext。 }
在客户端链接的时候,触发channelConnected事件;
for(IFilter filter : Global.getSingleton().getConnectionFilterList()) { //Global中取出启动时,注册的链接过滤器。 filter.filter(new GaeaContext(new GaeaChannel(e.getChannel())));// 执行链接过滤器。 }
主要执行链接过滤器,对链接进行控制
接收数据
try { logger.debug("message receive"); ByteBuffer buffer = ((ChannelBuffer)e.getMessage()).toByteBuffer(); //Netty接收到的数据在ChannelBuffer中,在此转为NIO的ByteBuffer byte[] reciveByte = buffer.array(); logger.debug("reciveByte.length:" + reciveByte.length); byte[] headDelimiter = new byte[0]; System.arraycopy(reciveByte, 0, headDelimiter, 0, 0); byte[] requestBuffer = new byte[reciveByte.length]; System.arraycopy(reciveByte, 0, requestBuffer, 0, (reciveByte.length)); GaeaContext gaeaContext = new GaeaContext(requestBuffer, //生成Gaea上下文GaeaContext; GaeaContext将贯穿于整个消息的处理流程。 new GaeaChannel(e.getChannel()), ServerType.TCP, this); SocketServer.invokerHandle.invoke(gaeaContext);//处理这个GaeaContext } catch(Throwable ex) { byte[] response = ExceptionHelper.createErrorProtocol(); //若是是异常,按照异常协议,返回给调用者 e.getChannel().write(response); logger.error("SocketHandler invoke error", ex); }
在invoke中,Gaea提供同步和异步两种处理方式,固然同步应该是初期产物,如今处理基本都是异步处理
##<i class="icon-share">同步处理流程</i>
@Override public void invoke(GaeaContext context) throws Exception { try { for(IFilter f : Global.getSingleton().getGlobalRequestFilterList()) {//请求过滤器 if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.RequestOnly) { f.filter(context); //执行请求过滤器的filter方法 } } if(context.isDoInvoke()) { doInvoke(context);//执行请求方法 } logger.debug("begin response filter"); // response filter for(IFilter f : Global.getSingleton().getGlobalResponseFilterList()) {//返回过滤器 if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.ResponseOnly) { f.filter(context);//执行返回过滤器的filter方法 } } context.getServerHandler().writeResponse(context);//写回给调用者 } catch(Exception ex) { context.setError(ex); context.getServerHandler().writeResponse(context); //若是出现异常,将异常写回给调用者 logger.error("in async messageReceived", ex); } }
从以上代码中能够看出,Gaea先通过了一系列请求过滤器,而后才执行真正的方法,最终再执行返回过滤器,最后写回给客户端。
##<i class="icon-share">异步处理流程</i>
调用invoke以后,Gaea的此次请求将会被放入AsyncInvoker的异步执行器中执行,并快速返回,接收下次请求;
asyncInvoker.run(taskTimeOut, new IAsyncHandler(){...};
IAsynHandler中,具体有三个方法,run是执行任务,messageReceived是执行完以后,返回执行结果,exceptionCaught对执行过程当中的全部异常进行处理。IAsyncHandler定义以下:
public interface IAsyncHandler { public Object run() throws Throwable; public void messageReceived(Object obj); public void exceptionCaught(Throwable e); }
在执行asyncInvoker.run的时候,异步执行器,只是把任务扔给了64个队列,此处默认是64,也能够进行配置,配置项gaea.async.worker.count
AsyncTask task = new AsyncTask(timeOut, handler); //handler放入异步任务中,timeOut为此任务的超时时间, if(rr > 10000) { rr = 0; } int idx = rr % workers.length; //轮询放入多个队列 workers[idx].addTask(task); ++rr;
其中异步任务中有一个超时时间,若是在队列中的时间大于这个值,Gaea将抛弃此任务,保护总体服务的正常运行,这个也就是Gaea的服务器负载保护策略,防止服务端压力过大宕机,丢弃部分任务,以保护大多数任务的有效执行。
在初始化异步执行器的时候,启动了64个工做线程和一个线程池
private AsyncInvoker(int workerCount, boolean timeoutEffect) { workers = new AsyncWorker[workerCount]; ExecutorService executor = Executors.newCachedThreadPool(new ThreadRenameFactory("async task thread")); for(int i=0; i<workers.length; i++) { workers[i] = new AsyncWorker(executor, timeoutEffect); workers[i].setDaemon(true); workers[i].setName("async task worker[" + i + "]"); workers[i].start(); } }
在此,提供64个线程和一个线程池的做用是Gaea提供的两种处理任务的方式,一种是任务分离,64个队列各自处理各自的任务,一种是线程池,处理单个队列,并设置了任务的多执行时间。
private void execNoTimeLimitTask() { AsyncTask task = null; try { task = taskQueue.take(); if(task != null){ if((System.currentTimeMillis() - task.getAddTime()) > task.getQtimeout()) { //超时 task.getHandler().exceptionCaught(new TimeoutException("async task timeout!")); return; } else { Object obj = task.getHandler().run(); //执行 task.getHandler().messageReceived(obj); //返回 } }else{ logger.error("execNoTimeLimitTask take task is null"); } } catch(InterruptedException ie) { } catch(Throwable ex) { if(task != null) { task.getHandler().exceptionCaught(ex); //处理异常 } } }
try { final AsyncTask task = taskQueue.take(); if(task != null) { if((System.currentTimeMillis() - task.getAddTime()) > task.getQtimeout()) { task.getHandler().exceptionCaught(new TimeoutException("async task timeout!")); return; }else{ final CountDownLatch cdl = new CountDownLatch(1); executor.execute(new Runnable(){ @Override public void run() { try { Object obj = task.getHandler().run(); //执行 task.getHandler().messageReceived(obj); //返回 } catch(Throwable ex) { task.getHandler().exceptionCaught(ex); } finally { cdl.countDown(); } } } ); cdl.await(getTimeout(task.getTimeout(), taskQueue.size()), TimeUnit.MILLISECONDS); //等待cdl.countDown通知,不然超时,任务抛弃。 if(cdl.getCount() > 0) { task.getHandler().exceptionCaught(new TimeoutException("async task timeout!")); //异常 } } }else{ logger.error("execTimeoutTask take task is null"); } } catch(InterruptedException ie) { logger.error(""); } catch (Throwable e) { logger.error("get task from poll error", e); }
public Object run() throws Throwable { logger.debug("begin request filter"); // request filter for(IFilter f : Global.getSingleton().getGlobalRequestFilterList()) { if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.RequestOnly) { f.filter(context); } } if(context.isDoInvoke()) { if(context.getServerType() == ServerType.HTTP){ //对http服务进行处理 httpThreadLocal.set(context.getHttpContext()); } doInvoke(context); } logger.debug("begin response filter"); // response filter for(IFilter f : Global.getSingleton().getGlobalResponseFilterList()) { if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.ResponseOnly) { f.filter(context); } } return context; }
从以上代码能够看出,处理流程上,跟同步基本相同,只是加了对http服务的处理,所以能够看出,同步是不支持HTTP服务的。返回部分和异常也不是在run中执行,而是分散开,在messageReceived和exceptionCaught中进行处理。具体祥看代码。
##<i class="icon-share">请求过滤器</i>
请求过滤器,顾名思义,是在执行任务以前,Gaea对请求作的一些处理。Gaea默认的框架的请求过滤器都有哪些?
<property> <name>gaea.filter.global.request</name> <value>com.bj58.spat.gaea.server.filter.ProtocolParseFilter,com.bj58.spat.gaea.server.filter.HandclaspFilter,com.bj58.spat.gaea.server.filter.ExecuteMethodFilter</value> </property>
在Gaea刚接收到数据的时候,Gaea将请求的二进制流放入了上下文根GaeaContext,在此Gaea对二进制流进行了解析,还原,用于执行任务
public void filter(GaeaContext context) throws Exception { if(context.getServerType() == ServerType.TCP) { byte[] desKeyByte = null; String desKeyStr = null; boolean bool = false; Global global = Global.getSingleton(); if(global != null){ //判断当前服务启用权限认证 if(global.getGlobalSecureIsRights()){ SecureContext securecontext = global.getGlobalSecureContext(context.getChannel().getNettyChannel()); bool = securecontext.isRights(); if(bool){ desKeyStr = securecontext.getDesKey(); } } } if(desKeyStr != null){ desKeyByte = desKeyStr.getBytes("utf-8"); } Protocol protocol = Protocol.fromBytes(context.getGaeaRequest().getRequestBuffer(),global.getGlobalSecureIsRights(),desKeyByte); //根据deskey进行协议解析 context.getGaeaRequest().setProtocol(protocol); /** * 服务重启直接返回 */ if(Global.getSingleton().getServerState() == ServerStateType.Reboot && protocol.getPlatformType() == PlatformType.Java){ GaeaResponse response = new GaeaResponse(); ResetProtocol rp = new ResetProtocol(); rp.setMsg("This server is reboot!"); protocol.setSdpEntity(rp); response.setResponseBuffer(protocol.toBytes(global.getGlobalSecureIsRights(),desKeyByte)); context.setGaeaResponse(response); context.setExecFilter(ExecFilterType.None); context.setDoInvoke(false); } } }
整个流程如以上代码,先判断是否启用权限认证,再根据权限认证,对二进制流进行协议解析。关于Gaea协议,是一个自定义的二进制协议,具体另起文章详解。解析后若是是服务重启任务,则写入GaeaContext,供后续操做。
客户端第一次请求,会跟服务端进行DES加解密的交互,肯定客户端是否有权限调用此服务,具体过程见代码
/** * 权限认证filter */ @Override public void filter(GaeaContext context) throws Exception { Protocol protocol = context.getGaeaRequest().getProtocol(); if(protocol.getPlatformType() == PlatformType.Java && context.getServerType() == ServerType.TCP){//java 客户端支持权限认证 GaeaResponse response = new GaeaResponse(); Global global = Global.getSingleton(); //是否启用权限认证 if(Global.getSingleton().getGlobalSecureIsRights()){ SecureContext sc = global.getGlobalSecureContext(context.getChannel().getNettyChannel()); //判断当前channel是否经过认证 if(!sc.isRights()){ //没有经过认证 if(protocol != null && protocol.getSdpEntity() instanceof HandclaspProtocol){ SecureKey sk = new SecureKey(); HandclaspProtocol handclaspProtocol = (HandclaspProtocol)protocol.getSdpEntity(); /** * 接收 客户端公钥 */ if("1".equals(handclaspProtocol.getType())){ sk.initRSAkey(); //客户端发送公钥数据 String clientPublicKey = handclaspProtocol.getData(); if(null == clientPublicKey || "".equals(clientPublicKey)){ logger.warn("get client publicKey warn!"); } //java 客户端 if(protocol.getPlatformType() == PlatformType.Java){ //服务器生成公/私钥,公钥传送给客户端 sc.setServerPublicKey(sk.getStringPublicKey()); sc.setServerPrivateKey(sk.getStringPrivateKey()); sc.setClientPublicKey(clientPublicKey); handclaspProtocol.setData(sk.getStringPublicKey());//服务器端公钥 } protocol.setSdpEntity(handclaspProtocol); response.setResponseBuffer(protocol.toBytes()); context.setGaeaResponse(response); this.setInvokeAndFilter(context); logger.info("send server publieKey sucess!"); } /** * 接收权限文件 */ else if("2".equals(handclaspProtocol.getType())){ //客户端加密受权文件 String clientSecureInfo = handclaspProtocol.getData(); if(null == clientSecureInfo || "".equals(clientSecureInfo)){ logger.warn("get client secureKey warn!"); } //受权文件客户端原文(服务器私钥解密) String sourceInfo = sk.decryptByPrivateKey(clientSecureInfo, sc.getServerPrivateKey()); //校验受权文件是否相同 //判断是否合法,若是合法服务器端生成DES密钥,经过客户端提供的公钥进行加密传送给客户端 if(global.containsSecureMap(sourceInfo)){ logger.info("secureKey is ok!"); String desKey = StringUtils.getRandomNumAndStr(8); //设置当前channel属性 sc.setDesKey(desKey); sc.setRights(true); handclaspProtocol.setData(sk.encryptByPublicKey(desKey, sc.getClientPublicKey())); protocol.setSdpEntity(handclaspProtocol); response.setResponseBuffer(protocol.toBytes()); context.setGaeaResponse(response); }else{ logger.error("It's bad secureKey!"); this.ContextException(context, protocol, response, "受权文件错误!"); } this.setInvokeAndFilter(context); }else{ //权限认证 异常状况 logger.error("权限认证异常!"); this.ContextException(context, protocol, response, "权限认证 异常!"); } response = null; sk = null; handclaspProtocol = null; }else{ //客户端没有启动权限认证 logger.error("客户端没有启用权限认证!"); this.ContextException(context, protocol, response, "客户端没有启用权限认证!"); } } }else{ if(protocol != null && protocol.getSdpEntity() instanceof HandclaspProtocol){ //异常--当前服务器没有启动权限认证 logger.error("当前服务没有启用权限认证!"); this.ContextException(context, protocol, response, "当前服务没有启用权限认证!"); } } } }
服务端受权文件,对须要执行的方法,进行受权配置,当调用者调用的时候,此方法是否通过受权。在gaea中没有看到关于此受权配置的文件。这里就很少说,内部系统对于受权访问的控制并不严格。
##<i class="icon-share">执行任务</i>
执行任务是doInvoke(gaeaContext)
这一步比较简单,根据协议解析过滤器解析出来的请求数据,找到请求的类名,再根据类名,从IProxyFactory中取出与之对应的代理类,而后代理去执行真正的方法,关于IProxyFactory类,请看Gaea的启动过程当中如何生成的。
IProxyStub localProxy = Global.getSingleton().getProxyFactory().getProxy(request.getLookup()); //获取代理类 GaeaResponse gaeaResponse = localProxy.invoke(context);//执行真正的方法
在此过程当中,还对各类异常作了处理,全部的处理结果都放到了GaeaContext中。StopWatch主要记录调用信息,并在返回过滤器中,记录执行时间。
##<i class="icon-share">返回过滤器</i>
框架的返回过滤器
<!-- global response filter --> <property> <name>gaea.filter.global.response</name> <value>com.bj58.spat.gaea.server.filter.ProtocolCreateFilter,com.bj58.spat.gaea.server.filter.ExecuteTimeFilter</value> </property>
context.getGaeaResponse().setResponseBuffer(protocol.toBytes(Global.getSingleton().getGlobalSecureIsRights(),desKeyByte));
将最终执行结果转换为二进制流放入GaeaContext。
对方法的执行时间进行监控,并将结果发到一个UDP日志服务器。
public void filter(GaeaContext context) throws Exception { StopWatch sw = context.getStopWatch(); Collection<PerformanceCounter> pcList = sw.getMapCounter().values(); for(PerformanceCounter pc : pcList) { if(pc.getExecuteTime() > minRecordTime) { StringBuilder sbMsg = new StringBuilder(); sbMsg.append(serviceName); sbMsg.append("--"); sbMsg.append(pc.getKey()); sbMsg.append("--time: "); sbMsg.append(pc.getExecuteTime()); sbMsg.append(" [fromIP: "); sbMsg.append(sw.getFromIP()); sbMsg.append(";localIP: "); sbMsg.append(sw.getLocalIP()+"]"); udpClient.send(sbMsg.toString()); } }
##<i class="icon-share">结果返回</i>
在以上过滤器,执行等过程,都是将所获得的结果封装到了上下文GaeaContext中,在这一步将其返回
public void messageReceived(Object obj) { if(context.getServerType() == ServerType.HTTP){ httpThreadLocal.remove(); } if(obj != null) { GaeaContext ctx = (GaeaContext)obj; ctx.getServerHandler().writeResponse(ctx); } else { logger.error("context is null!"); } } if(context != null && context.getGaeaResponse() != null){ context.getChannel().write(context.getGaeaResponse().getResponseBuffer()); //getResponseBuffer 结果的二进制流 } else { context.getChannel().write(new byte[]{0}); logger.error("context is null or response is null in writeResponse"); }
##<i class="icon-share">异常处理</i>
Gaea服务的执行过程当中,全部的异常都会抛出,被exceptionCaught(exception)接收,并通过Gaea封装,序列化,返回给客户端,告诉调用者,是什么缘由致使调用失败。
public void exceptionCaught(Throwable e) { if(context.getServerType() == ServerType.HTTP){ httpThreadLocal.remove(); } if(context.getGaeaResponse() == null){ GaeaResponse respone = new GaeaResponse(); context.setGaeaResponse(respone); } try { byte[] desKeyByte = null; String desKeyStr = null; boolean bool = false; Global global = Global.getSingleton(); if(global != null){ //判断当前服务启用权限认证 if(global.getGlobalSecureIsRights()){ SecureContext securecontext = global.getGlobalSecureContext(context.getChannel().getNettyChannel()); bool = securecontext.isRights(); if(bool){ desKeyStr = securecontext.getDesKey(); } } } if(desKeyStr != null){ desKeyByte = desKeyStr.getBytes("utf-8"); } Protocol protocol = context.getGaeaRequest().getProtocol(); if(protocol == null){ protocol = Protocol.fromBytes(context.getGaeaRequest().getRequestBuffer(),global.getGlobalSecureIsRights(),desKeyByte); context.getGaeaRequest().setProtocol(protocol); } protocol.setSdpEntity(ExceptionHelper.createError(e)); context.getGaeaResponse().setResponseBuffer(protocol.toBytes(Global.getSingleton().getGlobalSecureIsRights(),desKeyByte)); } catch (Exception ex) { context.getGaeaResponse().setResponseBuffer(new byte[]{0}); logger.error("AsyncInvokerHandle invoke-exceptionCaught error", ex); } context.getServerHandler().writeResponse(context); logger.error("AsyncInvokerHandle invoke error", e); } });
##<i class="icon-share">总结</i>
至此,一个客户端的请求处理完毕,在Gaea的整个设计中能够看出,不少东西都留出了接口,可以很好的在框架自己,作一些适合本身业务的处理,整个设计,则决定了服务通讯框架的性能。
###le284