RPC(Remote Procedure Call Protocol)——远程过程调用协议:它是一种经过网络从远程计算机程序上请求服务,而不须要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通讯程序之间携带信息数据。在OSI网络通讯模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。编程
PRC采用客户端/服务端模式,请求程序就是一个客户机,而服务提供就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,而后等待应答信息,在服务器,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器得到进程参数,计算结果,发送答复消息,而后等待下一个调用信息,最后,客户端调用进程接收答复信息,得到进程结果,而后调用执行继续前进。(整个过程有点相似:你到某大医院看病,你先到柜台交钱拿卡(医师费),拿卡去找医生(卡表明你的认证至关参数),医生根据卡给你把脉看病进行详谈沟通,医诊结束后给你开药,下一位患者进入。。。。。,医生有是服务端,患者是客户端,举例可能有点牵强,就是表达那个意思,)服务器
RPC是在计算机中一种常见的模式,是一般我要用消息队列3个关键点:网络
1、服务的寻址; 2、消息的接受; 3、消息的关联。
1、当客户端启动时,它会建立一个匿名的独占会回调队列; 2、对于一个RPC请求,客户端经过两个属性发送一条消息(从图中咱们也能够看到):relayTo 设置回调队列;correlationId,为每一个请求设置惟一的标识ID; 3、消息将发送到一个Rpc_queue 队列; 4、RPC工程线程(服务器)在该队列上等待请求,当请求出现,他将处理请求并把结果发回到客户端,使用队列在replayTo中设置; 5、客户端在回调队列上等待响应,当消息出现,它检查关联ID,若是匹配来自请求的关联ID值,返回队列消息到该应用程序。
correlationId 和 relayTo 参数异步
首先客户端经过RPC向服务端发送请求分布式
我这里有一堆东西须要你给我处理一下,correlationId :这是个人请求标识,relayTo :你处理完事后把结果返回到这个队列中。
服务端拿到请求,并开始处理并返回结果ui
correlationId :这是你的请求标识 ,原封不动的给你。 这时候客户端用本身的correlationId 与服务端返回的id进行对比。是个人,就接收。
但愿同步获得数据的场合,RPC合适;
但愿使用简单,则RPC;RPC操做基于接口,使用简单,使用方式模式本地调用。异步的方式编程比较复杂。
不但愿客户端受限于服务端的速度等,可使用Message Queue
RabbitMQ RPC的特色spa
Message Queue 把全部的请求消息存储起来,而后处理,和客户端解耦;
Message Queue 引用新的结点,系统的可靠性会受Message Queue 结点的影响;
Meaage Queue 是异步单向的消息,发送消息设计成是不须要等待消息处理的完成。
因此对于有同步返回需求,Message Queue 是个不错的方向
同步调用,对于要等待返回结果、处理结果的场景,RPC是能够很是天然直觉的使用方式,固然RPC也能够异步调用。
因为等待结果,客户端会有线程消耗。
若是以异步RPC的方式使用,客户端线程消耗能够去掉,但不能作到像消息同样暂存消息请求,压力会直接传导到服务端。
备注(建立两个解决方案:服务端和客户端)pwa
static void Main(string[] args) { using (var channel = GetConnection().CreateModel()) { channel.QueueDeclare("rpc_queue", true, false, false, null); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); // var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume("rpc_queue", false, consumer); Console.WriteLine("等待 RPC 队列"); consumer.Received += (model, ea) => { // while (true) // { string response = null; //出列 // var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body; var props = ea.BasicProperties;
//内容的基本属性 var replyProps = channel.CreateBasicProperties();
//注意这里的correlationId replyProps.CorrelationId = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine("显示内容" + message); response = fib(n).ToString(); } catch (Exception e) { Console.WriteLine("报错" + e.ToString()); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish("", props.ReplyTo, replyProps, responseBytes); channel.BasicAck(ea.DeliveryTag, false); } // }; }; Console.WriteLine("发布成功!!!"); Console.ReadLine(); } } /// <summary> /// 私有方法 /// </summary> /// <param name="n"></param> /// <returns></returns> private static int fib(int n) { if (n == 0 || n == 1) { return n; } //Thread.Sleep()方法用于将当前线程休眠必定时间 时间单位是毫秒 1000毫秒= 1秒 //System.Threading.Thread.Sleep(2000);当前休眠2秒 //suspen()挂起当前线程。也能够指定挂起时间。 //close() 关闭当前线程。 Thread.Sleep(100 * 10); return n; // return fib(n - 1) + fib(n - 2); }
static void Main(string[] args) { for (int i = 0; i < 30; i++) { Stopwatch watch = new Stopwatch(); watch.Start(); var rpcClient = new HelpConnection(); Console.WriteLine("显示内容" + i.ToString()); var response = rpcClient.Call(i); Console.WriteLine("显示内容" + response); //当前链接关闭 rpcClient.Close(); watch.Stop(); Console.WriteLine(string.Format(" [x] Requesting complete {0} ,cost {1} ms", i, watch.Elapsed.TotalMilliseconds)); } Console.WriteLine(" complete!!!! "); Console.ReadLine(); }
/// <summary> /// 成员变量 /// </summary> private static IConnection connection { get; set; } private IModel channel { get; set; } private string replyQueueName { get; set; } private QueueingBasicConsumer consumer { get; set; } /// <summary> /// 构造方法:链接配置 /// </summary> public HelpConnection() { var factory = new ConnectionFactory() { //计算机名称,帐号,密码, HostName = "localhost", UserName = "zhangguangpo", Password = "guangpo1992", //RequestedHeartbeat = 60, AutomaticRecoveryEnabled = true //要启用自动链接恢复 }; //建立链接 connection = factory.CreateConnection(); channel = connection.CreateModel(); //而客户端为了得到处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to replyQueueName = channel.QueueDeclare().QueueName; consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue: replyQueueName, noAck: true, consumer: consumer); // return Connection; } /// <summary> /// 消息判断 /// </summary> /// <param name="message"></param> /// <returns></returns> public string Call(int message) { var corrId = Guid.NewGuid().ToString(); var props = channel.CreateBasicProperties(); props.ReplyTo = replyQueueName; props.CorrelationId = corrId; var messageBates = Encoding.UTF8.GetBytes(message.ToString()); channel.BasicPublish("", "rpc_queue", props, messageBates); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); if (ea.BasicProperties.CorrelationId == corrId) { var body = Encoding.UTF8.GetString(ea.Body); return body; } } } /// <summary> /// 当前链接关闭 /// </summary> public void Close() { connection.Close(); }
效果图线程