Go RabbitMQ(三)发布订阅模式

RabbitMQ

  • 在上一节中咱们建立了工做队列,而且假设每个任务都可以准确的到达对应的worker。在本节中咱们将介绍如何将一个消息传递到多个消费者,这也就是所说的发布订阅模式
  • 为了验证该模式咱们使用两个创建一个简单的打印系统,一个负责发出消息,另外一个负责接收并打印。在该系统多个receiver中,其中一个直接将日志写入到硬盘,另外一个负责从屏幕上查看日志
  • 在以前的简介中,咱们能够做如下简单总结:
    • 生产者负责发送消息
    • 队列是一个存储消息的缓冲区
    • 消费者负责接收消息

RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列,实际上,一般生产者甚至不知道消息是否被传递到某个队列。git

相反,生产者只能向交换器发送消息。交换器一边接收来自生产者发布的消息一边将消息放入到队列当中。能够经过exchangeType来设置交换器对消息的处理,好比拼接到指定的队列,或是拼接到多个队列中,或是丢弃。github

exchange Type有如下几种:direct,topic,headers,fanout。咱们先使用最后一种建立相应的交换器并取名logs:日志

err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)

fanout模式就是广播全部接收到的消息到它已知的全部队列当中code

使用如下命令能够罗列RabbitMQ中全部的交换器:
sudo rabbitmqctl list_exchanges

在以前的例子中咱们没有使用交换器可是依旧能够发送消息到队列当中,说明咱们已经使用了默认的交换器,咱们能够看下之前的代码:router

err = ch.Publish(
  "",     // exchange
  q.Name, // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(body),
})

在这里咱们使用了默认的交换器:消息将被依据routering_key指定的名字路由到队列中.rabbitmq

一旦咱们定义好了交换器,则能够在生产者发送消息的时候使用:队列

err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)
failOnError(err, "Failed to declare an exchange")

body := bodyFrom(os.Args)
err = ch.Publish(
  "logs", // exchange
  "",     // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing{
          ContentType: "text/plain",
          Body:        []byte(body),
  })

临时队列

咱们想要获取全部日志消息不仅是子集,同时咱们只对当前的信息流感兴趣,为了解决这个问题咱们须要两个东西:路由

首先,咱们须要一个新的空的队列无论咱们是否有连接Rabbit,咱们可使用一个随机名字建立一个队列,或是让系统指定给咱们string

其次,一旦咱们断开与消费者的连接,队列必须自动删除。it

在amqp客户端中,当咱们使用一个空的名字建立一个队列的时候:

q, err := ch.QueueDeclare(
  "",    // name
  false, // durable
  false, // delete when usused
  true,  // exclusive
  false, // no-wait
  nil,   // arguments
)

当咱们获得其返回的队列的时候,队列实例将会包含一个由RabbitMQ产生的名字,差很少这个样子:amq.gen-JzTY20BRgKO-HjmUJj0wLg
当咱们连接关闭的时候,队列将被删除由于它被声明为exclusive

绑定

在前面咱们已经建立了一个fanout类型的交换器和一个队列,接下来咱们咱们须要让交换器将消息发送到咱们队列中,将交换器(exchange)和队列(queue)关联起来称为绑定

err = ch.QueueBind(
  q.Name, // 队列名 name
  "",     // routing key
  "logs", // 交换器名
  false,
  nil
)

通过以上关联以后,logs交换器就会将消息拼接到咱们的队列当中。

罗列出全部的绑定:
rabbitmqctl list_bindings

完整代码以下:

emit.go

package main

import (
        "fmt"
        "log"
        "os"
        "strings"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        err = ch.ExchangeDeclare(
                "logs",   // name
                "fanout", // type
                true,     // durable
                false,    // auto-deleted
                false,    // internal
                false,    // no-wait
                nil,      // arguments
        )
        failOnError(err, "Failed to declare an exchange")

        body := bodyFrom(os.Args)
        err = ch.Publish(
                "logs", // exchange
                "",     // routing key
                false,  // mandatory
                false,  // immediate
                amqp.Publishing{
                        ContentType: "text/plain",
                        Body:        []byte(body),
                })
        failOnError(err, "Failed to publish a message")

        log.Printf(" [x] Sent %s", body)
}

func bodyFrom(args []string) string {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "hello"
        } else {
                s = strings.Join(args[1:], " ")
        }
        return s
}

receive.go

package main

import (
    "github.com/streadway/amqp"
    "log"
)

func main() {
    conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    DealWithError(err,"Failed to connect to RabbitMQ")
    defer conn.Close()

    ch,err := conn.Channel()
    DealWithError(err,"Failed to open a channel")
    defer ch.Close()
    //声明交换器
    ch.ExchangeDeclare(
        "logs",
        "fanout",
        true,
        false,
        false,
        false,
        nil,
        )
    DealWithError(err,"Failed to declare an exchange")
    //声明了队列
    q,err := ch.QueueDeclare(
        "", //队列名字为rabbitMQ自动生成
        false,
        false,
        true,
        false,
        nil,
        )
    DealWithError(err,"Failed to declare an exchange")
    //交换器跟队列进行绑定,交换器将接收到的消息放进队列中
    err = ch.QueueBind(
        q.Name,
        "",
        "logs",
        false,
        nil,
        )
    DealWithError(err,"Failed to bind a queue")
    msgs,err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
        )
    DealWithError(err,"Failed to register a consumer")
    forever := make(chan bool)
    go func() {
        for d := range msgs{
            log.Printf(" [x] %s",d.Body)
        }
    }()
    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    <-forever
}

func DealWithError(err error,msg string)  {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}
相关文章
相关标签/搜索