RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列,实际上,一般生产者甚至不知道消息是否被传递到某个队列。git
相反,生产者只能向交换器发送消息。交换器一边接收来自生产者发布的消息一边将消息放入到队列当中。能够经过exchangeType来设置交换器对消息的处理,好比拼接到指定的队列,或是拼接到多个队列中,或是丢弃。github
exchange Type有如下几种:direct,topic,headers,fanout。咱们先使用最后一种建立相应的交换器并取名logs:日志
err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments )
fanout模式就是广播全部接收到的消息到它已知的全部队列当中code
使用如下命令能够罗列RabbitMQ中全部的交换器: sudo rabbitmqctl list_exchanges
在以前的例子中咱们没有使用交换器可是依旧能够发送消息到队列当中,说明咱们已经使用了默认的交换器,咱们能够看下之前的代码:router
err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
在这里咱们使用了默认的交换器:消息将被依据routering_key指定的名字路由到队列中.rabbitmq
一旦咱们定义好了交换器,则能够在生产者发送消息的时候使用:队列
err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs", // exchange "", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })
咱们想要获取全部日志消息不仅是子集,同时咱们只对当前的信息流感兴趣,为了解决这个问题咱们须要两个东西:路由
首先,咱们须要一个新的空的队列无论咱们是否有连接Rabbit,咱们可使用一个随机名字建立一个队列,或是让系统指定给咱们string
其次,一旦咱们断开与消费者的连接,队列必须自动删除。it
在amqp客户端中,当咱们使用一个空的名字建立一个队列的时候:
q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments )
当咱们获得其返回的队列的时候,队列实例将会包含一个由RabbitMQ产生的名字,差很少这个样子:amq.gen-JzTY20BRgKO-HjmUJj0wLg
当咱们连接关闭的时候,队列将被删除由于它被声明为exclusive
在前面咱们已经建立了一个fanout类型的交换器和一个队列,接下来咱们咱们须要让交换器将消息发送到咱们队列中,将交换器(exchange)和队列(queue)关联起来称为绑定
err = ch.QueueBind( q.Name, // 队列名 name "", // routing key "logs", // 交换器名 false, nil )
通过以上关联以后,logs交换器就会将消息拼接到咱们的队列当中。
罗列出全部的绑定: rabbitmqctl list_bindings
完整代码以下:
emit.go
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) err = ch.Publish( "logs", // exchange "", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s }
receive.go
package main import ( "github.com/streadway/amqp" "log" ) func main() { conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/") DealWithError(err,"Failed to connect to RabbitMQ") defer conn.Close() ch,err := conn.Channel() DealWithError(err,"Failed to open a channel") defer ch.Close() //声明交换器 ch.ExchangeDeclare( "logs", "fanout", true, false, false, false, nil, ) DealWithError(err,"Failed to declare an exchange") //声明了队列 q,err := ch.QueueDeclare( "", //队列名字为rabbitMQ自动生成 false, false, true, false, nil, ) DealWithError(err,"Failed to declare an exchange") //交换器跟队列进行绑定,交换器将接收到的消息放进队列中 err = ch.QueueBind( q.Name, "", "logs", false, nil, ) DealWithError(err,"Failed to bind a queue") msgs,err := ch.Consume( q.Name, "", true, false, false, false, nil, ) DealWithError(err,"Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs{ log.Printf(" [x] %s",d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever } func DealWithError(err error,msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }