介绍:
初步学习了一下netty,都知道Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。按照 http://netty.io/wiki/user-guide-for-4.x.html所作的介绍 尝试编写了几个demo 最后加入本身写的pojo对象,这里只是一些入门介绍 ,对于原理以及它如何高性能,高可靠性 后续会继续了解。后面会参考一个简单rpc框架的搭建。这里的版本html
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency>
第一个:咱们尝试写一个简单的服务端程序 而且经过telnet 在服务端打印一些东西java
server端:bootstrap
package com.manyi.iw.agentcall.soa.server.service; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Created by huhaosumail on 16/12/22. */ public class Server { private int port; public Server(int port){ this.port=port; } public void run() throws Exception{ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup=new NioEventLoopGroup(); try{ ServerBootstrap b= new ServerBootstrap(); b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // socketChannel.pipeline().addLast(new TimeEncoder(),new TimeServerHandler()); socketChannel.pipeline().addLast(new TimeServerHandler()); } }).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true); //绑定和开始接受连接 ChannelFuture f= b.bind(port).sync(); f.channel().closeFuture().sync(); }finally { //关闭 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port=1080; new Server(port).run(); } }
serverhandler:服务器
package com.manyi.iw.agentcall.soa.server.service; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; /** * Created by huhaosumail on 16/12/23. */ public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); } } // @Override // public void channelActive(final ChannelHandlerContext ctx) throws Exception { //// final ByteBuf time= ctx.alloc().buffer(4); //// time.writeInt((int)(System.currentTimeMillis()/1000L+2208988800L)); //// final ChannelFuture f= ctx.writeAndFlush(time); //// f.addListener(new ChannelFutureListener() { //// @Override //// public void operationComplete(ChannelFuture channelFuture) throws Exception { //// assert f==channelFuture; //// ctx.close(); //// } //// }); // // ChannelFuture f=ctx.writeAndFlush(new Person(26,"胡浩是傻逼")); // f.addListener(ChannelFutureListener.CLOSE); // } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } }
这里启动以后 终端经过telnet localhost 1080 能够在终端输入信息 并在控制台打印 网络
第一个:咱们尝试编写一个服务端 写一些内容发送给客户端 并打印这个pojo对象 不过这里我用到序列化框架
不太规范 优先达到效果异步
server:socket
package com.manyi.iw.agentcall.soa.server.service; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * Created by huhaosumail on 16/12/22. */ public class Server { private int port; public Server(int port){ this.port=port; } public void run() throws Exception{ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup=new NioEventLoopGroup(); try{ ServerBootstrap b= new ServerBootstrap(); b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new TimeEncoder(),new TimeServerHandler()); // socketChannel.pipeline().addLast(new TimeServerHandler()); } }).option(ChannelOption.SO_BACKLOG,128).childOption(ChannelOption.SO_KEEPALIVE,true); //绑定和开始接受连接 ChannelFuture f= b.bind(port).sync(); f.channel().closeFuture().sync(); }finally { //关闭 workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port=8080; new Server(port).run(); } }
TimeServerHandler:
package com.manyi.iw.agentcall.soa.server.service; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; /** * Created by huhaosumail on 16/12/23. */ public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; try { while (in.isReadable()) { System.out.print((char) in.readByte()); System.out.flush(); } } finally { ReferenceCountUtil.release(msg); } } @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { // final ByteBuf time= ctx.alloc().buffer(4); // time.writeInt((int)(System.currentTimeMillis()/1000L+2208988800L)); // final ChannelFuture f= ctx.writeAndFlush(time); // f.addListener(new ChannelFutureListener() { // @Override // public void operationComplete(ChannelFuture channelFuture) throws Exception { // assert f==channelFuture; // ctx.close(); // } // }); ChannelFuture f=ctx.writeAndFlush(new Person(26,"胡浩是傻逼")); f.addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } }
TimeEncoder:
package com.manyi.iw.agentcall.soa.server.service; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.io.ByteArrayOutputStream; import java.io.ObjectOutputStream; import java.io.Serializable; /** * Created by huhaosumail on 16/12/23. */ public class TimeEncoder extends MessageToByteEncoder<Person> implements Serializable { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Person person, ByteBuf byteBuf) throws Exception { // byteBuf.writeInt(person.getAge()); // byteBuf.writeBytes(person.getName().getBytes()); ByteArrayOutputStream bo=new ByteArrayOutputStream(); ObjectOutputStream oo=new ObjectOutputStream(bo); oo.writeObject(person); bo.close(); oo.close(); byteBuf.writeBytes(bo.toByteArray()); } }
client端:ide
package com.manyi.iw.agentcall.soa.server.service; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import sun.misc.Unsafe; import java.lang.reflect.Field; import java.nio.ByteBuffer; import java.nio.channels.Selector; /** * Created by huhaosumail on 16/12/23. */ public class TimeClient { public static void main(String[] args) throws Exception{ EventLoopGroup workerGroup=new NioEventLoopGroup(); try{ Bootstrap b=new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE,true); b.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new TimeDecoder(),new TimeClientHandler()); } }); //开启客户端 ChannelFuture f=b.connect("localhost",8080).sync(); //等待直到链接关闭 f.channel().closeFuture().sync(); }finally { workerGroup.shutdownGracefully(); } } }
TimeDecoder:
package com.manyi.iw.agentcall.soa.server.service; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.io.Serializable; import java.util.List; /** * Created by huhaosumail on 16/12/23. */ public class TimeDecoder extends ByteToMessageDecoder implements Serializable { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { if(byteBuf.readableBytes()<4){ return; } // list.add(byteBuf.readBytes(4)); // list.add(new Person(byteBuf.readInt())); // // byte[] req=new byte[byteBuf.readableBytes()]; // byteBuf.readBytes(req); // list.add(new Person(new String(req,"UTF-8"))); byte[] req=new byte[byteBuf.readableBytes()]; byteBuf.readBytes(req); ByteArrayInputStream bi = new ByteArrayInputStream(req); ObjectInputStream oi = new ObjectInputStream(bi); Person p=(Person)oi.readObject(); bi.close(); oi.close(); list.add(p); } }
TimeClientHandler:
package com.manyi.iw.agentcall.soa.server.service; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; /** * Created by huhaosumail on 16/12/23. */ public class TimeClientHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // ByteBuf f= (ByteBuf)msg; // try{ // long currentTimeMills=(f.readUnsignedInt()-2208988800L)*1000L; // System.out.println(new Date(currentTimeMills)); // ctx.close(); // }finally { // f.release(); // } Person p=(Person)msg; System.out.println("name:"+p.getName()+",age:"+p.getAge()); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); ctx.close(); } }
效果工具