高性能服务通讯框架Gaea的详细实现--server请求处理流程

#<i class="icon-file">Gaea请求处理流程</i>java

Gaea支持tcp/http/telnet三种通讯信息,其中主要的通讯部分是由netty通讯框架完成,netty提供了一种高性能的非阻塞通讯工具。bootstrap

##<i class="icon-share">Gaea各服务启动</i>服务器

启动服务的配置,这里就tcp的配置简单介绍一下,telnet,http的基本相同。多线程

<property>
<name>gaea.servers</name> 
<value>gaea.server.tcp,gaea.server.http,gaea.server.telnet</value>
</property>
<property>
<name>gaea.server.tcp.enable</name>
<value>true</value>
</property>
<property>
<name>gaea.server.tcp.implement</name>
<value>com.bj58.spat.gaea.server.core.communication.tcp.SocketServer</value>
</property>
//其中还有一些配置IP和端口,字符缓冲区大小等,就很少送,看配置便可明白

配置以上的配置,便可标明TCP服务的启动。也能够本身实现某一个服务,继承IServer接口,而后在配置文件中添加以上配置,便可在服务启动的时候,启动你的服务。如Gaea监控服务。app

IServer框架

public interface IServer {

	public void start() throws Exception;

	public void stop() throws Exception;
}

SocketServer是TCP服务的IServer的实现类。dom

//SocketServer的主要代码
    initSocketServer() { //初始化TCPServer
        SocketHandler handler = new SocketHandler(); //TCP服务处理事件的Handler
        bootstrap.setPipelineFactory(new SocketPipelineFactory(handler, 
        		Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.frameMaxLength")));
        bootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
        bootstrap.setOption("child.receiveBufferSize", 
        		Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.receiveBufferSize"));
        bootstrap.setOption("child.sendBufferSize", 
        		Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.sendBufferSize"));

        try {
        	InetSocketAddress socketAddress = null;
        	socketAddress = new InetSocketAddress(Global.getSingleton().getServiceConfig().getString("gaea.server.tcp.listenIP"),
        			Global.getSingleton().getServiceConfig().getInt("gaea.server.tcp.listenPort"));
            Channel channel = bootstrap.bind(socketAddress); //绑定地址
            allChannels.add(channel); //把生成的channel放入一个ChannelGroup中,提供后续使用
		} catch (Exception e) {
			logger.error("init socket server error", e);
			System.exit(1);
		}
    
    }

关于netty,网上的资料也不少,能够多看看。异步

##<i class="icon-share">Gaea接收数据</i>socket

netty接收数据等事件处理,都在Handler中实现,所以在这里说一下SocketHandlerasync

  1. channelOpen

在Channel被绑定的时候,触发ChannelOpen事件;

SocketServer.allChannels.add(e.getChannel());
		/**
		 * 若是当前服务启动权限认证,则增长当前链接对应的SecureContext
		 */
		if(Global.getSingleton().getGlobalSecureIsRights()){//是否经过链接认证
			Global.getSingleton().addChannelMap(e.getChannel(), new SecureContext());此链接对应一个SecureContext。
		}
  1. channelConnected

在客户端链接的时候,触发channelConnected事件;

for(IFilter filter : Global.getSingleton().getConnectionFilterList()) { //Global中取出启动时,注册的链接过滤器。
			filter.filter(new GaeaContext(new GaeaChannel(e.getChannel())));// 执行链接过滤器。
		}

主要执行链接过滤器,对链接进行控制

  1. messageReceived

接收数据

try {
			logger.debug("message receive");
			ByteBuffer buffer = ((ChannelBuffer)e.getMessage()).toByteBuffer(); //Netty接收到的数据在ChannelBuffer中,在此转为NIO的ByteBuffer
			byte[] reciveByte = buffer.array();
			logger.debug("reciveByte.length:" + reciveByte.length);
			
			byte[] headDelimiter = new byte[0];
			System.arraycopy(reciveByte, 0, headDelimiter, 0, 0);
			
			byte[] requestBuffer = new byte[reciveByte.length];
			System.arraycopy(reciveByte, 0, requestBuffer, 0, (reciveByte.length));
			
			GaeaContext gaeaContext = new GaeaContext(requestBuffer, //生成Gaea上下文GaeaContext; GaeaContext将贯穿于整个消息的处理流程。
					new GaeaChannel(e.getChannel()), 
					ServerType.TCP,
					this);
			
			SocketServer.invokerHandle.invoke(gaeaContext);//处理这个GaeaContext
		} catch(Throwable ex) {
			byte[] response = ExceptionHelper.createErrorProtocol(); //若是是异常,按照异常协议,返回给调用者
			e.getChannel().write(response);
			logger.error("SocketHandler invoke error", ex);
		}

在invoke中,Gaea提供同步和异步两种处理方式,固然同步应该是初期产物,如今处理基本都是异步处理

##<i class="icon-share">同步处理流程</i>

@Override
	public void invoke(GaeaContext context) throws Exception {
		try {
			for(IFilter f : Global.getSingleton().getGlobalRequestFilterList()) {//请求过滤器
				if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.RequestOnly) {
					f.filter(context); //执行请求过滤器的filter方法
				}
			}
			
			if(context.isDoInvoke()) {
				doInvoke(context);//执行请求方法
			}
			
			logger.debug("begin response filter");
			// response filter
			for(IFilter f : Global.getSingleton().getGlobalResponseFilterList()) {//返回过滤器
				if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.ResponseOnly) {
					f.filter(context);//执行返回过滤器的filter方法
				}
			}
			context.getServerHandler().writeResponse(context);//写回给调用者
		} catch(Exception ex) {
			context.setError(ex);
			context.getServerHandler().writeResponse(context); //若是出现异常,将异常写回给调用者
			logger.error("in async messageReceived", ex);
		}
	}

从以上代码中能够看出,Gaea先通过了一系列请求过滤器,而后才执行真正的方法,最终再执行返回过滤器,最后写回给客户端。

##<i class="icon-share">异步处理流程</i>

调用invoke以后,Gaea的此次请求将会被放入AsyncInvoker的异步执行器中执行,并快速返回,接收下次请求;

asyncInvoker.run(taskTimeOut, new IAsyncHandler(){...};

IAsynHandler中,具体有三个方法,run是执行任务,messageReceived是执行完以后,返回执行结果,exceptionCaught对执行过程当中的全部异常进行处理。IAsyncHandler定义以下:

public interface IAsyncHandler {
	public Object run() throws Throwable;
	public void messageReceived(Object obj);
	public void exceptionCaught(Throwable e);
}

在执行asyncInvoker.run的时候,异步执行器,只是把任务扔给了64个队列,此处默认是64,也能够进行配置,配置项gaea.async.worker.count

AsyncTask task = new AsyncTask(timeOut, handler); //handler放入异步任务中,timeOut为此任务的超时时间,
		if(rr > 10000) {
			rr = 0;
		}
		int idx = rr % workers.length; //轮询放入多个队列
		workers[idx].addTask(task);
		++rr;

其中异步任务中有一个超时时间,若是在队列中的时间大于这个值,Gaea将抛弃此任务,保护总体服务的正常运行,这个也就是Gaea的服务器负载保护策略,防止服务端压力过大宕机,丢弃部分任务,以保护大多数任务的有效执行。

在初始化异步执行器的时候,启动了64个工做线程和一个线程池

private AsyncInvoker(int workerCount, boolean timeoutEffect) {
		workers = new AsyncWorker[workerCount];
		ExecutorService executor = Executors.newCachedThreadPool(new ThreadRenameFactory("async task thread"));
		
		for(int i=0; i<workers.length; i++) {
			workers[i] = new AsyncWorker(executor, timeoutEffect);
			workers[i].setDaemon(true);
			workers[i].setName("async task worker[" + i + "]");
			workers[i].start();
		}
	}

在此,提供64个线程和一个线程池的做用是Gaea提供的两种处理任务的方式,一种是任务分离,64个队列各自处理各自的任务,一种是线程池,处理单个队列,并设置了任务的多执行时间。

  1. 64个队列处理各自任务 优势:资源隔离,互不影响 肯定:若有一个任务执行慢,将影响此队列中剩余任务,致使整个队列抛弃。 解决方案:处理现场不止处理本身队列的任务,在线程空闲时,可窃取其它队列的任务。
private void execNoTimeLimitTask() {
		AsyncTask task = null;
		try {
			task = taskQueue.take();
			if(task != null){
				if((System.currentTimeMillis() - task.getAddTime()) > task.getQtimeout()) { //超时
					task.getHandler().exceptionCaught(new TimeoutException("async task timeout!"));
					return;
				} else {
					Object obj = task.getHandler().run(); //执行
					task.getHandler().messageReceived(obj); //返回
				}
			}else{
				logger.error("execNoTimeLimitTask take task is null");
			}
		} catch(InterruptedException ie) {
			
		} catch(Throwable ex) {
			if(task != null) {
				task.getHandler().exceptionCaught(ex); //处理异常
			}
		}
	}
  1. 线程池处理单个队列任务 缺点:队列异常,可致使全部任务受到影响。newCachedThreadPool的线程池,可致使建立过多线程 优势:若有个别任务较慢,也不影响其它任务执行。
try {
			final AsyncTask task = taskQueue.take();
			if(task != null) {
				if((System.currentTimeMillis() - task.getAddTime()) > task.getQtimeout()) {
					task.getHandler().exceptionCaught(new TimeoutException("async task timeout!"));
					return;
				}else{
					final CountDownLatch cdl = new CountDownLatch(1); 
					executor.execute(new Runnable(){
							@Override
							public void run() {
								try {
									Object obj = task.getHandler().run(); //执行
									task.getHandler().messageReceived(obj); //返回
								} catch(Throwable ex) {
									task.getHandler().exceptionCaught(ex);
								} finally {
									cdl.countDown();
								}
							}
						}
					);
					cdl.await(getTimeout(task.getTimeout(), taskQueue.size()), TimeUnit.MILLISECONDS); //等待cdl.countDown通知,不然超时,任务抛弃。
					if(cdl.getCount() > 0) {
						task.getHandler().exceptionCaught(new TimeoutException("async task timeout!")); //异常
					}
				}
			}else{
				logger.error("execTimeoutTask take task is null");
			}
		} catch(InterruptedException ie) {
			logger.error("");
		} catch (Throwable e) {
			logger.error("get task from poll error", e);
		}
  1. run执行过程
public Object run() throws Throwable {
				logger.debug("begin request filter");
				// request filter
				for(IFilter f : Global.getSingleton().getGlobalRequestFilterList()) {
					if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.RequestOnly) {
						f.filter(context);
					}
				}
				
				if(context.isDoInvoke()) {
					if(context.getServerType() == ServerType.HTTP){ //对http服务进行处理
						httpThreadLocal.set(context.getHttpContext());
					}
					doInvoke(context);
				}
				
				logger.debug("begin response filter");
				// response filter
				for(IFilter f : Global.getSingleton().getGlobalResponseFilterList()) {
					if(context.getExecFilter() == ExecFilterType.All || context.getExecFilter() == ExecFilterType.ResponseOnly) {
						f.filter(context);
					}
				}
				return context;
			}

从以上代码能够看出,处理流程上,跟同步基本相同,只是加了对http服务的处理,所以能够看出,同步是不支持HTTP服务的。返回部分和异常也不是在run中执行,而是分散开,在messageReceived和exceptionCaught中进行处理。具体祥看代码。

##<i class="icon-share">请求过滤器</i>

请求过滤器,顾名思义,是在执行任务以前,Gaea对请求作的一些处理。Gaea默认的框架的请求过滤器都有哪些?

<property>
<name>gaea.filter.global.request</name>
<value>com.bj58.spat.gaea.server.filter.ProtocolParseFilter,com.bj58.spat.gaea.server.filter.HandclaspFilter,com.bj58.spat.gaea.server.filter.ExecuteMethodFilter</value>
</property>
  1. ProtocolParseFilter 解析协议过滤器

在Gaea刚接收到数据的时候,Gaea将请求的二进制流放入了上下文根GaeaContext,在此Gaea对二进制流进行了解析,还原,用于执行任务

public void filter(GaeaContext context) throws Exception {
		if(context.getServerType() == ServerType.TCP) {
			byte[] desKeyByte = null;
			String desKeyStr = null;
			boolean bool = false;
			
			Global global = Global.getSingleton();
			if(global != null){
				//判断当前服务启用权限认证
				if(global.getGlobalSecureIsRights()){
					SecureContext securecontext = global.getGlobalSecureContext(context.getChannel().getNettyChannel());
					bool = securecontext.isRights();
					if(bool){
						desKeyStr = securecontext.getDesKey();
					}
				}
			}
			
			if(desKeyStr != null){
				desKeyByte = desKeyStr.getBytes("utf-8");
			}
			Protocol protocol = Protocol.fromBytes(context.getGaeaRequest().getRequestBuffer(),global.getGlobalSecureIsRights(),desKeyByte); //根据deskey进行协议解析
			context.getGaeaRequest().setProtocol(protocol);
			/**
			 * 服务重启直接返回
			 */	
			if(Global.getSingleton().getServerState() == ServerStateType.Reboot && protocol.getPlatformType() == PlatformType.Java){
				GaeaResponse response = new GaeaResponse();
				ResetProtocol rp = new ResetProtocol();
				rp.setMsg("This server is reboot!");
				protocol.setSdpEntity(rp);
				response.setResponseBuffer(protocol.toBytes(global.getGlobalSecureIsRights(),desKeyByte));
				context.setGaeaResponse(response);
				context.setExecFilter(ExecFilterType.None);
				context.setDoInvoke(false);
			}
		}
	}

整个流程如以上代码,先判断是否启用权限认证,再根据权限认证,对二进制流进行协议解析。关于Gaea协议,是一个自定义的二进制协议,具体另起文章详解。解析后若是是服务重启任务,则写入GaeaContext,供后续操做。

  1. HandclaspFilter 链接过滤器

客户端第一次请求,会跟服务端进行DES加解密的交互,肯定客户端是否有权限调用此服务,具体过程见代码

/**
	 * 权限认证filter
	 */
	@Override
	public void filter(GaeaContext context) throws Exception {
		
		Protocol protocol = context.getGaeaRequest().getProtocol();		
		if(protocol.getPlatformType() == PlatformType.Java && context.getServerType() == ServerType.TCP){//java 客户端支持权限认证
			GaeaResponse response = new GaeaResponse();
			Global global = Global.getSingleton();
			//是否启用权限认证
			if(Global.getSingleton().getGlobalSecureIsRights()){
				SecureContext sc = global.getGlobalSecureContext(context.getChannel().getNettyChannel());
				//判断当前channel是否经过认证
				if(!sc.isRights()){
					//没有经过认证
					if(protocol != null && protocol.getSdpEntity() instanceof HandclaspProtocol){
						SecureKey sk = new SecureKey();
						HandclaspProtocol handclaspProtocol = (HandclaspProtocol)protocol.getSdpEntity();
						/**
						 * 接收 客户端公钥
						 */
						if("1".equals(handclaspProtocol.getType())){
							sk.initRSAkey();
							//客户端发送公钥数据
							String clientPublicKey = handclaspProtocol.getData();
							if(null == clientPublicKey || "".equals(clientPublicKey)){
								logger.warn("get client publicKey warn!");
							}
							//java 客户端
							if(protocol.getPlatformType() == PlatformType.Java){
								//服务器生成公/私钥,公钥传送给客户端
								sc.setServerPublicKey(sk.getStringPublicKey());
								sc.setServerPrivateKey(sk.getStringPrivateKey());
								sc.setClientPublicKey(clientPublicKey);
								handclaspProtocol.setData(sk.getStringPublicKey());//服务器端公钥
							}
							
							protocol.setSdpEntity(handclaspProtocol);
							response.setResponseBuffer(protocol.toBytes());
							context.setGaeaResponse(response);
							this.setInvokeAndFilter(context);
							logger.info("send server publieKey sucess!");
						} 
						/**
						 * 接收权限文件
						 */
						else if("2".equals(handclaspProtocol.getType())){
							//客户端加密受权文件
							String clientSecureInfo = handclaspProtocol.getData();
							if(null == clientSecureInfo || "".equals(clientSecureInfo)){
								logger.warn("get client secureKey warn!");
							}
							//受权文件客户端原文(服务器私钥解密)
							String sourceInfo = sk.decryptByPrivateKey(clientSecureInfo, sc.getServerPrivateKey());
							//校验受权文件是否相同
							//判断是否合法,若是合法服务器端生成DES密钥,经过客户端提供的公钥进行加密传送给客户端
							if(global.containsSecureMap(sourceInfo)){
								logger.info("secureKey is ok!");
								String desKey = StringUtils.getRandomNumAndStr(8);
								//设置当前channel属性
								sc.setDesKey(desKey);
								sc.setRights(true);
								handclaspProtocol.setData(sk.encryptByPublicKey(desKey, sc.getClientPublicKey()));
								protocol.setSdpEntity(handclaspProtocol);
								response.setResponseBuffer(protocol.toBytes());
								context.setGaeaResponse(response);
							}else{
								logger.error("It's bad secureKey!");
								this.ContextException(context, protocol, response, "受权文件错误!");
							}
							this.setInvokeAndFilter(context);
						}else{
							//权限认证 异常状况
							logger.error("权限认证异常!");
							this.ContextException(context, protocol, response, "权限认证 异常!");
						}
						response = null;
						sk = null;
						handclaspProtocol = null;
					}else{
						//客户端没有启动权限认证
						logger.error("客户端没有启用权限认证!");
						this.ContextException(context, protocol, response, "客户端没有启用权限认证!");
					}
				}
			}else{
				if(protocol != null && protocol.getSdpEntity() instanceof HandclaspProtocol){
					//异常--当前服务器没有启动权限认证
					logger.error("当前服务没有启用权限认证!");
					this.ContextException(context, protocol, response, "当前服务没有启用权限认证!");
				}
			}
		}
	}
  1. ExecuteMethodFilter 执行方法过滤器

服务端受权文件,对须要执行的方法,进行受权配置,当调用者调用的时候,此方法是否通过受权。在gaea中没有看到关于此受权配置的文件。这里就很少说,内部系统对于受权访问的控制并不严格。

##<i class="icon-share">执行任务</i>

执行任务是doInvoke(gaeaContext)

这一步比较简单,根据协议解析过滤器解析出来的请求数据,找到请求的类名,再根据类名,从IProxyFactory中取出与之对应的代理类,而后代理去执行真正的方法,关于IProxyFactory类,请看Gaea的启动过程当中如何生成的。

IProxyStub localProxy = Global.getSingleton().getProxyFactory().getProxy(request.getLookup()); //获取代理类
    
    GaeaResponse gaeaResponse = localProxy.invoke(context);//执行真正的方法

在此过程当中,还对各类异常作了处理,全部的处理结果都放到了GaeaContext中。StopWatch主要记录调用信息,并在返回过滤器中,记录执行时间。

##<i class="icon-share">返回过滤器</i>

框架的返回过滤器

<!-- global response filter -->
<property>
<name>gaea.filter.global.response</name>
<value>com.bj58.spat.gaea.server.filter.ProtocolCreateFilter,com.bj58.spat.gaea.server.filter.ExecuteTimeFilter</value>
</property>
  1. ProtocolCreateFilter 建立协议过滤器
context.getGaeaResponse().setResponseBuffer(protocol.toBytes(Global.getSingleton().getGlobalSecureIsRights(),desKeyByte));

将最终执行结果转换为二进制流放入GaeaContext。

  1. ExecuteTimeFilter 执行时间监视过滤器

对方法的执行时间进行监控,并将结果发到一个UDP日志服务器。

public void filter(GaeaContext context) throws Exception {
		StopWatch sw = context.getStopWatch();
		Collection<PerformanceCounter> pcList = sw.getMapCounter().values();
		for(PerformanceCounter pc : pcList) {
			if(pc.getExecuteTime() > minRecordTime) {
				StringBuilder sbMsg = new StringBuilder();
				sbMsg.append(serviceName);
				sbMsg.append("--");
				sbMsg.append(pc.getKey());
				sbMsg.append("--time: ");
				sbMsg.append(pc.getExecuteTime());
				
				sbMsg.append(" [fromIP: ");
				sbMsg.append(sw.getFromIP());
				sbMsg.append(";localIP: ");
				sbMsg.append(sw.getLocalIP()+"]");
				
				udpClient.send(sbMsg.toString());
			}
		}

##<i class="icon-share">结果返回</i>

在以上过滤器,执行等过程,都是将所获得的结果封装到了上下文GaeaContext中,在这一步将其返回

public void messageReceived(Object obj) {
				if(context.getServerType() == ServerType.HTTP){
					httpThreadLocal.remove();
				}
				if(obj != null) {
					GaeaContext ctx = (GaeaContext)obj;
					ctx.getServerHandler().writeResponse(ctx);
				} else {
					logger.error("context is null!");
				}
			}
		if(context != null && context.getGaeaResponse() != null){
			context.getChannel().write(context.getGaeaResponse().getResponseBuffer()); //getResponseBuffer 结果的二进制流
		} else {
			context.getChannel().write(new byte[]{0}); 
			logger.error("context is null or response is null in writeResponse");
		}

##<i class="icon-share">异常处理</i>

Gaea服务的执行过程当中,全部的异常都会抛出,被exceptionCaught(exception)接收,并通过Gaea封装,序列化,返回给客户端,告诉调用者,是什么缘由致使调用失败。

public void exceptionCaught(Throwable e) {
				if(context.getServerType() == ServerType.HTTP){
					httpThreadLocal.remove();
				}
				
				if(context.getGaeaResponse() == null){
					GaeaResponse respone = new GaeaResponse();
					context.setGaeaResponse(respone);
				}
				
				try {
					byte[] desKeyByte = null;
					String desKeyStr = null;
					boolean bool = false;
					
					Global global = Global.getSingleton();
					if(global != null){
						//判断当前服务启用权限认证
						if(global.getGlobalSecureIsRights()){
							SecureContext securecontext = global.getGlobalSecureContext(context.getChannel().getNettyChannel());
							bool = securecontext.isRights();
							if(bool){
								desKeyStr = securecontext.getDesKey();
							}
						}
					}
					
					if(desKeyStr != null){
						desKeyByte = desKeyStr.getBytes("utf-8");
					}
					
					Protocol protocol = context.getGaeaRequest().getProtocol();
					if(protocol == null){
						protocol = Protocol.fromBytes(context.getGaeaRequest().getRequestBuffer(),global.getGlobalSecureIsRights(),desKeyByte);
						context.getGaeaRequest().setProtocol(protocol);
					}
					protocol.setSdpEntity(ExceptionHelper.createError(e));
					context.getGaeaResponse().setResponseBuffer(protocol.toBytes(Global.getSingleton().getGlobalSecureIsRights(),desKeyByte));
				} catch (Exception ex) {
					context.getGaeaResponse().setResponseBuffer(new byte[]{0});
					logger.error("AsyncInvokerHandle invoke-exceptionCaught error", ex);
				}
				
				context.getServerHandler().writeResponse(context);
				logger.error("AsyncInvokerHandle invoke error", e);
			}
		});

##<i class="icon-share">总结</i>

至此,一个客户端的请求处理完毕,在Gaea的整个设计中能够看出,不少东西都留出了接口,可以很好的在框架自己,作一些适合本身业务的处理,整个设计,则决定了服务通讯框架的性能。

###le284

相关文章
相关标签/搜索