Netty服务端代码的hello worldjava
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[]args)throws Exception{
new EchoServer(8888).start();
}
public void start() throws Exception{
final EchoServerHandler handler = new EchoServerHandler();
EventLoopGroup group = new NioEventLoopGroup();
try{
ServerBootstrap b = new ServerBootstrap();
b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(handler);
}
});
ChannelFuture f = b.bind().sync();
f.channel().closeFuture().sync();
}finally {
group.shutdownGracefully().sync();
}
}
}
复制代码
Hello word版代码中用的是同一个NioEventLoop,实际中通常各自分配数组
在管道的最后添加ChannelInitializer的方式则会在管道注册完成以后,往管道中 添加一个ServerBootstrapAcceptor(它是InboundHandler),它持有对childGroup(client)和childHandler的引用,而ChannelInitializer这个InboundHandler在完成它的使命以后,就会从管道中被移除, 至此完成channel的初始化。bash
ServerBootstrapAcceptor 最开始在客户端创建链接的时候执行调用(后续读消息调用),入口是 doReadMessages,读到消息以后,从Head沿着InBoundHandler到ServerBootstrapAcceptor,触发读事件,此时执行注册childGroup到这个channel,也就是每次都用childGroup来处理读到的消息ide
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
//管道注册完成以后触发
ChannelPipeline pipeline = ctx.pipeline();
boolean success = false;
try {
initChannel((C) ctx.channel()); //执行注册过程当中的方法,在这里就是往管道中添加ServerBootstrapAcceptor
pipeline.remove(this); //删除ChannelInitializer自己
ctx.fireChannelRegistered(); //继续沿着管道传递channel注册完成事件
success = true;
} catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
} finally {
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
if (!success) {
ctx.close();
}
}
}
复制代码
新建的NioServerSocketChannel的部分类结构以下:工具
凡是经过 channel()方法获取的则是Netty自身的channel
public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel; //Netty自身的channel
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
复制代码
凡是经过javachannel()调用的获取到的值便是jdk的channel
,而unsafe自己真正意义上执行的register、bind、connect、write、read操做均经过ServerSocketChannel实现netty nio底层的注册channel、绑定监听端口都是经过jdk自身的nio完成的。java nio中的select和channel是怎么使用的?oop
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.printf("Server get:"+in.toString(CharsetUtil.UTF_8));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//将目前暂存于ChannelOutboundBuffer中的消息在下一次flush或者writeAndFlush的时候冲刷到远程并关闭这个channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
复制代码
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host,int port){
this.host=host;
this.port=port;
}
public void start() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)//指定NIO的传输方式
.remoteAddress(new InetSocketAddress(host,port))//指定远程地址
.handler(new ChannelInitializer<SocketChannel>() {//向channel的pipeline添加handler
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());//channelHander交给pipeline
}
});
ChannelFuture f = b.connect().sync();//链接到远程节点,阻塞直到链接完成
System.out.println("wait");
f.channel().closeFuture().sync();//阻塞直到链接关闭
System.out.println("over");
}finally {
System.out.println("shutdown");
group.shutdownGracefully().sync();//关闭线程池而且释放资源
}
}
public static void main(String[]args) throws Exception{
new EchoClient("localhost",8888).start();
}
}
复制代码
从代码自己能够看到与 server的差别化在于如下两个部分:post
注意这里的其实是没有指定本地的地址的ui
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello world",CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("Client get:"+msg.toString(CharsetUtil.UTF_8));
}
}
复制代码