AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息的网络协议。服务器
RabbitMQ 就是 amqp 协议的Erlang的实现。网络
AMQP的模型架构的主要角色,生产者、消费者、交换器、队列。架构
Rabbitmq中须要路由键和绑定键联合使用才能使生产者成功投递到队列中去。异步
生产者将消息投递到交换器,经过交换器绑定的队列,最终投递到对应的队列中去。性能
Rabbitmq共有4种交换器测试
topic 规则匹配,BindingKey中存在两种特殊字符ui
*
匹配零个或多个单词#
匹配一个单词在Golang中建立rabbitmq 生产者基本步骤是:spa
// connection connection, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { panic(err) } // channel channel, err := connection.Channel() if err != nil { panic(err) }
if err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil); err != nil { panic(err) }
参数解析:code
参数说明要点:blog
自动删除功能必需要在交换器曾经绑定过队列或者交换器的状况下,处于再也不使用的时候才会自动删除,
若是是刚刚建立的还没有绑定队列或者交换器的交换器或者早已建立只是未进行队列或者交换器绑定的交换器是不会自动删除的。
内置交换器是一种特殊的交换器,这种交换器不能直接接收生产者发送的消息,
只能做为相似于队列的方式绑定到另外一个交换器,来接收这个交换器中路由的消息,
内置交换器一样能够绑定队列和路由消息,只是其接收消息的来源与普通交换器不一样。
当noWait为true时,声明时无需等待服务器的确认。
该通道可能因为错误而关闭。 添加一个NotifyClose侦听器应对任何异常。
建立交换器还有一个差很少的方法(ExchangeDeclarePassive),他主要是假定交换已存在,并尝试链接到
不存在的交换将致使RabbitMQ引起异常,可用于检测交换的存在。
if _, err := channel.QueueDeclare("q1", true, false, false, true, nil); err != nil { panic(err) }
参数解析:
参数说明要点:
排他队列只对首次建立它的链接可见,排他队列是基于链接(Connection)可见的,而且该链接内的全部信道(Channel)均可以访问这个排他队列,在这个链接断开以后,该队列自动删除,因而可知这个队列能够说是绑到链接上的,对同一服务器的其余链接不可见。
同一链接中不容许创建同名的排他队列的
这种排他优先于持久化,即便设置了队列持久化,在链接断开后,该队列也会自动删除。
非排他队列不依附于链接而存在,同一服务器上的多个链接均可以访问这个队列。
为true则设置队列为自动删除。
自动删除的前提是:至少有一个消费者链接到这个队列,以后全部与这个队列链接的消费者都断开时,才会自动删除。
不能把这个参数错误地理解为:"当链接到此队列的全部客户端断开时,这个队列自动删除",由于生产者客户端建立这个队列,或者没有消费者客户端与这个队列链接时,都不会自动删除这个队列。
建立队列还有一个差很少的方法(QueueDeclarePassive),他主要是假定队列已存在,并尝试链接到
不存在的队列将致使RabbitMQ引起异常,可用于检测队列的存在。
if err = channel.QueueBind("q1", "q1Key", "e1", true, nil); err != nil { panic(err) }
参数解析:
if err = channel.ExchangeBind("dest", "q1Key", "src", false, nil); err != nil { panic(err) }
参数解析:
生产者发送消息至交换器source中,交换器source根据路由键找到与其匹配的另外一个交换器destination,井把消息转发到destination中,进而存储在.destination绑定的队列queue中,某种程度上来讲destination交换器能够看做一个队列。如图:
if err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{ Timestamp: time.Now(), ContentType: "text/plain", Body: []byte("Hello Golang and AMQP(Rabbitmq)!"), }); err != nil { panic(err) }
参数解析:
参数说明要点:
消息发布的时候设置消息的 mandatory 属性用于设置消息在发送到交换器以后没法路由到队列的状况对消息的处理方式,
设置为 true 表示将消息返回到生产者,不然直接丢弃消息。
参数告诉服务器至少将该消息路由到一个队列中,不然将消息返回给生产者。imrnediate参数告诉服务器,若是该消息关联的队列上有消费者,则马上投递:若是全部匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。
RabbitMQ 3.0版本开始去掉了对imrnediate参数的支持
Rabbitmq消费方式共有2种,分别是推模式和拉模式
推模式是经过持续订阅的方式来消费信息,
Consume将信道(Channel)直为接收模式,直到取消队列的订阅为止。在接收模式期间,RabbitMQ会不断地推送消息给消费者。
推送消息的个数仍是会受到channel.Qos的限制
deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil) if err != nil { panic(err) }
若是ack设置为false则表示须要手动进行ack消费
v, ok := <-deliveries if ok { // 手动ack确认 // 注意: 这里只要调用了ack就是手动确认模式, // multiple 表示的是在此channel中先前全部未确认的deliveries都将被确认 // 并非表示设置为false就不进行当前ack确认 if err := v.Ack(true); err != nil { fmt.Println(err.Error()) } } else { fmt.Println("Channel close") }
参数解析:
参数说明要点:
设置为true则表示不能将同一个Connection中生产者发送的消息传送给这个Connection中的消费者
拉模式:
相对来讲比较简单,是由消费者主动拉取信息来消费,一样也须要进行ack确认消费
channel.Get(queue string, autoAck bool)
下面是一个简单示例,只是为了通讯测试,单条数据收发
func Connection() (*amqp.Connection) { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") if err != nil { panic(err) } return conn } func Sample() { var wg sync.WaitGroup wg.Add(1) go SampleConsumption(&wg) // 建立链接 connection := Connection() defer connection.Close() // 开启 channel channel, err := connection.Channel() if err != nil { panic(err) } defer channel.Close() if err = channel.ExchangeDeclare("e1", "direct", true, false, false, true, nil); err != nil { panic(err) } if _, err := channel.QueueDeclare("q1", true, false, false, true, nil); err != nil { panic(err) } if err = channel.QueueBind("q1", "q1Key", "e1", true, nil); err != nil { panic(err) } // mandatory true 未找到队列返回给消费者 returnChan := make(chan amqp.Return,0) channel.NotifyReturn(returnChan) // Publish if err = channel.Publish("e1", "q1Key", true, false, amqp.Publishing{ Timestamp: time.Now(), ContentType: "text/plain", Body: []byte("Hello Golang and AMQP(Rabbitmq)!"), }); err != nil { panic(err) } //for v := range returnChan{ // fmt.Printf("Return %#v\n",v) //} wg.Wait() } func SampleConsumption(wg *sync.WaitGroup) { connection := Connection() defer connection.Close() channel, err := connection.Channel() if err != nil { panic(err) } defer channel.Close() deliveries, err := channel.Consume("q1", "any", false, false, false, true, nil) if err != nil { panic(err) } // 这里只取一条,由于product只发一条 v, ok := <-deliveries if ok { if err := v.Ack(true); err != nil { fmt.Println(err.Error()) } } else { fmt.Println("Channel close") } wg.Done() }