在第二个教程中,咱们学习了如何使用工做队列在多个worker之间分配耗时的任务。
可是若是咱们须要在远程计算机上运行功能并等待结果呢?嗯,这是另一件事情,这种模式一般被称为远程过程调用(RPC)。
在本教程中咱们将使用RabbitMQ的创建一个RPC系统:一个客户端和一个可伸缩的RPC服务器。因为咱们没有什么耗时的任务,咱们要建立一个返回斐波那契数虚设RPC服务。html
为了说明RPC如何使用,咱们将建立一个简单的客户端类。它将建立一个名为call的方法——发送RPC请求,而且处于阻塞状态,直到收到应答。java
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
PRC笔记
尽管PRC是一个常见的模式,它常常受到批评。当程序员不知道他所调用的方法是本地的仍是一个缓慢的RPC,问题就出现了。这样的混乱在系统中形成不可预料的结果,并增长了没必要要的调试的复杂性,相比于简单的软件,PRC的滥用可能致使形成不可维护的面条式的代码。
考虑到这一点,请参考如下建议:git确保能明确分辨出哪些函数是本地的,哪些是远程的。
创建文档,让组件之间的依赖关系更清楚。
处理错误的case,若是RPC服务器挂了很长时间,客户端应该怎么处理?
若是对以上有疑问,请避免使用。若是没有,你也应该使用异步管道,而不是阻塞式的RPC调用,结果被异步地推到下一个计算阶段。程序员
通常来讲利用RabbitMQ来作RPC是很简单的。客户端发送请求消息,服务端回复应答消息。为了能收到回复,咱们须要发送一个“callback”队列地址在请求里面。咱们可使用默认队列(这是Java客户端独有的):github
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); // ... then code to read a response message from the callback_queue ...
消息属性
AMQP协议预约义了14个属性去发送消息。大部分的属性都不多使用,可是下列除外:json
deliveryMode:标记的消息为持久(值为2)或暂时的(任何其余值)。你可能还记得第二个教程中的此属性。
contentType:用于描述MIME类型的编码。例如,对于常用JSON编码,是一个很好的作法,将此属性设置为:application/json。
eplyTo: 经常使用于命名一个回调队列。
correlationId: 用于关联的RPC响应。服务器
咱们须要import:网络
import com.rabbitmq.client.AMQP.BasicProperties;
在上面介绍的方法中,咱们建议为每个RPC请求创建一个回调队列。这是至关低效的,幸亏有一个更好的办法 - 让咱们建立每一个客户端一个回调队列。
这样产生了一个新的问题,在收到该回调队列的响应的时候,咱们并不知道该响应是哪一个请求的响应,这就是correlationId属性的用处,咱们将它设置为每一个请求的惟一值。这样,当咱们在回调队列收到一条消息的时候,咱们将看看这个属性,就能找到与这个响应相对应的请求。若是咱们看到一个未知的correlationId,咱们彻底能够丢弃消息,由于他不并不属于咱们系统。
你也许会问,为何咱们选择丢弃这个消息,而不是抛出一个错误。这是为了解决服务器端有可能发生的竞争状况。尽管可能性不大,但RPC服务器仍是有可能在已将应答发送给咱们但还未将确认消息发送给请求的状况下死掉。若是这种状况发生,RPC在重启后会从新处理请求。这就是为何咱们必须在客户端优雅的处理重复响应,同时RPC也须要尽量保持幂等性。并发
咱们的RPC这样工做:
app
斐波那契数列任务:
private static int fib(int n) throws Exception { if (n == 0) return 0; if (n == 1) return 1; return fib(n-1) + fib(n-2); }
咱们定义一个斐波那契的方法,假定只有有效的正整数输入。(不要期望它为大数据工做,这多是最慢的递归实现)
咱们的RPC服务器RPCServer.java的代码以下:
private static final String RPC_QUEUE_NAME = "rpc_queue"; ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties .Builder() .correlationId(props.getCorrelationId()) .build(); String message = new String(delivery.getBody()); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); String response = "" + fib(n); channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes()); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }
以上服务端的代码很简单:
RPCClient.java:
private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = java.util.UUID.randomUUID().toString(); BasicProperties props = new BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes()); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody()); break; } } return response; } public void close() throws Exception { connection.close(); }
客户端的代码稍微复杂:
客户端请求:
RPCClient fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); String response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); fibonacciRpc.close();
以上的设计不是惟一可能的实现一个RPC服务的,但它有一些重要的优势:
咱们的代码依旧很是简单,并且没有试图去解决一些复杂(可是重要)的问题,如:
原文地址:https://www.rabbitmq.com/tutorials/tutorial-six-java.html
代码地址:https://github.com/aheizi/hi-mq
相关:
1.RabbitMQ之HelloWorld
2.RabbitMQ之任务队列
3.RabbitMQ之发布订阅
4.RabbitMQ之路由(Routing)
5.RabbitMQ之主题(Topic)
6.RabbitMQ之远程过程调用(RPC)