RabbitMQ学习之Publish/Subscribe(3)

 上一个教程中,咱们建立了一个work queue. 其中的每一个task都会被精确的传送到一个worker. 这节,咱们将会讲把一个message传送到多个consumers. 这种模式叫作publish/subscribe(发布/订阅).html

为了说明这种模式,咱们将建立一个简单的日志系统(logging system. 它由两个程序组成,一个是发送日志message而且另外一个接收。安全

最重要的,发布的日志message将会被广播到全部的receivers服务器

Exchangs

前面咱们讲的包含下面的:producer,queue,consumerspa

它的主要思想是producer毫不直接发送任何messagequeue. 不少状况下,producer甚至不知道一个message是否会被发送到任何queue.3d

如图,它会直接发送messages到一个exchange. 而对于exchange,一方面它接收来自producermessage,另外一方面它把这些message推送到queues. 至于,messages是否会被发送一个特定的queue或者发送到不少queue或者丢弃,这些规则都由exchange type定义。rest

Exchange type: direct , topic , headers , fanout.日志

咱们这节主要讲fanout,它会控制广播。code

channel.ExchangeDeclare("logs", "fanout");

对于fanout exchange ,它会广播它收到的全部的messages 到它知道的全部的queue.htm

Listing exchanges

对于列出服务器上的exchanges , 你可使用rabbitmqctlblog

sudo rabbitmqctl list_exchanges
The default exchange

在前面的教程中,咱们不知道exchanges,可是咱们仍然能够发送messages queues. 由于咱们使用到了一个默认的exchange(a default exchange).这个默认的exchange是被空字符串(“”)定义。

回想下,咱们以前怎样发送message

 var message = GetMessage(args);
    var body = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(exchange: "",  //默认的exchange
                         routingKey: "hello",
                         basicProperties: null,
                         body: body);

此时,messages会根据指定的routingKey被路由到queue.

如今,咱们能够发布到指定的exchange.

var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
                     routingKey: "",
                     basicProperties: null,
                     body: body);

Temporary queues

以前咱们使用过不少指定名称的queues(例如hellotask_queue). 能够命名一个queue是很重要的,咱们能够指定workers到同一个queue。 并且使你能够在多个producersconsumers以前共享这个queue. 

We’re also interested only in currently flowing messages not in the old ones. 咱们想要最新的message而不是仅仅以前的。

这须要解决两个事情。

  1. 首先,不管何时咱们链接Rabbit,咱们须要一个新的,空的queue。为了达到这个目的,咱们能够建立一个带随机名称的queue。更好的办法,咱们可让服务器给咱们选择一个随机的queue名称。
  2. 第二,一旦咱们断开与consumer的链接,这个queue应该被自动删除。 

.NET客户端中,咱们使用下面的语句建立一个带随机名称的queue (when we supply no parameters to QueueDeclare() we create a non-durable, exclusive, autodelete queue with a generated name)

var queueName = channel.QueueDeclare().QueueName;

Bindings

咱们已经建立好了exchangequeue,它们之间的关系咱们叫作binding. 用来告诉exchange发送messagesqueue. 

channel.QueueBind(queue: queueName,  //绑定
                  exchange: "logs",
                  routingKey: "");

如今,在logs exchange上会把messages发到咱们的queue

Listing bindings
rabbitmqctl list_bindings

代码

这种fanout exchanges ,在发送时,会忽视routingKey的值。

EmitLog.cs(发送)

using System;using RabbitMQ.Client;using System.Text;
class EmitLog
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");  //声明exchange

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "logs",  //发送到logs exchange
                                 routingKey: "",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0)
               ? string.Join(" ", args)
               : "info: Hello World!");
    }
}

不容许发送到一个不存在的exchange.

若是没有queue绑定到exchangemessages将会丢失。若是没有consumer正在监听,咱们能够安全的丢弃这些message.

ReceiveLogs.cs

using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;
class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout"); //声明exchange

            var queueName = channel.QueueDeclare().QueueName;  //得到随机queue name
            channel.QueueBind(queue: queueName,  //定义queue和exchange的关系
                              exchange: "logs",
                              routingKey: "");

            Console.WriteLine(" [*] Waiting for logs.");

            var consumer = new EventingBasicConsumer(channel);  //回调
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] {0}", message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

参考网址:

https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html

相关文章
相关标签/搜索