生产者和消费者启动之后,都有一个接收事件,消费者是接收事件是处理调用方法之后等待生产者的返回,生产者的接收事件是处理接收生产者发送的消息,进行处理。消费者发送的时候要在回调队列中加入一个标识,标明是哪一个方法进行的调用 。生产者接收到消费之后,若是发现有消息标识 ,把消息标识继续返回去,这样消费者能够保证接收到的消息是哪一个方法调用的c#
主方法经过实例化rpcclient,而后调用rpcclient里面的方法,得到结果之后关闭安全
rpcclient的逻辑以下函数
static void Main(string[] args) { //TopicMessageTest(); RpcClient rpc = new RpcClient(); Console.WriteLine("开始启动"); var response = rpc.Call("30"); Console.WriteLine("the result is :"+response); rpc.Close(); Console.WriteLine("调用结束"); Console.ReadLine(); }
public class RpcClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; private readonly IBasicProperties props; private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); //线程安全的集合 public RpcClient() { var factory = new ConnectionFactory() { HostName="localhost"}; //建立一个实例 connection = factory.CreateConnection(); //建立链接 channel = connection.CreateModel(); //建立信道 replyQueueName = channel.QueueDeclare().QueueName; //建立队列 consumer = new EventingBasicConsumer(channel);//经过指定的model初台化消费者 props = channel.CreateBasicProperties(); var relationId = Guid.NewGuid().ToString(); props.CorrelationId = relationId;//应用相关标识 props.ReplyTo = replyQueueName; //回复队列指定 consumer.Received += (sender,e)=> { var body = e.Body; var response = Encoding.UTF8.GetString(body); if (e.BasicProperties.CorrelationId == relationId) { respQueue.Add(response); } }; } public string Call(string message) { var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "rpcqueue", basicProperties: props, body: messageBytes); channel.BasicConsume(consumer: consumer, queue: replyQueueName, autoAck: true); return respQueue.Take(); }
static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "rpcqueue",durable:false,exclusive:false,autoDelete:false,arguments:null); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "rpcqueue", autoAck: false,consumer:consumer); Console.WriteLine("Waiting rpc requesting"); consumer.Received += (sender, e) => { string response = null; var body = e.Body; var props = e.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine("request message is :" + message); response = fib(n).ToString(); var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false); }; Console.WriteLine("over"); Console.ReadLine(); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static int fib(int n) { if (n == 0 || n == 1) return n; return fib(n - 1) + fib(n - 2); }
我对个人文章负责,发现好多网上的文章 没有实践,都发出来的,让人走不少弯路,若是你在个人文章中遇到没法实现,或者没法走通的问题。能够直接在公众号《爱码农爱生活 》留言。一定会再次复查缘由。让每一篇 文章的流程都能顺利实现。测试