RabbitMQ Go客户端教程6——RPC

本文翻译自RabbitMQ官网的Go语言客户端系列教程,本文首发于个人我的博客:liwenzhou.com,教程共分为六篇,本文是第六篇——RPC。html

这些教程涵盖了使用RabbitMQ建立消息传递应用程序的基础知识。
你须要安装RabbitMQ服务器才能完成这些教程,请参阅安装指南或使用Docker镜像
这些教程的代码是开源的,官方网站也是如此。git

先决条件

本教程假设RabbitMQ已安装并运行在本机上的标准端口(5672)。若是你使用不一样的主机、端口或凭据,则须要调整链接设置。程序员

远程过程调用(RPC)

(使用Go RabbitMQ客户端)github

在第二个教程中,咱们学习了如何使用工做队列在多个worker之间分配耗时的任务。web

可是,若是咱们须要在远程计算机上运行函数并等待结果怎么办?好吧,那是一个不一样的故事。这种模式一般称为远程过程调用RPCdocker

在本教程中,咱们将使用RabbitMQ构建一个RPC系统:客户端和可伸缩RPC服务器。因为咱们没有值得分配的耗时任务,所以咱们将建立一个虚拟RPC服务,该服务返回斐波那契数。json

有关RPC的说明bash

尽管RPC是计算中很是常见的模式,但它常常受到批评。服务器

当程序员不知道函数调用是本地的仍是缓慢的RPC时,就会出现问题。这样的混乱会致使系统变幻莫测,并给调试增长了没必要要的复杂性。滥用RPC可能会致使没法维护的意大利面条式代码而不是简化软件,网络

牢记这一点,请考虑如下建议:

  • 肯定哪一个函数调用是本地的,哪一个是远程的。
  • 为你的系统编写文档。明确组件之间的依赖关系。
  • 处理错误状况。 当RPC服务器长时间关闭时,客户端应如何处理?

回调队列

一般,经过RabbitMQ进行RPC很容易。客户端发送请求消息,服务器发送响应消息。为了接收响应,咱们须要发送带有“回调”队列地址的请求。咱们可使用默认队列。让咱们尝试一下:

q, err := ch.QueueDeclare(
  "",    // 不指定队列名,默认使用随机生成的队列名
  false, // durable
  false, // delete when unused
  true,  // exclusive
  false, // noWait
  nil,   // arguments
)

err = ch.Publish(
  "",          // exchange
  "rpc_queue", // routing key
  false,       // mandatory
  false,       // immediate
  amqp.Publishing{
    ContentType:   "text/plain",
    CorrelationId: corrId,
    ReplyTo:       q.Name,  // 在这里指定callback队列名,也是在这个队列等回复
    Body:          []byte(strconv.Itoa(n)),
})

消息属性

AMQP 0-9-1协议预约义了消息附带的14个属性集。除如下属性外,大多数属性不多使用:

  • persistent:将消息标记为持久性(值为true)或瞬态(false)。你可能还记得第二个教程中的此属性。
  • content_type:用于描述编码的mime类型。例如,对于常用的JSON编码,将此属性设置为application/ json是一个好习惯。
  • reply_to:经常使用于命名回调队列
  • correlation_id:有助于将RPC响应与请求相关联

关联ID(Correlation Id)

在上面介绍的方法中,咱们建议为每一个RPC请求建立一个回调队列。这是至关低效的,可是幸运的是,有一种更好的方法——让咱们为每一个客户端建立一个回调队列。

这就引起了一个新问题,在该队列中收到响应后,尚不清楚响应属于哪一个请求。这个时候就该使用correlation_id这个属性了。针对每一个请求咱们将为其设置一个惟一值。随后,当咱们在回调队列中收到消息时,咱们将查看该属性,并基于这个属性将响应与请求进行匹配。若是咱们看到未知的correlation_id值,则能够放心地丢弃该消息——它不属于咱们的请求。

你可能会问,为何咱们应该忽略回调队列中的未知消息,而不是报错而失败?这是因为服务器端可能出现竞争情况。尽管可能性不大,但RPC服务器可能会在向咱们发送答案以后但在发送请求的确认消息以前死亡。若是发生这种状况,从新启动的RPC服务器将再次处理该请求。这就是为何在客户端上咱们必须妥善处理重复的响应,而且理想状况下RPC应该是幂等的。

总结

img

咱们的RPC工做流程以下:

  • 客户端启动时,它将建立一个匿名排他回调队列。
  • 对于RPC请求,客户端发送一条消息,该消息具备两个属性:reply_to(设置为回调队列)和correlation_id(设置为每一个请求的惟一值)。
  • 该请求被发送到rpc_queue队列。
  • RPC工做程序(又名:服务器)正在等待该队列上的请求。当出现请求时,它会完成计算工做并把结果做为消息使用replay_to字段中的队列发回给客户端。
  • 客户端等待回调队列上的数据。出现消息时,它将检查correlation_id属性。若是它与请求中的值匹配,则将响应返回给应用程序。

完整示例

斐波那契函数:

func fib(n int) int {
        if n == 0 {
                return 0
        } else if n == 1 {
                return 1
        } else {
                return fib(n-1) + fib(n-2)
        }
}

声明咱们的斐波那契函数。它仅假设有效的正整数输入。 (不要期望这种方法适用于大量用户,它多是最慢的递归实现)。

咱们的RPC服务器rpc_server.go的代码以下所示:

package main

import (
        "log"
        "strconv"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func fib(n int) int {
        if n == 0 {
                return 0
        } else if n == 1 {
                return 1
        } else {
                return fib(n-1) + fib(n-2)
        }
}

func main() {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "rpc_queue", // name
                false,       // durable
                false,       // delete when unused
                false,       // exclusive
                false,       // no-wait
                nil,         // arguments
        )
        failOnError(err, "Failed to declare a queue")

        err = ch.Qos(
                1,     // prefetch count
                0,     // prefetch size
                false, // global
        )
        failOnError(err, "Failed to set QoS")

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                false,  // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        forever := make(chan bool)

        go func() {
                for d := range msgs {
                        n, err := strconv.Atoi(string(d.Body))
                        failOnError(err, "Failed to convert body to integer")

                        log.Printf(" [.] fib(%d)", n)
                        response := fib(n)

                        err = ch.Publish(
                                "",        // exchange
                                d.ReplyTo, // routing key
                                false,     // mandatory
                                false,     // immediate
                                amqp.Publishing{
                                        ContentType:   "text/plain",
                                        CorrelationId: d.CorrelationId,
                                        Body:          []byte(strconv.Itoa(response)),
                                })
                        failOnError(err, "Failed to publish a message")

                        d.Ack(false)
                }
        }()

        log.Printf(" [*] Awaiting RPC requests")
        <-forever
}

服务器代码很是简单:

  • 与往常同样,咱们首先创建链接,通道并声明队列。
  • 咱们可能要运行多个服务器进程。为了将负载平均分配给多个服务器,咱们须要在通道上设置prefetch设置。
  • 咱们使用Channel.Consume获取去队列,咱们从队列中接收消息。而后,咱们进入goroutine进行工做,并将响应发送回去。

咱们的RPC客户端rpc_client.go的代码:

package main

import (
        "log"
        "math/rand"
        "os"
        "strconv"
        "strings"
        "time"

        "github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
        if err != nil {
                log.Fatalf("%s: %s", msg, err)
        }
}

func randomString(l int) string {
        bytes := make([]byte, l)
        for i := 0; i < l; i++ {
                bytes[i] = byte(randInt(65, 90))
        }
        return string(bytes)
}

func randInt(min int, max int) int {
        return min + rand.Intn(max-min)
}

func fibonacciRPC(n int) (res int, err error) {
        conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
        failOnError(err, "Failed to connect to RabbitMQ")
        defer conn.Close()

        ch, err := conn.Channel()
        failOnError(err, "Failed to open a channel")
        defer ch.Close()

        q, err := ch.QueueDeclare(
                "",    // name
                false, // durable
                false, // delete when unused
                true,  // exclusive
                false, // noWait
                nil,   // arguments
        )
        failOnError(err, "Failed to declare a queue")

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                true,   // auto-ack
                false,  // exclusive
                false,  // no-local
                false,  // no-wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

        corrId := randomString(32)

        err = ch.Publish(
                "",          // exchange
                "rpc_queue", // routing key
                false,       // mandatory
                false,       // immediate
                amqp.Publishing{
                        ContentType:   "text/plain",
                        CorrelationId: corrId,
                        ReplyTo:       q.Name,
                        Body:          []byte(strconv.Itoa(n)),
                })
        failOnError(err, "Failed to publish a message")

        for d := range msgs {
                if corrId == d.CorrelationId {
                        res, err = strconv.Atoi(string(d.Body))
                        failOnError(err, "Failed to convert body to integer")
                        break
                }
        }

        return
}

func main() {
        rand.Seed(time.Now().UTC().UnixNano())

        n := bodyFrom(os.Args)

        log.Printf(" [x] Requesting fib(%d)", n)
        res, err := fibonacciRPC(n)
        failOnError(err, "Failed to handle RPC request")

        log.Printf(" [.] Got %d", res)
}

func bodyFrom(args []string) int {
        var s string
        if (len(args) < 2) || os.Args[1] == "" {
                s = "30"
        } else {
                s = strings.Join(args[1:], " ")
        }
        n, err := strconv.Atoi(s)
        failOnError(err, "Failed to convert arg to integer")
        return n
}

如今是时候看看rpc_client.gorpc_server.go的完整示例源代码了。

咱们的RPC服务现已准备就绪。咱们能够启动服务器:

go run rpc_server.go
# => [x] Awaiting RPC requests

要请求斐波那契数,请运行客户端:

go run rpc_client.go 30
# => [x] Requesting fib(30)

这里介绍的设计不是RPC服务的惟一可能的实现,可是它具备一些重要的优势:

  • 若是RPC服务器太慢,则能够经过运行另外一台RPC服务器来进行扩展。尝试在新控制台中运行另外一个rpc_server.go
  • 在客户端,RPC只须要发送和接收一条消息。结果,RPC客户端只须要一个网络往返就能够处理单个RPC请求。

咱们的代码仍然很是简单,而且不会尝试解决更复杂(但很重要)的问题,例如:

  • 若是没有服务器在运行,客户端应如何反应?
  • 客户端是否应该为RPC设置某种超时时间?
  • 若是服务器发生故障并引起异常,是否应该将其转发给客户端?
  • 在处理以前防止无效的传入消息(例如检查边界,类型)。

若是要进行实验,可能会发现管理后台界面对于查看队列颇有用。

相关文章
相关标签/搜索