【译】RabbitMQ:远程过程调用(RPC)

在教程二中,咱们学习了如何使用工做队列在多个工做线程中分发耗时的任务。但若是咱们须要去执行远程机器上的方法而且等待结果会怎么样呢?那又是另一回事了。这种模式一般被称为远程过程调用(RPC)。html

本教程中咱们将使用RabbitMQ构建一个远程过程调用系统:一个客户端和一个可扩展的服务器。因为没有什么耗时的任务值得分发,咱们将建立一个虚拟的RPC服务用于返回斐波那契数列。git

客户端接口

为了阐释如何使用RPC服务咱们将建立一个简单的客户端类。类中奖公开一个方法用于发送一个RPC请求,而后阻塞知道收到应答,方法名称叫作call:程序员

1 var rpcClient = new RPCClient();
2 
3 Console.WriteLine(" [x] Requesting fib(30)");
4 var response = rpcClient.Call("30");
5 Console.WriteLine(" [.] Got '{0}'", response);
6 
7 rpcClient.Close();

 

RPC注记github

尽管RPC在计算机技术中是一种很是常见的模式,可是它却饱受批判,问题发生在程序员不知道一个调用是本地的仍是一个耗时的RPC。这样的混乱,致使不可预知的系统,并将没必要要的复杂性调价到调试过程当中。误用RPC将致使不可维护的混乱的代码,而不是简化软件。json

铭记这些限制,考虑下面的建议:安全

  • 确保方法是本地调用仍是远程调用能清晰明了
  • 将系统归档备案,使组件间的依赖关系足够清晰
  • 捕获异常,当RPC服务宕机很长时间客户端做何响应?

应该在不能肯定的时候避免使用RPC,若是能够的话,你可使用异步管道,而不是类RPC的阻塞,结果被异步推送到下一个计算阶段。服务器

 

回调队列

通常来讲,在RabbitMQ之上构建RPC很是的容易,客户端发送请求消息,服务返回应答消息。为了可以接收到应答的消息,咱们须要在请求时指定一个回调队列地址:网络

 1 var corrId = Guid.NewGuid().ToString();
 2 var props = channel.CreateBasicProperties();
 3 props.ReplyTo = replyQueueName;
 4 props.CorrelationId = corrId;
 5 
 6 var messageBytes = Encoding.UTF8.GetBytes(message);
 7 channel.BasicPublish(exchange: "",
 8                      routingKey: "rpc_queue",
 9                      basicProperties: props,
10                      body: messageBytes);
11 
12 // ... 而后是从回调队列中读取消息的代码 ... 

消息属性

AMQP协议预约义了一个包含14个属性的属性集做用于消息之上,大多数都不多使用,除了下面这些:app

  • deliveryMode:将消息标记为持续(使用数值2)或瞬时(其余任意值)的,经过教程二你应该还记得这个属性。
  • contentType:用于描述媒体类型编码,例如:针对经常使用的JSON编码,最好的作法是把这个属性设置为:application/json
  • relayTo:一般用于命名一个回调队列。
  • correlationId:关联RPC请求和响应的时候很是有用

关联ID

在上面准备的方法中,咱们建议为每个RPC请求建立一个回调队列。这样至关低效,辛运的是有更好的方法,让咱们为每个客户端建立一个回调队列。异步

这样引出了一个新问题,当收到一个响应的时候,它没法清楚的知道响应属于哪个请求。这就是correlationId派上用场的时候。咱们将为每个请求设置一个惟一的关联ID,以后当咱们从回调队列收到一个响应的时候,咱们将检查这个属性,基于此,便能将响应和请求关联起来了。若是发现一个未知的关联ID值,咱们能够安全的销毁消息,由于消息不属于任何一个请求。

你可能会奇怪,为何咱们忽略掉未知关联ID值得消息,而不是用错误来标记失败?这是由于在服务器端可能存在争用条件。尽管不太可能,可是RPC服务器可能在发送了响应消息而未发送消息确认的状况下出现故障,若是出现这样的状况,在RPC服务器重启以后将再次处理该请求。这就是为何咱们必须在客户端优雅的捕获重复的请求,而且RPC理论上应该是幂等的。

总结

                                 

咱们的RPC将这样工做:

  • 当客户端启动时,它会建立一个匿名的独占回调队列。
  • 对于一个RPC请求,客户端经过两个属性发送一条消息:relayTo,设置回调队列;correlationId,为每一个请求设置一个惟一值。
  • 消息将被发送到一个rpc_queue队列。
  • RPC工做线程(即,服务器)在该队列上等待请求。当请求出现,他将处理请求并把结果发回给客户端,使用的队列是在replayTo中设置的。
  • 客户端在回调队列上等待响应,当消息出现,它检查关联ID,若是匹配来自请求的关联ID值,返回消息到该应用程序。

组合在一块儿

斐波那契任务:

1 private static int fib(int n)
2 {
3     if (n == 0 || n == 1) return n;
4     return fib(n - 1) + fib(n - 2);
5 }

咱们定义斐波那契函数,它只采用正整数做为输入。(别期望它能在大数值的状况下工做,并且这多是最慢的一种递归实现)

RPC服务器RPCServer.cs中的代码看起来是这样的:

 1 using System;
 2 using RabbitMQ.Client;
 3 using RabbitMQ.Client.Events;
 4 using System.Text;
 5 
 6 class RPCServer
 7 {
 8     public static void Main()
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using(var connection = factory.CreateConnection())
12         using(var channel = connection.CreateModel())
13         {
14             channel.QueueDeclare(queue: "rpc_queue",
15                                  durable: false,
16                                  exclusive: false,
17                                  autoDelete: false,
18                                  arguments: null);
19             channel.BasicQos(0, 1, false);
20             var consumer = new QueueingBasicConsumer(channel);
21             channel.BasicConsume(queue: "rpc_queue",
22                                  noAck: false,
23                                  consumer: consumer);
24             Console.WriteLine(" [x] Awaiting RPC requests");
25 
26             while(true)
27             {
28                 string response = null;
29                 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
30 
31                 var body = ea.Body;
32                 var props = ea.BasicProperties;
33                 var replyProps = channel.CreateBasicProperties();
34                 replyProps.CorrelationId = props.CorrelationId;
35 
36                 try
37                 {
38                     var message = Encoding.UTF8.GetString(body);
39                     int n = int.Parse(message);
40                     Console.WriteLine(" [.] fib({0})", message);
41                     response = fib(n).ToString();
42                 }
43                 catch(Exception e)
44                 {
45                     Console.WriteLine(" [.] " + e.Message);
46                     response = "";
47                 }
48                 finally
49                 {
50                     var responseBytes = Encoding.UTF8.GetBytes(response);
51                     channel.BasicPublish(exchange: "",
52                                          routingKey: props.ReplyTo,
53                                          basicProperties: replyProps,
54                                          body: responseBytes);
55                     channel.BasicAck(deliveryTag: ea.DeliveryTag,
56                                      multiple: false);
57                 }
58             }
59         }
60     }
61 
62     /// <summary>
63     /// Assumes only valid positive integer input.
64     /// Don't expect this one to work for big numbers,
65     /// and it's probably the slowest recursive implementation possible.
66     /// </summary>
67     private static int fib(int n)
68     {
69         if(n == 0 || n == 1)
70         {
71             return n;
72         }
73 
74         return fib(n - 1) + fib(n - 2);
75     }
76 }

服务端代码至关简单:

  • 一般状况下,咱们都会以建立连接、信道和申明队列做为开始。
  • 咱们可能但愿运行不止一个服务器进程。为了将加载均匀分布到多个服务器,咱们须要将prefetchCount设置为channel.basicQos
  • 咱们使用basicConsume来访问队列。以后进入While循环,等待请求消息,完成工做,而后发回响应。

RPC客户端RPCClient.cs中的代码:

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 using RabbitMQ.Client.Events;
 8 
 9 class RPCClient
10 {
11     private IConnection connection;
12     private IModel channel;
13     private string replyQueueName;
14     private QueueingBasicConsumer consumer;
15 
16     public RPCClient()
17     {
18         var factory = new ConnectionFactory() { HostName = "localhost" };
19         connection = factory.CreateConnection();
20         channel = connection.CreateModel();
21         replyQueueName = channel.QueueDeclare().QueueName;
22         consumer = new QueueingBasicConsumer(channel);
23         channel.BasicConsume(queue: replyQueueName,
24                              noAck: true,
25                              consumer: consumer);
26     }
27 
28     public string Call(string message)
29     {
30         var corrId = Guid.NewGuid().ToString();
31         var props = channel.CreateBasicProperties();
32         props.ReplyTo = replyQueueName;
33         props.CorrelationId = corrId;
34 
35         var messageBytes = Encoding.UTF8.GetBytes(message);
36         channel.BasicPublish(exchange: "",
37                              routingKey: "rpc_queue",
38                              basicProperties: props,
39                              body: messageBytes);
40 
41         while(true)
42         {
43             var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
44             if(ea.BasicProperties.CorrelationId == corrId)
45             {
46                 return Encoding.UTF8.GetString(ea.Body);
47             }
48         }
49     }
50 
51     public void Close()
52     {
53         connection.Close();
54     }
55 }
56 
57 class RPC
58 {
59     public static void Main()
60     {
61         var rpcClient = new RPCClient();
62 
63         Console.WriteLine(" [x] Requesting fib(30)");
64         var response = rpcClient.Call("30");
65         Console.WriteLine(" [.] Got '{0}'", response);
66 
67         rpcClient.Close();
68     }
69 }

客户端的代码要稍微复杂一些:

  • 建立一个连接、信道、为响应申明独占的回调队列。
  • 订阅回调队列,以便接收RPC响应。
  • call方法完成实际的RPC调用。
  • 首先建立一个惟一的关联Id而且保存它,while循环使用它去匹配合适的应答。
  • 接下来,咱们发布请求消息,使用了两个属性:replyTocorrelationId
  • 这时咱们就能够坐等正确的响应到达了。
  • While循环作的事情很是简单,检测每个响应,若是correlactionId是咱们须要的,就保存该响应。
  • 最后,把响应返回给用户。

构建客户端请求:

1 RPCClient fibonacciRpc = new RPCClient();
2 
3 System.out.println(" [x] Requesting fib(30)");
4 String response = fibonacciRpc.call("30");
5 System.out.println(" [.] Got '" + response + "'");
6 
7 fibonacciRpc.close();

如今是时候来看看完整示例的源代码了(包含基本的异常处理)。RPCClient.csRPCServer.cs

编译(参见教程一):

1 $ csc /r:"RabbitMQ.Client.dll" RPCClient.cs
2 $ csc /r:"RabbitMQ.Client.dll" RPCServer.cs

如今RPC服务已经准备就绪,能够启动服务了:

1 $ RPCServer.exe
2  [x] Awaiting RPC requests

运行客户端去请求斐波那契数列:

1 $ RPCClient.exe
2  [x] Requesting fib(30)

这里介绍的设计并不是RPC服务的惟一实现方式,可是它有一些重要的优点:

  • 若是RPC服务太慢,你能够经过运行另一个实例来对其进行横向扩展,试着在一个新的控制台里面运行另外一个服务器。
  • 在客户端,RPC只要求发送和接收一条消息,没有如同declareQueue的同步调用被要求。做为结果,RPC客户端对于一个RPC请求,只须要一个网络往返。

咱们的代码依然很是简单,并无尝试去解决一些复杂(可是重要)的问题,好比:

  • 若是没有运行中的服务器,客户端将做何响应?
  • 客户端对于RPC是否能够有某种形式的超时?
  • 若是服务器发生故障,引起异常,是否应当被转发给客户端?
  • 在处理以前,避免无效的输入数据,好比:检查边界、类型等。

 

若是你想尝试,你能够找到有用的RabbitMQ管理插件去浏览队列。

 

原文连接:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html

相关文章
相关标签/搜索