咱们上次讲到zbus网络通信的核心API:java
Dispatcher -- 负责-NIO网络事件Selector引擎的管理,对Selector引擎负载均衡git
IoAdaptor -- 网络事件的处理,服务器与客户端共用,负责读写,消息分包组包等服务器
Session -- 表明网络连接,能够读写消息网络
实际的应用,咱们几乎只须要作IoAdaptor的个性化实现就能完成高效的网络通信服务,今天咱们将举例说明如何个性化这个IoAdaptor。负载均衡
咱们今天要完成的目标是:实现MySQL服务器的透明代理。效果是,你访问代理服务器跟访问目标MySQL无差别。async
咱们在测试环境10.17.2.30:3306 这台机器上提供了MySql,在咱们本地机器上跑起来咱们今天基于zbus.NET实现的一个代理程序,就能达到下面的效果。ide
完成大概不到100 行的代码, Cool?Let’s roll! 工具
首先,咱们思考透明TCP代理到底在干啥,透明的TCP代理的业务逻辑其实很是简单,能够描述为,未来自代理上游(发起请求到代理)的数据转发到目标TCP服务器,把目标服务器回来的数据原路返回代理上游客户端。 注意这个原路,如何作到原路返回成为关键点。这个示例其实跟MySQL没有任何关系,原则上任何TCP层面的服务都应该适配。测试
基于zbus.NET怎么来将上面的逻辑在体现出来,也就是如何个性化IoAdaptor?直观的讲,咱们要处理的几个事件应该包括:1)从上游客户端发起的连接请求--代理服务器的Accept事件,2)代理服务器链接目标服务器的Connect事件,3)上下游的数据事件onMessage。this
zbus.NET的IoAdaptor提供的个性化事件以下
基本包括一个连接(客户端或者服务端)的生命周期,与消息的编解码。
咱们的代理IoAdaptor就是逐一个性化处理。
第一步,编解码: 透明代理对消息内容不作理解,因此不须要编解码。
// 透传不须要编解码,简单返回ByteBuffer数据 public IoBuffer encode(Object msg) { if (msg instanceof IoBuffer) { IoBuffer buff = (IoBuffer) msg; return buff; } else { throw new RuntimeException("Message Not Support"); } } // 透传不须要编解码,简单返回ByteBuffer数据 public Object decode(IoBuffer buff) { if (buff.remaining() > 0) { byte[] data = new byte[buff.remaining()]; buff.readBytes(data); return IoBuffer.wrap(data); } else { return null; } }
第二步,代理服务接入:
@Override protected void onSessionAccepted(Session sess) throws IOException { Session target = null; Dispatcher dispatcher = sess.getDispatcher(); try { target = dispatcher.createClientSession(targetAddress, this); } catch (Exception e) { sess.asyncClose(); return; } sess.chain = target; target.chain = sess; dispatcher.registerSession(SelectionKey.OP_CONNECT, target); }
这里的逻辑思路是,代理服务器每接受到一个请求--经过onSessionAccepted表达,咱们将同时建立一个到目标服务器的连接,今天的例子是目标MySQL服务器,注意上面的处理中把建立目标服务器Session过程与真正连接到目标服务分开(Dispatcher也提供合并两者的工具方法),是为了能在没有发生连接以前绑定上好上下游关系,经过Session的chain变量来表达,也就是当前Session的关联Session,关联好以后启动感兴趣Connect事件,逻辑处理完毕。
第三步,连接成功事件(第二步中须要连接到目标服务器)
@Override public void onSessionConnected(Session sess) throws IOException { Session chain = sess.chain; if(chain == null){ sess.asyncClose(); return; } if(sess.isActive() && chain.isActive()){ sess.register(SelectionKey.OP_READ); chain.register(SelectionKey.OP_READ); } }
这里的一个核心是当上下游都处于连接正常态,上下游Session都启动感兴趣消息读事件(写事件是在读取处理中自动触发),为何在这里作的缘由是必定要等上下游都正常态后才启动双方消息处理,否则会出现字节丢失。
第四步,处理上下游数据事件
@Override protected void onMessage(Object msg, Session sess) throws IOException { Session chain = sess.chain; if(chain == null){ sess.asyncClose(); return; } chain.write(msg); }
是否是很是简单,相似pipeline,从一端的数据写到另一端。
原则上面4步结束,整个透明代理就完成了,可是为了处理连接异常清理,咱们增长了Session清理处理,以下
@Override public void onSessionToDestroy(Session sess) throws IOException { try { sess.close(); } catch (IOException e) { //ignore } if (sess.chain == null) return; try { sess.chain.close(); sess.chain.chain = null; sess.chain = null; } catch (IOException e) { } }
工做就是解决上下游连接清理连接。
至此为止咱们的IoAdaptor个性化就完成了,是否是很是简单,如今咱们要跑起来测试了,下面的代码就是上一次讲到重复的设置,没有新意。
public static void main(String[] args) throws Exception { Dispatcher dispatcher = new Dispatcher(); IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); final Server server = new Server(dispatcher, ioAdaptor, 3306); server.start(); }
骚年,包括渣渣import和少量注释加起来折腾了不到100行,该跑一跑了,仍是那句话,不是HelloWorld,你能够规模压力测。看看你是否在本地代理出来了你的目标服务MySQL,gl,hf, gogogo.
完整代码可运行代码以下,也可直接到zbus示例代码库中找到
package org.zbus.net; import java.io.IOException; import java.nio.channels.SelectionKey; import org.zbus.net.core.Dispatcher; import org.zbus.net.core.IoAdaptor; import org.zbus.net.core.IoBuffer; import org.zbus.net.core.Session; public class TcpProxyAdaptor extends IoAdaptor { private String targetAddress; public TcpProxyAdaptor(String targetAddress) { this.targetAddress = targetAddress; } // 透传不须要编解码,简单返回ByteBuffer数据 public IoBuffer encode(Object msg) { if (msg instanceof IoBuffer) { IoBuffer buff = (IoBuffer) msg; return buff; } else { throw new RuntimeException("Message Not Support"); } } // 透传不须要编解码,简单返回ByteBuffer数据 public Object decode(IoBuffer buff) { if (buff.remaining() > 0) { byte[] data = new byte[buff.remaining()]; buff.readBytes(data); return IoBuffer.wrap(data); } else { return null; } } @Override protected void onSessionAccepted(Session sess) throws IOException { Session target = null; Dispatcher dispatcher = sess.getDispatcher(); try { target = dispatcher.createClientSession(targetAddress, this); } catch (Exception e) { sess.asyncClose(); return; } sess.chain = target; target.chain = sess; dispatcher.registerSession(SelectionKey.OP_CONNECT, target); } @Override public void onSessionConnected(Session sess) throws IOException { Session chain = sess.chain; if(chain == null){ sess.asyncClose(); return; } if(sess.isActive() && chain.isActive()){ sess.register(SelectionKey.OP_READ); chain.register(SelectionKey.OP_READ); } } @Override protected void onMessage(Object msg, Session sess) throws IOException { Session chain = sess.chain; if(chain == null){ sess.asyncClose(); return; } chain.write(msg); } @Override public void onSessionToDestroy(Session sess) throws IOException { try { sess.close(); } catch (IOException e) { //ignore } if (sess.chain == null) return; try { sess.chain.close(); sess.chain.chain = null; sess.chain = null; } catch (IOException e) { } } @SuppressWarnings("resource") public static void main(String[] args) throws Exception { Dispatcher dispatcher = new Dispatcher(); IoAdaptor ioAdaptor = new TcpProxyAdaptor("10.17.2.30:3306"); final Server server = new Server(dispatcher, ioAdaptor, 3306); server.setServerName("TcpProxyServer"); server.start(); } }