jsbintask的博客,转载请注明出处。java
Nowadays we use general purpose applications or libraries to communicate with each other. For example, we often use an HTTP client library to retrieve information from a web server and to invoke a remote procedure call via web services. However, a general purpose protocol or its implementation sometimes does not scale very well. It is like how we don't use a general purpose HTTP server to exchange huge files, e-mail messages, and near-realtime messages such as financial information and multiplayer game data. What's required is a highly optimized protocol implementation that is dedicated to a special purpose. For example, you might want to implement an HTTP server that is optimized for AJAX-based chat application, media streaming, or large file transfer. You could even want to design and implement a whole new protocol that is precisely tailored to your need. Another inevitable case is when you have to deal with a legacy proprietary protocol to ensure the interoperability with an old system. What matters in this case is how quickly we can implement that protocol while not sacrificing the stability and performance of the resulting application.git
这是netty的官方介绍,大概意思就是: 咱们常常但愿咱们的应用可以和其它应用互相通讯。例如,咱们常用http请求去查询信息或者使用rpc调用webservice,可是对于这种特定的协议(http,ftp等)来讲,是不易于专门针对 本身应用程序进行扩展的。比方说咱们不会使用http协议去传输大文件,邮件,即时通信(金融信息),这须要对现有协议作出较大的优化!这样咱们就可使用netty定制属于你本身的协议!程序员
这里借用知乎上一个回答:github
做为一个学Java的,若是没有研究过Netty,那么你对Java语言的使用和理解仅仅停留在表面水平,会点SSH,写几个MVC,访问数据库和缓存,这些只是初等Java程序员干的事。若是你要进阶,想了解Java服务器的深层高阶知识,Netty绝对是一个必需要过的门槛。有了Netty,你能够实现本身的HTTP服务器,FTP服务器,UDP服务器,RPC服务器,WebSocket服务器,Redis的Proxy服务器,MySQL的Proxy服务器等等。若是你想知道Nginx是怎么写出来的,若是你想知道Tomcat和Jetty,Dubbo是如何实现的,若是你也想实现一个简单的Redis服务器,那都应该好好理解一下Netty,它们高性能的原理都是相似的。web
while ture
events = takeEvents(fds) // 获取事件,若是没有事件,线程就休眠
for event in events {
if event.isAcceptable {
doAccept() // 新连接来了
} elif event.isReadable {
request = doRead() // 读消息
if request.isComplete() {
doProcess()
}
} elif event.isWriteable {
doWrite() // 写消息
}
}
}
复制代码
NIO的流程大体就是上面的伪代码描述的过程,跟实际真实的代码有较多差别,不过对于初学者,这样理解也是足够了。Netty是创建在NIO基础之上,Netty在NIO之上又提供了更高层次的抽象。在Netty里面,Accept链接可使用单独的线程池去处理,读写操做又是另外的线程池来处理。Accept链接和读写操做也可使用同一个线程池来进行处理。而请求处理逻辑既可使用单独的线程池进行处理,也能够跟放在读写线程一块处理。线程池中的每个线程都是NIO线程。用户能够根据实际状况进行组装,构造出知足系统需求的并发模型。Netty提供了内置的经常使用编解码器,包括行编解码器[一行一个请求],前缀长度编解码器[前N个字节定义请求的字节长度],可重放解码器[记录半包消息的状态],HTTP编解码器,WebSocket消息编解码器等等Netty提供了一些列生命周期回调接口,当一个完整的请求到达时,当一个链接关闭时,当一个链接创建时,用户都会收到回调事件,而后进行逻辑处理。Netty能够同时管理多个端口,可使用NIO客户端模型,这些对于RPC服务是颇有必要的。Netty除了能够处理TCP Socket以外,还能够处理UDP Socket。在消息读写过程当中,须要大量使用ByteBuffer,Netty对ByteBuffer在性能和使用的便捷性上都进行了优化和抽象。总之,Netty是Java程序员进阶的必备神奇。若是你知其然,还想知其因此然,必定要好好研究下Netty。若是你以为Java枯燥无谓,Netty则是从新开启你对Java兴趣大门的钥匙。数据库
总结:程序员水平进阶的利器!缓存
note: 对于本例中除了很是重要的核心类会讲解外,其余类不会过多讲解,本章只作入门,其它章节会重点讲解! 咱们已经知道了netty的做用(灵活优化定制你本身的协议),以及为何要学习netty。那接下来咱们就一步一步来定制本身的协议最后完成聊天室!服务器
既然咱们取名print协议,那就是打印的意思:服务端接受客服端的信息而且打印! 首先咱们编写一个ChannelInboundHandlerAdapter,用于处理接收到的消息,咱们首先分析下这个类的做用,继承关系以下: 架构
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(byteBuf.toString(Charset.forName("utf-8")));
ctx.writeAndFlush(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
复制代码
收到消息后打印,接着继续编写一个启动类,用于启动一个开启咱们本身协议的服务,PrintServerApp:并发
public class EchoServerApp {
private int port;
public EchoServerApp(int port) {
this.port = port;
}
public void run() throws Exception {
NioEventLoopGroup bossLoopGroup = new NioEventLoopGroup();
NioEventLoopGroup workLoopGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossLoopGroup, workLoopGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossLoopGroup.shutdownGracefully();
workLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new EchoServerApp(8080).run();
}
}
复制代码
启动。而后咱们使用win自带的telnet工具来测试(控制面板-》程序和控制-》开启或关闭window功能,勾选telnet)。打开cmd,输入
telnet localhost 8080
复制代码
首先同上面同样,写一个TimeServerHandler:
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf timeBuf = ctx.alloc().buffer();
timeBuf.writeBytes(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()).getBytes());
ChannelFuture channelFuture = ctx.writeAndFlush(timeBuf);
channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
assert channelFuture == future;
// ctx.close();
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
复制代码
启动类同上,接下来,编写客户端TimeClientHandler:
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
ByteBuf byteBuf = (ByteBuf) msg;
int length = byteBuf.readableBytes();
byte[] buff = new byte[1024];
byteBuf.readBytes(buff, 0, length);
System.out.println("current time: " + new String(buff, 0, length));
ctx.close();
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
复制代码
分别启动服务端,客户端。
首先,客户端与服务端通讯的信息咱们抽象出一个对象,Message以及工具类:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Message {
private String username;
private Date sentTime;
private String msg;
}
复制代码
public class Utils {
public static String encodeMsg(Message message) {
return message.getUsername() + "~" + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(message.getSentTime())) + "~" + message.getMsg();
}
public static String formatDateTime(Date time) {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time);
}
public static Date parseDateTime(String time) {
try {
return new SimpleDateFormat("yyyy-MM-dd Hh:mm:ss").parse(time);
} catch (ParseException e) {
return null;
}
}
public static void printMsg(Message msg) {
System.out.println("=================================================================================================");
System.out.println(" " + Utils.formatDateTime(msg.getSentTime()) + " ");
System.out.println(msg.getUsername() + ": " + msg.getMsg());
System.out.println("=================================================================================================");
}
}
复制代码
三个属性分别表明用户名,发送时间,消息内容,接着编写一个用于处理输入消息的handler,用于将byte消息转换成Message,ServerTransferMsgHandler:
public class ServerTransferMsgHandler extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
String totalMsg = in.readCharSequence(in.readableBytes(), Charset.forName("utf-8")).toString();
String[] content = totalMsg.split("~");
out.add(new Message(content[0], Utils.parseDateTime(content[1]), content[2]));
}
}
复制代码
接着,编写一个处理接收消息的Handler,用于打印客户端发送过来的消息,ServerMsgHandler:
public class ServerMsgHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("jsbintask-client进入聊天室。");
Message message = new Message(Constants.SERVER, new Date(), "Hello, I'm jsbintask-server side.");
ByteBuf buffer = ctx.alloc().buffer();
String content = Utils.encodeMsg(message);
buffer.writeBytes(content.getBytes());
ctx.writeAndFlush(buffer);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg1) throws Exception {
try {
Message msg = (Message) msg1;
Utils.printMsg(msg);
Scanner scanner = new Scanner(System.in);
System.out.print("jsbintask-server, please input msg: ");
String reply = scanner.nextLine();
Message message = new Message(Constants.SERVER, new Date(), reply);
ctx.writeAndFlush(message);
} finally {
ReferenceCountUtil.release(msg1);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
复制代码
知道注意的是,channelActive方法,在客户端连接的时候,率先给客户端发送了一条消息,最后,在编写一个用户将服务端Message转成Byte消息的handler,MessageEncoder:
public class MessageEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
String content = Utils.encodeMsg(message);
buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(buffer);
}
}
复制代码
最后,编写server端启动类,ChatroomServerApp:
public class ChatroomServerApp {
public static void main(String[] args) throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new MessageEncoder(), new ServerTransferMsgHandler(), new ServerMsgHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 1024 * 10)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
复制代码
启动Server,继续编写ChatroomClient。
同server端同样,client的关键也是handler,ClientMsgHandler以下:
public class ClientMsgHandler extends SimpleChannelInboundHandler<Message> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
try {
Utils.printMsg(msg);
Scanner scanner = new Scanner(System.in);
System.out.print("jsbintask-client, please input msg: ");
String reply = scanner.nextLine();
Message message = new Message(Constants.CLIENT, new Date(), reply);
ByteBuf buffer = ctx.alloc().buffer();
String content = message.getUsername() + "~" + Utils.formatDateTime(message.getSentTime()) + "~" + message.getMsg();
buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8));
ctx.writeAndFlush(buffer);
} finally {
ReferenceCountUtil.release(msg);
}
}
}
复制代码
接着,一样有将byte转换成Message的转换器,CliengMsgHandler:
public class ClientTransferMsgHandler extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
byte[] buff = new byte[2024];
int length = in.readableBytes();
in.readBytes(buff, 0, length);
String totalMsg = new String(buff, 0, length, StandardCharsets.UTF_8);
String[] content = totalMsg.split("~");
out.add(new Message(content[0], Utils.parseDateTime(content[1]), content[2]));
}
}
复制代码
最后,启动类ChatroomClientApp:
public class ChatroomClientApp {
public static void main(String[] args) throws Exception {
NioEventLoopGroup workLoopGroup = new NioEventLoopGroup();
try {
Bootstrap clientBootstrap = new Bootstrap();
clientBootstrap.group(workLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ClientTransferMsgHandler(), new ClientMsgHandler());
}
})
.option(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = clientBootstrap.connect("localhost", 8888).sync();
channelFuture.channel().closeFuture().sync();
} finally {
workLoopGroup.shutdownGracefully();
}
}
}
复制代码
一样启动client,观察控制台。首先,server端提示client进入了聊天室,而且客户端看到了server端发送过来的”招呼“信息:
本章,咱们开启了学习netty的大门,首先介绍了netty,为何要学netty,而且经过三个案例一步一步实现了本身的协议,打印,时间,消息转换协议,成功踏入了netty的大门,下一章,咱们就来学习一下netty的架构!
关注我,这里只有干货!