本文翻译自RabbitMQ官网的Go语言客户端系列教程,本文首发于个人我的博客:liwenzhou.com,教程共分为六篇,本文是第六篇——RPC。html
这些教程涵盖了使用RabbitMQ建立消息传递应用程序的基础知识。
你须要安装RabbitMQ服务器才能完成这些教程,请参阅安装指南或使用Docker镜像。
这些教程的代码是开源的,官方网站也是如此。git
本教程假设RabbitMQ已安装并运行在本机上的标准端口(5672)。若是你使用不一样的主机、端口或凭据,则须要调整链接设置。程序员
(使用Go RabbitMQ客户端)github
在第二个教程中,咱们学习了如何使用工做队列在多个worker之间分配耗时的任务。web
可是,若是咱们须要在远程计算机上运行函数并等待结果怎么办?好吧,那是一个不一样的故事。这种模式一般称为远程过程调用或RPC。docker
在本教程中,咱们将使用RabbitMQ构建一个RPC系统:客户端和可伸缩RPC服务器。因为咱们没有值得分配的耗时任务,所以咱们将建立一个虚拟RPC服务,该服务返回斐波那契数。json
有关RPC的说明bash
尽管RPC是计算中很是常见的模式,但它常常受到批评。服务器
当程序员不知道函数调用是本地的仍是缓慢的RPC时,就会出现问题。这样的混乱会致使系统变幻莫测,并给调试增长了没必要要的复杂性。滥用RPC可能会致使没法维护的意大利面条式代码而不是简化软件,网络
牢记这一点,请考虑如下建议:
- 肯定哪一个函数调用是本地的,哪一个是远程的。
- 为你的系统编写文档。明确组件之间的依赖关系。
- 处理错误状况。 当RPC服务器长时间关闭时,客户端应如何处理?
一般,经过RabbitMQ进行RPC很容易。客户端发送请求消息,服务器发送响应消息。为了接收响应,咱们须要发送带有“回调”队列地址的请求。咱们可使用默认队列。让咱们尝试一下:
q, err := ch.QueueDeclare( "", // 不指定队列名,默认使用随机生成的队列名 false, // durable false, // delete when unused true, // exclusive false, // noWait nil, // arguments ) err = ch.Publish( "", // exchange "rpc_queue", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: corrId, ReplyTo: q.Name, // 在这里指定callback队列名,也是在这个队列等回复 Body: []byte(strconv.Itoa(n)), })
消息属性
AMQP 0-9-1协议预约义了消息附带的14个属性集。除如下属性外,大多数属性不多使用:
persistent
:将消息标记为持久性(值为true
)或瞬态(false
)。你可能还记得第二个教程中的此属性。content_type
:用于描述编码的mime类型。例如,对于常用的JSON编码,将此属性设置为application/ json
是一个好习惯。reply_to
:经常使用于命名回调队列correlation_id
:有助于将RPC响应与请求相关联
在上面介绍的方法中,咱们建议为每一个RPC请求建立一个回调队列。这是至关低效的,可是幸运的是,有一种更好的方法——让咱们为每一个客户端建立一个回调队列。
这就引起了一个新问题,在该队列中收到响应后,尚不清楚响应属于哪一个请求。这个时候就该使用correlation_id
这个属性了。针对每一个请求咱们将为其设置一个惟一值。随后,当咱们在回调队列中收到消息时,咱们将查看该属性,并基于这个属性将响应与请求进行匹配。若是咱们看到未知的correlation_id
值,则能够放心地丢弃该消息——它不属于咱们的请求。
你可能会问,为何咱们应该忽略回调队列中的未知消息,而不是报错而失败?这是因为服务器端可能出现竞争情况。尽管可能性不大,但RPC服务器可能会在向咱们发送答案以后但在发送请求的确认消息以前死亡。若是发生这种状况,从新启动的RPC服务器将再次处理该请求。这就是为何在客户端上咱们必须妥善处理重复的响应,而且理想状况下RPC应该是幂等的。
咱们的RPC工做流程以下:
reply_to
(设置为回调队列)和correlation_id
(设置为每一个请求的惟一值)。rpc_queue
队列。replay_to
字段中的队列发回给客户端。correlation_id
属性。若是它与请求中的值匹配,则将响应返回给应用程序。斐波那契函数:
func fib(n int) int { if n == 0 { return 0 } else if n == 1 { return 1 } else { return fib(n-1) + fib(n-2) } }
声明咱们的斐波那契函数。它仅假设有效的正整数输入。 (不要期望这种方法适用于大量用户,它多是最慢的递归实现)。
咱们的RPC服务器rpc_server.go的代码以下所示:
package main import ( "log" "strconv" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func fib(n int) int { if n == 0 { return 0 } else if n == 1 { return 1 } else { return fib(n-1) + fib(n-2) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "rpc_queue", // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS") msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { n, err := strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to integer") log.Printf(" [.] fib(%d)", n) response := fib(n) err = ch.Publish( "", // exchange d.ReplyTo, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: d.CorrelationId, Body: []byte(strconv.Itoa(response)), }) failOnError(err, "Failed to publish a message") d.Ack(false) } }() log.Printf(" [*] Awaiting RPC requests") <-forever }
服务器代码很是简单:
prefetch
设置。Channel.Consume
获取去队列,咱们从队列中接收消息。而后,咱们进入goroutine进行工做,并将响应发送回去。咱们的RPC客户端rpc_client.go的代码:
package main import ( "log" "math/rand" "os" "strconv" "strings" "time" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func randomString(l int) string { bytes := make([]byte, l) for i := 0; i < l; i++ { bytes[i] = byte(randInt(65, 90)) } return string(bytes) } func randInt(min int, max int) int { return min + rand.Intn(max-min) } func fibonacciRPC(n int) (res int, err error) { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when unused true, // exclusive false, // noWait nil, // arguments ) failOnError(err, "Failed to declare a queue") msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") corrId := randomString(32) err = ch.Publish( "", // exchange "rpc_queue", // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: corrId, ReplyTo: q.Name, Body: []byte(strconv.Itoa(n)), }) failOnError(err, "Failed to publish a message") for d := range msgs { if corrId == d.CorrelationId { res, err = strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to integer") break } } return } func main() { rand.Seed(time.Now().UTC().UnixNano()) n := bodyFrom(os.Args) log.Printf(" [x] Requesting fib(%d)", n) res, err := fibonacciRPC(n) failOnError(err, "Failed to handle RPC request") log.Printf(" [.] Got %d", res) } func bodyFrom(args []string) int { var s string if (len(args) < 2) || os.Args[1] == "" { s = "30" } else { s = strings.Join(args[1:], " ") } n, err := strconv.Atoi(s) failOnError(err, "Failed to convert arg to integer") return n }
如今是时候看看rpc_client.go和rpc_server.go的完整示例源代码了。
咱们的RPC服务现已准备就绪。咱们能够启动服务器:
go run rpc_server.go # => [x] Awaiting RPC requests
要请求斐波那契数,请运行客户端:
go run rpc_client.go 30 # => [x] Requesting fib(30)
这里介绍的设计不是RPC服务的惟一可能的实现,可是它具备一些重要的优势:
rpc_server.go
。咱们的代码仍然很是简单,而且不会尝试解决更复杂(但很重要)的问题,例如:
若是要进行实验,可能会发现管理后台界面对于查看队列颇有用。