服务通讯框架Gaea---client的请求处理模型

Gaea的请求处理模型图

Gaea是一个服务通讯框架 java

 图片来源于“58同城的跨平台高性能,高可用的中间层服务架构设计分享”  安全

根据上图,咱们详细的来讲明一下Gaea的客户端,请求处理的过程。 服务器

Gaea1.0版本中并无实现异步化调用,那咱们就围绕这同步调用来讲明一下。处理过程以下: session

一、建立代理

为每个方法的调用建立代理proxy 架构

final String url = "tcp://demo/NewsService"; //demo服务名 NewsService 接口实现类
  INewsService newsService = ProxyFactory.create(INewsService.class, url);
InvocationHandler handler = new ProxyStandard(type, serviceName, lookup);
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{type},
                handler);

在这里放了一个全局的cache,存储了每个代理, 框架

private static Map cache = new ConcurrentHashMap(); //key:服务名 value: proxy

建立代理,为每个方法制定统一的调用入口,每个方法在真正执行的时候,都将跳转到这里往下执行。也就是说,此方法将代理你所调用的方法,去执行后面的操做。 异步

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable

二、等待窗口

在这里咱们暂时不讨论服务器的选择等策略,继续说明这个请求处理模型。在这个途中提到了一个颇有用的东西,即等待窗口,那到底这个等待窗口到底在Gaea中充当一个什么样的角色呢?在代码中咱们能够简单的看到,等待窗口,便是一个Map而已 socket

private ConcurrentHashMap<Integer, WindowData> WaitWindows = new ConcurrentHashMap<Integer, WindowData>();
在这个 等待窗口WaitWindows中Gaea放入了两个个很重要的东西key:session 和 value:WindowData

session是一个随着每一次调用自增加的一个int,它将被写入到Gaea的协议中,用于找到返回数据的归属 tcp

private int sessionId; 
private int createSessionId() {
        synchronized (this) { //同步,确保线程安全
            if (sessionId > GaeaConst.MAX_SESSIONID) { //当大于最大值后,从1继续累加
                sessionId = 1;
            }
            return sessionId++;
        }
    }
WindowData是一个对象类型,用于时间通知和存放返回值。
public class WindowData {

	AutoResetEvent _event; //用于事件通知
	byte[] _data; //接受返回值
	private byte flag; //Gaea1.0中没有用到
	private Exception exception; 
...........
}
在send以前,Gaea在 等待窗口waitWindows中注册了这个windowData
socket.registerRec(p.getSessionID());
public void registerRec(int sessionId) {
        AutoResetEvent event = new AutoResetEvent(); 
        WindowData wd = new WindowData(event);
        WaitWindows.put(sessionId, wd);
    }

在发送以前Gaea往等待窗口waitWindows中,利用sessionId注册了windowData,在windowData中Gaea又new了一个AutoResetEvent,用于通知接收程序。 ide

三、数据发送

在发送线程中,直接利用send函数发送了数据,也没有用到等待窗口waitWindows和WindowData用于拥塞控制,而是直接发送

socket.registerRec(p.getSessionID());
  socket.send(data);

在send中,直接添加了协议的头和尾就直接发送了,所以目前尚未实现设计中的拥塞控制

四、数据接收

返回数据的接受,是在建立server,socket时,就启动了一个死循环在接收数据,新版的Gaea中,此处已改成select方式获取数据。

@Override
    public void run() {
        while (true) {
            try {
                for (final CSocket socket : sockets) {
                    try {
                        pool.execute(Handle.getInstance(socket));
                    } catch (Throwable ex) {
                        logger.error(ex);
                    }
                }
                Thread.sleep(1);
            } catch (Throwable ex) {
                logger.error(ex);
            }
        }
    }
真正接收数据实在CSocket.java中
protected void frameHandle() throws IOException, InterruptedException
在frameHandle()中,Gaea从receive中获取到sessionId,由于sessionId是被写入到协议中来回传输的,

经过这个sessionId,很容易的就从等待窗口中取出了WindowData

WindowData wd = WaitWindows.get(pSessionId);
  if (wd != null) {
      wd.setData(pak);
      wd.getEvent().set();
 }
其中wd.setData(pak);就是向windowData中写入返回数据,wd.getEvent().set() 则是通知接收函数,这个sessionId的请求回来了,你能够返回给调用者了。

五、数据返回

由于讲的是同步,其中send,和 receive在同一个request()的方法中,send以后,receive一直在等待着,这次调用的返回

AutoResetEvent event = wd.getEvent();
        int timeout = getReadTimeout(socketConfig.getReceiveTimeout(), queueLen);
        if (!event.waitOne(timeout)) {
            throw new TimeoutException("Receive data timeout or error!timeout:" + timeout + "ms,queue length:" + queueLen);
        }
在这一段代码中,receive经过发送时的sessionId找到windowData,而后获取AutoResetEvent

if(!event.waitOne(timeout)){} 在这里程序将等待这个event的set()函数通知本身,数据已经被返回了,不然等待timeout之后,将再也不等待,执行if中的语句,返回超时。超时后,在异常处理中,删除这个等待窗口中的sessionId,则若是再返回,就会由于找不到这个sessionId,而抛弃返回值。

if (socket != null) {
      socket.unregisterRec(p.getSessionID());
 }
至此整个Gaea客户端的请求处理模型就说完了,能够说,整个模型都是围绕这 等待窗口WaitWindows、sessionId、windowData来处理的。
相关文章
相关标签/搜索