本章介绍java
UDP介绍算法
UDP程序结构和设计数据库
日志事件POJO编程
编写广播器bootstrap
编写监听者服务器
使用广播器和监听者网络
Summaryapp
前面的章节都是在示例中使用TCP协议,这一章,咱们将使用UDP。UDP是一种无链接协议,若须要很高的性能和对数据的完成性没有严格要求,那使用UDP是一个很好的方法。最著名的基于UDP协议的是用来域名解析的DNS。dom
Netty使用了统一的传输API,这使得编写基于UDP的应用程序很容易。能够重用现有的ChannelHandler和其余公共组件来编写另外的Netty程序。看完本章后,你就会知道什么是无链接协议以及为何UDP可能适合你的应用程序。socket
13.1 UDP介绍
在深刻探讨UDP以前,咱们先了解UDP是什么,以及UDP有什么限制或问题。UDP是一种无链接的协议,也就是说客户端和服务器在交互数据以前不会像TCP那样事先创建链接。
UDP是User Datagram Protocol的简称,即用户数据报协议。UDP有不提供数据报分组、组装和不能对数据报进行排序的缺点,也就是说,当数据报发送以后是没法确认数据是否完整到达的。
UDP协议的主要做用是将网络数据流量压缩成数据包的形式。一个典型的数据包就是一个二进制数据的传输单位。每个数据包的前8个字节用来包含报头信息,剩余字节则用来包含具体的传输数据。
在选择使用协议的时候,选择UDP必需要谨慎。在网络质量使人十分不满意的环境下,UDP协议数据包丢失会比较严重。可是因为UDP的特性:它不属于链接型协议,于是具备资源消耗小,处理速度快的优势,因此一般音频、视频和普通数据在传送时使用UDP较多,由于它们即便偶尔丢失一两个数据包,也不会对接收结果产生太大影响。好比咱们聊天用的ICQ和QQ就是使用的UDP协议。
13.2 UDP程序结构和设计
本章例子中,程序打开一个文件并将文件内容一行一行的经过UDP广播到其余的接收主机。
对于像发送日志的需求,UDP很是适合这样的应用程序,并可使用UDP经过网络发送大量的“事件”。
每一个UDP报文分UDP报头和UDP数据区两部分,报头由四个16位长(2字节)字段组成,分别说明该报文的源端口、目的端口、报文长度以及校验值;数据库就是传输的具体数据。
UDP有以下特性:
1.UDP是一个无链接协议,传输数据以前源端和终端不创建链接,当它想传送时就简单地去抓取来自应用程序的数据,并尽量快地把它扔到网络上。在发送端,UDP传送数据的速度仅仅是受应用程序生成数据的速度、计算机的能力和传输带宽的限制;在接收端,UDP把每一个消息段放在队列中,应用程序每次从队列中读一个消息段。
2.因为传输数据不创建链接,所以也就不须要维护链接状态,包括收发状态等,所以一台服务机可同时向多个客户机传输相同的消息。
3.UDP信息包的标题很短,只有8个字节,相对于TCP的20个字节信息包的额外开销很小。
4.吞吐量不受拥挤控制算法的调节,只受应用软件生成数据的速率、传输带宽、源端和终端主机性能的限制。
5.UDP使用尽最大努力交付,即不保证可靠交付,所以主机不须要维持复杂的连接状态表(这里面有许多参数)。
6.UDP是面向报文的。发送方的UDP对应用程序交下来的报文,在添加首部后就向下交付给IP层。既不拆分,也不合并,而是保留这些报文的边界,所以,应用程序须要选择合适的报文大小。
本章UDP程序例子的示意图入以下:
从上图能够看出,例子程序由两部分组成:广播日志文件和“监控器”,监控器用于接收广播。为了简单,咱们将不作任何形式的身份验证或加密。
13.3 日志事件POJO
咱们的应用程序一般须要某种“消息POJO”用于保存消息,咱们把这个消息POJO当作是一个“事件消息”在本例子中咱们也建立一个POJO叫作LogEvent,LogEvent用来存储事件数据,而后将数据输出到日志文件。看下面代码:
package netty.in.action.udp; import java.net.InetSocketAddress; public class LogEvent { public static final byte SEPARATOR = (byte) '|'; private final InetSocketAddress source; private final String logfile; private final String msg; private final long received; public LogEvent(String logfile, String msg) { this(null, -1, logfile, msg); } public LogEvent(InetSocketAddress source, long received, String logfile, String msg) { this.source = source; this.logfile = logfile; this.msg = msg; this.received = received; } public InetSocketAddress getSource() { return source; } public String getLogfile() { return logfile; } public String getMsg() { return msg; } public long getReceived() { return received; } }
接下来的章节,咱们将用这个POJO类来实现具体的逻辑。
13.4 编写广播器
咱们要作的是广播一个DatagramPacket日志条目,以下图所示:
上图显示咱们有一个从日志条路到DatagramPacket一对一的关系。如同全部的基于Netty的应用程序同样,它由一个或多个ChannelHandler和一些实体对象绑定,用于引导该应用程序。首先让咱们来看看LogEventBroadcaster的ChannelPipeline以及做为数据载体的LogEvent的流向,看下图:
上图显示,LogEventBroadcaster使用LogEvent消息并将消息写入本地Channel,全部的信息封装在LogEvent消息中,这些消息被传到ChannelPipeline中。流进ChannelPipeline的LogEvent消息被编码成DatagramPacket消息,最后经过UDP广播到远程对等通道。
这能够归结为有一个自定义的ChannelHandler,从LogEvent消息编程成DatagramPacket消息。
回忆咱们在第七章讲解的编解码器,咱们定义个LogEventEncoder,代码以下:
package netty.in.action.udp; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramPacket; import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.util.CharsetUtil; import java.net.InetSocketAddress; import java.util.List; public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> { private final InetSocketAddress remoteAddress; public LogEventEncoder(InetSocketAddress remoteAddress){ this.remoteAddress = remoteAddress; } @Override protected void encode(ChannelHandlerContext ctx, LogEvent msg, List<Object> out) throws Exception { ByteBuf buf = ctx.alloc().buffer(); buf.writeBytes(msg.getLogfile().getBytes(CharsetUtil.UTF_8)); buf.writeByte(LogEvent.SEPARATOR); buf.writeBytes(msg.getMsg().getBytes(CharsetUtil.UTF_8)); out.add(new DatagramPacket(buf, remoteAddress)); } }
下面咱们再编写一个广播器:
package netty.in.action.udp; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioDatagramChannel; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.net.InetSocketAddress; public class LogEventBroadcaster { private final EventLoopGroup group; private final Bootstrap bootstrap; private final File file; public LogEventBroadcaster(InetSocketAddress address, File file) { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(new LogEventEncoder(address)); this.file = file; } public void run() throws IOException { Channel ch = bootstrap.bind(0).syncUninterruptibly().channel(); long pointer = 0; for (;;) { long len = file.length(); if (len < pointer) { pointer = len; } else { RandomAccessFile raf = new RandomAccessFile(file, "r"); raf.seek(pointer); String line; while ((line = raf.readLine()) != null) { ch.write(new LogEvent(null, -1, file.getAbsolutePath(), line)); } ch.flush(); pointer = raf.getFilePointer(); raf.close(); } try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.interrupted(); break; } } } public void stop() { group.shutdownGracefully(); } public static void main(String[] args) throws Exception { int port = 4096; String path = System.getProperty("user.dir") + "/log.txt"; LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress( "255.255.255.255", port), new File(path)); try { broadcaster.run(); } finally { broadcaster.stop(); } } }
13.5 编写监听者
这一节咱们编写一个监听者:EventLogMonitor,也就是用来接收数据的程序。EventLogMonitor作下面事情:
接收LogEventBroadcaster广播的DatagramPacket
解码LogEvent消息
输出LogEvent
EventLogMonitor的示意图以下:
解码器代码以下:
package netty.in.action.udp; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.socket.DatagramPacket; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.util.CharsetUtil; import java.util.List; public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> { @Override protected void decode(ChannelHandlerContext ctx, DatagramPacket msg, List<Object> out) throws Exception { ByteBuf buf = msg.content(); int i = buf.indexOf(0, buf.readableBytes(), LogEvent.SEPARATOR); String filename = buf.slice(0, i).toString(CharsetUtil.UTF_8); String logMsg = buf.slice(i + 1, buf.readableBytes()).toString(CharsetUtil.UTF_8); LogEvent event = new LogEvent(msg.sender(), System.currentTimeMillis(), filename, logMsg); out.add(event); } }
处理消息的Handler代码以下:
package netty.in.action.udp; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> { @Override protected void channelRead0(ChannelHandlerContext ctx, LogEvent msg) throws Exception { StringBuilder builder = new StringBuilder(); builder.append(msg.getReceived()); builder.append(" ["); builder.append(msg.getSource().toString()); builder.append("] ["); builder.append(msg.getLogfile()); builder.append("] : "); builder.append(msg.getMsg()); System.out.println(builder.toString()); } }
EventLogMonitor代码以下:
package netty.in.action.udp; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioDatagramChannel; import java.net.InetSocketAddress; public class LogEventMonitor { private final EventLoopGroup group; private final Bootstrap bootstrap; public LogEventMonitor(InetSocketAddress address) { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioDatagramChannel.class) .option(ChannelOption.SO_BROADCAST, true) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new LogEventDecoder()); pipeline.addLast(new LogEventHandler()); } }).localAddress(address); } public Channel bind() { return bootstrap.bind().syncUninterruptibly().channel(); } public void stop() { group.shutdownGracefully(); } public static void main(String[] args) throws InterruptedException { LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(4096)); try { Channel channel = monitor.bind(); System.out.println("LogEventMonitor running"); channel.closeFuture().sync(); } finally { monitor.stop(); } } }
13.6 使用LogEventBroadcaster和LogEventMonitor
为避免LogEventMonitor接收不到数据,咱们必须先启动LogEventMonitor后,再启动LogEventBroadcaster,输出内容就不贴图了,读者能够本身运营本例子测试。
13.7 Summary
本章依然没按照原书中的来翻译,主要是以一个例子来讲明UDP在Netty中的使用。概念性的东西都是从网上复制的,读者只须要了解UDP的概念再了解清楚例子代码的含义,并试着运行一些例子。