出自:https://my.oschina.net/hosee/blog/711632html
在学校期间你们都写过很多程序,好比写个hello world服务类,而后本地调用下,以下所示。这些程序的特色是服务消费方和服务提供方是本地调用关系。java
public class Test { public static void main(String[] args) { HelloWorldService helloWorldService = new HelloWorldServiceImpl(); helloWorldService.sayHello("test"); } }
而一旦踏入公司尤为是大型互联网公司就会发现,公司的系统都由成千上万大大小小的服务组成,各服务部署在不一样的机器上,由不一样的团队负责。node
这时就会遇到两个问题:web
因为各服务部署在不一样机器,服务间的调用免不了网络通讯过程,服务消费方每调用一个服务都要写一坨网络通讯相关的代码,不只复杂并且极易出错。编程
若是有一种方式能让咱们像调用本地服务同样调用远程服务,而让调用者对网络通讯这些细节透明,那么将大大提升生产力,好比服务消费方在执行helloWorldService.sayHello("test")时,实质上调用的是远端的服务。这种方式其实就是RPC(Remote Procedure Call Protocol),在各大互联网公司中被普遍使用,如阿里巴巴的hsf、dubbo(开源)、Facebook的thrift(开源)、Google grpc(开源)、Twitter的finagle(开源)等。网络
要让网络通讯细节对使用者透明,咱们须要对通讯细节进行封装,咱们先看下一个RPC调用的流程涉及到哪些通讯细节:数据结构
RPC的目标就是要2~8这些步骤都封装起来,让用户对这些细节透明。并发
怎么封装通讯细节才能让用户像以本地调用方式调用远程服务呢?对java来讲就是使用代理!java代理有两种方式:负载均衡
尽管字节码生成方式实现的代理更为强大和高效,但代码维护不易,大部分公司实现RPC框架时仍是选择动态代理方式。框架
下面简单介绍下动态代理怎么实现咱们的需求。咱们须要实现RPCProxyClient代理类,代理类的invoke方法中封装了与远端服务通讯的细节,消费方首先从RPCProxyClient得到服务提供方的接口,当执行helloWorldService.sayHello("test")方法时就会调用invoke方法。
public class RPCProxyClient implements java.lang.reflect.InvocationHandler{ private Object obj; public RPCProxyClient(Object obj){ this.obj=obj; } /** * 获得被代理对象; */ public static Object getProxy(Object obj){ return java.lang.reflect.Proxy.newProxyInstance(obj.getClass().getClassLoader(), obj.getClass().getInterfaces(), new RPCProxyClient(obj)); } /** * 调用此方法执行 */ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //结果参数; Object result = new Object(); // ...执行通讯相关逻辑 // ... return result; } }
public class Test { public static void main(String[] args) { HelloWorldService helloWorldService = (HelloWorldService)RPCProxyClient.getProxy(HelloWorldService.class); helloWorldService.sayHello("test"); } }
上节讲了invoke里须要封装通讯细节(通讯细节再后面几章详细探讨),而通讯的第一步就是要肯定客户端和服务端相互通讯的消息结构。客户端的请求消息结构通常须要包括如下内容:
1)接口名称
在咱们的例子里接口名是“HelloWorldService”,若是不传,服务端就不知道调用哪一个接口了;
2)方法名
一个接口内可能有不少方法,若是不传方法名服务端也就不知道调用哪一个方法;
3)参数类型&参数值
参数类型有不少,好比有bool、int、long、double、string、map、list,甚至如struct(class);以及相应的参数值;
4)超时时间
5)requestID,标识惟一请求id,在下面一节会详细描述requestID的用处。
同理服务端返回的消息结构通常包括如下内容。
1)返回值
2)状态code
3)requestID
一旦肯定了消息的数据结构后,下一步就是要考虑序列化与反序列化了。
什么是序列化?序列化就是将数据结构或对象转换成二进制串的过程,也就是编码的过程。
什么是反序列化?将在序列化过程当中所生成的二进制串转换成数据结构或者对象的过程。
为何须要序列化?转换为二进制串后才好进行网络传输嘛!
为何须要反序列化?将二进制转换为对象才好进行后续处理!
现现在序列化的方案愈来愈多,每种序列化方案都有优势和缺点,它们在设计之初有本身独特的应用场景,那到底选择哪一种呢?从RPC的角度上看,主要看三点:
目前互联网公司普遍使用Protobuf、Thrift、Avro等成熟的序列化解决方案来搭建RPC框架,这些都是久经考验的解决方案。
消息数据结构被序列化为二进制串后,下一步就要进行网络通讯了。目前有两种经常使用IO通讯模型:1)BIO;2)NIO。通常RPC框架须要支持这两种IO模型。
如何实现RPC的IO通讯框架呢?
若是使用netty的话,通常会用channel.writeAndFlush()方法来发送消息二进制串,这个方法调用后对于整个远程调用(从发出请求到接收到结果)来讲是一个异步的,即对于当前线程来讲,将请求发送出来后,线程就能够日后执行了,至于服务端的结果,是服务端处理完成后,再以消息的形式发送给客户端的。因而这里出现如下两个问题:
以下图所示,线程A和线程B同时向client socket发送请求requestA和requestB,socket前后将requestB和requestA发送至server,而server可能将responseA先返回,尽管requestA请求到达时间更晚。咱们须要一种机制保证responseA丢给ThreadA,responseB丢给ThreadB。
怎么解决呢?
public Object get() { synchronized (this) { // 旋锁 while (!isDone) { // 是否有结果了 wait(); //没结果是释放锁,让当前线程处于等待状态 } } }
private void setDone(Response res) { this.res = res; isDone = true; synchronized (this) { //获取锁,由于前面wait()已经释放了callback的锁了 notifyAll(); // 唤醒处于等待的线程 } }
如何让别人使用咱们的服务呢?有同窗说很简单嘛,告诉使用者服务的IP以及端口就能够了啊。确实是这样,这里问题的关键在因而自动告知仍是人肉告知。
人肉告知的方式:若是你发现你的服务一台机器不够,要再添加一台,这个时候就要告诉调用者我如今有两个ip了,大家要轮询调用来实现负载均衡;调用者咬咬牙改了,结果某天一台机器挂了,调用者发现服务有一半不可用,他又只能手动修改代码来删除挂掉那台机器的ip。现实生产环境固然不会使用人肉方式。
有没有一种方法能实现自动告知,即机器的增添、剔除对调用方透明,调用者再也不须要写死服务提供方地址?固然能够,现现在zookeeper被普遍用于实现服务自动注册与发现功能!
简单来说,zookeeper能够充当一个服务注册表
(Service Registry),让多个服务提供者
造成一个集群,让服务消费者
经过服务注册表获取具体的服务访问地址(ip+端口)去访问具体的服务提供者。以下图所示:
具体来讲,zookeeper就是个分布式文件系统,每当一个服务提供者部署后都要将本身的服务注册到zookeeper的某一路径上: /{service}/{version}/{ip:port}, 好比咱们的HelloWorldService部署到两台机器,那么zookeeper上就会建立两条目录:分别为/HelloWorldService/1.0.0/100.19.20.01:16888 /HelloWorldService/1.0.0/100.19.20.02:16888。
zookeeper提供了“心跳检测”功能,它会定时向各个服务提供者发送一个请求(实际上创建的是一个 Socket 长链接),若是长期没有响应,服务中心就认为该服务提供者已经“挂了”,并将其剔除,好比100.19.20.02这台机器若是宕机了,那么zookeeper上的路径就会只剩/HelloWorldService/1.0.0/100.19.20.01:16888。
服务消费者会去监听相应路径(/HelloWorldService/1.0.0),一旦路径上的数据有任务变化(增长或减小),zookeeper都会通知服务消费方服务提供者地址列表已经发生改变,从而进行更新。
更为重要的是zookeeper与生俱来的容错容灾能力(好比leader选举),能够确保服务注册表的高可用性。
ipc.RPC类中有一些内部类,为了你们对RPC类有个初步的印象,就先罗列几个咱们感兴趣的分析一下吧:
Invocation :用于封装方法名和参数,做为数据传输层。
ClientCache :用于存储client对象,用socket factory做为hash key,存储结构为hashMap <SocketFactory, Client>。
Invoker :是动态代理中的调用实现类,继承了InvocationHandler.
Server :是ipc.Server的实现类。
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { ••• ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId); ••• return value.get(); }
若是你发现这个invoke()方法实现的有些奇怪的话,那你就对了。通常咱们看到的动态代理的invoke()方法中总会有 method.invoke(ac, arg); 这句代码。而上面代码中却没有,这是为何呢?其实使用 method.invoke(ac, arg); 是在本地JVM中调用;而在hadoop中,是将数据发送给服务端,服务端将处理的结果再返回给客户端,因此这里的invoke()方法必然须要进行网络通讯。而网络通讯就是下面的这段代码实现的:
ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), remoteId);
Invocation类在这里封装了方法名和参数。其实这里网络通讯只是调用了Client类的call()方法。那咱们接下来分析一下ipc.Client源码吧。和第一章同样,一样是3个问题
public Writable call(Writable param, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(param); //将传入的数据封装成call对象 Connection connection = getConnection(remoteId, call); //得到一个链接 connection.sendParam(call); // 向服务端发送call对象 boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); // 等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程 } catch (InterruptedException ie) { // 因中断异常而终止,设置标志interrupted为true interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { // 本地异常 throw wrapException(remoteId.getAddress(), call.error); } } else { return call.value; //返回结果数据 } } }
具体代码的做用我已作了注释,因此这里再也不赘述。但到目前为止,你依然不知道RPC机制底层的网络链接是怎么创建的。分析代码后,咱们会发现和网络通讯有关的代码只会是下面的两句了:
Connection connection = getConnection(remoteId, call); //得到一个链接 connection.sendParam(call); // 向服务端发送call对象
先看看是怎么得到一个到服务端的链接吧,下面贴出ipc.Client类中的getConnection()方法。
private Connection getConnection(ConnectionId remoteId, Call call) throws IOException, InterruptedException { if (!running.get()) { // 若是client关闭了 throw new IOException("The client is stopped"); } Connection connection; //若是connections链接池中有对应的链接对象,就不需从新建立了;若是没有就需从新建立一个链接对象。 //但请注意,该//链接对象只是存储了remoteId的信息,其实还并无和服务端创建链接。 do { synchronized (connections) { connection = connections.get(remoteId); if (connection == null) { connection = new Connection(remoteId); connections.put(remoteId, connection); } } } while (!connection.addCall(call)); //将call对象放入对应链接中的calls池,就不贴出源码了 //这句代码才是真正的完成了和服务端创建链接哦~ connection.setupIOstreams(); return connection; }
下面贴出Client.Connection类中的setupIOstreams()方法:
private synchronized void setupIOstreams() throws InterruptedException { ••• try { ••• while (true) { setupConnection(); //创建链接 InputStream inStream = NetUtils.getInputStream(socket); //得到输入流 OutputStream outStream = NetUtils.getOutputStream(socket); //得到输出流 writeRpcHeader(outStream); ••• this.in = new DataInputStream(new BufferedInputStream (new PingInputStream(inStream))); //将输入流装饰成DataInputStream this.out = new DataOutputStream (new BufferedOutputStream(outStream)); //将输出流装饰成DataOutputStream writeHeader(); // 跟新活动时间 touch(); //当链接创建时,启动接受线程等待服务端传回数据,注意:Connection继承了Tread start(); return; } } catch (IOException e) { markClosed(e); close(); } }
再有一步咱们就知道客户端的链接是怎么创建的啦,下面贴出Client.Connection类中的setupConnection()方法:
private synchronized void setupConnection() throws IOException { short ioFailures = 0; short timeoutFailures = 0; while (true) { try { this.socket = socketFactory.createSocket(); //终于看到建立socket的方法了 this.socket.setTcpNoDelay(tcpNoDelay); ••• // 设置链接超时为20s NetUtils.connect(this.socket, remoteId.getAddress(), 20000); this.socket.setSoTimeout(pingInterval); return; } catch (SocketTimeoutException toe) { /* 设置最多链接重试为45次。 * 总共有20s*45 = 15 分钟的重试时间。 */ handleConnectionFailure(timeoutFailures++, 45, toe); } catch (IOException ie) { handleConnectionFailure(ioFailures++, maxRetries, ie); } } }
终于,咱们知道了客户端的链接是怎样创建的了,其实就是建立一个普通的socket进行通讯。
下面贴出Client.Connection类的sendParam()方法吧:
public void sendParam(Call call) { if (shouldCloseConnection.get()) { return; } DataOutputBuffer d=null; try { synchronized (this.out) { if (LOG.isDebugEnabled()) LOG.debug(getName() + " sending #" + call.id); //建立一个缓冲区 d = new DataOutputBuffer(); d.writeInt(call.id); call.param.write(d); byte[] data = d.getData(); int dataLength = d.getLength(); out.writeInt(dataLength); //首先写出数据的长度 out.write(data, 0, dataLength); //向服务端写数据 out.flush(); } } catch(IOException e) { markClosed(e); } finally { IOUtils.closeStream(d); } }
下面贴出Client.Connection类和Client.Call类中的相关方法:
方法一: public void run() { ••• while (waitForWork()) { receiveResponse(); //具体的处理方法 } close(); ••• } 方法二: private void receiveResponse() { if (shouldCloseConnection.get()) { return; } touch(); try { int id = in.readInt(); // 阻塞读取id if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + id); Call call = calls.get(id); //在calls池中找到发送时的那个对象 int state = in.readInt(); // 阻塞读取call对象的状态 if (state == Status.SUCCESS.state) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // 读取数据 //将读取到的值赋给call对象,同时唤醒Client等待线程,贴出setValue()代码方法三 call.setValue(value); calls.remove(id); //删除已处理的call } else if (state == Status.ERROR.state) { ••• } else if (state == Status.FATAL.state) { ••• } } catch (IOException e) { markClosed(e); } } 方法三: public synchronized void setValue(Writable value) { this.value = value; callComplete(); //具体实现 } protected synchronized void callComplete() { this.done = true; notify(); // 唤醒client等待线程 }
完成的功能主要是:启动一个处理线程,读取从服务端传来的call对象,将call对象读取完毕后,唤醒client处理线程。就这么简单,客户端就获取了服务端返回的数据了哦~。客户端的源码分析就到这里了哦,下面咱们来分析Server端的源码吧。
为了让你们对ipc.Server有个初步的了解,咱们先分析一下它的几个内部类吧:
Call :用于存储客户端发来的请求
Listener : 监听类,用于监听客户端发来的请求,同时Listener内部还有一个静态类,Listener.Reader,当监听器监听到用户请求,便让Reader读取用户请求。
Responder :响应RPC请求类,请求处理完毕,由Responder发送给请求客户端。
Connection :链接类,真正的客户端请求读取逻辑在这个类中。
Handler :请求处理类,会循环阻塞读取callQueue中的call对象,并对其进行操做。
private void initialize(Configuration conf) throws IOException { ••• // 建立 rpc server InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf); if (dnSocketAddr != null) { int serviceHandlerCount = conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY, DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT); //得到serviceRpcServer this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount, false, conf, namesystem.getDelegationTokenSecretManager()); this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress(); setRpcServiceServerAddress(conf); } //得到server this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(), handlerCount, false, conf, namesystem .getDelegationTokenSecretManager()); ••• this.server.start(); //启动 RPC server Clients只容许链接该server if (serviceRpcServer != null) { serviceRpcServer.start(); //启动 RPC serviceRpcServer 为HDFS服务的server } startTrashEmptier(conf); }
查看Namenode初始化源码得知:RPC的server对象是经过ipc.RPC类的getServer()方法得到的。下面我们去看看ipc.RPC类中的getServer()源码吧:
public static Server getServer(final Object instance, final String bindAddress, final int port, final int numHandlers, final boolean verbose, Configuration conf, SecretManager<? extends TokenIdentifier> secretManager) throws IOException { return new Server(instance, conf, bindAddress, port, numHandlers, verbose, secretManager); }
这时咱们发现getServer()是一个建立Server对象的工厂方法,但建立的倒是RPC.Server类的对象。哈哈,如今你明白了我前面说的“RPC.Server是ipc.Server的实现类”了吧。不过RPC.Server的构造函数仍是调用了ipc.Server类的构造函数的,因篇幅所限,就不贴出相关源码了。
初始化Server后,Server端就运行起来了,看看ipc.Server的start()源码吧:
/** 启动服务 */ public synchronized void start() { responder.start(); //启动responder listener.start(); //启动listener handlers = new Handler[handlerCount]; for (int i = 0; i < handlerCount; i++) { handlers[i] = new Handler(i); handlers[i].start(); //逐个启动Handler } }
分析过ipc.Client源码后,咱们知道Client端的底层通讯直接采用了阻塞式IO编程,当时咱们曾作出猜想:Server端是否是也采用了阻塞式IO。如今咱们仔细地分析一下吧,若是Server端也采用阻塞式IO,当链接进来的Client端不少时,势必会影响Server端的性能。hadoop的实现者们考虑到了这点,因此他们采用了java NIO来实现Server端,那Server端采用java NIO是怎么创建链接的呢?分析源码得知,Server端采用Listener监听客户端的链接,下面先分析一下Listener的构造函数吧:
public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); // 建立ServerSocketChannel,并设置成非阻塞式 acceptChannel = ServerSocketChannel.open(); acceptChannel.configureBlocking(false); // 将server socket绑定到本地端口 bind(acceptChannel.socket(), address, backlogLength); port = acceptChannel.socket().getLocalPort(); // 得到一个selector selector= Selector.open(); readers = new Reader[readThreads]; readPool = Executors.newFixedThreadPool(readThreads); //启动多个reader线程,为了防止请求多时服务端响应延时的问题 for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); Reader reader = new Reader(readSelector); readers[i] = reader; readPool.execute(reader); } // 注册链接事件 acceptChannel.register(selector, SelectionKey.OP_ACCEPT); this.setName("IPC Server listener on " + port); this.setDaemon(true); }
在启动Listener线程时,服务端会一直等待客户端的链接,下面贴出Server.Listener类的run()方法:
public void run() { ••• while (running) { SelectionKey key = null; try { selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { key = iter.next(); iter.remove(); try { if (key.isValid()) { if (key.isAcceptable()) doAccept(key); //具体的链接方法 } } catch (IOException e) { } key = null; } } catch (OutOfMemoryError e) { ••• }
下面贴出Server.Listener类中doAccept()方法中的关键源码吧:
void doAccept(SelectionKey key) throws IOException, OutOfMemoryError { Connection c = null; ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel; while ((channel = server.accept()) != null) { //创建链接 channel.configureBlocking(false); channel.socket().setTcpNoDelay(tcpNoDelay); Reader reader = getReader(); //从readers池中得到一个reader try { reader.startAdd(); // 激活readSelector,设置adding为true SelectionKey readKey = reader.registerChannel(channel);//将读事件设置成兴趣事件 c = new Connection(readKey, channel, System.currentTimeMillis());//建立一个链接对象 readKey.attach(c); //将connection对象注入readKey synchronized (connectionList) { connectionList.add(numConnections, c); numConnections++; } ••• } finally { //设置adding为false,采用notify()唤醒一个reader,其实代码十三中启动的每一个reader都使 //用了wait()方法等待。因篇幅有限,就不贴出源码了。 reader.finishAdd(); } } }
当reader被唤醒,reader接着执行doRead()方法。
下面贴出Server.Listener.Reader类中的doRead()方法和Server.Connection类中的readAndProcess()方法源码:
方法一: void doRead(SelectionKey key) throws InterruptedException { int count = 0; Connection c = (Connection)key.attachment(); //得到connection对象 if (c == null) { return; } c.setLastContact(System.currentTimeMillis()); try { count = c.readAndProcess(); // 接受并处理请求 } catch (InterruptedException ieo) { ••• } ••• } 方法二: public int readAndProcess() throws IOException, InterruptedException { while (true) { ••• if (!rpcHeaderRead) { if (rpcHeaderBuffer == null) { rpcHeaderBuffer = ByteBuffer.allocate(2); } //读取请求头 count = channelRead(channel, rpcHeaderBuffer); if (count < 0 || rpcHeaderBuffer.remaining() > 0) { return count; } // 读取请求版本号 int version = rpcHeaderBuffer.get(0); byte[] method = new byte[] {rpcHeaderBuffer.get(1)}; ••• data = ByteBuffer.allocate(dataLength); } // 读取请求 count = channelRead(channel, data); if (data.remaining() == 0) { ••• if (useSasl) { ••• } else { processOneRpc(data.array());//处理请求 } ••• } } return count; } }
下面贴出Server.Connection类中的processOneRpc()方法和processData()方法的源码。
方法一: private void processOneRpc(byte[] buf) throws IOException, InterruptedException { if (headerRead) { processData(buf); } else { processHeader(buf); headerRead = true; if (!authorizeConnection()) { throw new AccessControlException("Connection from " + this + " for protocol " + header.getProtocol() + " is unauthorized for user " + user); } } } 方法二: private void processData(byte[] buf) throws IOException, InterruptedException { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); int id = dis.readInt(); // 尝试读取id Writable param = ReflectionUtils.newInstance(paramClass, conf);//读取参数 param.readFields(dis); Call call = new Call(id, param, this); //封装成call callQueue.put(call); // 将call存入callQueue incRpcCount(); // 增长rpc请求的计数 }
RPC:
Web service
web service接口就是RPC中的stub组件,规定了server可以提供的服务(web service),这在server和client上是一致的,可是也是跨语言跨平台的。同时,因为web service规范中的WSDL文件的存在,如今各平台的web service框架,均可以基于WSDL文件,自动生成web service接口 。
其实二者差很少,只是传输的协议不一样。
1. http://www.cnblogs.com/LBSer/p/4853234.html
2. http://weixiaolu.iteye.com/blog/1504898
3. http://kyfxbl.iteye.com/blog/1745550