在第二章中咱们学习了如何使用Work模式在多个worker之间派发时间敏感的任务。这种状况是不涉及到返回值的,worker执行任务就好。若是涉及返回值,就要用到本章提到的RPC(Remote Procedure Call)了。java
本章咱们使用RabbitMQ来构建一个RPC系统:一个客户端和一个可扩展的RPC服务端。咱们让RPC服务返回一个斐波那契数组。json
咱们建立一个简单的客户端类来演示如何使用RPC服务。call方法发送RPC请求,并阻塞知道结果返回。数组
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
RPC贴士
虽然RPC的使用在计算机领域很是广泛,可是却常常受到批评。主要问题是编码人若是不注意使用的方法是本地仍是远程时,每每会形成问题。每每让系统变得不可预知,增长没必要要的复杂性和调试的难度。对此咱们有以下几点建议:安全
- 是本地方法仍是远程方法要一目了然
- 把系统的依赖写进文档
- 系统要处理好超时的问题
若是能够尽可能使用异步的pipeline来替代像RPC这种阻塞的操做。服务器
在RabbitMQ上实现RPC是很是简单的。客户端发送一个request message,服务端回应一个response message。为了接受response message咱们须要在发送request message的时候附带上'callback' queue的地址。咱们可使用默认的queue。app
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...
Message的属性
AMQP 0-9-1协议预约义了14个消息属性,其中大部分不多使用,下面的属性较为经常使用dom
- deliverMode: 标记message为持久(设置为2)或其余值。
- contentType:message的编码类型,咱们常用JSON编码,则设置为application/json
- replyTo: 命名回调queue
- correlationId:将RPC的请求和回应关联起来
须要引入新的类异步
import com.rabbitmq.client.AMQP.BasicProperties;
在上面的代码中,每次RPC请求都会建立一个用于回调的临时queue,咱们有更好的方法,咱们为每个client建立一个回调queue。函数
可是这样有新的问题,从回调queue中收到response没法和相应的request关联起来。这时候就是correlationId属性发挥做用的时候了。为每一个request中设置惟一的值,在稍后的回调queue中收到的response里也有这个属性,基于此,咱们就能够关联以前的request了。若是咱们遇到一个匹配不到的correlationId,那么丢弃的行为是安全的。学习
你可能会问,为何咱们忽略这些没法匹配的message,而不是当作一个错误处理呢?主要是考虑服务端的竞态条件,若是RPC服务器在发送response以后就宕机了,可是却没有发送ack消息。那么当RPC Server重启以后,会继续执行这个request。这就是为何client须要幂等处理response。
咱们的RPC向下面这样进行工做:
斐波那契处理函数
private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); }
这是一个简易的实现,若是传入一个较大的值,将会是个灾难。
RPC服务器的代码为RPCServer.java, 代码是很简单明确的
RPC客户端的代码为RPCClient.java,代码略微有点复杂
RPCClient.java完整代码
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; public class RPCClient implements AutoCloseable { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); } public static void main(String[] argv) { try (RPCClient fibonacciRpc = new RPCClient()) { for (int i = 0; i < 32; i++) { String i_str = Integer.toString(i); System.out.println(" [x] Requesting fib(" + i_str + ")"); String response = fibonacciRpc.call(i_str); System.out.println(" [.] Got '" + response + "'"); } } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } } public String call(String message) throws IOException, InterruptedException { final String corrId = UUID.randomUUID().toString(); String replyQueueName = channel.queueDeclare().getQueue(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { response.offer(new String(delivery.getBody(), "UTF-8")); } }, consumerTag -> { }); String result = response.take(); channel.basicCancel(ctag); return result; } public void close() throws IOException { connection.close(); } }
RPCServer.java完整代码
import com.rabbitmq.client.*; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.queuePurge(RPC_QUEUE_NAME); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Object monitor = new Object(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(delivery.getProperties().getCorrelationId()) .build(); String response = ""; try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized (monitor) { monitor.notify(); } } }; channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { })); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized (monitor) { try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }