RPC即远程过程调用,它的提出旨在消除通讯细节、屏蔽繁杂且易错的底层网络通讯操做,像调用本地服务通常地调用远程服务,让业务开发者更多关注业务开发而没必要考虑网络、硬件、系统的异构复杂环境。node
先看看集群中RPC的整个通讯过程,假设从节点node1开始一个RPC调用,数组
上面整个过程是在只有一条线程的状况下,一切看起来没什么问题,但若是有多条线程并发调用则会致使一个问题:线程与响应的对应关系将被打乱,没法肯定哪一个线程对应哪几个响应。bash
由于NIO通讯框架不会每一个线程都独自使用一个socket通道,为提升性能通常都是使用长链接,全部线程共用一个socket通道,这时就算线程一比线程二先放入通讯框架也不能保证响应一比响应二先接收到,因此接收到响应一后不知道该通知线程一仍是线程二。只有解决了这个问题才能保证RPC调用的正确性。网络
要解决线程与响应对应的问题就须要维护一个线程响应关系列表,响应从关系列表中就能查找对应的线程,如图,在发送以前生成一个UUID标识,此标识要保证同socket中惟一,再把UUID与线程对象关系对应起来,可以使用Map数据结构实现,UUID的值做为key,线程对应的锁对象为value。数据结构
接着制定一个协议报文,UUID做为报文的其中一部分,报文发往另外一个节点node2后将响应信息message放入报文中并返回,node1对接收到的报文进行解包根据UUID去查找并唤起对应的线程,告诉它“你要的消息已经收到,往下处理吧”。但在集群环境下,咱们更但愿是集群中全部节点的消息都接收到了才往下处理,如图下半部分,一个UUID1的请求报文会发往node二、node3和node4三个节点,这时假如只接收到一个响应则不唤起线程,直到node二、node3对应UUID1的响应报文都接收到后才唤起对应线程往下执行。一样地,UUID二、UUID3的报文消息都是如此处理,最后集群中对应的响应都能正确回到各自线程上。多线程
用简单代码实现一个RPC例子,自行选择一个集群通讯框架负责底层通讯,接着往下:并发
public interface RpcCallback {
public Serializable replyRequest(Serializable msg, Member sender);
public void leftOver(Serializable msg, Member sender);
}
复制代码
public class RpcMessage implements Externalizable {
protected Serializable message;
protected byte[] uuid;
protected byte[] rpcId;
protected boolean reply = false;
public RpcMessage() {}
public RpcMessage(byte[] rpcId, byte[] uuid, Serializable message) {
this.rpcId = rpcId;
this.uuid = uuid;
this.message = message;
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
reply = in.readBoolean();
int length = in.readInt();
uuid = new byte[length];
in.readFully(uuid);
length = in.readInt();
rpcId = new byte[length];
in.readFully(rpcId);
message = (Serializable) in.readObject();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeBoolean(reply);
out.writeInt(uuid.length);
out.write(uuid, 0, uuid.length);
out.writeInt(rpcId.length);
out.write(rpcId, 0, rpcId.length);
out.writeObject(message);
}
}
复制代码
public class RpcResponseType {
public static final int FIRST_REPLY = 1;
public static final int MAJORITY_REPLY = 2;
public static final int ALL_REPLY = 3;
public static final int NO_REPLY = 4;
}
复制代码
public class RpcResponse {
private Member source;
private Serializable message;
public RpcResponse() {}
public RpcResponse(Member source, Serializable message) {
this.source = source;
this.message = message;
}
public void setSource(Member source) {
this.source = source;
}
public void setMessage(Serializable message) {
this.message = message;
}
public Member getSource() {
return source;
}
public Serializable getMessage() {
return message;
}
}
复制代码
public class RpcCollector {
public ArrayList<RpcResponse> responses = new ArrayList<RpcResponse>();
public byte[] key;
public int options;
public int destcnt;
public RpcCollector(byte[] key, int options, int destcnt) {
this.key = key;
this.options = options;
this.destcnt = destcnt;
}
public void addResponse(Serializable message, Member sender){
RpcResponse resp = new RpcResponse(sender,message);
responses.add(resp);
}
public boolean isComplete() {
if ( destcnt <= 0 ) return true;
switch (options) {
case RpcResponseType.ALL_REPLY:
return destcnt == responses.size();
case RpcResponseType.MAJORITY_REPLY:
{
float perc = ((float)responses.size()) / ((float)destcnt);
return perc >= 0.50f;
}
case RpcResponseType.FIRST_REPLY:
return responses.size()>0;
default:
return false;
}
}
public RpcResponse[] getResponses() {
return responses.toArray(new RpcResponse[responses.size()]);
}
}
复制代码
public class RpcChannel implements ChannelListener {
private Channel channel;
private RpcCallback callback;
private byte[] rpcId;
private int replyMessageOptions = 0;
private HashMap<byte[], RpcCollector> responseMap = new HashMap<byte[], RpcCollector>();
public RpcChannel(byte[] rpcId, Channel channel, RpcCallback callback) {
this.rpcId = rpcId;
this.channel = channel;
this.callback = callback;
channel.addChannelListener(this);
}
public RpcResponse[] send(Member[] destination, Serializable message, int rpcOptions,
int channelOptions, long timeout) throws ChannelException {
int sendOptions = channelOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK;
byte[] key = UUIDGenerator.randomUUID(false);
RpcCollector collector = new RpcCollector(key, rpcOptions, destination.length);
try {
synchronized (collector) {
if (rpcOptions != RpcResponseType.NO_REPLY) responseMap.put(key, collector);
RpcMessage rmsg = new RpcMessage(rpcId, key, message);
channel.send(destination, rmsg, sendOptions);
if (rpcOptions != RpcResponseType.NO_REPLY) collector.wait(timeout);
}
} catch (InterruptedException ix) {
Thread.currentThread().interrupt();
} finally {
responseMap.remove(key);
}
return collector.getResponses();
}
@Override
public void messageReceived(Serializable msg, Member sender) {
RpcMessage rmsg = (RpcMessage) msg;
byte[] key = rmsg.uuid;
if (rmsg.reply) {
RpcCollector collector = responseMap.get(key);
if (collector == null) {
callback.leftOver(rmsg.message, sender);
} else {
synchronized (collector) {
if (responseMap.containsKey(key)) {
collector.addResponse(rmsg.message, sender);
if (collector.isComplete()) collector.notifyAll();
} else {
callback.leftOver(rmsg.message, sender);
}
}
}
} else {
Serializable reply = callback.replyRequest(rmsg.message, sender);
rmsg.reply = true;
rmsg.message = reply;
try {
channel.send(new Member[] {sender}, rmsg,
replyMessageOptions & ~Channel.SEND_OPTIONS_SYNCHRONIZED_ACK);
} catch (Exception x) {}
}
}
@Override
public boolean accept(Serializable msg, Member sender) {
if (msg instanceof RpcMessage) {
RpcMessage rmsg = (RpcMessage) msg;
return Arrays.equals(rmsg.rpcId, rpcId);
} else
return false;
}
}
复制代码
public class MyRPC implements RpcCallback {
@Override
public Serializable replyRequest(Serializable msg, Member sender) {
RpcMessage mapmsg = (RpcMessage) msg;
mapmsg.message = "hello,response for you!";
return mapmsg;
}
@Override
public void leftOver(Serializable msg, Member sender) {
System.out.println("receive a leftover message!");
}
public static void main(String[] args) {
MyRPC myRPC = new MyRPC();
byte[] rpcId = new byte[] {1, 1, 1, 1};
byte[] key = new byte[] {0, 0, 0, 0};
String message = "hello";
int sendOptions = Channel.SEND_OPTIONS_SYNCHRONIZED_ACK | Channel.SEND_OPTIONS_USE_ACK;
RpcMessage msg = new RpcMessage(rpcId, key, (Serializable) message);
RpcChannel rpcChannel = new RpcChannel(rpcId, channel, myRPC);
RpcResponse[] resp =
rpcChannel.send(channel.getMembers(), msg, RpcResponseType.FIRST_REPLY, sendOptions, 3000);
while (true)
Thread.currentThread().sleep(1000);
}
}
复制代码
能够看到经过上面的RPC封装后,上层能够把更多的精力关注到消息逻辑处理上面了,而没必要关注具体的网络IO如何实现,屏蔽了繁杂重复的网络传输操做,为上层提供了很大的方便。框架
=============广告时间===============dom
公众号的菜单已分为“分布式”、“机器学习”、“深度学习”、“NLP”、“Java深度”、“Java并发核心”、“JDK源码”、“Tomcat内核”等,可能有一款适合你的胃口。机器学习
鄙人的新书《Tomcat内核设计剖析》已经在京东销售了,有须要的朋友能够购买。感谢各位朋友。
=========================
欢迎关注: