原文来自 RabbitMQ 英文官网的教程(6.Remote procedure call - RPC),其示例代码采用了 .NET C# 语言。html
In the second tutorial we learned how to use Work Queues to distribute time-consuming tasks among multiple workers.react
在第二篇教程中,咱们学习了如何使用工做队列在多个工做单元之间分配耗时的任务。程序员
But what if we need to run a function on a remote computer and wait for the result? Well, that's a different story. This pattern is commonly known as Remote Procedure Call or RPC.json
可是假如咱们须要运行一个在远程电脑上的函数并等待其结果将会怎样呢?好吧,这将是一个彻底不一样的故事,这个模式被广泛认为叫远程过程调用或者简称 RPC。服务器
In this tutorial we're going to use RabbitMQ to build an RPC system: a client and a scalable RPC server. As we don't have any time-consuming tasks that are worth distributing, we're going to create a dummy RPC service that returns Fibonacci numbers.网络
在本教程中咱们即将使用 RabbitMQ 来构建一个 RPC 系统:一个客户端和一个可伸缩的 RPC 服务器。因为咱们并无任何耗时的任务能拿来分配,那就建立一个返回斐波纳契数列的虚拟 RPC 服务吧。app
客户端接口
To illustrate how an RPC service could be used we're going to create a simple client class. It's going to expose a method named call which sends an RPC request and blocks until the answer is received:异步
为了说明如何使用 RPC 服务咱们来建立一个简单的客户端类。我会公开一个名叫 call 的方法,该方法用以发送一个 RPC 请求并保持阻塞状态,直至接收到应答为止。async
var rpcClient = new RPCClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close();
关于 RPC
Although RPC is a pretty common pattern in computing, it's often criticised. The problems arise when a programmer is not aware whether a function call is local or if it's a slow RPC. Confusions like that result in an unpredictable system and adds unnecessary complexity to debugging. Instead of simplifying software, misused RPC can result in unmaintainable spaghetti code.ide
尽管 RPC 是一个很常见的计算模式,也时常遭受批评。当程序员不知道针对 call 函数的调用是本地的仍是很慢的 RPC 时就会出现问题,像这样的困惑每每会致使不可预测的系统(问题)以及徒增没必要要的调试复杂性。与简化软件有所不一样的是,误用 RPC 会致使难以维护的意大利面条式代码。
Bearing that in mind, consider the following advice:
Make sure it's obvious which function call is local and which is remote.
Document your system. Make the dependencies between components clear.
Handle error cases. How should the client react when the RPC server is down for a long time?
记住以上问题,并考虑如下建议:
- 确保能够明显区分哪个函数是调用本地的,哪个是远程的。
- 为系统编写文档,确保组件之间的依赖很明确。
- 处理错误情形,当 RPC 服务端停机很长时间时,客户端会怎样应对?
When in doubt avoid RPC. If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.
当有疑问时先避免使用 RPC,若是能够,考虑使用一个异步管道 - 它相似于 RPC 的阻塞,会经过异步的方式将结果推送到下一个计算场景。
回调队列
In general doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response we need to send a 'callback' queue address with the request:
通常而言,基于 RabbitMQ 来使用 RPC 是很简单的,即客户端发送一个请求消息,而后服务端使用一个响应消息做为应答。为了能得到一个响应,咱们须要在请求过程当中发送一个“callback”队列地址。
var corrId = Guid.NewGuid().ToString(); var props = channel.CreateBasicProperties(); props.ReplyTo = replyQueueName; props.CorrelationId = corrId; var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); // ... then code to read a response message from the callback_queue ...
消息属性
The AMQP 0-9-1 protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
deliveryMode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
contentType: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
replyTo: Commonly used to name a callback queue.
correlationId: Useful to correlate RPC responses with requests.
AMQP 0-9-1 协议会在消息中预约义包含有 14 个属性的集合,大部分的属性用得都比较少,除了如下几项以外:
- deliveryMode:将消息标记为持久的(值为2),或者瞬时的(其余值),想必你在第二篇教程中还记得这个属性。
- contentType:常常用来描述编码的 mime 类型,好比在常见的 JSON 编码中一个好的实践即是设置该属性为:application/json。
- replyTo:一般用来为回调队列命名。
- correlationId:用以将 RPC 响应与请求关联起来。
CorrelationId
In the method presented above we suggest creating a callback queue for every RPC request. That's pretty inefficient, but fortunately there is a better way - let's create a single callback queue per client.
在上面呈现的方法中咱们建议为每个 RPC 请求建立一个回调队列,不过这很低效,幸运的是咱们有更好的办法 - 让咱们为每个客户端建立一个单独的回调。
That raises a new issue, having received a response in that queue it's not clear to which request the response belongs. That's when the correlationId property is used. We're going to set it to a unique value for every request. Later, when we receive a message in the callback queue we'll look at this property, and based on that we'll be able to match a response with a request. If we see an unknown correlationId value, we may safely discard the message - it doesn't belong to our requests.
这就又会出现一个问题,即在收到响应的队列中,并不清楚哪一个请求隶属于该响应,这即是 correlationId 属性所用之处。咱们将会对每个请求设置 correlationId 为惟一值,而后,当咱们在回调队列中接收到消息时会查看这个属性,在该属性的基础上,咱们可让请求与响应进行匹配。若是咱们发现有未知的 correlationId 值,则能够放心地丢弃这些并不属于咱们的请求的消息。
You may ask, why should we ignore unknown messages in the callback queue, rather than failing with an error? It's due to a possibility of a race condition on the server side. Although unlikely, it is possible that the RPC server will die just after sending us the answer, but before sending an acknowledgment message for the request.
If that happens, the restarted RPC server will process the request again. That's why on the client we must handle the duplicate responses gracefully, and the RPC should ideally be idempotent.
你可能会问,咱们为何应该在回调队列中忽略未知的消息,而不是(直接)返回错误?这多是因为服务端存在竞态条件。尽管不太可能,可是针对一个请求,RPC 服务器极可能在发送完应答后停止,而不是在发送确认消息以前。若是确实发生,重启的 RPC 服务将再一次处理这个请求,这就是为何咱们在客户端须要优雅地处理重复的响应,以及应该(保持)理想地幂等性。
总结
Our RPC will work like this:
When the Client starts up, it creates an anonymous exclusive callback queue.
For an RPC request, the Client sends a message with two properties: replyTo, which is set to the callback queue and correlationId, which is set to a unique value for every request.
The request is sent to an rpc_queue queue.
The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the replyTo field.
The client waits for data on the callback queue. When a message appears, it checks the correlationId property. If it matches the value from the request it returns the response to the application.
RPC 会像以下这样运做:
- 当客户端启动时,它将建立一个匿名的独有回调队列。
- 针对一个 RPC 请求,客户端会发送一个基于两个属性的消息:一个是指向回调队列的 replyTo,另外一个是为每个请求标记惟一值的 correlationId。
- 请求将发送至 rpc_queue 队列。
- RPC 工做单元(或者叫服务端)会在队列中持续等待请求。当请求出现时,RPC 将完成工做,同时使用来自 replyTo 字段(所指代)的队列来发送携带着结果的消息返回至客户端。
- 客户端在回调队列上等待着数据,当一个消息出现时,客户端会检查 correlationId 属性,若是该值与当前请求的值相匹配,则把响应返回给应用程序。
融合一块儿
The Fibonacci task:
斐波纳契任务(函数)
private static int fib(int n) { if (n == 0 || n == 1) return n; return fib(n - 1) + fib(n - 2); }
We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, and it's probably the slowest recursive implementation possible).
咱们声明了斐波纳契函数,并假定只(容许)输入正整数。(不要指望输入过大的数字,由于极可能这个递归实现会很是慢)
The code for our RPC server RPCServer.cs looks like this:
针对咱们的 RPC 服务端,RPCServer.cs 类文件的代码看起来以下:
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class RPCServer { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer); Console.WriteLine(" [x] Awaiting RPC requests"); consumer.Received += (model, ea) => { string response = null; var body = ea.Body; var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine(" [.] fib({0})", message); response = fib(n).ToString(); } catch (Exception e) { Console.WriteLine(" [.] " + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } /// /// Assumes only valid positive integer input. /// Don't expect this one to work for big numbers, and it's /// probably the slowest recursive implementation possible. /// private static int fib(int n) { if (n == 0 || n == 1) { return n; } return fib(n - 1) + fib(n - 2); } }
The server code is rather straightforward:
As usual we start by establishing the connection, channel and declaring the queue.
We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set the prefetchCount setting in channel.basicQos.
We use basicConsume to access the queue. Then we register a delivery handler in which we do the work and send the response back.
服务端的代码是至关简单的。
- 像往常同样,咱们先创建链接、信道以及声明队列。
- 咱们可能想运行不仅一个服务端处理程序,为了能经过多台服务器平均地分担负载,咱们须要设定 channel.basicQos 中 prefetchCount 的值。
- 咱们使用 basicConsume 来访问队列,而后注册一个递送程序,在这个程序中咱们执行工做并返回响应。
The code for our RPC client RPCClient.cs:
针对咱们的 RPC 客户端,RPCClient.cs 类文件的代码以下:
using System; using System.Collections.Concurrent; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; public class RpcClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); private readonly IBasicProperties props; public RpcClient() { var factory = new ConnectionFactory() { HostName = "localhost" }; connection = factory.CreateConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties(); var correlationId = Guid.NewGuid().ToString(); props.CorrelationId = correlationId; props.ReplyTo = replyQueueName; consumer.Received += (model, ea) => { var body = ea.Body; var response = Encoding.UTF8.GetString(body); if (ea.BasicProperties.CorrelationId == correlationId) { respQueue.Add(response); } }; } public string Call(string message) { var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish( exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); channel.BasicConsume( consumer: consumer, queue: replyQueueName, autoAck: true); return respQueue.Take(); ; } public void Close() { connection.Close(); } } public class Rpc { public static void Main() { var rpcClient = new RpcClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close(); } }
The client code is slightly more involved:
We establish a connection and channel and declare an exclusive 'callback' queue for replies.
We subscribe to the 'callback' queue, so that we can receive RPC responses.
Our call method makes the actual RPC request.
Here, we first generate a unique correlationId number and save it - the while loop will use this value to catch the appropriate response.
Next, we publish the request message, with two properties: replyTo and correlationId.
At this point we can sit back and wait until the proper response arrives.
The while loop is doing a very simple job, for every response message it checks if the correlationId is the one we're looking for. If so, it saves the response.
Finally we return the response back to the user.
客户端的代码稍微多一些:
- 咱们创建链接和信道,以及针对答复(响应)声明一个独有的“callback”队列。
- 咱们订阅这个“callback”队列,以即可以接收到 RPC 响应。
- 咱们的 call 方法将发起一个实际的 RPC 请求。
- 在此,咱们首先生成一个惟一的 correlationId 编号并保存好它,由于 while 循环会使用该值来捕获匹配的响应。
- 接下来,咱们发布请求消息,它包含了两个属性:replyTo 和 correlationId。
- 此时,咱们能够稍微等待一下直到指定的响应到来。
- while 循环所作的事情很是简单,对于每个响应消息,它都会检查 correlationId 是否为咱们正在寻找的那一个,若是是就保存该响应。
- 最终,咱们将响应返回给用户。
Making the Client request:
客户端请求
var rpcClient = new RPCClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close();
Now is a good time to take a look at our full example source code (which includes basic exception handling) for RPCClient.cs and RPCServer.cs.
Set up as usual (see tutorial one):
Our RPC service is now ready. We can start the server:
如今是时候来看一下 RPCClient.cs 和 RPCServer.cs 完整的示例代码了(包含了基本的异常处理)。
像往常一下建立(可参考第一篇):
咱们的 RPC 服务已经就绪,现能够开启服务端:
cd RPCServer dotnet run # => [x] Awaiting RPC requests
To request a fibonacci number run the client:
运行客户端来请求一个斐波纳契数:
cd RPCClient dotnet run # => [x] Requesting fib(30)
The design presented here is not the only possible implementation of a RPC service, but it has some important advantages:
If the RPC server is too slow, you can scale up by just running another one. Try running a second RPCServer in a new console.
On the client side, the RPC requires sending and receiving only one message. No synchronous calls like queueDeclare are required. As a result the RPC client needs only one network round trip for a single RPC request.
目前所呈现的设计不只仅是 RPC 服务的可能实现,并且还有一些重要优势:
- 若是 RPC 服务很慢,你能够经过运行另外一个来横向扩展,也就是尝试在新的控制台中运行第二个 RPCServer。
- 在客户端,RPC 只能发送和接收一条消息,必需像 queueDeclare 那样进行非同步式调用。所以,RPC 客户端只须要单次请求的一次网络往返。
Our code is still pretty simplistic and doesn't try to solve more complex (but important) problems, like:
How should the client react if there are no servers running?
Should a client have some kind of timeout for the RPC?
If the server malfunctions and raises an exception, should it be forwarded to the client?
Protecting against invalid incoming messages (eg checking bounds, type) before processing.
咱们的代码仍然很简单,也并无尝试去解决更复杂(但很重要的)问题,好比就像:
- 若是服务端没有运行,那么客户端将如何应对?
- 客户端针对 RPC 是否应该有某种超时(应对措施)?
- 若是服务端出现故障并引起异常,它是否应该转发给客户端?
- 在处理以前防备无效的传入消息(好比检查边界和类型)。