基于Netty实现服务端与客户端通讯

我的博客html

www.milovetingting.cnjava

基于Netty实现服务端与客户端通讯

前言

本文介绍基于Netty实现的服务端与客户端通讯的简单使用方法,并在此基础上实现一个简单的服务端-客户端指令通讯的Demo。git

Netty是什么

Netty是一个NIO客户端-服务器框架,能够快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化了网络编程,例如TCP和UDP套接字服务器的开发。提供一个异步事件驱动的网络应用程序框架和工具,以快速开发可维护的高性能和高可扩展性协议服务器和客户端。github

以上内容摘选自netty.io/wiki/user-g…编程

Netty具备如下特色:json

  • 适用于各类传输类型的统一API-阻塞和非阻塞套接字
  • 更高的吞吐量,更低的延迟
  • 减小资源消耗
  • 减小没必要要的内存复制
  • 完整的SSL / TLS和StartTLS支持

以上内容摘选自netty.io/bash

使用入门

Netty的使用,能够参照Netty的官方文档,这里以4.x为例来演示Netty在服务端和客户端上使用。文档地址:netty.io/wiki/user-g…服务器

这里用Eclipse来进行开发,服务端和客户端都放在一个工程里。网络

新建Java工程app

服务端

首先须要导入netty的jar包。这里使用netty-all-4.1.48.Final.jar。

NettyServer

新建NettyServer类

public class NettyServer {

	private int mPort;

	public NettyServer(int port) {
		this.mPort = port;
	}

	public void run() {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap();
			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
					// 指定链接队列大小
					.option(ChannelOption.SO_BACKLOG, 128)
					//KeepAlive
					.childOption(ChannelOption.SO_KEEPALIVE, true)
					//Handler
					.childHandler(new ChannelInitializer<SocketChannel>() {

						@Override
						protected void initChannel(SocketChannel channel) throws Exception {
							channel.pipeline().addLast(new NettyServerHandler());
						}
					});
			ChannelFuture f = b.bind(mPort).sync();
			if (f.isSuccess()) {
				LogUtil.log("Server,启动Netty服务端成功,端口号:" + mPort);
			}
			// f.channel().closeFuture().sync();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			// workerGroup.shutdownGracefully();
			// bossGroup.shutdownGracefully();
		}
	}

}
复制代码

NettyServerHandler

在初始化时,须要指定Handle,用来处理Channel相关业务。

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Server,channelActive");
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		LogUtil.log("Server,接收到客户端发来的消息:" + msg);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		LogUtil.log("Server,exceptionCaught");
		cause.printStackTrace();
	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Server,channelInactive");
	}

}
复制代码

通过上面这些步骤后,服务端最基本的设置就完成了。

客户端

客户端和服务端在初始化时大致是相似的,不过相比服务端要简单一些。

NettyClient

public class NettyClient {

	private String mHost;

	private int mPort;

	private NettyClientHandler mClientHandler;

	private ChannelFuture mChannelFuture;

	public NettyClient(String host, int port) {
		this.mHost = host;
		this.mPort = port;
	}

	public void connect() {
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			Bootstrap b = new Bootstrap();
			mClientHandler = new NettyClientHandler();
			b.group(workerGroup).channel(NioSocketChannel.class)
					// KeepAlive
					.option(ChannelOption.SO_KEEPALIVE, true)
					// Handler
					.handler(new ChannelInitializer<SocketChannel>() {

						@Override
						protected void initChannel(SocketChannel channel) throws Exception {
							channel.pipeline().addLast(mClientHandler);
						}
					});
			mChannelFuture = b.connect(mHost, mPort).sync();
			if (mChannelFuture.isSuccess()) {
				LogUtil.log("Client,链接服务端成功");
			}
			mChannelFuture.channel().closeFuture().sync();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			workerGroup.shutdownGracefully();
		}
	}
}
复制代码

NettyClientHandler

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Client,channelActive");
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		LogUtil.log("Client,接收到服务端发来的消息:" + msg);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		LogUtil.log("Client,exceptionCaught");
		cause.printStackTrace();
	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Client,channelInactive");
	}

}
复制代码

到这里,客户端最基本设置就完成了。

链接服务端

新建一个Main类,用于测试服务端和客户端是否能正常链接。

public class Main {

	public static void main(String[] args) {
		try {
			String host = "127.0.0.1";
			int port = 12345;
			NettyServer server = new NettyServer(port);
			server.run();
			Thread.sleep(1000);
			NettyClient client = new NettyClient(host, port);
			client.connect();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}
复制代码

运行main方法,输出日志以下:

2020-4-13 0:11:02--Server,启动Netty服务端成功,端口号:12345
2020-4-13 0:11:03--Client,channelActive
2020-4-13 0:11:03--Client,链接服务端成功
2020-4-13 0:11:03--Server,channelActive
复制代码

能够看到,客户端成功链接上了服务端,服务端和客户端里设置的Handler的channelActive方法都会回调。

服务端与客户端通讯

在服务端与客户端链接成功后,咱们每每须要在双方间进行通讯。这里假定,在链接成功后,服务端给客户端发送一个欢迎信息"你好,客户端",而客户端在收到服务端的消息后,也给服务端回复一个消息"你好,服务端"。下面来实现具体的功能。

修改服务端NettyServerHandler中的channelActive方法和channelRead方法,在channelActive方法中给客户端发送消息,在channelRead方法中解析客户端发来的消息

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Server,channelActive");
		ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客户端", Charset.forName("utf-8"));
		ctx.writeAndFlush(byteBuf);
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		byte[] buffer = new byte[buf.readableBytes()];
		buf.readBytes(buffer);
		String message = new String(buffer, "utf-8");
		LogUtil.log("Server,接收到客户端发来的消息:" + message);
	}

}
复制代码

修改客户端NettyClientHandler中的channelRead方法,当收到服务端的消息时,回复服务端

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		byte[] buffer = new byte[buf.readableBytes()];
		buf.readBytes(buffer);
		String message = new String(buffer,"utf-8");
		LogUtil.log("Client,接收到服务端发来的消息:" + message);
		
		ByteBuf byteBuf = Unpooled.copiedBuffer("你好,服务端", Charset.forName("utf-8"));
		ctx.writeAndFlush(byteBuf);
	}

}
复制代码

运行后,输出日志以下:

2020-4-13 0:29:16--Server,启动Netty服务端成功,端口号:12345
2020-4-13 0:29:17--Client,channelActive
2020-4-13 0:29:17--Client,链接服务端成功
2020-4-13 0:29:17--Server,channelActive
2020-4-13 0:29:17--Client,接收到服务端发来的消息:你好,客户端
2020-4-13 0:29:17--Server,接收到客户端发来的消息:你好,服务端
复制代码

能够看到,服务端与客户端已经能够正常通讯。

粘包与拆包

在实际的使用场景中,可能会存在短期内大量数据发送的问题。咱们模拟这个场景。在客户端链接上服务端后,服务端给客户端发送100个消息,而为便于分析,客户端在收到服务端消息后,不做回复。

修改服务端中NettyServerHandler的channelActive方法

@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Server,channelActive");
		for (int i = 0; i < 100; i++) {
			ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客户端", Charset.forName("utf-8"));
			ctx.writeAndFlush(byteBuf);
		}
	}
复制代码

修改客户端中NettyClientHandler的channelRead方法

@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf buf = (ByteBuf) msg;
		byte[] buffer = new byte[buf.readableBytes()];
		buf.readBytes(buffer);
		String message = new String(buffer, "utf-8");
		LogUtil.log("Client,接收到服务端发来的消息:" + message);

        //ByteBuf byteBuf = Unpooled.copiedBuffer("你好,服务端", Charset.forName("utf-8"));
        //ctx.writeAndFlush(byteBuf);
	}
复制代码

运行后,输出的部分结果以下:

2020-4-13 0:35:28--Server,启动Netty服务端成功,端口号:12345
2020-4-13 0:35:29--Client,channelActive
2020-4-13 0:35:29--Client,链接服务端成功
2020-4-13 0:35:29--Server,channelActive
2020-4-13 0:35:29--Client,接收到服务端发来的消息:你好,客户端
2020-4-13 0:35:29--Client,接收到服务端发来的消息:你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端你好,客户端
2020-4-13 0:35:29--Client,接收到服务端发来的消息:你好,客户端

复制代码

能够看到,出现了多条消息"粘"在一块儿的状况。

什么是粘包与拆包

TCP是个"流"协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际状况进行包的划分,因此在业务上认为,一个完整的包可能会被TCP拆分红多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

以上内容摘选自TCP粘包/拆包与Netty解决方案

解决方案

在没有 Netty 的状况下,用户若是本身须要拆包,基本原理就是不断从 TCP 缓冲区中读取数据,每次读取完都须要判断是不是一个完整的数据包 若是当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从 TCP 缓冲区中读取,直到获得一个完整的数据包。 若是当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,构成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接。

以上内容摘选自完全理解Netty,这一篇文章就够了

而使用Netty,则解决这个问题的方法就简单多了。Netty已经提供了四个拆包器:

  • FixedLengthFrameDecoder:固定长度的拆包器,Netty会把固定长度的数据包发送给下一个channelHandler
  • LineBasedFrameDecoder:行拆包器,每一个数据包以换行符分隔发送
  • DelimiterBasedFrameDecoder:分隔符拆包器,能够自定义分隔符,行拆包器是分隔符拆包器的一种特例
  • LengthFieldBasedFrameDecoder:基于长度域的拆包器,若是自定义协议中包含长度域的字段,就可使用这个拆包器

在这里,咱们选用分隔符拆包器

首先定义分隔符

public class Config {
	public static final String DATA_PACK_SEPARATOR = "#$&*";
}
复制代码

在服务端的channelHandler配置中,须要增长

@Override
protected void initChannel(SocketChannel channel) throws Exception {
    //这个配置须要在添加Handler前设置
	channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer(Config.DATA_PACK_SEPARATOR.getBytes())));
	channel.pipeline().addLast(new NettyServerHandler());
	}
复制代码

在客户端的channelHandler的配置中,一样也须要增长

@Override
protected void initChannel(SocketChannel channel) throws Exception {
    //这个配置须要在添加Handler前设置
	channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer(Config.DATA_PACK_SEPARATOR.getBytes())));
	channel.pipeline().addLast(new NettyServerHandler());
	}
复制代码

发送数据时,在数据的末尾增长分隔符:

@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Server,channelActive");
		for (int i = 0; i < 100; i++) {
			ByteBuf byteBuf = Unpooled.copiedBuffer("你好,客户端"+Config.DATA_PACK_SEPARATOR, Charset.forName("utf-8"));
			ctx.writeAndFlush(byteBuf);
		}
	}
复制代码

运行后,能够发现,已经解决"粘包"与"拆包"的问题。

心跳

在网络应用中,为了判断链接是否还存在,通常会经过发送心跳包来检测。在Netty中,配置心跳包的步骤以下

在客户端的channelHandler的配置中,须要增长

@Override
protected void initChannel(SocketChannel channel) throws Exception {
			channel.pipeline().addLast(new IdleStateHandler(5, 5, 10));
            //...
						}
复制代码

在NettyClientHandler中,重写userEventTriggered方法

@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		IdleStateEvent event = (IdleStateEvent) evt;
		LogUtil.log("Client,Idle:" + event.state());
		switch (event.state()) {
		case READER_IDLE:

			break;
		case WRITER_IDLE:
			ByteBuf byteBuf = Unpooled.copiedBuffer("心跳^v^v", Charset.forName("utf-8"));
			break;
		case ALL_IDLE:
			break;
		default:
			super.userEventTriggered(ctx, evt);
			break;
		}
	}
复制代码

当写空闲达到配置的时间时,往服务端发送一个心跳消息

运行后,日志输出以下:

2020-4-13 1:22:50--Server,启动Netty服务端成功,端口号:12345
2020-4-13 1:22:51--Client,channelActive
2020-4-13 1:22:51--Client,链接服务端成功
2020-4-13 1:22:51--Server,channelActive
2020-4-13 1:22:51--Client,接收到服务端发来的消息:你好,客户端
2020-4-13 1:22:56--Client,Idle:WRITER_IDLE
2020-4-13 1:22:56--Server,接收到客户端发来的消息:心跳^v^
2020-4-13 1:22:56--Client,Idle:READER_IDLE
2020-4-13 1:23:01--Client,Idle:WRITER_IDLE
2020-4-13 1:23:01--Server,接收到客户端发来的消息:心跳^v^
2020-4-13 1:23:01--Client,Idle:READER_IDLE
复制代码

能够看到,心跳包按咱们配置的时间正常输出了。

配置编码器与解码器

咱们上面在发送数据时,须要经过ByteBuf来转换String,而经过配置编码,解码器,咱们就能够直接发送字符串。配置以下:

在服务端与客户端的channelHandler分别增长如下配置:

@Override
protected void initChannel(SocketChannel channel) throws Exception {
	//...
	//这个配置须要在添加Handler前设置
	channel.pipeline().addLast("encoder", new StringEncoder());
	channel.pipeline().addLast("decoder", new StringDecoder());
    //...
}
复制代码

在发送消息时,则能够直接经过ctx.writeAndFlush("心跳^v^" + Config.DATA_PACK_SEPARATOR)的形式来发送。

源码

到此,最简单的服务端与客户端通讯的Demo已经完成。源码地址:github.com/milovetingt…

使用进阶

在上面的基础上,咱们来实现一个下面的需求:

  • 客户端须要登陆到服务端

  • 客户端登陆成功后,服务端能够给客户端发送指令消息,客户端在收到消息及处理完消息后,都须要上报给服务端

封装链接

为便于程序扩展,咱们将客户端链接服务端的部分抽取出来。经过一个接口来定义链接的方法,而链接的具体实现由子类来实现。

定义接口

public interface IConnection {

	/** * 链接服务器 * * @param host 服务器地址 * @param port 端口 * @param callback 链接回调 */
	public void connect(String host, int port, IConnectionCallback callback);

}
复制代码

在这里还须要定义链接的回调接口

public interface IConnectionCallback {

	/** * 链接成功 */
	public void onConnected();

}
复制代码

具体的链接实现类

public class NettyConnection implements IConnection {

	private NettyClient mClient;

	@Override
	public void connect(String host, int port, IConnectionCallback callback) {
		if (mClient == null) {
			mClient = new NettyClient(host, port);
			mClient.setConnectionCallBack(callback);
			mClient.connect();
		}
	}

}
复制代码

为便于管理链接,定义一个链接的管理类

public class ConnectionManager implements IConnection {

	private static IConnection mConnection;

	private ConnectionManager() {

	}

	static class ConnectionManagerInner {
		private static ConnectionManager INSTANCE = new ConnectionManager();
	}

	public static ConnectionManager getInstance() {
		return ConnectionManagerInner.INSTANCE;
	}

	public static void initConnection(IConnection connection) {
		mConnection = connection;
	}

	private void checkInit() {
		if (mConnection == null) {
			throw new IllegalAccessError("please invoke initConnection first!");
		}
	}

	@Override
	public void connect(String host, int port, IConnectionCallback callback) {
		checkInit();
		mConnection.connect(host, port, callback);
	}

}
复制代码

调用链接:

public class Main {

	public static void main(String[] args) {
		try {
			String host = "127.0.0.1";
			int port = 12345;
			NettyServer server = new NettyServer(port);
			server.run();
			Thread.sleep(1000);
			ConnectionManager.initConnection(new NettyConnection());
			ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() {

				@Override
				public void onConnected() {
					LogUtil.log("Main,onConnected"););
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

}
复制代码

在调用connect方法前,须要先调用initConnection来指定具体的链接类

消息Bean的定义

在链接成功后,服务端会给客户端发送一个欢迎的消息。为便于管理,咱们定义一个消息Bean

public class Msg {

	/** * 欢迎 */
	public static final int TYPE_WELCOME = 0;

	public int type;

	public String msg;

}
复制代码

服务端发送欢迎消息

服务端发送消息

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	private ChannelHandlerContextWrapper mChannelHandlerContextWrapper;

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		LogUtil.log("Server,channelActive");
		mChannelHandlerContextWrapper = new ChannelHandlerContextWrapper(ctx);
		MsgUtil.sendWelcomeMsg(mChannelHandlerContextWrapper);
	}
}
复制代码

在这里,经过定义一个ChannelHandlerContextWrapper类来统一管理消息分隔符

public class ChannelHandlerContextWrapper {

	private ChannelHandlerContext mContext;

	public ChannelHandlerContextWrapper(ChannelHandlerContext context) {
		this.mContext = context;
	}

	/** * 包装writeAndFlush方法 * * @param object */
	public void writeAndFlush(Object object) {
		mContext.writeAndFlush(object + Config.DATA_PACK_SEPARATOR);
	}

}
复制代码

再进一步,经过定义MsgUtil类来封装发送欢迎消息

public class MsgUtil {

	/** * 发送欢迎消息 * * @param wrapper */
	public static void sendWelcomeMsg(ChannelHandlerContextWrapper wrapper) {
		Msg msg = new Msg();
		msg.type = Msg.TYPE_WELCOME;
		msg.msg = "你好,客户端";
		wrapper.writeAndFlush(Global.sGson.toJson(msg));
	}

}
复制代码

客户端消息接收

对于客户端而言,为方便处理消息,咱们须要定义一个方法来接收消息。经过在IConnection接口中新增一个registerMsgCallback方法来实现

public interface IConnection {

	/** * 链接服务器 * * @param host 服务器地址 * @param port 端口 * @param callback 链接回调 */
	public void connect(String host, int port, IConnectionCallback callback);

	/** * 注册消息回调 * * @param callback */
	public void registerMsgCallback(IMsgCallback callback);

}
复制代码

在这里,还须要新增IMsgCallback接口

public interface IMsgCallback {

	/** * 接收到消息时的回调 * * @param msg */
	public void onMsgReceived(Msg msg);

}
复制代码

对应到实现类

public class NettyConnection implements IConnection {

	private NettyClient mClient;

	@Override
	public void connect(String host, int port, IConnectionCallback callback) {
		if (mClient == null) {
			mClient = new NettyClient(host, port);
			mClient.setConnectionCallBack(callback);
			mClient.connect();
		}
	}

	@Override
	public void registerMsgCallback(IMsgCallback callback) {
		if (mClient == null) {
			throw new IllegalAccessError("please invoke connect first!");
		}
		mClient.registerMsgCallback(callback);
	}

}
复制代码

消息的分发

在客户端,为便于处理消息,咱们对消息类型进行划分

修改消息Bean

public class Msg {

	/** * 欢迎 */
	public static final int TYPE_WELCOME = 0;

	/** * 心跳 */
	public static final int TYPE_HEART_BEAT = 1;

	/** * 登陆 */
	public static final int TYPE_LOGIN = 2;

	public static final int TYPE_COMMAND_A = 3;

	public static final int TYPE_COMMAND_B = 4;

	public static final int TYPE_COMMAND_C = 5;

	public int type;

	public String msg;
}
复制代码

假定消息是串行的,须要一个一个地处理。为便于管理消息,增长MsgQueue类

public class MsgQueue {

	private PriorityBlockingQueue<Msg> mQueue;

	private boolean using;

	private MsgQueue() {
		mQueue = new PriorityBlockingQueue<>(128, new Comparator<Msg>() {

			@Override
			public int compare(Msg msg1, Msg msg2) {
				int res = msg2.priority - msg1.priority;
				if (res == 0 && msg1.time != msg2.time) {
					return (int) (msg2.time - msg1.time);
				}
				return res;
			}
		});
	}

	public static MsgQueue getInstance() {
		return MsgQueueInner.INSTANCE;
	}

	private static class MsgQueueInner {
		private static final MsgQueue INSTANCE = new MsgQueue();
	}

	/** * 将消息加入消息队列 * * @param msg */
	public void enqueueMsg(Msg msg) {
		mQueue.add(msg);
	}

	/** * 从消息队列获取消息 * * @return */
	public synchronized Msg next() {
		if (using) {
			return null;
		}
		Msg msg = mQueue.poll();
		if (msg != null) {
			makeUse(true);
		}
		return msg;
	}

	/** * 标记使用状态 * * @param use */
	public synchronized void makeUse(boolean use) {
		using = use;
	}

	/** * 是否可以使用 * * @return */
	public synchronized boolean canUse() {
		return !using;
	}

}
复制代码

增长消息的分发类MsgDispatcher

public class MsgDispatcher {

	private static Map<Integer, Class<? extends IMsgHandler>> mHandlerMap;

	static {
		mHandlerMap = new HashMap<>();
		mHandlerMap.put(Msg.TYPE_WELCOME, WelcomeMsgHandler.class);
		mHandlerMap.put(Msg.TYPE_HEART_BEAT, HeartBeatMsgHandler.class);
		mHandlerMap.put(Msg.TYPE_LOGIN, HeartBeatMsgHandler.class);
		mHandlerMap.put(Msg.TYPE_COMMAND_A, CommandAMsgHandler.class);
		mHandlerMap.put(Msg.TYPE_COMMAND_B, CommandBMsgHandler.class);
		mHandlerMap.put(Msg.TYPE_COMMAND_C, CommandCMsgHandler.class);
	}

	public static void dispatch() {
		if (MsgQueue.getInstance().canUse()) {
			Msg msg = MsgQueue.getInstance().next();
			if (msg == null) {
				return;
			}
			dispatch(msg);
		}
	}

	public static void dispatch(Msg msg) {
		try {
			IMsgHandler handler = (IMsgHandler) Class.forName(mHandlerMap.get(msg.type).getName()).newInstance();
			handler.handle(msg);
		} catch (InstantiationException e) {
			e.printStackTrace();
		} catch (IllegalAccessException e) {
			e.printStackTrace();
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}
	}

}
复制代码

消息的处理

定义IMsgHandler,在这里定义了处理的方法,具体实现由子类实现

public interface IMsgHandler {

	/** * 处理消息 * * @param msg */
	public void handle(Msg msg);

}
复制代码

为统一管理,定义Base类BaseCommandHandler

public abstract class BaseCommandHandler implements IMsgHandler {

	@Override
	public void handle(Msg msg) {
		execute(msg);
	}

	public final void execute(Msg msg) {
		LogUtil.log("Client,received command:" + msg);
		doHandle(msg);
		MsgQueue.getInstance().makeUse(false);
		LogUtil.log("Client,report command:" + msg);
		MsgDispatcher.dispatch();
	}

	public abstract void doHandle(Msg msg);

}
复制代码

在BaseCommandHandler中,定义execute方法,顺序调用:上报消息已接收成功、处理消息、上报消息已处理完成。这里的消息上报部分,都只是输出一个日志来代替,在实际的业务中,能够抽取出一个抽象方法,让子类来实现。

定义子类,继承自BaseCommandHandler

public class LoginMsgHandler extends BaseCommandHandler {

	@Override
	public void doHandle(Msg msg) {
		LogUtil.log("Client,handle msg:" + msg);
	}

}
复制代码

对应的心跳类型消息、欢迎类型消息等,均可以新增对应的处理类来实现,这里再也不展开。

接收到消息时的处理

public class Main {

	public static void main(String[] args) {
		try {
			String host = "127.0.0.1";
			int port = 12345;
			NettyServer server = new NettyServer(port);
			server.run();
			Thread.sleep(1000);
			ConnectionManager.initConnection(new NettyConnection());
			ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() {

				@Override
				public void onConnected() {
					LogUtil.log("Main,onConnected");

					ConnectionManager.getInstance().registerMsgCallback(new IMsgCallback() {

						@Override
						public void onMsgReceived(Msg msg) {
							MsgQueue.getInstance().enqueueMsg(msg);
							MsgDispatcher.dispatch();
						}
					});
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

}
复制代码

客户端登陆

修改消息Bean,增长登陆的请求和响应

public class Msg {

	/** * 欢迎 */
	public static final int TYPE_WELCOME = 0;

	/** * 心跳 */
	public static final int TYPE_HEART_BEAT = 1;

	/** * 登陆 */
	public static final int TYPE_LOGIN = 2;

	public static final int TYPE_COMMAND_A = 3;

	public static final int TYPE_COMMAND_B = 4;

	public static final int TYPE_COMMAND_C = 5;

	public int type;

	public String msg;

	public int priority;

	public long time;

	/** * 登陆请求信息 * * @author Administrator * */
	public static class LoginRuquestInfo {
		/** * 用户名 */
		public String user;

		/** * 密码 */
		public String pwd;

		@Override
		public String toString() {
			return "LoginRuquestInfo [user=" + user + ", pwd=" + pwd + "]";
		}
	}

	/** * 登陆响应信息 * * @author Administrator * */
	public static class LoginResponseInfo {

		/** * 登陆成功 */
		public static final int CODE_SUCCESS = 0;

		/** * 登陆失败 */
		public static final int CODE_FAILED = 100;

		/** * 响应码 */
		public int code;

		/** * 响应数据 */
		public String data;

		public static class ResponseData {
			public String token;
		}

		@Override
		public String toString() {
			return "LoginResponseInfo [code=" + code + ", data=" + data + "]";
		}

	}
}
复制代码

发送登陆请求

public class Main {

	public static void main(String[] args) {
		try {
			String host = "127.0.0.1";
			int port = 12345;
			NettyServer server = new NettyServer(port);
			server.run();
			Thread.sleep(1000);
			ConnectionManager.initConnection(new NettyConnection());
			ConnectionManager.getInstance().connect(host, port, new IConnectionCallback() {

				@Override
				public void onConnected() {
					LogUtil.log("Main,onConnected");

					ConnectionManager.getInstance().registerMsgCallback(new IMsgCallback() {

						@Override
						public void onMsgReceived(Msg msg) {
							MsgQueue.getInstance().enqueueMsg(msg);
							MsgDispatcher.dispatch();
						}
					});

					Msg msg = new Msg();
					msg.type = Msg.TYPE_LOGIN;

					Msg.LoginRuquestInfo request = new LoginRuquestInfo();
					request.user = "wangyz";
					request.pwd = "wangyz";

					Gson gson = new Gson();
					msg.msg = gson.toJson(request);

					ConnectionManager.getInstance().sendMsg(msg);
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

}
复制代码

这里,引入Gson,将消息Bean转成json字符串后发送。

对应到服务端,为便于解析出消息,也须要对应的修改消息的Bean。服务端对消息的具体分发与处理,和客户端相似,这里再也不展开。

源码

因为篇幅限制,Demo中指令的优先级处理,模拟服务端指令下发等,这里没有再进一步详细介绍,具体能够参考源码:github.com/milovetingt…

后记

本文介绍了基于Netty实现服务端与客户端通讯的基本用法,以及在此基础上,实现处理服务端指令并上报。Demo中通讯的数据格式,用到了json,而优化的作法,能够用protobuf来实现,这里只展现通讯的流程及简单的封装,于是未使用protobuf。Demo中只实现大致的流程,可能存在未测试到的Bug,权当一个参考的思路吧。

End~

相关文章
相关标签/搜索