在netty基本组件介绍中,咱们大体了解了netty的一些基本组件,今天咱们来搭建一个基于netty的Tcp服务端程序,经过代码来了解和熟悉这些组件的功能和使用方法。html
首先咱们本身建立一个Server类,命名为TCPServer服务器
第一步初始化ServerBootstrap,ServerBootstrap是netty中的一个服务器引导类,对ServerBootstrap的实例化就是建立netty服务器的入口微信
public class TCPServer { private Logger log = LoggerFactory.getLogger(getClass()); //端口号 private int port=5080; //服务器运行状态 private volatile boolean isRunning = false; //处理Accept链接事件的线程,这里线程数设置为1便可,netty处理连接事件默认为单线程,过分设置反而浪费cpu资源 private final EventLoopGroup bossGroup = new NioEventLoopGroup(1); //处理hadnler的工做线程,其实也就是处理IO读写 。线程数据默认为 CPU 核心数乘以2 private final EventLoopGroup workerGroup = new NioEventLoopGroup(); public void init() throws Exception{ //建立ServerBootstrap实例 ServerBootstrap serverBootstrap=new ServerBootstrap(); //初始化ServerBootstrap的线程组 serverBootstrap.group(workerGroup,workerGroup);// //设置将要被实例化的ServerChannel类 serverBootstrap.channel(NioServerSocketChannel.class);// //在ServerChannelInitializer中初始化ChannelPipeline责任链,并添加到serverBootstrap中 serverBootstrap.childHandler(new ServerChannelInitializer()); //标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度 serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024); // 是否启用心跳保活机机制 serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); //绑定端口后,开启监听 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); if(channelFuture.isSuccess()){ System.out.println("TCP服务启动 成功---------------"); } } /** * 服务启动 */ public synchronized void startServer() { try { this.init(); }catch(Exception ex) { } } /** * 服务关闭 */ public synchronized void stopServer() { if (!this.isRunning) { throw new IllegalStateException(this.getName() + " 未启动 ."); } this.isRunning = false; try { Future<?> future = this.workerGroup.shutdownGracefully().await(); if (!future.isSuccess()) { log.error("workerGroup 没法正常中止:{}", future.cause()); } future = this.bossGroup.shutdownGracefully().await(); if (!future.isSuccess()) { log.error("bossGroup 没法正常中止:{}", future.cause()); } } catch (InterruptedException e) { e.printStackTrace(); } this.log.info("TCP服务已经中止..."); } private String getName() { return "TCP-Server"; } }
上面的代码中主要使用到的ServerBootstrap类的方法有如下这些:框架
group :设置SeverBootstrap要用到的EventLoopGroup,也就是定义netty服务的线程模型,处理Acceptor连接的主"线程池"以及用于I/O工做的从"线程池";socket
channel:设置将要被实例化的SeverChannel类;ide
option :指定要应用到新建立SeverChannel的ChannelConfig的ChannelOption.其实也就是服务自己的一些配置;oop
chidOption:子channel的ChannelConfig的ChannelOption。也就是与客户端创建的链接的一些配置;post
childHandler:设置将被添加到已被接收的子Channel的ChannelPipeline中的ChannelHandler,其实就是让你在里面定义处理链接收发数据,须要哪些ChannelHandler按什么顺序去处理;this
第二步接下来咱们实现ServerChannelInitializer类,这个类继承实现自netty的ChannelInitializer抽象类,这个类的做用就是对channel(链接)的ChannelPipeline进行初始化工做,说白了就是你要把处理数据的方法添加到这个任务链中去,netty才知道每一步拿着socket链接和数据去作什么。编码
@ChannelHandler.Sharable public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { static final EventExecutorGroup group = new DefaultEventExecutorGroup(2); public ServerChannelInitializer() throws InterruptedException { } @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //IdleStateHandler心跳机制,若是超时触发Handle中userEventTrigger()方法 pipeline.addLast("idleStateHandler", new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES)); // netty基于分割符的自带解码器,根据提供的分隔符解析报文,这里是0x7e;1024表示单条消息的最大长度,解码器在查找分隔符的时候,达到该长度还没找到的话会抛异常 // pipeline.addLast( // new DelimiterBasedFrameDecoder(1024, Unpooled.copiedBuffer(new byte[] { 0x7e }), // Unpooled.copiedBuffer(new byte[] { 0x7e }))); //自定义编解码器 pipeline.addLast( new MessagePacketDecoder(), new MessagePacketEncoder() ); //自定义Hadler pipeline.addLast("handler",new TCPServerHandler()); //自定义Hander,可用于处理耗时操做,不阻塞IO处理线程 pipeline.addLast(group,"BussinessHandler",new BussinessHandler()); } }
这里咱们注意下
pipeline.addLast(group,"BussinessHandler",new BussinessHandler());
在这里咱们能够把一些比较耗时的操做(如存储、入库)等操做放在BussinessHandler中进行,由于咱们为它单独分配了EventExecutorGroup 线程池执行,因此说即便这里发生阻塞,也不会影响TCPServerHandler中数据的接收。
最后就是各个部分的具体实现
解码器的实现:
public class MessagePacketDecoder extends ByteToMessageDecoder { public MessagePacketDecoder() throws Exception { } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception { try { if (buffer.readableBytes() > 0) { // 待处理的消息包 byte[] bytesReady = new byte[buffer.readableBytes()]; buffer.readBytes(bytesReady); //这之间能够进行报文的解析处理 out.add(bytesReady ); } }finally { } } }
编码器的实现
public class MessagePacketEncoder extends MessageToByteEncoder<Object> { public MessagePacketEncoder() { } @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { try { //在这以前能够实现编码工做。 out.writeBytes((byte[])msg); }finally { } } }
TCPServerHandler的实现
public class TCPServerHandler extends ChannelInboundHandlerAdapter { public TCPServerHandler() { } @Override public void channelRead(ChannelHandlerContext ctx, Object source) throws Exception { // 拿到传过来的msg数据,开始处理 ByteBuf recvmg = (ByteBuf) source;// 转化为ByteBuf ctx.writeAndFlush(respmsg);// 收到及发送,这里若是没有writeAndFlush,上面声明的ByteBuf须要ReferenceCountUtil.release主动释放 } }
BussinessHandler的实现与TCPServerHandler基本相似,它能够处理一些相对比较耗时的操做,咱们这里就不实现了。
经过以上的代码咱们能够看到,一个基于netty的TCP服务的搭建基本就是三大块:
一、对引导服务器类ServerBootstrap的初始化;
二、对ChannelPipeline的定义,也就是把多个ChannelHandler组成一条任务链;
三、对 ChannelHandler的具体实现,其中能够有编解码器,能够有对收发数据的业务处理逻辑;
以上代码只是在基于netty框架搭建一个最基本的TCP服务,其中包含了一些netty基本的特性和功能,固然这只是netty运用的一个简单的介绍,若有不正确的地方还望指出与海涵。
关注微信公众号,查看更多技术文章。