在教程二中,咱们学习了如何使用工做队列在多个工做线程中分发耗时的任务。但若是咱们须要去执行远程机器上的方法而且等待结果会怎么样呢?那又是另一回事了。这种模式一般被称为远程过程调用(RPC)。html
本教程中咱们将使用RabbitMQ构建一个远程过程调用系统:一个客户端和一个可扩展的服务器。因为没有什么耗时的任务值得分发,咱们将建立一个虚拟的RPC服务用于返回斐波那契数列。git
为了阐释如何使用RPC服务咱们将建立一个简单的客户端类。类中奖公开一个方法用于发送一个RPC请求,而后阻塞知道收到应答,方法名称叫作call:程序员
1 var rpcClient = new RPCClient(); 2 3 Console.WriteLine(" [x] Requesting fib(30)"); 4 var response = rpcClient.Call("30"); 5 Console.WriteLine(" [.] Got '{0}'", response); 6 7 rpcClient.Close();
RPC注记github
尽管RPC在计算机技术中是一种很是常见的模式,可是它却饱受批判,问题发生在程序员不知道一个调用是本地的仍是一个耗时的RPC。这样的混乱,致使不可预知的系统,并将没必要要的复杂性调价到调试过程当中。误用RPC将致使不可维护的混乱的代码,而不是简化软件。json
铭记这些限制,考虑下面的建议:安全
应该在不能肯定的时候避免使用RPC,若是能够的话,你可使用异步管道,而不是类RPC的阻塞,结果被异步推送到下一个计算阶段。服务器
通常来讲,在RabbitMQ之上构建RPC很是的容易,客户端发送请求消息,服务返回应答消息。为了可以接收到应答的消息,咱们须要在请求时指定一个回调队列地址:网络
1 var corrId = Guid.NewGuid().ToString(); 2 var props = channel.CreateBasicProperties(); 3 props.ReplyTo = replyQueueName; 4 props.CorrelationId = corrId; 5 6 var messageBytes = Encoding.UTF8.GetBytes(message); 7 channel.BasicPublish(exchange: "", 8 routingKey: "rpc_queue", 9 basicProperties: props, 10 body: messageBytes); 11 12 // ... 而后是从回调队列中读取消息的代码 ...
AMQP协议预约义了一个包含14个属性的属性集做用于消息之上,大多数都不多使用,除了下面这些:app
在上面准备的方法中,咱们建议为每个RPC请求建立一个回调队列。这样至关低效,辛运的是有更好的方法,让咱们为每个客户端建立一个回调队列。异步
这样引出了一个新问题,当收到一个响应的时候,它没法清楚的知道响应属于哪个请求。这就是correlationId派上用场的时候。咱们将为每个请求设置一个惟一的关联ID,以后当咱们从回调队列收到一个响应的时候,咱们将检查这个属性,基于此,便能将响应和请求关联起来了。若是发现一个未知的关联ID值,咱们能够安全的销毁消息,由于消息不属于任何一个请求。
你可能会奇怪,为何咱们忽略掉未知关联ID值得消息,而不是用错误来标记失败?这是由于在服务器端可能存在争用条件。尽管不太可能,可是RPC服务器可能在发送了响应消息而未发送消息确认的状况下出现故障,若是出现这样的状况,在RPC服务器重启以后将再次处理该请求。这就是为何咱们必须在客户端优雅的捕获重复的请求,而且RPC理论上应该是幂等的。
咱们的RPC将这样工做:
斐波那契任务:
1 private static int fib(int n) 2 { 3 if (n == 0 || n == 1) return n; 4 return fib(n - 1) + fib(n - 2); 5 }
咱们定义斐波那契函数,它只采用正整数做为输入。(别期望它能在大数值的状况下工做,并且这多是最慢的一种递归实现)
RPC服务器RPCServer.cs中的代码看起来是这样的:
1 using System; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using System.Text; 5 6 class RPCServer 7 { 8 public static void Main() 9 { 10 var factory = new ConnectionFactory() { HostName = "localhost" }; 11 using(var connection = factory.CreateConnection()) 12 using(var channel = connection.CreateModel()) 13 { 14 channel.QueueDeclare(queue: "rpc_queue", 15 durable: false, 16 exclusive: false, 17 autoDelete: false, 18 arguments: null); 19 channel.BasicQos(0, 1, false); 20 var consumer = new QueueingBasicConsumer(channel); 21 channel.BasicConsume(queue: "rpc_queue", 22 noAck: false, 23 consumer: consumer); 24 Console.WriteLine(" [x] Awaiting RPC requests"); 25 26 while(true) 27 { 28 string response = null; 29 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 30 31 var body = ea.Body; 32 var props = ea.BasicProperties; 33 var replyProps = channel.CreateBasicProperties(); 34 replyProps.CorrelationId = props.CorrelationId; 35 36 try 37 { 38 var message = Encoding.UTF8.GetString(body); 39 int n = int.Parse(message); 40 Console.WriteLine(" [.] fib({0})", message); 41 response = fib(n).ToString(); 42 } 43 catch(Exception e) 44 { 45 Console.WriteLine(" [.] " + e.Message); 46 response = ""; 47 } 48 finally 49 { 50 var responseBytes = Encoding.UTF8.GetBytes(response); 51 channel.BasicPublish(exchange: "", 52 routingKey: props.ReplyTo, 53 basicProperties: replyProps, 54 body: responseBytes); 55 channel.BasicAck(deliveryTag: ea.DeliveryTag, 56 multiple: false); 57 } 58 } 59 } 60 } 61 62 /// <summary> 63 /// Assumes only valid positive integer input. 64 /// Don't expect this one to work for big numbers, 65 /// and it's probably the slowest recursive implementation possible. 66 /// </summary> 67 private static int fib(int n) 68 { 69 if(n == 0 || n == 1) 70 { 71 return n; 72 } 73 74 return fib(n - 1) + fib(n - 2); 75 } 76 }
服务端代码至关简单:
RPC客户端RPCClient.cs中的代码:
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 using RabbitMQ.Client; 7 using RabbitMQ.Client.Events; 8 9 class RPCClient 10 { 11 private IConnection connection; 12 private IModel channel; 13 private string replyQueueName; 14 private QueueingBasicConsumer consumer; 15 16 public RPCClient() 17 { 18 var factory = new ConnectionFactory() { HostName = "localhost" }; 19 connection = factory.CreateConnection(); 20 channel = connection.CreateModel(); 21 replyQueueName = channel.QueueDeclare().QueueName; 22 consumer = new QueueingBasicConsumer(channel); 23 channel.BasicConsume(queue: replyQueueName, 24 noAck: true, 25 consumer: consumer); 26 } 27 28 public string Call(string message) 29 { 30 var corrId = Guid.NewGuid().ToString(); 31 var props = channel.CreateBasicProperties(); 32 props.ReplyTo = replyQueueName; 33 props.CorrelationId = corrId; 34 35 var messageBytes = Encoding.UTF8.GetBytes(message); 36 channel.BasicPublish(exchange: "", 37 routingKey: "rpc_queue", 38 basicProperties: props, 39 body: messageBytes); 40 41 while(true) 42 { 43 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 44 if(ea.BasicProperties.CorrelationId == corrId) 45 { 46 return Encoding.UTF8.GetString(ea.Body); 47 } 48 } 49 } 50 51 public void Close() 52 { 53 connection.Close(); 54 } 55 } 56 57 class RPC 58 { 59 public static void Main() 60 { 61 var rpcClient = new RPCClient(); 62 63 Console.WriteLine(" [x] Requesting fib(30)"); 64 var response = rpcClient.Call("30"); 65 Console.WriteLine(" [.] Got '{0}'", response); 66 67 rpcClient.Close(); 68 } 69 }
客户端的代码要稍微复杂一些:
构建客户端请求:
1 RPCClient fibonacciRpc = new RPCClient(); 2 3 System.out.println(" [x] Requesting fib(30)"); 4 String response = fibonacciRpc.call("30"); 5 System.out.println(" [.] Got '" + response + "'"); 6 7 fibonacciRpc.close();
如今是时候来看看完整示例的源代码了(包含基本的异常处理)。RPCClient.cs和RPCServer.cs。
编译(参见教程一):
1 $ csc /r:"RabbitMQ.Client.dll" RPCClient.cs 2 $ csc /r:"RabbitMQ.Client.dll" RPCServer.cs
如今RPC服务已经准备就绪,能够启动服务了:
1 $ RPCServer.exe 2 [x] Awaiting RPC requests
运行客户端去请求斐波那契数列:
1 $ RPCClient.exe 2 [x] Requesting fib(30)
这里介绍的设计并不是RPC服务的惟一实现方式,可是它有一些重要的优点:
咱们的代码依然很是简单,并无尝试去解决一些复杂(可是重要)的问题,好比:
若是你想尝试,你能够找到有用的RabbitMQ管理插件去浏览队列。
原文连接:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html