HTTP协议会携带诸如header和cookie等信息,其自己对字节的利用率也较低,这使得HTTP协议比较臃肿,在承载相同信息的状况下,HTTP协议将须要发送更多的数据包;
HTTP协议是基于TCP的短链接,其在每次请求和响应的时候都须要进行三次握手和四次挥手,因为服务的交互设计通常都要求可以承载高并发的请求,于是HTTP协议这种频繁的握手和挥手动做会极大的影响服务之间交互的效率;
服务之间每每有一些根据其自身业务特性所独有的需求,而HTTP协议没法很好的服务于这些业务需求。
基于上面的缘由,通常的服务之间进行交互时都会使用自定义协议,常见的框架,诸如dubbo,kafka,zookeeper都实现了符合其自身业务需求的协议,本文主要讲解如何使用Netty实现一款自定义的协议。
bootstrap
所谓协议,其本质其实就是定义了一个将数据转换为字节,或者将字节转换为数据的一个规范。一款自定义协议,其通常包含两个部分:消息头和消息体。
消息头的长度通常是固定的,或者说是可肯定的,其定义了这次消息的一些公有信息,好比当前服务的版本,消息的sessionId,消息的类型等等;消息体则主要是这次消息所须要发送的内容,通常在消息头的最后必定的字节中保存了当前消息的消息体的长度。下面是咱们为当前自定义协议所作的一些规定:服务器
上述协议定义中,咱们除了定义经常使用的请求和响应消息类型之外,还定义了Ping和Pong消息。Ping和Pong消息的做用通常是,在服务处于闲置状态达到必定时长,好比2s时,客户端服务会向服务端发送一个Ping消息,则会返回一个Pong消息,这样才表示客户端与服务端的链接是无缺的。
若是服务端没有返回相应的消息,客户端就会关闭与服务端的链接或者是从新创建与服务端的链接。这样的优势在于能够防止忽然会产生的客户端与服务端的大量交互。
cookie
经过上面的定义其实咱们能够发现,所谓协议,就是定义了一个规范,基于这个规范,咱们能够将消息转换为相应的字节流,而后经由TCP传输到目标服务,目标服务则也基于该规范将字节流转换为相应的消息,这样就达到了相互交流的目的。
这里面最重要的主要是如何基于该规范将消息转换为字节流或者将字节流转换为消息。
这一方面,Netty为咱们提供了ByteToMessageDecoder和MessageToByteEncoder用于进行消息和字节流的相互转换。首先咱们定义了以下消息实体:session
public class Message {
private int magicNumber;
private byte mainVersion;
private byte subVersion;
private byte modifyVersion;
private String sessionId;
private MessageTypeEnum messageType;
private Map<String, String> attachments = new HashMap<>();
private String body;
public Map<String, String> getAttachments() {
return Collections.unmodifiableMap(attachments);
}
public void setAttachments(Map<String, String> attachments) {
this.attachments.clear();
if (null != attachments) {
this.attachments.putAll(attachments);
}
}
public void addAttachment(String key, String value) {
attachments.put(key, value);
}
// getter and setter...
}
上述消息中,咱们将协议中所规定的各个字段都进行了定义,而且定义了一个标志消息类型的枚举MessageTypeEnum,以下是该枚举的源码:并发
public enum MessageTypeEnum {
REQUEST((byte)1), RESPONSE((byte)2), PING((byte)3), PONG((byte)4), EMPTY((byte)5);
private byte type;
MessageTypeEnum(byte type) {
this.type = type;
}
public int getType() {
return type;
}
public static MessageTypeEnum get(byte type) {
for (MessageTypeEnum value : values()) {
if (value.type == type) {
return value;
}
}
throw new RuntimeException("unsupported type: " + type);
}
}
上述主要是定义了描述自定义协议相关的实体属性,对于消息的编码,本质就是依据上述协议方式将消息实体转换为字节流,以下是转换字节流的代码:app
public class MessageEncoder extends MessageToByteEncoder<Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) {
// 这里会判断消息类型是否是EMPTY类型,若是是EMPTY类型,则表示当前消息不须要写入到管道中
if (message.getMessageType() != MessageTypeEnum.EMPTY) {
out.writeInt(Constants.MAGIC_NUMBER);// 写入当前的魔数
out.writeByte(Constants.MAIN_VERSION);// 写入当前的主版本号
out.writeByte(Constants.SUB_VERSION);// 写入当前的次版本号
out.writeByte(Constants.MODIFY_VERSION);// 写入当前的修订版本号
if (!StringUtils.hasText(message.getSessionId())) {
// 生成一个sessionId,并将其写入到字节序列中
String sessionId = SessionIdGenerator.generate();
message.setSessionId(sessionId);
out.writeCharSequence(sessionId, Charset.defaultCharset());
}
out.writeByte(message.getMessageType().getType());// 写入当前消息的类型
out.writeShort(message.getAttachments().size());// 写入当前消息的附加参数数量
message.getAttachments().forEach((key, value) -> {
Charset charset = Charset.defaultCharset();
out.writeInt(key.length());// 写入键的长度
out.writeCharSequence(key, charset);// 写入键数据
out.writeInt(value.length());// 希尔值的长度
out.writeCharSequence(value, charset);// 写入值数据
});
if (null == message.getBody()) {
out.writeInt(0);// 若是消息体为空,则写入0,表示消息体长度为0
} else {
out.writeInt(message.getBody().length());
out.writeCharSequence(message.getBody(), Charset.defaultCharset());
}
}
}
}
对于消息的解码,其过程与上面的消息编码方式基本一致,主要是基于协议所规定的将字节流数据转换为消息实体数据。以下是其转换过程:框架
public class MessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
Message message = new Message();
message.setMagicNumber(byteBuf.readInt()); // 读取魔数
message.setMainVersion(byteBuf.readByte()); // 读取主版本号
message.setSubVersion(byteBuf.readByte()); // 读取次版本号
message.setModifyVersion(byteBuf.readByte());// 读取修订版本号
CharSequence sessionId = byteBuf.readCharSequence(
Constants.SESSION_ID_LENGTH, Charset.defaultCharset());// 读取sessionId
message.setSessionId((String)sessionId);
message.setMessageType(MessageTypeEnum.get(byteBuf.readByte()));// 读取当前的消息类型
short attachmentSize = byteBuf.readShort();// 读取附件长度
for (short i = 0; i < attachmentSize; i++) {
int keyLength = byteBuf.readInt();// 读取键长度和数据
CharSequence key = byteBuf.readCharSequence(keyLength, Charset.defaultCharset());
int valueLength = byteBuf.readInt();// 读取值长度和数据
CharSequence value = byteBuf.readCharSequence(valueLength, Charset.defaultCharset());
message.addAttachment(key.toString(), value.toString());
}
int bodyLength = byteBuf.readInt();// 读取消息体长度和数据
CharSequence body = byteBuf.readCharSequence(bodyLength, Charset.defaultCharset());
message.setBody(body.toString());
out.add(message);
}
}
如此,咱们自定义消息与字节流的相互转换工做已经完成。对于消息的处理,主要是要根据消息的不一样类型,对消息进行相应的处理,好比对于request类型消息,要写入响应数据,对于ping消息,要写入pong消息做为回应。
下面咱们经过定义Netty handler的方式实现对消息的处理:dom
// 服务端消息处理器
public class ServerMessageHandler extends SimpleChannelInboundHandler<Message> {
// 获取一个消息处理器工厂类实例
private MessageResolverFactory resolverFactory = MessageResolverFactory.getInstance();
@Override
protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {
Resolver resolver = resolverFactory.getMessageResolver(message);// 获取消息处理器
Message result = resolver.resolve(message);// 对消息进行处理并获取响应数据
ctx.writeAndFlush(result);// 将响应数据写入处处理器中
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
resolverFactory.registerResolver(new RequestMessageResolver());// 注册request消息处理器
resolverFactory.registerResolver(new ResponseMessageResolver());// 注册response消息处理器
resolverFactory.registerResolver(new PingMessageResolver());// 注册ping消息处理器
resolverFactory.registerResolver(new PongMessageResolver());// 注册pong消息处理器
}
}
// 客户端消息处理器
public class ClientMessageHandler extends ServerMessageHandler {
// 建立一个线程,模拟用户发送消息
private ExecutorService executor = Executors.newSingleThreadExecutor();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 对于客户端,在创建链接以后,在一个独立线程中模拟用户发送数据给服务端
executor.execute(new MessageSender(ctx));
}
/**
* 这里userEventTriggered()主要是在一些用户事件触发时被调用,这里咱们定义的事件是进行心跳检测的
* ping和pong消息,当前触发器会在指定的触发器指定的时间返回内若是客户端没有被读取消息或者没有写入
* 消息到管道,则会触发当前方法
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
// 必定时间内,当前服务没有发生读取事件,也即没有消息发送到当前服务来时,
// 其会发送一个Ping消息到服务器,以等待其响应Pong消息
Message message = new Message();
message.setMessageType(MessageTypeEnum.PING);
ctx.writeAndFlush(message);
} else if (event.state() == IdleState.WRITER_IDLE) {
// 若是当前服务在指定时间内没有写入消息到管道,则关闭当前管道
ctx.close();
}
}
}
private static final class MessageSender implements Runnable {
private static final AtomicLong counter = new AtomicLong(1);
private volatile ChannelHandlerContext ctx;
public MessageSender(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
try {
while (true) {
// 模拟随机发送消息的过程
TimeUnit.SECONDS.sleep(new Random().nextInt(3));
Message message = new Message();
message.setMessageType(MessageTypeEnum.REQUEST);
message.setBody("this is my " + counter.getAndIncrement() + " message.");
message.addAttachment("name", "xufeng");
ctx.writeAndFlush(message);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
上述代码中,因为客户端和服务端须要处理的消息类型是彻底同样的,于是客户端处理类继承了服务端处理类。可是对于客户端而言,其还须要定时向服务端发送心跳消息,用于检测客户端与服务器的链接是否健在,于是客户端还会实现userEventTriggered()方法,在该方法中定时向服务器发送心跳消息。
userEventTriggered()方法主要是在客户端被闲置必定时间后,其会根据其读取或者写入消息的限制时长来选择性的触发读取或写入事件。
上述实现中,咱们看到,对于具体类型消息的处理,咱们是经过一个工厂类来获取对应的消息处理器,而后处理相应的消息,下面咱们该工厂类的代码:ide
public final class MessageResolverFactory {
// 建立一个工厂类实例
private static final MessageResolverFactory resolverFactory = new MessageResolverFactory();
private static final List<Resolver> resolvers = new CopyOnWriteArrayList<>();
private MessageResolverFactory() {}
// 使用单例模式实例化当前工厂类实例
public static MessageResolverFactory getInstance() {
return resolverFactory;
}
public void registerResolver(Resolver resolver) {
resolvers.add(resolver);
}
// 根据解码后的消息,在工厂类处理器中查找能够处理当前消息的处理器
public Resolver getMessageResolver(Message message) {
for (Resolver resolver : resolvers) {
if (resolver.support(message)) {
return resolver;
}
}
throw new RuntimeException("cannot find resolver, message type: " + message.getMessageType());
}
}
上述工厂类比较简单,主要就是经过单例模式获取一个工厂类实例,而后提供一个根据具体消息来查找其对应的处理器的方法。下面咱们来看看各个消息处理器的代码:高并发
// request类型的消息
public class RequestMessageResolver implements Resolver {
private static final AtomicInteger counter = new AtomicInteger(1);
@Override
public boolean support(Message message) {
return message.getMessageType() == MessageTypeEnum.REQUEST;
}
@Override
public Message resolve(Message message) {
// 接收到request消息以后,对消息进行处理,这里主要是将其打印出来
int index = counter.getAndIncrement();
System.out.println("[trx: " + message.getSessionId() + "]"
+ index + ". receive request: " + message.getBody());
System.out.println("[trx: " + message.getSessionId() + "]"
+ index + ". attachments: " + message.getAttachments());
// 处理完成后,生成一个响应消息返回
Message response = new Message();
response.setMessageType(MessageTypeEnum.RESPONSE);
response.setBody("nice to meet you too!");
response.addAttachment("name", "xufeng");
response.addAttachment("hometown", "wuhan");
return response;
}
}
// 响应消息处理器
public class ResponseMessageResolver implements Resolver {
private static final AtomicInteger counter = new AtomicInteger(1);
@Override
public boolean support(Message message) {
return message.getMessageType() == MessageTypeEnum.RESPONSE;
}
@Override
public Message resolve(Message message) {
// 接收到对方服务的响应消息以后,对响应消息进行处理,这里主要是将其打印出来
int index = counter.getAndIncrement();
System.out.println("[trx: " + message.getSessionId() + "]"
+ index + ". receive response: " + message.getBody());
System.out.println("[trx: " + message.getSessionId() + "]"
+ index + ". attachments: " + message.getAttachments());
// 响应消息不须要向对方服务再发送响应,于是这里写入一个空消息
Message empty = new Message();
empty.setMessageType(MessageTypeEnum.EMPTY);
return empty;
}
}
// ping消息处理器
public class PingMessageResolver implements Resolver {
@Override
public boolean support(Message message) {
return message.getMessageType() == MessageTypeEnum.PING;
}
@Override
public Message resolve(Message message) {
// 接收到ping消息后,返回一个pong消息返回
System.out.println("receive ping message: " + System.currentTimeMillis());
Message pong = new Message();
pong.setMessageType(MessageTypeEnum.PONG);
return pong;
}
}
// pong消息处理器
public class PongMessageResolver implements Resolver {
@Override
public boolean support(Message message) {
return message.getMessageType() == MessageTypeEnum.PONG;
}
@Override
public Message resolve(Message message) {
// 接收到pong消息后,不须要进行处理,直接返回一个空的message
System.out.println("receive pong message: " + System.currentTimeMillis());
Message empty = new Message();
empty.setMessageType(MessageTypeEnum.EMPTY);
return empty;
}
}
如此,对于自定义协议的消息处理过程已经完成,下面则是使用用Netty实现的客户端与服务端代码:
// 服务端
public class Server {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加用于处理粘包和拆包问题的处理器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
// 添加自定义协议消息的编码和解码处理器
pipeline.addLast(new MessageEncoder());
pipeline.addLast(new MessageDecoder());
// 添加具体的消息处理器
pipeline.addLast(new ServerMessageHandler());
}
});
ChannelFuture future = bootstrap.bind(8585).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class Client {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 添加用于解决粘包和拆包问题的处理器
pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
// 添加用于进行心跳检测的处理器
pipeline.addLast(new IdleStateHandler(1, 2, 0));
// 添加用于根据自定义协议将消息与字节流进行相互转换的处理器
pipeline.addLast(new MessageEncoder());
pipeline.addLast(new MessageDecoder());
// 添加客户端消息处理器
pipeline.addLast(new ClientMessageHandler());
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 8585).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
}
运行上述代码以后,咱们能够看到客户端和服务器分别打印了以下数据:
// 客户端
receive pong message: 1555123429356
[trx: d05024d2]1. receive response: nice to meet you too!
[trx: d05024d2]1. attachments: {hometown=wuhan, name=xufeng}
[trx: 66ee1438]2. receive response: nice to meet you too!
// 服务器
receive ping message: 1555123432279
[trx: f582444f]4. receive request: this is my 4 message.
[trx: f582444f]4. attachments: {name=xufeng}