在微服务大行其道的今天,分布式系统愈来愈重要,实现服务化首先就要考虑服务之间的通讯问题。这里面涉及序列化、反序列化、寻址、链接等等问题。。不过,有了RPC框架,咱们就无需苦恼。node
RPC(Remote Procedure Call)— 远程过程调用,是一个计算机通讯协议。该协议容许运行于一台计算机的程序调用另外一台计算机的子程序,而程序员无需额外地为这个交互做用编程。git
值得注意是,两个或多个应用程序都分布在不一样的服务器上,它们之间的调用都像是本地方法调用同样。 程序员
RPC框架有不少,比较知名的如阿里的Dubbo、google的gRPC、Go语言的rpcx、Apache的thrift。固然了,还有Spring Cloud,不过对于Spring Cloud来讲,RPC只是它的一个功能模块。github
复杂的先不讲,若是要实现一个基本功能、简单的RPC,要涉及哪些东西呢?数据库
下面咱们一块儿经过代码来分析,怎么把这些技术点串到一块儿,实现咱们本身的RPC。编程
在开始以前,笔者先介绍一下所用到的软件环境。json
SpringBoot、Netty、zookeeper、zkclient、fastjsonbootstrap
整个RPC,咱们分为生产者和消费者。首先它们有一个共同的服务接口API。在这里,咱们搞一个操做用户信息的service接口。缓存
public interface InfoUserService {
List<InfoUser> insertInfoUser(InfoUser infoUser);
InfoUser getInfoUserById(String id);
void deleteInfoUserById(String id);
String getNameById(String id);
Map<String,InfoUser> getAllUser();
}
复制代码
做为生产者,它固然要有实现类,咱们建立InfoUserServiceImpl实现类,并用注解把它标注为RPC的服务,而后注册到Spring的Bean容器中。在这里,咱们把infoUserMap当作数据库,存储用户信息。bash
package com.viewscenes.netsupervisor.service.impl;
@RpcService
public class InfoUserServiceImpl implements InfoUserService {
Logger logger = LoggerFactory.getLogger(this.getClass());
//当作数据库,存储用户信息
Map<String,InfoUser> infoUserMap = new HashMap<>();
public List<InfoUser> insertInfoUser(InfoUser infoUser) {
logger.info("新增用户信息:{}", JSONObject.toJSONString(infoUser));
infoUserMap.put(infoUser.getId(),infoUser);
return getInfoUserList();
}
public InfoUser getInfoUserById(String id) {
InfoUser infoUser = infoUserMap.get(id);
logger.info("查询用户ID:{}",id);
return infoUser;
}
public List<InfoUser> getInfoUserList() {
List<InfoUser> userList = new ArrayList<>();
Iterator<Map.Entry<String, InfoUser>> iterator = infoUserMap.entrySet().iterator();
while (iterator.hasNext()){
Map.Entry<String, InfoUser> next = iterator.next();
userList.add(next.getValue());
}
logger.info("返回用户信息记录数:{}",userList.size());
return userList;
}
public void deleteInfoUserById(String id) {
logger.info("删除用户信息:{}",JSONObject.toJSONString(infoUserMap.remove(id)));
}
public String getNameById(String id){
logger.info("根据ID查询用户名称:{}",id);
return infoUserMap.get(id).getName();
}
public Map<String,InfoUser> getAllUser(){
logger.info("查询全部用户信息{}",infoUserMap.keySet().size());
return infoUserMap;
}
}
复制代码
元注解定义以下:
package com.viewscenes.netsupervisor.annotation;
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {}
复制代码
全部的请求信息和返回信息,咱们用两个JavaBean来表示。其中的重点是,返回信息要带有请求信息的ID。
package com.viewscenes.netsupervisor.entity;
public class Request {
private String id;
private String className;// 类名
private String methodName;// 函数名称
private Class<?>[] parameterTypes;// 参数类型
private Object[] parameters;// 参数列表
get/set ...
}
复制代码
package com.viewscenes.netsupervisor.entity;
public class Response {
private String requestId;
private int code;
private String error_msg;
private Object data;
get/set ...
}
复制代码
Netty做为高性能的NIO通讯框架,在不少RPC框架中都有它的身影。咱们也采用它当作通讯服务器。说到这,咱们先看个配置文件,重点有两个,zookeeper的注册地址和Netty通讯服务器的地址。
TOMCAT端口
server.port=8001
#zookeeper注册地址
registry.address=192.168.245.131:2181,192.168.245.131:2182,192.168.245.131:2183
#RPC服务提供者地址
rpc.server.address=192.168.197.1:18868
复制代码
为了方便管理,咱们把它也注册成Bean,同时实现ApplicationContextAware接口,把上面@RpcService注解的服务类捞出来,缓存起来,供消费者调用。同时,做为服务器,还要对客户端的链路进行心跳检测,超过60秒未读写数据,关闭此链接。
package com.viewscenes.netsupervisor.netty.server;
@Component
public class NettyServer implements ApplicationContextAware,InitializingBean{
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
private static final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private static final EventLoopGroup workerGroup = new NioEventLoopGroup(4);
private Map<String, Object> serviceMap = new HashMap<>();
@Value("${rpc.server.address}")
private String serverAddress;
@Autowired
ServiceRegistry registry;
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> beans = applicationContext.getBeansWithAnnotation(RpcService.class);
for(Object serviceBean:beans.values()){
Class<?> clazz = serviceBean.getClass();
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> inter : interfaces){
String interfaceName = inter.getName();
logger.info("加载服务类: {}", interfaceName);
serviceMap.put(interfaceName, serviceBean);
}
}
logger.info("已加载所有服务接口:{}", serviceMap);
}
public void afterPropertiesSet() throws Exception {
start();
}
public void start(){
final NettyServerHandler handler = new NettyServerHandler(serviceMap);
new Thread(() -> {
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup).
channel(NioServerSocketChannel.class).
option(ChannelOption.SO_BACKLOG,1024).
childOption(ChannelOption.SO_KEEPALIVE,true).
childOption(ChannelOption.TCP_NODELAY,true).
childHandler(new ChannelInitializer<SocketChannel>() {
//建立NIOSocketChannel成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络IO事件
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 60));
pipeline.addLast(new JSONEncoder());
pipeline.addLast(new JSONDecoder());
pipeline.addLast(handler);
}
});
String[] array = serverAddress.split(":");
String host = array[0];
int port = Integer.parseInt(array[1]);
ChannelFuture cf = bootstrap.bind(host,port).sync();
logger.info("RPC 服务器启动.监听端口:"+port);
registry.register(serverAddress);
//等待服务端监听端口关闭
cf.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}).start();
}
}
复制代码
上面的代码就把Netty服务器启动了,在处理器中的构造函数中,咱们先把服务Bean的Map传进来,全部的处理要基于这个Map才能找到对应的实现类。在channelRead中,获取请求方法的信息,而后经过反射调用方法获取返回值。
package com.viewscenes.netsupervisor.netty.server;
@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private final Map<String, Object> serviceMap;
public NettyServerHandler(Map<String, Object> serviceMap) {
this.serviceMap = serviceMap;
}
public void channelActive(ChannelHandlerContext ctx) {
logger.info("客户端链接成功!"+ctx.channel().remoteAddress());
}
public void channelInactive(ChannelHandlerContext ctx) {
logger.info("客户端断开链接!{}",ctx.channel().remoteAddress());
ctx.channel().close();
}
public void channelRead(ChannelHandlerContext ctx, Object msg) {
Request request = JSON.parseObject(msg.toString(),Request.class);
if ("heartBeat".equals(request.getMethodName())) {
logger.info("客户端心跳信息..."+ctx.channel().remoteAddress());
}else{
logger.info("RPC客户端请求接口:"+request.getClassName()+" 方法名:"+request.getMethodName());
Response response = new Response();
response.setRequestId(request.getId());
try {
Object result = this.handler(request);
response.setData(result);
} catch (Throwable e) {
e.printStackTrace();
response.setCode(1);
response.setError_msg(e.toString());
logger.error("RPC Server handle request error",e);
}
ctx.writeAndFlush(response);
}
}
/**
* 经过反射,执行本地方法
* @param request
* @return
* @throws Throwable
*/
private Object handler(Request request) throws Throwable{
String className = request.getClassName();
Object serviceBean = serviceMap.get(className);
if (serviceBean!=null){
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
Method method = serviceClass.getMethod(methodName, parameterTypes);
method.setAccessible(true);
return method.invoke(serviceBean, getParameters(parameterTypes,parameters));
}else{
throw new Exception("未找到服务接口,请检查配置!:"+className+"#"+request.getMethodName());
}
}
/**
* 获取参数列表
* @param parameterTypes
* @param parameters
* @return
*/
private Object[] getParameters(Class<?>[] parameterTypes,Object[] parameters){
if (parameters==null || parameters.length==0){
return parameters;
}else{
Object[] new_parameters = new Object[parameters.length];
for(int i=0;i<parameters.length;i++){
new_parameters[i] = JSON.parseObject(parameters[i].toString(),parameterTypes[i]);
}
return new_parameters;
}
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.ALL_IDLE){
logger.info("客户端已超过60秒未读写数据,关闭链接.{}",ctx.channel().remoteAddress());
ctx.channel().close();
}
}else{
super.userEventTriggered(ctx,evt);
}
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.info(cause.getMessage());
ctx.close();
}
}
复制代码
咱们启动了Netty通讯服务器,而且把服务实现类加载到缓存,等待请求时调用。这一步,咱们要进行服务注册。为了简单化处理,咱们只注册通讯服务器的监听地址便可。 在上面代码中,bind以后咱们执行了registry.register(serverAddress);
它的做用就是,将Netty监听的IP端口注册到zookeeper。
package com.viewscenes.netsupervisor.registry;
@Component
public class ServiceRegistry {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${registry.address}")
private String registryAddress;
private static final String ZK_REGISTRY_PATH = "/rpc";
public void register(String data) {
if (data != null) {
ZkClient client = connectServer();
if (client != null) {
AddRootNode(client);
createNode(client, data);
}
}
}
//链接zookeeper
private ZkClient connectServer() {
ZkClient client = new ZkClient(registryAddress,20000,20000);
return client;
}
//建立根目录/rpc
private void AddRootNode(ZkClient client){
boolean exists = client.exists(ZK_REGISTRY_PATH);
if (!exists){
client.createPersistent(ZK_REGISTRY_PATH);
logger.info("建立zookeeper主节点 {}",ZK_REGISTRY_PATH);
}
}
//在/rpc根目录下,建立临时顺序子节点
private void createNode(ZkClient client, String data) {
String path = client.create(ZK_REGISTRY_PATH + "/provider", data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
logger.info("建立zookeeper数据节点 ({} => {})", path, data);
}
}
复制代码
有一点须要注意,子节点必须是临时节点。这样,生产者端停掉以后,才能通知到消费者,把此服务从服务列表中剔除。到此为止,生产者端已经完成。咱们看一下它的启动日志:
加载服务类: com.viewscenes.netsupervisor.service.InfoUserService
已加载所有服务接口:{com.viewscenes.netsupervisor.service.InfoUserService=com.viewscenes.netsupervisor.service.impl.InfoUserServiceImpl@46cc127b}
Initializing ExecutorService 'applicationTaskExecutor'
Tomcat started on port(s): 8001 (http) with context path ''
Started RpcProviderApplication in 2.003 seconds (JVM running for 3.1)
RPC 服务器启动.监听端口:18868
Starting ZkClient event thread.
Socket connection established to 192.168.245.131/192.168.245.131:2183, initiating session
Session establishment complete on server 192.168.245.131/192.168.245.131:2183, sessionid = 0x367835b48970010, negotiated timeout = 4000
zookeeper state changed (SyncConnected)
建立zookeeper主节点 /rpc
建立zookeeper数据节点 (/rpc/provider0000000000 => 192.168.197.1:28868)
复制代码
首先,咱们须要把生产者端的服务接口API,即InfoUserService。以相同的目录放到消费者端。路径不一样,调用会找不到的哦。
RPC的目标其中有一条,《程序员无需额外地为这个交互做用编程。》因此,咱们在调用的时候,就像调用本地方法同样。就像下面这样:
@Controller
public class IndexController {
@Autowired
InfoUserService userService;
@RequestMapping("getById")
@ResponseBody
public InfoUser getById(String id){
logger.info("根据ID查询用户信息:{}",id);
return userService.getInfoUserById(id);
}
}
复制代码
那么,问题来了。消费者端并无此接口的实现,怎么调用到的呢?这里,首先就是代理。笔者这里用的是Spring的工厂Bean机制建立的代理对象,涉及的代码较多,就不在文章中体现了,若是有不懂的同窗,请想象一下,MyBatis中的Mapper接口怎么被调用的。能够参考笔者文章:Mybatis源码分析(四)mapper接口方法是怎样被调用到的
总之,在调用userService方法的时候,会调用到代理对象的invoke方法。在这里,封装请求信息,而后调用Netty的客户端方法发送消息。而后根据方法返回值类型,转成相应的对象返回。
package com.viewscenes.netsupervisor.configurer.rpc;
@Component
public class RpcFactory<T> implements InvocationHandler {
@Autowired
NettyClient client;
Logger logger = LoggerFactory.getLogger(this.getClass());
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Request request = new Request();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameters(args);
request.setParameterTypes(method.getParameterTypes());
request.setId(IdUtil.getId());
Object result = client.send(request);
Class<?> returnType = method.getReturnType();
Response response = JSON.parseObject(result.toString(), Response.class);
if (response.getCode()==1){
throw new Exception(response.getError_msg());
}
if (returnType.isPrimitive() || String.class.isAssignableFrom(returnType)){
return response.getData();
}else if (Collection.class.isAssignableFrom(returnType)){
return JSONArray.parseArray(response.getData().toString(),Object.class);
}else if(Map.class.isAssignableFrom(returnType)){
return JSON.parseObject(response.getData().toString(),Map.class);
}else{
Object data = response.getData();
return JSONObject.parseObject(data.toString(), returnType);
}
}
}
复制代码
在生产者端,咱们把服务IP端口都注册到zookeeper中,因此这里,咱们要去拿到服务地址,而后经过Netty链接。重要的是,还要对根目录进行监听子节点变化,这样随着生产者的上线和下线,消费者端能够及时感知。
package com.viewscenes.netsupervisor.connection;
@Component
public class ServiceDiscovery {
@Value("${registry.address}")
private String registryAddress;
@Autowired
ConnectManage connectManage;
// 服务地址列表
private volatile List<String> addressList = new ArrayList<>();
private static final String ZK_REGISTRY_PATH = "/rpc";
private ZkClient client;
Logger logger = LoggerFactory.getLogger(this.getClass());
@PostConstruct
public void init(){
client = connectServer();
if (client != null) {
watchNode(client);
}
}
//链接zookeeper
private ZkClient connectServer() {
ZkClient client = new ZkClient(registryAddress,30000,30000);
return client;
}
//监听子节点数据变化
private void watchNode(final ZkClient client) {
List<String> nodeList = client.subscribeChildChanges(ZK_REGISTRY_PATH, (s, nodes) -> {
logger.info("监听到子节点数据变化{}",JSONObject.toJSONString(nodes));
addressList.clear();
getNodeData(nodes);
updateConnectedServer();
});
getNodeData(nodeList);
logger.info("已发现服务列表...{}", JSONObject.toJSONString(addressList));
updateConnectedServer();
}
//链接生产者端服务
private void updateConnectedServer(){
connectManage.updateConnectServer(addressList);
}
private void getNodeData(List<String> nodes){
logger.info("/rpc子节点数据为:{}", JSONObject.toJSONString(nodes));
for(String node:nodes){
String address = client.readData(ZK_REGISTRY_PATH+"/"+node);
addressList.add(address);
}
}
}
复制代码
其中,connectManage.updateConnectServer(addressList);
就是根据服务地址,去链接生产者端的Netty服务。而后建立一个Channel列表,在发送消息的时候,从中选取一个Channel和生产者端进行通讯。
Netty客户端有两个方法比较重要,一个是根据IP端口链接服务器,返回Channel,加入到链接管理器;一个是用Channel发送请求数据。同时,做为客户端,空闲的时候还要往服务端发送心跳信息。
package com.viewscenes.netsupervisor.netty.client;
@Component
public class NettyClient {
Logger logger = LoggerFactory.getLogger(this.getClass());
private EventLoopGroup group = new NioEventLoopGroup(1);
private Bootstrap bootstrap = new Bootstrap();
@Autowired
NettyClientHandler clientHandler;
@Autowired
ConnectManage connectManage;
public Object send(Request request) throws InterruptedException{
Channel channel = connectManage.chooseChannel();
if (channel!=null && channel.isActive()) {
SynchronousQueue<Object> queue = clientHandler.sendRequest(request,channel);
Object result = queue.take();
return JSONArray.toJSONString(result);
}else{
Response res = new Response();
res.setCode(1);
res.setError_msg("未正确链接到服务器.请检查相关配置信息!");
return JSONArray.toJSONString(res);
}
}
public Channel doConnect(SocketAddress address) throws InterruptedException {
ChannelFuture future = bootstrap.connect(address);
Channel channel = future.sync().channel();
return channel;
}
....其余方法略
}
复制代码
咱们必须重点关注send方法,它是在代理对象invoke方法调用到的。首先从链接器中轮询选择一个Channel,而后发送数据。可是,Netty是异步操做,咱们还要转为同步,就是说要等待生产者端返回数据才往下执行。笔者在这里用的是同步队列SynchronousQueue,它的take方法会阻塞在这里,直到里面有数据可读。而后在处理器中,拿到返回信息写到队列中,take方法返回。
package com.viewscenes.netsupervisor.netty.client;
@Component
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Autowired
NettyClient client;
@Autowired
ConnectManage connectManage;
Logger logger = LoggerFactory.getLogger(this.getClass());
private ConcurrentHashMap<String,SynchronousQueue<Object>> queueMap = new ConcurrentHashMap<>();
public void channelActive(ChannelHandlerContext ctx) {
logger.info("已链接到RPC服务器.{}",ctx.channel().remoteAddress());
}
public void channelInactive(ChannelHandlerContext ctx) {
InetSocketAddress address =(InetSocketAddress) ctx.channel().remoteAddress();
logger.info("与RPC服务器断开链接."+address);
ctx.channel().close();
connectManage.removeChannel(ctx.channel());
}
public void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {
Response response = JSON.parseObject(msg.toString(),Response.class);
String requestId = response.getRequestId();
SynchronousQueue<Object> queue = queueMap.get(requestId);
queue.put(response);
queueMap.remove(requestId);
}
public SynchronousQueue<Object> sendRequest(Request request,Channel channel) {
SynchronousQueue<Object> queue = new SynchronousQueue<>();
queueMap.put(request.getId(), queue);
channel.writeAndFlush(request);
return queue;
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {
logger.info("已超过30秒未与RPC服务器进行读写操做!将发送心跳消息...");
if (evt instanceof IdleStateEvent){
IdleStateEvent event = (IdleStateEvent)evt;
if (event.state()== IdleState.ALL_IDLE){
Request request = new Request();
request.setMethodName("heartBeat");
ctx.channel().writeAndFlush(request);
}
}else{
super.userEventTriggered(ctx,evt);
}
}
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){
logger.info("RPC通讯服务器发生异常.{}",cause);
ctx.channel().close();
}
}
复制代码
至此,消费者端也基本完成。一样的,咱们先看一下启动日志:
Waiting for keeper state SyncConnected
Opening socket connection to server 192.168.139.129/192.168.139.129:2181. Will not attempt to authenticate using SASL (unknown error)
Socket connection established to 192.168.139.129/192.168.139.129:2181, initiating session
Session establishment complete on server 192.168.139.129/192.168.139.129:2181, sessionid = 0x100000273ba002c, negotiated timeout = 20000
zookeeper state changed (SyncConnected)
/rpc子节点数据为:["provider0000000015"]
已发现服务列表...["192.168.100.74:18868"]
加入Channel到链接管理器./192.168.100.74:18868
已链接到RPC服务器./192.168.100.74:18868
Initializing ExecutorService 'applicationTaskExecutor'
Tomcat started on port(s): 7002 (http) with context path ''
Started RpcConsumerApplication in 4.218 seconds (JVM running for 5.569)
复制代码
咱们以Controller里面的两个方法为例,先开启100个线程调用insertInfoUser方法,而后开启1000个线程调用查询方法getAllUser。
public class IndexController {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
InfoUserService userService;
@RequestMapping("insert")
@ResponseBody
public List<InfoUser> getUserList() throws InterruptedException {
long start = System.currentTimeMillis();
int thread_count = 100;
CountDownLatch countDownLatch = new CountDownLatch(thread_count);
for (int i=0;i<thread_count;i++){
new Thread(() -> {
InfoUser infoUser = new InfoUser(IdUtil.getId(),"Jeen","BeiJing");
List<InfoUser> users = userService.insertInfoUser(infoUser);
logger.info("返回用户信息记录:{}", JSON.toJSONString(users));
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
long end = System.currentTimeMillis();
logger.info("线程数:{},执行时间:{}",thread_count,(end-start));
return null;
}
@RequestMapping("getAllUser")
@ResponseBody
public Map<String,InfoUser> getAllUser() throws InterruptedException {
long start = System.currentTimeMillis();
int thread_count = 1000;
CountDownLatch countDownLatch = new CountDownLatch(thread_count);
for (int i=0;i<thread_count;i++){
new Thread(() -> {
Map<String, InfoUser> allUser = userService.getAllUser();
logger.info("查询全部用户信息:{}",JSONObject.toJSONString(allUser));
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
long end = System.currentTimeMillis();
logger.info("线程数:{},执行时间:{}",thread_count,(end-start));
return null;
}
}
复制代码
结果以下:
本文简单介绍了RPC的整个流程,若是你正在学习RPC的相关知识,能够根据文中的例子,本身实现一遍。相信写完以后,你会对RPC会有更深一些的认识。
生产者端流程:
消费者端流程:
限于篇幅,本文代码并不完整,若有须要,访问:https://github.com/taoxun/simple_rpc 或者添加笔者微信公众号:<清幽之地的博客>),获取完整项目。