本文翻译自RabbitMQ官网的Go语言客户端系列教程,本文首发于个人我的博客:liwenzhou.com,共分为六篇,本文是第一篇——HelloWorld。html
这些教程涵盖了使用RabbitMQ建立消息传递应用程序的基础知识。
你须要安装RabbitMQ服务器才能完成这些教程,请参阅安装指南或使用Docker镜像。
这些教程的代码是开源的,官方网站也是如此。git
本教程假设RabbitMQ已安装并运行在本机上的标准端口(5672)。若是你使用不一样的主机、端口或凭据,则须要调整链接设置。github
RabbitMQ是一个消息代理:它接受并转发消息。你能够把它想象成一个邮局:当你把你想要邮寄的邮件放进一个邮箱时,你能够肯定邮差先生或女士最终会把邮件送到你的收件人那里。在这个比喻中,RabbitMQ是一个邮箱、一个邮局和一个邮递员。web
RabbitMQ和邮局的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据块——消息。docker
RabbitMQ和通常的消息传递都使用一些术语。小程序
生产仅意味着发送。发送消息的程序是生产者:数组
队列是位于RabbitMQ内部的邮箱的名称。尽管消息经过RabbitMQ和你的应用程序流动,但它们只能存储在队列中。队列只受主机内存和磁盘限制的限制,实际上它是一个大的消息缓冲区。许多生产者能够向一个队列发送消息,而许多消费者能够尝试从一个队列接收数据。如下是咱们表示队列的方式:bash
消费与接收具备类似的含义。消费者是一个主要等待接收消息的程序:服务器
请注意,生产者,消费者和代理(broker)没必要位于同一主机上。实际上,在大多数应用程序中它们不是。一个应用程序既能够是生产者,也能够是消费者。异步
(使用Go RabbitMQ客户端)
在本教程的这一部分中,咱们将在Go中编写两个小程序:发送单个消息的生产者和接收消息并将其打印出来的消费者。咱们将忽略Go-RabbitMQ API中的一些细节,只关注很是简单的事情,以便开始教程。这是一个消息传递版的“Hello World”。
在下图中,“ P”是咱们的生产者,“ C”是咱们的消费者。中间的框是一个队列——RabbitMQ表明消费者保存的消息缓冲区。
Go RabbitMQ客户端库
RabbitMQ讲多种协议。本教程使用amqp0-9-1,这是一个开放的、通用的消息传递协议。RabbitMQ有许多不一样语言的客户端。在本教程中,咱们将使用Go amqp客户端。
首先,使用
go get
安装amqpgo get github.com/streadway/amqp
如今安装好amqp以后,咱们就能够编写一些代码。
咱们将消息发布者(发送者)称为 send.go
,将消息消费者(接收者)称为receive.go
。发布者将链接到RabbitMQ,发送一条消息,而后退出。
在send.go
中,咱们须要首先导入库:
package main import ( "log" "github.com/streadway/amqp" )
咱们还须要一个辅助函数来检查每一个amqp调用的返回值:
func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
而后链接到RabbitMQ服务器
// 1. 尝试链接RabbitMQ,创建链接 // 该链接抽象了套接字链接,并为咱们处理协议版本协商和认证等。 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close()
链接抽象了socket链接,并为咱们处理协议版本协商和认证等。接下来,咱们建立一个通道,这是大多数用于完成任务的API所在的位置:
// 2. 接下来,咱们建立一个通道,大多数API都是用过该通道操做的。 ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close()
要发送,咱们必须声明要发送到的队列。而后咱们能够将消息发布到队列:
// 3. 声明消息要发送到的队列 q, err := ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") body := "Hello World!" // 4.将消息发布到声明的队列 err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message")
声明队列是幂等的——仅当队列不存在时才建立。消息内容是一个字节数组,所以你能够在此处编码任何内容。
上面是咱们的发布者。咱们的消费者监听来自RabbitMQ的消息,所以与发布单个消息的发布者不一样,咱们将使消费者保持运行状态以监听消息并打印出来。
该代码(在receive.go
中)具备与send
相同的导入和帮助功能:
package main import ( "log" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } }
设置与发布者相同;咱们打开一个链接和一个通道,并声明要消耗的队列。请注意,这与send
发布到的队列匹配。
// 创建链接 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 获取channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 声明队列 q, err := ch.QueueDeclare( "hello", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue")
请注意,咱们也在这里声明队列。由于咱们可能在发布者以前启动使用者,因此咱们但愿在尝试使用队列中的消息以前确保队列存在。
咱们将告诉服务器将队列中的消息传递给咱们。因为它将异步地向咱们发送消息,所以咱们将在goroutine中从通道(由amqp::Consume
返回)中读取消息。
// 获取接收消息的Delivery通道 msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever
如今咱们能够运行两个脚本。在一个终端窗口,运行发布者:
go run send.go
而后,运行使用者:
go run receive.go
消费者将打印经过RabbitMQ从发布者那里获得的消息。使用者将持续运行,等待消息(使用Ctrl-C中止它),所以请尝试从另外一个终端运行发布者。
若是要检查队列,请尝试使用rabbitmqctl list_queues
命令。
接下来该继续教程的第二部分并创建一个简单的任务队列。