深刻浅出Java分布式系统通讯

什么是分布式系统

以前我有篇文章已经简单介绍了分布式通讯,有兴趣的朋友能够去看看:html

大型网站系统架构实践(二)分布式模块之间的通讯java

那么今天我详细的说下我对java分布式系统通讯的理解数据库

1.集群模式,将相同应用模块部署多份后端

2.业务拆分模式,将业务拆分红多个模块,并分别部署服务器

3.存储分布式session

因为分布式概念太大,咱们能够缩小下讨论的范围:架构

如下分布式的狭义定义为:并发

业务拆分,但不限于水平拆分,而是拆分出底层模块,功能模块,上层模块等等异步

一个系统功能繁多,且有层次依赖,那么咱们须要将其分为不少模块,并分别部署 。socket

举例:

好比咱们如今开发一个相似于钱包的系统,那么它会有以下功能模块:用户模块(用户数据),

应用模块(如手机充值等),业务模块(处理核心业务),交易模块(与银行发生交易),

前置模块(与客户端通讯) 等等

咱们会获得一个系统架构图:

clip_image002

为何要分布式

1) 将系统功能模块化,且部署在不一样的地方,对于底层模块,只要保持接口不变,

上层系统调用底层模块将不关心其具体实现,且底层模块作内部逻辑变动,上层系统

都不须要再作发布,能够极大限度的解耦合

2) 解耦合以后,能够复用共同的功能,且业务扩展更为方便,加快开发和发布的速度

3) 系统分开部署,充分利用硬件,能够提升系统性能

4) 减小数据库链接资源的消耗

分布式通讯方案

场景:服务端与服务端的通讯

方案1:基于socket短链接

方案2:基于socket长链接同步通讯

方案3:基于socket长链接异步通讯

tcp短链接通讯方案

定义:

短链接:http短链接,或者socket短链接,是指每次客户端和服务端通讯的时候,都要新

创建一个socket链接,本次通讯完毕后,当即关闭该链接,也就是说每次通讯都须要开启一个新的链接 。

传输图以下:

clip_image004

io通讯用mina实现

客户端示例代码:

NioSocketConnector connector = new NioSocketConnector();
connector.setConnectTimeoutMillis(CONNECT_TIMEOUT);
//设置读缓冲,传输的内容必须小于此缓冲
connector.getSessionConfig().setReadBufferSize(2048*2048);
//设置编码解码器
connector.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
//设置日志过滤器
connector.getFilterChain().addLast("logger", new LoggingFilter());
//设置Handler
connector.setHandler(new MyClientHandler());

//获取链接,该方法为异步执行
ConnectFuture future = connector.connect(new InetSocketAddress(
        HOSTNAME, PORT));
//等待链接创建
future.awaitUninterruptibly();
//获取session
IoSession session = future.getSession();

//等待session关闭
session.getCloseFuture().awaitUninterruptibly();
//释放connector
connector.dispose();

下面咱们进行性能测试:

 

测试场景:

每一个请求的业务处理时间110ms

100个线程并发测试,每一个线程循环请求服务端

测试环境:

客户端服务器:

Cpu为4线程 2400mhz

服务端cpu: 4线程 3000Mhz

测试结果:

在通过10分钟测试以后,稳定状况下的tps

Tps:554左右

客户端Cpu:30%

服务端cpu:230%

 

该方案的优势:

程序实现起来简单

该方案的缺点:

1. Socket发送消息时,须要先发送至socket缓冲区,所以系统为每一个socket分配缓冲区

当缓冲不足时,就达到了最大链接数的限制

2. 链接数大,也就意味着系统内核调用的越多,socket的accept和close调用

3.每次通讯都从新开启新的tcp链接,握手协议耗时间,tcp是三次握手

4.tcp是慢启动,TCP 数据传输的性能还取决于 TCP 链接的使用期(age)。TCP 链接会随着时间进行自我“调谐”,起初会限制链接的最大速度,若是数据成功传输,会随着时间的推移提升传输的速度。这种调谐被称为 TCP 慢启动(slow start),用于防止因特网的突

然过载和拥塞 。

 

tcp长链接同步通讯

长链接同步的传输图

clip_image008

一个socket链接在同一时间只能传递一个请求的信息

只有等到response以后,第二个请求才能开始使用这个通道

为了提升并发性能,能够提供多个链接,创建一个链接池,链接被使用的时候标志为正在使用,

使用完放回链接池,标识为空闲,这和jdbc链接池是同样的。

假设后端服务器,tps是1000,即每秒处理业务数是1000

如今内网传输耗时是5毫秒,业务处理一次请求的时间为150毫秒

那么一次请求从客户端发起请求到获得服务端的响应,总共耗时150毫秒+5毫秒*2

=160毫秒,若是只有一个链接通讯,那么1秒内只能完成2次业务处理,即tps为2

若是要使tps达到1000,那么理论上须要500个链接,可是当链接数上升的时候,其性能却在降低,

所以该方案将会下降网站的吞吐量。

实现挑战:

mina的session.write()和receive消息都是异步的,那么须要在主线程上阻塞以等待响应的到达。

链接池代码:

/**
* 空闲链接池
*/
private static BlockingQueue<Connection> idlePool = new LinkedBlockingQueue<Connection>();
    
/**
* 使用中的链接池
*/
public static BlockingQueue<Connection> activePool = new LinkedBlockingQueue<Connection>();

public static Connection getConn() throws InterruptedException{
    long time1 = System.currentTimeMillis();
    Connection connection = null;
    connection = idlePool.take();            
    activePool.add(connection);
    long time2 = System.currentTimeMillis();
    //log.info("获取链接耗时:"+(time2-time1));
    return connection;
}

客户端代码:

public TransInfo send(TransInfo info) throws InterruptedException {
    Result result = new Result();
    //获取tcp链接
    Connection connection = ConnectFutureFactory.getConnection(result);
    ConnectFuture connectFuture = connection.getConnection();
    IoSession session = connectFuture.getSession();
    session.setAttribute("result", result);
    //发送信息
    session.write(info);
    //同步阻塞获取响应
    TransInfo synGetInfo = result.synGetInfo();
    //此处并非真正关闭链接,而是将链接放回链接池
    ConnectFutureFactory.close(connection,result);
    return synGetInfo;
}

阻塞获取服务端响应代码:

public synchronized TransInfo synGetInfo() {
    //等待消息返回
    //必需要在同步的状况下执行
    if (!done) {
        try {                    
            wait();
        } catch (InterruptedException e) {
            log.error(e.getMessage(), e);
        }
    }
    return info;
}

public synchronized void synSetInfo(TransInfo info) {
    this.info = info;
    this.done = true;
    notify();
}

测试场景:

每一个请求的业务处理时间110ms

300个线程300个链接并发测试,每一个线程循环请求服务端

测试环境:

客户端服务器:

Cpu为4线程 2400mhz

服务端cpu: 4线程 3000Mhz

测试结果:

在通过10分钟测试以后,稳定状况下的tps

Tps:2332左右

客户端Cpu:90%

服务端cpu:250%

从测试结果能够看出,当链接数足够大的时候,系统性能会下降,开启的tcp链接数越多,那么

系统开销将会越大。 

tcp长链接异步通讯

通讯图:

image

一个socket链接在同一时间内传输屡次请求的信息,输入通道接收多条响应消息,消息是连续发出,连续收回的。

业务处理和发消息是异步的,一个业务线程告诉通道发送消息后,再也不占用通道,而是等待响应到达,而此时其它

业务线程也能够往该链接通道发信息,这样能够充分利用通道来进行通讯。

实现挑战

但该方案使编码变得复杂,如上图,请求request1,request2,request3顺序发出,可是服务端处理请求并非

排队的,而是并行处理的,有可能request3先于request1响应给客户端,那么一个request将没法找到他的response,

这时候咱们须要在request和response报文中添加惟一标识,如通讯序列号,在一个通讯通道里面保持惟一,

那么能够根据序列号去获取对应的响应报文。

个人方案是:

1.客户端获取一个tcp链接

2.调用session.write()发送信息,并将消息的惟一序列号存入一个Result对象

result对象存入一个map 

3.同步阻塞获取结果,线程在result对象进行同步阻塞

4.接收消息,并经过惟一序列号从map里面获取result对象,并唤醒阻塞在result对象上的线程

客户端发送消息示例代码:

public TransInfo send(TransInfo info) throws InterruptedException {
    Result result = new Result();
    result.setInfo(info);
    //获取socket链接
    ConnectFuture connectFuture = ConnectFutureFactory
        .getConnection(result);
    IoSession session = connectFuture.getSession();
    //将result放入ConcurrentHashMap
    ConcurrentHashMap<Long, Result> resultMap = (ConcurrentHashMap<Long, Result>)session.getAttribute("resultMap");
    resultMap.put(info.getId(), result);
    //发送消息
    session.write(info);
    //同步阻塞获取结果
    return result.synGetInfo();
}

同步阻塞和唤醒方法:

public synchronized TransInfo synGetInfo() {
    //等待消息返回
    //必需要在同步的状况下执行
    while (!done) {
        try {                    
            wait();
        } catch (InterruptedException e) {
            log.error(e.getMessage(), e);
        }
    }
    return info;
}

public synchronized void synSetInfo(TransInfo info) {
    this.info = info;
    this.done = true;
    notify();
}

接收消息示例代码:

public void messageReceived(IoSession session, Object message)
        throws Exception {
    TransInfo info = (TransInfo) message;    
    //根据惟一序列号从resultMap中获取result
    ConcurrentHashMap<Long, Result> resultMap = (ConcurrentHashMap<Long, Result>)session.getAttribute("resultMap");
    //移除result
    Result result = resultMap.remove(info.getId());        
    //唤醒阻塞线程
    result.synSetInfo(info);
}

测试场景:

每一个请求的业务处理时间110ms

300个线程10个链接并发测试,每一个线程循环请求服务端

测试环境:

客户端服务器:

Cpu为4线程 2400mhz

服务端cpu: 4线程 3000Mhz

测试结果:

在通过10分钟测试以后,稳定状况下的tps

Tps:2600左右

客户端Cpu:25%

服务端cpu:250%

经测试发现,异步通讯能够用更少的tcp链接实现一样高效的通讯,极大的减小了系统性能开销。

今天暂时写到这里。

 

参考 文章

http://www.2cto.com/os/201203/125511.html

wireshark-win32-1.6.5.exe:

http://down.51cto.com/data/685517

RPC与消息队列的区别

http://oldratlee.com/post/2013-02-01/synchronous-rpc-vs-asynchronous-message

tcp长链接与短链接的区别

http://www.cnblogs.com/liuyong/archive/2011/07/01/2095487.html

http://blog.chinaunix.net/uid-354915-id-3587924.html

keep-alived详解

http://wudi.in/archives/446.html

http://www.nowamagic.net/academy/detail/23350305

wireshark抓包详解

http://www.cnblogs.com/TankXiao/archive/2012/10/10/2711777.html

长链接,同步异步参考

http://www.yeolar.com/note/2012/11/10/c10k/

同步队列:

http://ifeve.com/java-synchronousqueue/

netty:

http://www.infoq.com/cn/articles/netty-reliability

相关文章
相关标签/搜索