做者的其余文章还不错 ! java
原文: 服务器
http://maoyidao.iteye.com/blog/1636923 架构
最近团队在开发基于移动互联网的项目,又一次涉及到post service,即在服务器集群之间投递消息。是的,又是一个RPC服务。RPC实现方式从笨重的CORBA,SOAP over HTTP,XMPP over TCP,到轻量级的protobuf,scribe和Avro。这里不想比较各自的应用场景(另外后面三种RPC方式极为接近,都是经过提供Object <-> 二进制映射来提升高效的传输),本文的目的是给你们一点能够实际操做的代码:java如何用protobuf 实现rpc socket
和protobuf-socket-rpc的区别
protobuf-socket-rpc(code.google.com/p/protobuf-socket-rpc/)是googlecode为rpc写的简单实现。本文介绍的代码和googlecode不一样之处在于: 工具
1,基于NIO post
2,增长了校验码 性能
高性能RPC over google protobuf
Google's protocol buffer library makes writing rpc services easy, but it does not contain a rpc implementation. The transport details are left up to the user to implement. ui
google把这问题留给了咱们,那么看看应该怎么实现。hellow world伪代码应该是这样的: google
Java代码
- MessageLite message = getMessage(); // get a proto message object by proto file
-
- OutputStream out = getOutputStream();
- InputStream in = getInputStream();
- message.writeDelimitedTo(out); // Like writeTo(OutputStream), but writes the size of the message as a varint before writing the data
- messageBuilder.mergeDelimitedFrom(in);
好了,这样就实现了序列化和反序列化。在真正的内容以前加入内容长度,这是一种最简单的实现。为了能可靠的进行传输,我在消息长度前加入了2个byte的验证码,下面就开始逐步构建个人rpc代码。 spa
定义你的proto文件,为传输多种消息,须要有“命令”字段:好比:Maoyidao.proto
List 1:
Java代码
- package com.maoyidao.rpc;
-
- message MaoyidaoPacket {
- required int32 cmd = 1;
- required int32 subcmd = 2;
- optional bytes content = 3;
- }
OK,compile it to Java class: protoc -I=$SRC_DIR --java_out=$DST_DIR $SRC_DIR/addressbook.proto
你会获得一个MaoyidaoPacket 类,而后你须要这样得到实例:
List 2:
Java代码
- Maoyidao.MaoyidaoPacket packet = Maoyidao.MaoyidaoPacket.newBuilder()
- .setCmd(mycmd)
- .setSubcmd(mysubcmd)
- .setContent(ByteString.copyFromUtf8("some message")).build();
咱们先不讨论怎么基于MINA建立一个NIO,先假设咱们得到了一个OutputStream,看看怎么把消息写出去(其中的关键是用一些特殊的字符来区分你的消息,这是RPC over TCP的基本要求):
List 3:
Java代码
- private final void writeObject(OutputStream os, Maoyidao.MaoyidaoPacket packet) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- com.google.protobuf.CodedOutputStream cos = com.google.protobuf.CodedOutputStream.newInstance(baos);
- cos.writeRawVarint32(3);
- cos.writeRawVarint32(7);
- cos.writeRawVarint32(packet.getSerializedSize());
- vpacket.writeTo(cos);
- cos.flush();
- os.write(baos.toByteArray());
- baos.close();
- }
- }
注意我不只写了分隔符,还写了content长度。
读进来的时候要用相同的方式解析,假设咱们获得了一个Bytebuffer,熟悉NIO的同窗知道,你老是会从ByteBuffer中读取数据。同时我须要用到com.google.protobuf.CodedInputStream:Reads and decodes protocol message fields. This class contains two kinds of methods: methods that read specific protocol message constructs and field types (e.g. readTag() and readInt32()) and methods that read low-level values (e.g. readRawVarint32() and readRawBytes(int)).)这样我就能够从inputstream中读到校验码:
Java代码
- ByteBuffer in = getByteBuffer();
- CodedInputStream cis = CodedInputStream.newInstance(in);
-
- int flag1 = cis.readRawVarint32();
- int flag2 = cis.readRawVarint32();
- if(flag1 != 3 || flag2 != 7){
- // find some error
- }
-
- int contentLength = cis.readRawVarint32();
- int contentLength0 = contentLength + CodedOutputStream.computeRawVarint32Size(contentLength);
-
- if(in.remaining() >= contentLength0){
- try {
- Maoyidao.MaoyidaoPacket.Builder builder = Maoyidao.MaoyidaoPacket.newBuilder();
- CodedInputStream.newInstance(getBytesFromIn(in,contentLength0)).readMessage(
- builder, ExtensionRegistry.getEmptyRegistry());
- out.write(builder.build());
- in.position(in.position() + protocolLength);
- return true;
- } catch (Exception e) {
- //
- }
- }
-
- // ByteBuffer没有足够的数据,等待下一次
- // do something
截止目前,咱们完成了带校验码的基于protobuf的消息序列化和反序列。在这个实现中,我更偏向把protobuf当作一个序列化工具来使用,总体仍是依赖MINA自己提供的架构,这部分将在本系列的下一篇中详细阐述。
本文系maoyidao原创,转载请引用原连接: