上一篇咱们简单过了一遍RPC是什么,三个过程,为何咱们须要它,它的特性和适用场景,RPC的流程及协议定义还有它的框架的一些小知识。理论常常会看的人昏昏欲睡,不知所云。若是可以结合一些代码来讲明的话,那就方便理解不少了spring
从零开始的高并发(一)--- Zookeeper的基础概念缓存
从零开始的高并发(二)--- Zookeeper实现分布式锁服务器
从零开始的高并发(三)--- Zookeeper集群的搭建和leader选举网络
从零开始的高并发(四)--- Zookeeper的分布式队列并发
从零开始的高并发(五)--- Zookeeper的配置中心应用app
从零开始的高并发(六)--- Zookeeper的Master选举及官网小览框架
从零开始的高并发(七)--- RPC的介绍,协议及框架dom
其实这个在上一篇的2 - ① 也已经提到过了,若是忘了,不要紧,我再复制过来socket
stub:分布式计算中的存根是一段代码,它转换在远程过程调用期间Client和server之间传递的参数分布式
1.客户端处理过程当中调用client stub(就像调用本地方法同样),传入参数
2.Client stub将参数编组为消息,而后经过系统调用向服务端发送消息
3.客户端本地操做系统将消息从客户端机器发送到服务端机器
4.服务端操做系统将接收到的数据包传递给client stub
5.server stub解组消息为参数
6.server stub再调用服务端的过程,过程执行结果以反方向的相同步骤响应给客户端
1.定义过程接口
2.服务端实现接口的整个过程
3.客户端使用生成的stub代理对象
复制代码
客户端生成过程接口的代理对象,经过设计一个客户端代理工厂,使用JDK动态代理便可生成接口的代理对象
Student类有三个属性name(String),age(int),sex(String),节省篇幅就不贴代码了,提供getter,setter和toString方法便可
public interface StudentService {
/**
* 获取信息
* @return
*/
public Student getInfo();
//打印student的信息并返回一个boolean值
public boolean printInfo(Student student);
}
复制代码
而且提供一个简单的实现,其实就是打印一个Student的信息出来而已
@Service(StudentService.class)
public class StudentServiceImpl implements StudentService {
public Student getInfo() {
Student person = new Student();
person.setAge(25);
person.setName("说出你的愿望吧~");
person.setSex("男");
return person;
}
public boolean printInfo(Student person) {
if (person != null) {
System.out.println(person);
return true;
}
return false;
}
public static void main(String[] args) {
new Thread(()->{
System.out.println("111");
}).start();;
}
}
复制代码
首先,客户端经过咱们的本地代理,得到咱们的StudentService的代理类,此时咱们客户端本地是确定不存在StudentService的实现的,此时寻址咱们是直接给出来了
public class ClientTest {
@Test
public void test() {
// 本地没有接口实现,经过代理得到接口实现实例
RpcClientProxy proxy = new RpcClientProxy("192.168.80.1", 9998);
StudentService service = proxy.getProxy(StudentService.class);
System.out.println(service.getInfo());
Student student = new Student();
student.setAge(23);
student.setName("hashmap");
student.setSex("男");
System.out.println(service.printInfo(student));
}
}
复制代码
此时咱们的关注点转到客户端是如何帮咱们进行代理的
/**
* RpcClientProxy
* 客户端代理服务,客户端往服务端发起的调用将经过客户端代理来发起
*/
public class RpcClientProxy implements InvocationHandler{
private String host; // 服务端地址
private int port; // 服务端口号
public RpcClientProxy(String host, int port){
this.host = host;
this.port = port;
}
/**
* 生成业务接口的代理对象,代理对象作的事情,在invoke方法中。
* @param clazz 代理类型(接口)
* @return
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> clazz){
// clazz 不是接口不能使用JDK动态代理
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{ clazz }, RpcClientProxy.this);
}
/**
* 动态代理作的事情,接口的实现不在本地,在网络中的其余进程中,咱们经过实现了Rpc客户端的对象来发起远程服务的调用。
*/
public Object invoke(Object obj, Method method, Object[] params) throws Throwable {
// 调用前
System.out.println("执行远程方法前,能够作些事情");
// 调用远程服务,须要封装参数,相似于序列化的过程
RpcRequest request = new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParamTypes(method.getParameterTypes());
request.setParams(params);
// 连接服务器调用服务
RpcClient client = new RpcClient();
Object rst = client.start(request, host, port);
// 调用后
System.out.println("执行远程方法后,也能够作些事情");
return rst;
}
}
复制代码
JDK提供了Proxy类来实现咱们的动态代理,能够经过newProxyInstance(ClassLoader var0, Class<?>[] var1, InvocationHandler var2)方法来实例化一个代理对象,此时咱们传入的参数clazz是规定必须为一个接口的,若是不是接口就不能使用JDK动态代理
而第三个参数RpcClientProxy.this则是newProxyInstance()方法虽然帮咱们建立好了实例,可是建立实例完成后的具体动做必须由这个InvocationHandler来提供
InvocationHandler这个接口里面仅仅只有一个 Object invoke(Object var1, Method var2, Object[] var3) throws Throwable,这个方法的参数相信不难理解,第一个是代理对象,第二个是执行的方法,第三个是所需的参数集
回到咱们刚刚的代码,在我执行System.out.println(service.getInfo())这条语句的时候,咱们的逻辑就会跳到invoke()的实现中来,在invoke()方法的注释中也把过程很详细的说明了,首先咱们须要调用远程服务了,进行一个参数的封装,以后就进行一个网络链接把这些参数发送给咱们的服务端,此时咱们须要用到RpcClient了
在start()方法中,咱们的RpcRequest request是实现了Serializable接口的,因此此时封装好的数据会转换成一个二进制而后被flush()过去,此时咱们消息已经发送了,须要等待服务端的响应,响应咱们就须要经过咱们的服务端ObjectOutputStream来接收一个输入流
/**
* RpcClient
* Rpc客户端,表明业务代码做为客户端,往远端服务发起请求。
*/
public class RpcClient {
/**
* 经过网络IO,打开远端服务链接,将请求数据写入网络中,并得到响应结果。
*
* @param request 将要发送的请求数据
* @param host 远端服务域名或者ip地址
* @param port 远端服务端口号
* @return 服务端响应结果
* @throws Throwable 抛出的异常
*/
public Object start(RpcRequest request, String host, int port) throws Throwable{
// 打开远端服务链接
Socket server = new Socket(host, port);
ObjectInputStream oin = null;
ObjectOutputStream oout = null;
try {
// 1. 服务端输出流,写入请求数据,发送请求数据
oout = new ObjectOutputStream(server.getOutputStream());
oout.writeObject(request);
oout.flush();
// 2. 服务端输入流,获取返回数据,转换参数类型
// 相似于反序列化的过程
oin = new ObjectInputStream(server.getInputStream());
Object res = oin.readObject();
RpcResponse response = null;
if(!(res instanceof RpcResponse)){
throw new InvalidClassException("返回参数不正确,应当为:"+RpcResponse.class+" 类型");
}else{
response = (RpcResponse) res;
}
// 3. 返回服务端响应结果
if(response.getError() != null){ // 服务器产生异常
throw response.getError();
}
return response.getResult();
}finally{
try { // 清理资源,关闭流
if(oin != null) oin.close();
if(oout != null) oout.close();
if(server != null) server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
复制代码
/**
* RpcRequest
* Rpc请求对象,请求远端服务服务的内容,在网络上进行传输。
*/
public class RpcRequest implements Serializable{
// 须要请求的类名
private String className;
// 需求请求的方法名
private String methodName;
// 请求方法的参数类型
private Class<?>[] paramTypes;
// 请求的参数值
private Object[] params;
public String getClassName() {
return className;
}
public void setClassName(String className) {
this.className = className;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class<?>[] getParamTypes() {
return paramTypes;
}
public void setParamTypes(Class<?>[] paramTypes) {
this.paramTypes = paramTypes;
}
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
}
复制代码
同时也是实现了JDK默认的序列化Serializable
/**
* RpcResponse
* Rpc服务端响应结果包装类,在网络上进行传输。
*/
public class RpcResponse implements Serializable {
// 可能抛出的异常
private Throwable error;
// 响应的内容或结果
private Object result;
public Throwable getError() {
return error;
}
public void setError(Throwable error) {
this.error = error;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}
复制代码
public class ServerTest {
@Test
public void startServer() {
RpcServer server = new RpcServer();
server.start(9998, "rpc.simple.RpcServer");
}
public static void main(String[] args) {
}
}
复制代码
给到一个端口号,参数中带有一个包,功能是扫描某个包下的服务
建立一个Map类型的集合services存放扫描到提供rpc服务的类,此时由于没有放在注册中心上因此就不存在寻址了。后面将会把它放入zookeeper的注册中心
getService()下,咱们在ServerTest不是提供了一个包名吗,此时咱们先去找到了它们全部的classes(请参考getClasses()方法),getClasses()中咱们其实主要是先根据提供的包名往下找,要是目录都有问题的话就抛出异常,若是没问题,就开始遍历此目录下的全部文件,遍历出来的结果若是发现这个文件是class文件,就把其实例化,而且进行判断是否存在一个自定义注解@service,标注了这个注解的类就是RPC服务的实现类。若是存在这个注解,那就是咱们须要找的rpc服务,就把它装到一个结果集classes中,若是目录下面仍然是目录,那就本身调用本身,直到看到class文件为止
当咱们把全部的class都找到了,回到getService()方法下,就都集中放于一个classList中,而后把它们Map化,就是把接口的名称做为key,把实例做为value(services.put(cla.getAnnotation(Service.class).value().getName(), obj))。
最后再回到start(),进行完服务扫描以后还会有一个RpcServerHandler来进行处理
/**
* RpcServer
* Rpc服务提供者
*/
public class RpcServer {
/**
* 启动指定的网络端口号服务,并监听端口上的请求数据。得到请求数据之后将请求信息委派给服务处理器,放入线程池中执行。
* @param port 监听端口
* @param clazz 服务类所在包名,多个用英文逗号隔开
*/
public void start(int port, String clazz) {
ServerSocket server = null;
try {
// 1. 建立服务端指定端口的socket链接
server = new ServerSocket(port);
// 2. 获取全部rpc服务类
Map<String, Object> services = getService(clazz);
// 3. 建立线程池
Executor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
while(true){
// 4. 获取客户端链接
Socket client = server.accept();
// 5. 放入线程池中执行
RpcServerHandler service = new RpcServerHandler(client, services);
executor.execute(service);
}
} catch (IOException e) {
e.printStackTrace();
}finally{
//关闭监听
if(server != null)
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 实例化全部rpc服务类,也可用于暴露服务信息到注册中心。
* @param clazz 服务类所在包名,多个用英文逗号隔开
* @return
*/
public Map<String,Object> getService(String clazz){
try {
Map<String, Object> services = new HashMap<String, Object>();
// 获取全部服务类
String[] clazzes = clazz.split(",");
List<Class<?>> classes = new ArrayList<Class<?>>();
for(String cl : clazzes){
List<Class<?>> classList = getClasses(cl);
classes.addAll(classList);
}
// 循环实例化
for(Class<?> cla:classes){
Object obj = cla.newInstance();
services.put(cla.getAnnotation(Service.class).value().getName(), obj);
}
return services;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 获取包下全部有@Sercive注解的类
* @param pckgname
* @return
* @throws ClassNotFoundException
*/
public static List<Class<?>> getClasses(String pckgname) throws ClassNotFoundException {
// 须要查找的结果
List<Class<?>> classes = new ArrayList<Class<?>>();
// 找到指定的包目录
File directory = null;
try {
ClassLoader cld = Thread.currentThread().getContextClassLoader();
if (cld == null)
throw new ClassNotFoundException("没法获取到ClassLoader");
String path = pckgname.replace('.', '/');
URL resource = cld.getResource(path);
if (resource == null)
throw new ClassNotFoundException("没有这样的资源:" + path);
directory = new File(resource.getFile());
} catch (NullPointerException x) {
throw new ClassNotFoundException(pckgname + " (" + directory + ") 不是一个有效的资源");
}
if (directory.exists()) {
// 获取包目录下的全部文件
String[] files = directory.list();
File[] fileList = directory.listFiles();
// 获取包目录下的全部文件
for (int i = 0; fileList != null && i < fileList.length; i++) {
File file = fileList[i];
//判断是不是Class文件
if (file.isFile() && file.getName().endsWith(".class")) {
Class<?> clazz = Class.forName(pckgname + '.' + files[i].substring(0, files[i].length() - 6));
if(clazz.getAnnotation(Service.class) != null){
classes.add(clazz);
}
}else if(file.isDirectory()){ //若是是目录,递归查找
List<Class<?>> result = getClasses(pckgname+"."+file.getName());
if(result != null && result.size() != 0){
classes.addAll(result);
}
}
}
} else{
throw new ClassNotFoundException(pckgname + "不是一个有效的包名");
}
return classes;
}
}
复制代码
和刚刚的RpcClient很是相似,都是序列化和反序列化的过程,主要是第三步中得到了实例和方法及其参数后,再调用invoke()方法而后把结果放入response的过程
/**
* RpcServerHandler
* 服务端请求处理,处理来自网络IO的服务请求,并响应结果给网络IO。
*/
public class RpcServerHandler implements Runnable {
// 客户端网络请求socket,能够从中得到网络请求信息
private Socket clientSocket;
// 服务端提供处理请求的类集合
private Map<String, Object> serviceMap;
/**
* @param client 客户端socket
* @param services 全部服务
*/
public RpcServerHandler(Socket client, Map<String, Object> services) {
this.clientSocket = client;
this.serviceMap = services;
}
/**
* 读取网络中客户端请求的信息,找到请求的方法,执行本地方法得到结果,写入网络IO输出中。
*
*/
public void run() {
ObjectInputStream oin = null;
ObjectOutputStream oout = null;
RpcResponse response = new RpcResponse();
try {
// 1. 获取流以待操做
oin = new ObjectInputStream(clientSocket.getInputStream());
oout = new ObjectOutputStream(clientSocket.getOutputStream());
// 2. 从网络IO输入流中请求数据,强转参数类型
Object param = oin.readObject();
RpcRequest request = null;
if(!(param instanceof RpcRequest)){
response.setError(new Exception("参数错误"));
oout.writeObject(response);
oout.flush();
return;
}else{
// 反序列化RpcRequest
request = (RpcRequest) param;
}
// 3. 查找并执行服务方法
Object service = serviceMap.get(request.getClassName());
Class<?> clazz= service.getClass();
Method method = clazz.getMethod(request.getMethodName(), request.getParamTypes());
Object result = method.invoke(service, request.getParams());
// 4. 返回RPC响应,序列化RpcResponse
response.setResult(result);
// 序列化结果
oout.writeObject(response);
oout.flush();
return;
} catch (Exception e) {
try { //异常处理
if(oout != null){
response.setError(e);
oout.writeObject(response);
oout.flush();
}
} catch (Exception e1) {
e1.printStackTrace();
}
return;
}finally{
try { // 回收资源,关闭流
if(oin != null) oin.close();
if(oout != null) oout.close();
if(clientSocket != null) clientSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
复制代码
先开启ServerTest再开启ClientTest,简单快捷,注意别去右键跑main方法便可
设计客户端的时候,在ClientStubInvocationHandler中须要完成的两件事为编组消息和发送网络请求,而将请求的内容编组为消息这件事就交由客户端的stub代理,它除了消息协议和网络层的事务之外,可能还存在一个服务信息发现,此外消息协议可能也是会存在变化的,咱们也须要去支持多种协议,这个实际上是和框架对协议的支持广度有关的。好比dubbo相对于spring cloud而言对协议的支持就相对灵活一些
此时咱们须要得知某服务用的是什么协议,因此咱们须要引入一个服务发现者
咱们想要作到支持多种协议,类该如何设计(面向接口,策略模式,组合)
此时咱们的协议须要抽象出来,对于协议的内容须要进行编组和解组,好比咱们上面提供的JSON和HTTP两种不一样的实现,而此时客户端的存根里面就不只仅只是须要服务发现者,还须要咱们对于这个协议的支持
主要看regist()方法,咱们在注册的时候把服务信息进行了拼接,并建立成临时节点,父节点为持久节点。servicePath是相似于dubbo的一个目录结构,一个根目录/rpc+服务名称serviceName+service,获取服务的方法loadServiceResouces()也不难,根据这些地址获取它们下面的子节点,把全部的url加载出来给到调用者
public class RegistCenter {
ZkClient client = new ZkClient("localhost:2181");
private String centerRootPath = "/rpc";
public RegistCenter() {
client.setZkSerializer(new MyZkSerializer());
}
public void regist(ServiceResource serviceResource) {
String serviceName = serviceResource.getServiceName();
String uri = JsonMapper.toJsonString(serviceResource);
try {
uri = URLEncoder.encode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
String servicePath = centerRootPath + "/"+serviceName+"/service";
if(! client.exists(servicePath)) {
client.createPersistent(servicePath, true);
}
String uriPath = servicePath+"/"+uri;
client.createEphemeral(uriPath);
}
/**
* 加载配置中心中服务资源信息
* @param serviceName
* @return
*/
public List<ServiceResource> loadServiceResouces(String serviceName) {
String servicePath = centerRootPath + "/"+serviceName+"/service";
List<String> children = client.getChildren(servicePath);
List<ServiceResource> resources = new ArrayList<ServiceResource>();
for(String ch : children) {
try {
String deCh = URLDecoder.decode(ch, "UTF-8");
ServiceResource r = JsonMapper.fromJsonString(deCh, ServiceResource.class);
resources.add(r);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return resources;
}
private void sub(String serviceName, ChangeHandler handler) {
/*
String path = centerRootPath + "/"+serviceName+"/service";
client.subscribeChildChanges(path, new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
handler();
}
});
client.subscribeDataChanges(path, new IZkDataListener() {
@Override
public void handleDataDeleted(String dataPath) throws Exception {
handler();
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
handler();
}
});
*/
}
interface ChangeHandler {
/**
* 发生变化后给一个完整的属性对象
* @param resource
*/
void itemChange(ServiceResource resource);
}
}
复制代码
/**
* ClientStubProxyFactory
* 客户端存根代理工厂
*/
public class ClientStubProxyFactory {
private ServiceInfoDiscoverer sid;
private Map<String, MessageProtocol> supportMessageProtocols;
private NetClient netClient;
private Map<Class<?>, Object> objectCache = new HashMap<>();
/**
*
*
* @param <T>
* @param interf
* @return
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Class<T> interf) {
T obj = (T) this.objectCache.get(interf);
if (obj == null) {
obj = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class<?>[] { interf },
new ClientStubInvocationHandler(interf));
this.objectCache.put(interf, obj);
}
return obj;
}
public ServiceInfoDiscoverer getSid() {
return sid;
}
public void setSid(ServiceInfoDiscoverer sid) {
this.sid = sid;
}
public Map<String, MessageProtocol> getSupportMessageProtocols() {
return supportMessageProtocols;
}
public void setSupportMessageProtocols(Map<String, MessageProtocol> supportMessageProtocols) {
this.supportMessageProtocols = supportMessageProtocols;
}
public NetClient getNetClient() {
return netClient;
}
public void setNetClient(NetClient netClient) {
this.netClient = netClient;
}
/**
* ClientStubInvocationHandler
* 客户端存根代理调用实现
* @date 2019年4月12日 下午2:38:30
*/
private class ClientStubInvocationHandler implements InvocationHandler {
private Class<?> interf;
public ClientStubInvocationHandler(Class<?> interf) {
super();
this.interf = interf;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 一、得到服务信息
String serviceName = this.interf.getName();
ServiceInfo sinfo = sid.getServiceInfo(serviceName);
if (sinfo == null) {
throw new Exception("远程服务不存在!");
}
// 二、构造request对象
Request req = new Request();
req.setServiceName(sinfo.getName());
req.setMethod(method.getName());
req.setPrameterTypes(method.getParameterTypes());
req.setParameters(args);
// 三、协议层编组
// 得到该方法对应的协议
MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol());
// 编组请求
byte[] data = protocol.marshallingRequest(req);
// 四、调用网络层发送请求
byte[] repData = netClient.sendRequest(data, sinfo);
// 5解组响应消息
Response rsp = protocol.unmarshallingResponse(repData);
// 六、结果处理
if (rsp.getException() != null) {
throw rsp.getException();
}
return rsp.getReturnValue();
}
}
}
复制代码
ClientStub中有两个引用,一个是服务发现接口ServiceInfoDiscoverer,做用为根据服务名得到远程服务信息,提供一个ServiceInfo getServiceInfo(String name)方法,还有就是对于不一样协议的支持supportMessageProtocols,MessageProtocol咱们也是定义了一个接口,这个接口就须要比较详细了,编码成二级制,和解码成Request等,对于response也是一样这么个过程
/**
* 通讯协议接口
* MessageProtocol
*/
public interface MessageProtocol {
/**
* 编组请求消息
* @param req
* @return
*/
byte[] marshallingRequest(Request req);
/**
* 解编组请求消息
* @param data
* @return
*/
Request unmarshallingRequest(byte[] data);
/**
* 编组响应消息
* @param rsp
* @return
*/
byte[] marshallingResponse(Response rsp);
/**
* 解编组响应消息
* @param data
* @return
*/
Response unmarshallingResponse(byte[] data);
}
复制代码
此时又存在一些问题,单纯依靠编组和解组的方法是不够的,编组和解组的操做对象是请求,响应,可是它们的内容是不一样的,此时咱们又须要定义框架标准的请求响应类
request有具体的服务名,服务方法,消息头,参数类型和参数,一样的response也有状态(经过枚举),消息头,返回值及类型以及是否存在异常。
此时协议层扩展为4个方法
将消息协议独立为一层,客户端和服务端都须要使用
网络层的工做主要是发送请求和得到响应,此时咱们若是须要发起网络请求一定先要知道服务地址,此时咱们利用下图中serviceInfo对象做为必须依赖,setRequest()方法里面会存在发送数据,还有发送给谁,此时给出了BIO和Netty两种实现
因此咱们须要的三个依赖就都出来了,一个是服务发现者,一个是协议支持,再而后就是咱们网络层的NetClient
紫色表明客户端代理部分,浅绿色属于服务发现,浅蓝色属于协议部分
由于这些代码和主要的思路已经没有瓜葛了,只是一些功能代码,因此能够直接忽略了。若是实在是想本身跑一下,也能够问我要一个小样。
能够和内容二的RpcClientProxy作一个对比,在原有的基础上加上了三个依赖ServiceInfoDiscoverer,supportMessageProtocols,netClient
在ClientStubProxyFactory中对Object作了一个缓存,若是已经存在这个缓存就直接返回,没有的话加入到缓存中而后new出来,只是一个小小的不一样。
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 一、得到服务信息
String serviceName = this.interf.getName();
ServiceInfo sinfo = sid.getServiceInfo(serviceName);
if (sinfo == null) {
throw new Exception("远程服务不存在!");
}
// 二、构造request对象
Request req = new Request();
req.setServiceName(sinfo.getName());
req.setMethod(method.getName());
req.setPrameterTypes(method.getParameterTypes());
req.setParameters(args);
// 三、协议层编组
// 得到该方法对应的协议
MessageProtocol protocol = supportMessageProtocols.get(sinfo.getProtocol());
// 编组请求
byte[] data = protocol.marshallingRequest(req);
// 四、调用网络层发送请求
byte[] repData = netClient.sendRequest(data, sinfo);
// 五、解组响应消息
Response rsp = protocol.unmarshallingResponse(repData);
// 六、结果处理
if (rsp.getException() != null) {
throw rsp.getException();
}
return rsp.getReturnValue();
}
复制代码
首先是服务发现,在咱们执行 ① 中提到的getProxy()方法时,此时代理的接口已经直接告诉咱们了,因此咱们就直接得到了接口信息interf,而后调用getName()方法获取接口的名称,经过接口名,调用服务发现者ServiceInfo提供的getServiceInfo()方法就能获取服务的具体信息,而后放入请求参数request里面,接下来给request的各个属性赋值
以后咱们就开始寻找这个服务所对应的协议,得到协议以后能够获取协议支持对象,以后进行编组请求,转换成二进制,经过netClient发送过去,顺带连同服务端信息给出去。获取结果repData进行解组(二进制回到response),以后进行结果处理。
以前也提到了,服务发现者ServiceInfoDiscoverer是做为一个接口提供了getServiceInfo()方法的
有两种不一样的实现,本地实现咱们能够本身搞一个配置文件加载进来,把相关的服务信息弄进去得了
zookeeper的服务发现实现以下,相似于咱们一开始在2 - ① 中补充的zookeeper的内容
public class ZookeeperServiceInfoDiscoverer implements ServiceInfoDiscoverer {
ZkClient client = new ZkClient("localhost:2181");
private String centerRootPath = "/rpc";
public ZookeeperServiceInfoDiscoverer() {
client.setZkSerializer(new MyZkSerializer());
}
public void regist(ServiceInfo serviceResource) {
String serviceName = serviceResource.getName();
String uri = JSON.toJSONString(serviceResource);
try {
uri = URLEncoder.encode(uri, "UTF-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
String servicePath = centerRootPath + "/"+serviceName+"/service";
if(! client.exists(servicePath)) {
client.createPersistent(servicePath, true);
}
String uriPath = servicePath+"/"+uri;
client.createEphemeral(uriPath);
}
/**
* 加载配置中心中服务资源信息
* @param serviceName
* @return
*/
public List<ServiceInfo> loadServiceResouces(String serviceName) {
String servicePath = centerRootPath + "/"+serviceName+"/service";
List<String> children = client.getChildren(servicePath);
List<ServiceInfo> resources = new ArrayList<ServiceInfo>();
for(String ch : children) {
try {
String deCh = URLDecoder.decode(ch, "UTF-8");
ServiceInfo r = JSON.parseObject(deCh, ServiceInfo.class);
resources.add(r);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return resources;
}
@Override
public ServiceInfo getServiceInfo(String name) {
List<ServiceInfo> list = loadServiceResouces(name);
ServiceInfo info = list.get(0);
list.forEach((e)->{
if(e != info) {
info.addAddress(e.getAddress().get(0));
}
});
return info;
}
}
复制代码
这里只实现了JSON的,经过fastJSON来实现
public class JSONMessageProtocol implements MessageProtocol {
@Override
public byte[] marshallingRequest(Request req) {
Request temp = new Request();
temp.setServiceName(req.getServiceName());
temp.setMethod(req.getMethod());
temp.setHeaders(req.getHeaders());
temp.setPrameterTypes(req.getPrameterTypes());
if (req.getParameters() != null) {
Object[] params = req.getParameters();
Object[] serizeParmas = new Object[params.length];
for (int i = 0; i < params.length; i++) {
serizeParmas[i] = JSON.toJSONString(params[i]);
}
temp.setParameters(serizeParmas);
}
return JSON.toJSONBytes(temp);
}
@Override
public Request unmarshallingRequest(byte[] data) {
Request req = JSON.parseObject(data, Request.class);
if(req.getParameters() != null) {
Object[] serizeParmas = req.getParameters();
Object[] params = new Object[serizeParmas.length];
for(int i = 0; i < serizeParmas.length; i++) {
Object param = JSON.parseObject(serizeParmas[i].toString(), Object.class);
params[i] = param;
}
req.setParameters(params);
}
return req;
}
@Override
public byte[] marshallingResponse(Response rsp) {
Response resp = new Response();
resp.setHeaders(rsp.getHeaders());
resp.setException(rsp.getException());
resp.setReturnValue(rsp.getReturnValue());
resp.setStatus(rsp.getStatus());
return JSON.toJSONBytes(resp);
}
@Override
public Response unmarshallingResponse(byte[] data) {
return JSON.parseObject(data, Response.class);
}
}
复制代码
public class BioNetClient implements NetClient {
@Override
public byte[] sendRequest(byte[] data, ServiceInfo sinfo) {
List<String> addressList = sinfo.getAddress();
int randNum = new Random().nextInt(addressList.size());
String address = addressList.get(randNum);
String[] addInfoArray = address.split(":");
try {
return startSend(data, addInfoArray[0], Integer.valueOf(addInfoArray[1]));
} catch (Throwable e) {
e.printStackTrace();
}
return null;
}
/**
* 经过网络IO,打开远端服务链接,将请求数据写入网络中,并得到响应结果。
*
* @param requestData 将要发送的请求数据
* @param host 远端服务域名或者ip地址
* @param port 远端服务端口号
* @return 服务端响应结果
* @throws Throwable 抛出的异常
*/
private byte[] startSend(byte[] requestData, String host, int port) throws Throwable{
// 打开远端服务链接
Socket serverSocket = new Socket(host, port);
InputStream in = null;
OutputStream out = null;
try {
// 1. 服务端输出流,写入请求数据,发送请求数据
out = serverSocket.getOutputStream();
out.write(requestData);
out.flush();
// 2. 服务端输入流,获取返回数据,转换参数类型
// 相似于反序列化的过程
in = serverSocket.getInputStream();
byte[] res = new byte[1024];
int readLen = -1;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while((readLen = in.read(res)) > 0) {
baos.write(res, 0, readLen);
}
return baos.toByteArray();
}finally{
try { // 清理资源,关闭流
if(in != null) in.close();
if(out != null) out.close();
if(serverSocket != null) serverSocket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
复制代码
public class NettyNetClient implements NetClient {
private SendHandler sendHandler;
private Map<String, SendHandler> sendHandlerMap = new ConcurrentHashMap<String, SendHandler>();
@Override
public byte[] sendRequest(byte[] data, ServiceInfo sinfo) {
try {
List<String> addressList = sinfo.getAddress();
int randNum = new Random().nextInt(addressList.size());
String address = addressList.get(randNum);
String[] addInfoArray = address.split(":");
SendHandler handler = sendHandlerMap.get(address);
if(handler == null) {
sendHandler = new SendHandler(data);
new Thread(()->{
try {
connect(addInfoArray[0], Integer.valueOf(addInfoArray[1]));
} catch (NumberFormatException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
byte[] respData = (byte[]) sendHandler.rspData();
return respData;
} catch (NumberFormatException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public void connect(String host, int port) throws Exception {
// 配置客户端
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
//EchoClientHandler handler = new EchoClientHandler();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(sendHandler);
}
});
// 启动客户端链接
ChannelFuture f = b.connect(host, port).sync();
// 等待客户端链接关闭
f.channel().closeFuture().sync();
} finally {
// 释放线程组资源
group.shutdownGracefully();
}
}
}
复制代码
能够自行模拟一个消费者和一个生产者进行测试,这里就不贴出来了
以后会继续dubbo的内容
下一篇:从零开始的高并发(九)--- dubbo的核心功能及协议