前言
程序员
UDP 是面向无链接的通信协议,UDP 数据包括目的端口号和源端口号信息, 因为通信不须要链接,因此能够实现广播发送。Netty也为咱们封装相关支持UDP诸多组件、数据报文和处理器。编程
UDP 通信时不须要接收方确认,属于不可靠的传输,可能会出现丢包现象, 实际应用中要求程序员编程验证。网络
UDP 与 TCP 位于同一层,但它无论数据包的顺序、错误或重发。所以,UDP 不被应用于那些面向链接的服务,UDP 主要用于那些面向查询---应答的服务,例 如 NFS。相对于 FTP 或 Telnet,这些服务须要交换的信息量较小。使用 UDP 的服 务包括 NTP(网络时间协议)和 DNS(DNS 也使用 TCP),包总量较少的通讯(DNS、 SNMP 等);2.视频、音频等多媒体通讯(即时通讯);3.限定于 LAN 等特定网 络中的应用通讯;4.广播通讯(广播、多播)。app
经常使用的 QQ,就是一个以 UDP 为主,TCP 为辅的通信协议。 TCP 和 UDP 的优缺点没法简单地、绝对地去作比较:TCP 用于在传输层有 必要实现可靠传输的状况;而在一方面,UDP 主要用于那些对高速传输和实时 性有较高要求的通讯或广播通讯。TCP 和 UDP 应该根据应用的目的按需使用。dom
Netty中UDP的核心组件ide
Netty 的 DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程 节点通讯。相似于在咱们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地 址以及消息的有效负载自己。oop
让咱们来运用Netty的核心组件,构建一个基于UDP的多播应用。this
代码设计实现编码
广播端部分spa
/**
* @author andychen https://blog.51cto.com/14815984
* @description:通知的广播端
*/
public class NoticeBroadcast {
//广播线程组
private final EventLoopGroup group;
//广播启动器
private final Bootstrap boot;
/**
* 默认构造
* @param remotePort 接收端端口
*/
public NoticeBroadcast(int remotePort) {
this.group = new NioEventLoopGroup();
this.boot = new Bootstrap();
//绑定NioDatagramChannel数据报通道
this.boot.group(group).channel(NioDatagramChannel.class)
//设置通道用于广播
.option(ChannelOption.SO_BROADCAST, true)
.handler(new NoticeEncoder(new InetSocketAddress(Constant.BROADCAST_IP, remotePort)));
}
/**
* 运行广播
*/
public void run() throws Exception {
int count = 0;
//绑定广播通道
Channel channel = this.boot.bind(0).sync().channel();
System.out.println("开始运行广播,发送通知,目标全部主机端口("+Constant.ACCEPTER_PORT+")...");
//循环广播通知
for (;;){
/**
* 发送通知到接收端
*/
channel.writeAndFlush(new Notice(++count, Constant.getNotice(),null));
//间隔3秒发送
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.interrupted();
e.printStackTrace();
break;
}
}
}
/**
* 中止运行
*/
public void stop(){
try {
this.group.shutdownGracefully();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:广播运行器
*/
public class BroadcastRunner {
/**
* 运行消息广播
* @param args
*/
public static void main(String[] args) {
NoticeBroadcast broadcast = null;
try {
broadcast = new NoticeBroadcast(Constant.ACCEPTER_PORT);
broadcast.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
broadcast.stop();
}
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:通知编码器
*/
public class NoticeEncoder extends MessageToMessageEncoder<Notice> {
//目的地
private final InetSocketAddress target;
public NoticeEncoder(InetSocketAddress target) {
this.target = target;
}
/**
* 编码方法实现
* @param ctx 处理器上下文
* @param notice 通知对象
* @param list 集合
* @throws Exception
*/
protected void encode(ChannelHandlerContext ctx, Notice notice, List<Object> list) throws Exception {
//内容数据
byte[] bytes = notice.getContent().getBytes(CharsetUtil.UTF_8);
//定义缓冲:一个int型+一个long型+内容长度+分隔符
int capacity = 4+8+bytes.length+1;
ByteBuf buf = ctx.alloc().buffer(capacity);
//写通知id
buf.writeInt(notice.getId());
//发送时间
buf.writeLong(notice.getTime());
//分隔符
buf.writeByte(Notice.SEPARATOR);
//内容
buf.writeBytes(bytes);
//加入消息列表
list.add(new DatagramPacket(buf, target));
}
}
接收端部分
/**
* @author andychen https://blog.51cto.com/14815984
* @description:通知接收器
*/
public class NoticeAccepter {
//通知线程组
private final EventLoopGroup group;
//启动器
private final Bootstrap boot;
public NoticeAccepter() {
this.group = new NioEventLoopGroup();
this.boot = new Bootstrap();
this.boot.group(this.group)
.channel(NioDatagramChannel.class)
//开启通道底层广播
.option(ChannelOption.SO_BROADCAST, true)
//端口重用
.option(ChannelOption.SO_REUSEADDR, true)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new NoticeDecoder());
pipeline.addLast(new NoticeChannelHanler());
}
})
.localAddress(Constant.ACCEPTER_PORT);
}
/**
* 运行接收器
*/
public void run(){
try {
//设置不间断接收消息,并绑定通道
Channel channel = this.boot.bind().syncUninterruptibly().channel();
System.out.println("接收器启动,端口("+ Constant.ACCEPTER_PORT+"),等待接收通知...");
//通道阻塞,直到关闭
channel.closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
this.stop();
}
}
/**
* 中止接收消息
*/
public void stop(){
try {
this.group.shutdownGracefully();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:通知通道处理器
*/
public class NoticeChannelHanler extends SimpleChannelInboundHandler<Notice> {
/**
* 接收广播传递过来的报文
* @param channelHandlerContext
* @param notice
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Notice notice) throws Exception {
StringBuffer buffer = new StringBuffer();
buffer.append("时间[");
buffer.append(notice.getTime());
buffer.append("],广播源[");
buffer.append(notice.getSource().toString());
buffer.append("]=====[");
buffer.append(notice.getId());
buffer.append("]=====通知内容:");
buffer.append(notice.getContent());
//打印接收到的数据
System.out.println(buffer.toString());
}
/**
* 异常捕获
* @param ctx 上下文
* @param cause
* @throws Exception 异常信息
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:通知解码器
*/
public class NoticeDecoder extends MessageToMessageDecoder<DatagramPacket> {
/**
* 解码器核心实现
* @param channelHandlerContext 处理器上下文
* @param datagramPacket 数据报
* @param list 消息列表
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket, List<Object> list) throws Exception {
//数据报内容
ByteBuf data = datagramPacket.content();
//通知id
int id = data.readInt();
//发送时间
long time = data.readLong();
//分隔符
data.readByte();
//当前索引
int idx = data.readerIndex();
//通知内容
String content = data.slice(idx, data.readableBytes()).toString(CharsetUtil.UTF_8);
//加入消息列表
list.add(new Notice(id,content, datagramPacket.sender()));
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:消息接收器启动器
*/
public class AccepterRunner {
/**
* 运行通知接收任务
* @param args
*/
public static void main(String[] args) {
NoticeAccepter accepter = null;
try {
accepter = new NoticeAccepter();
accepter.run();
} catch (Exception e) {
e.printStackTrace();
}finally {
accepter.stop();
}
}
}
其它部分
/**
* @author andychen https://blog.51cto.com/14815984
* @description:通知信息
*/
public class Notice {
public int getId() {
return id;
}
public long getTime() {
return time;
}
public String getContent() {
return content;
}
public InetSocketAddress getSource() {
return source;
}
//通知id
private final int id;
//发送时间
private final long time;
//通知内容
private final String content;
//来源地址
private final InetSocketAddress source;
//分隔符
public static final byte SEPARATOR = (byte) ':';
public Notice(int id, String content, InetSocketAddress source) {
this.id = id;
this.content = content;
this.source = source;
this.time = System.currentTimeMillis();
}
}
/**
* @author andychen https://blog.51cto.com/14815984
* @description:系统常量
*/
public class Constant {
/**
* 广播地址
*/
public static final String BROADCAST_IP = "255.255.255.255";
/**
* 接收者端口(固定的)
*/
public static final int ACCEPTER_PORT = 8700;
/**
* 通知池
*/
private static final String[] NOTICE_POOL = {
"多国疫情忽然反弹,北京下一步怎么办?",
"端午假期,这位政法委书记去了边境",
"省委书记、省长们的端午假期",
"人社部回应图书馆留言大叔 送吴桂春们求职就业指南",
"北京通报病例状况,有句话出现十屡次",
"北京26日新增病例活动轨迹公布!涉及这些地方!\n",
"蚊蝇增可能是否会传播新冠病毒?官方回应来了",
"俄罗斯将扩大直接对华供应食品地区名单",
"一图看懂:新发地到底有多大多复杂?",
"倒闭、亏损、坏帐,影视行业如何“活下去”?",
"警察献血反被辱!香港医管局:致歉并展开调查",
"华为获准在英国建研发中心 美官员“打招呼”过问",
"105所高校经过认证!教育部公布一份重磅名单",
"北京新增确诊17例:北京16天确诊297例",
"印度陆军司令:已在中印实控线作长期准备",
"印度增兵边境作出两冒险动做 中方须作冲突升级准备",
"中印对峙印度下一步会有何行动?偷袭奇袭捞一把就走",
"印度大批军机飞向拉达克想洗刷耻辱 直升机紧急着陆",
"蓬佩奥为什么反对伊朗买歼10?将威胁美在波斯湾秩序",
"美国防受权法呼吁美军医疗船安慰号及仁慈号停靠台湾",
"印度造舰能力有多强?媒体:像当年的中日韩值得看好",
"中国6代战机究竟长啥样:机头尖锐无平尾",
"歼16电子战机有多强:干扰距离翻倍 优于美军EA-18G",
"我军PCL09车载炮为什么上高原 高低搭配火力覆盖没死角",
"胡锡进:这时候谁愿意去美国?签证留给黄之锋吧",
"美方因涉港问题对中方官员实施签证限制 中方回应",
"我军为什么选择6-25高炮放弃单35 根本缘由并不在火炮",
"在中国问题上 短视的是莫迪买单的是印度",
"解放军驻吉布提基地官兵已换装星空迷彩服",
"印度陆军司令向防长汇报:在中印实控线作长期准备",
"印度多架军机在中印边境密集活动 直升机紧急着陆",
"我军PCL09车载炮为什么上高原 高低搭配火力覆盖没死角",
"蓬佩奥为什么反对伊朗买歼10?将威胁美在波斯湾秩序",
"美国防受权法呼吁美军医疗船安慰号及仁慈号停靠台湾"
};
/**
* 获取消息
*/
public static String getNotice(){
Random r = new Random();
return NOTICE_POOL[r.nextInt(NOTICE_POOL.length)];
}
}
运行验证
UDP广播端模式,广播端和接收端并没有严格地启动顺序;通常来讲为了不开始消息接收不到的问题,可先启动接收端等待。接收端开多个模拟验证。
总结
由于UDP广播模式的发送针对局域网全部主机IP,因此更适合在公司内部使用项目,相似通知模块和须要全体一块儿接收的业务场景。但同时鉴于UDP是面向无链接的,消息的发送没有对端的应答等机制。因此是不可靠的传输协议,你们在项目中要评估好业务场景。固然也能够做为辅助手段和TCP协议结合使用为最佳。