本文咱们将经过RabbitMQ和AMQP协议在Go微服务之间进行消息传递。html
微服务是将应用程序的业务领域分离成具备清晰分离域的边界上下文,运行进程分离,其中任何跨域边界的持久关系必须依赖最终一致性,而不是相似于ACID事务或外键约束。其中不少概念都来自域驱动设计,或受其启发。领域驱动设计是另一个很大的话题,足以用一个文章系列来介绍。linux
在咱们Go语言微服务博客系列的上下文和微服务大致架构中,实现服务间的松耦合的一种模式是使用消息传递来进行服务间通讯,不须要严格的请求/响应消息交换或相似的消息交换。也就是说,使用消息传递只是便于服务间松耦合的众多策略中的一种。git
在Spring Cloud中,RabbitMQ彷佛是选择的消息中间人(代理), 特别是由于在第八部分中咱们看到的,Spring Cloud Config服务器具备RabbitMQ运行时依赖。github
本文中,将会让accountservice服务每当读取特殊帐号对象时,就在RabbitMQ exchange上放一条消息。这个消息会被一个咱们本文所实现的全新微服务消费。咱们也将处理Go代码在多微服务间的复用问题,将多服务复用代码放在common类库中,这样每一个微服务均可以import它。spring
还记得咱们在第一部分中的系统景观的图片吗? 下面是在本部分完成以后看起来的样子:docker
依然还有不少元素还没有实现。 不要担忧,咱们慢慢都会作到的。数据库
这一部分有不少源代码,本文不会包含全部代码。 要查看完整代码,可克隆并切换到P9分支,或者直接查看https://github.com/callistaen...。json
咱们将实现一个简单的虚构(make-believe)用例: 当特定VIP帐号在读取accountservice服务时,咱们但愿通知一个vip offer服务,在某些状况下,它将为帐户持有人产生"offer"。在适当设计的领域模型中,帐户对象和VIP offer对象时两个独立领域,它们应该尽量少的互相了解。跨域
换言之,accountservice不能直接访问VIP服务的存储。这个例子中,咱们经过RabbitMQ传递一个消息给vipservice, 彻底将业务逻辑和持久化都委托给vipservice。数组
咱们将使用AMQP协议作全部通讯,这个协议是面向互操做性消息传递的ISO标准应用程序层协议。咱们的选择使用的Go类库是streadway/amqp, 相似在第八部分中咱们消费配置更新时候使用的。
让咱们重复在AMQP中exchange和publisher, consumer和queue之间的关系:
也就是说消息被发布到exchange, 而后将消息副本基于路由规则和可能已经注册消费者的绑定分布到queue。在quora.com网站上的这个帖子对这个话题进行了很好的解释。
Thread vs Post: 在论坛中,经常使用Thread和Post代指某些东西。可是这二者有什么区别呢?
通俗的讲Thread就是论坛中最初发起的某个主题的话题, 包含不少Post(A thread is a group of posts on a single topic.)。中文社区一般所谓的楼主发的第一个东西。 而Post则是对楼主最初发的内容作的回复或跟帖。
参考连接: https://www.drupal.org/projec...。
现实中的(Quora中的答案)例子:
假设你在Apple商店里边,先要买耳机。 店里就会有人过来问你:"须要什么?" 你告诉他你须要买耳机,而后他就把你带到他的同事的柜台前的排队队列以后等待。由于不少其余人也在买东西,销售员正在处理队列前面的那个消费者。 若是这个时候,另一我的进店了,刚才招呼你的人会一样询问对方须要什么帮助。刚进来的人须要修下手机,被找呼的人带到了另一个修理手机的柜台等待了。
这个例子中问你须要什么的人就是exchange, 他会根据须要把你路由到恰当的队列中排队等待。在队列的后面有不少员工,也就是对应队列的worker, 或者消费者。一次处理一个请求,基于先进先出的原则。也可能会根据最早到的人作一个简单轮询。
若是店里没有导流的服务员,那么你就须要来回在每一个柜台前来回问是否能帮到你,直到找到你须要办理业务的柜台后开始排队。
固然,导航苹果商店的工做不复杂,但在应用程序中,你可能有不少队列,服务不一样类型的请求,基于路由和绑定具备交换路由消息的键来讲很是有帮助。 发布者只须要关心添加正确的路由密匙,而消费者只须要关心用正确的绑定密匙建立正确的队列,就能够作到"我对这些消息感兴趣。"
既然咱们须要在accountservice和vipservice中使用消息传递代码和从Spring Cloud Config服务器上加载配置的代码,咱们能够建立可共享的库。
咱们在goblog目录下面建立一个common目录来保存咱们可复用的东西:
mkdir -p common/messaging mkdir -p common/config
咱们将全部AMQP相关的代码放在messaging目录,配置相关的放在config目录。这样你能够把以前的goblog/accountservice/config中的代码移到common/config目录中,并相应的修改import语句中的代码位置。能够看看已完成代码看它是如何支持的。
消息传递代码在单独文件中封装起来,里边定义了咱们应用将用于链接、发布和订阅的接口以及具体实现。老实说,对于使用streadway/amqp的AMQP消息传递来讲有不少样板代码,所以无需在乎代码的实现细节。
在common/messaging/下面建立一个messagingclient.go文件:
package messaging import ( "github.com/streadway/amqp" "fmt" "log" ) // Defines our interface for connecting and consuming messages. type IMessagingClient interface { ConnectToBroker(connectionString string) Publish(msg []byte, exchangeName string, exchangeType string) error PublishOnQueue(msg []byte, queueName string) error Subscribe(exchangeName string, exchangeType string, consumerName string, handlerFunc func(amqp.Delivery)) error SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error Close() } // Real implementation, encapsulates a pointer to an amqp.Connection type MessagingClient struct { conn *amqp.Connection }
上面代码片断,定义了messaging的接口。 这就是accountservice和vipservice须要消息传递的时候须要使用它们进行处理的,但愿能从不少复杂的东西里边抽象出来。注意我已经选择两种变体"Product"和"Consume"来使用topics和direct/queue消息模式。
接下来,咱们定义了一个保存amqp.Connection指针的结构体,咱们会将必要的方法绑定到它上面(隐式的,由于Go语言中都是这样干的), 这样就实现了咱们声明的接口。
func (m *MessagingClient) ConnectToBroker(connectionString string) { if connectionString == "" { panic("Cannot initialize connection to broker, connectionString not set. Have you initialized?") } var err error m.conn, err = amqp.Dial(fmt.Sprintf("%s/", connectionString)) if err != nil { panic("Failed to connect to AMQP compatible broker at: " + connectionString) } } func (m *MessagingClient) PublishOnQueue(body []byte, queueName string) error { if m.conn == nil { panic("Tried to send message before connection was initialized. Don't do that.") } ch, err := m.conn.Channel() // Get a channel from the connection defer ch.Close() queue, err := ch.QueueDeclare(// Declare a queue that will be created if not exists with some args queueName, // our queue name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) // Publishes a message onto the queue. err = ch.Publish( "", // exchange queue.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "application/json", Body: body, // Our JSON body as []byte }) fmt.Printf("A message was sent to queue %v: %v", queueName, body) return err }
ConnectToBroker中展现了咱们如何获取链接指针的,例如amqp.Dial方法。若是咱们没有配置或者没法链接咱们的broker, 会panic咱们的微服务,容器编排会尝试使用新实例从新尝试。 传入的链接字符串就像这样:
amqp://guest:guest@rabbitmq:5672/
注意咱们如今使用的是Docker Swarm模式下的RabbitMQ broker的服务名。
PublishOnQueue()函数至关长,它或多或少是从官方例子派生过来的,这里我对其进行了简化,带比较少的参数。要发布消息到命名队列,咱们须要传入的参数有:
要了解更多exchange的详情,能够参考RabbitMQ的官方文档。
PublishOnQueue()方法样本代码使用的很重,可是很容易理解。声明队列(若是不存在就建立它), 而后发布咱们的[]byte消息到它里边。发布消息到命名exchange更加复杂,它须要样板代码首先声明一个exchange,一个队列,而后实现将它们绑定一块儿的代码。 详细请查看完整代码。
继续,实际使用咱们MessagingClient的是在goblog/accountservice/service/handlers.go中,所以咱们添加一个字段,并硬编码检查是否为VIP, 而后若是请求帐号id是10000的话,咱们就发送一个消息传递。
var DBClient dbclient.IBoltClient var MessagingClient messaging.IMessagingClient // 添加新行 var isHealthy = true func GetAccount(w http.ResponseWriter, r *http.Request) { // Read the 'accountId' path parameter from the mux map var accountId = mux.Vars(r)["accountId"] // Read the account struct BoltDB account, err := DBClient.QueryAccount(accountId) account.ServedBy = util.GetIP() // If err, return a 404 if err != nil { fmt.Println("Some error occured serving " + accountId + ": " + err.Error()) w.WriteHeader(http.StatusNotFound) return } notifyVIP(account) // 添加新行 同时发送VIP通知。 // NEW call the quotes-service quote, err := getQuote() if err == nil { account.Quote = quote } // If found, marshal into JSON, write headers and content data, _ := json.Marshal(account) writeJsonResponse(w, http.StatusOK, data) } // If our hard-coded "VIP" account, spawn a goroutine to send a message. func notifyVIP(account model.Account) { if account.Id == "10000" { go func(account model.Account) { vipNotification := model.VipNotification{AccountId: account.Id, ReadAt: time.Now().UTC().String()} data, _ := json.Marshal(vipNotification) fmt.Printf("Notifying VIP account %v\n", account.Id) err := MessagingClient.PublishOnQueue(data, "vip_queue") if err != nil { fmt.Println(err.Error()) } }(account) } }
借此机会,咱们展现调用新goroutine的内联匿名函数, 也就是说使用了go关键词的。既然咱们没有什么理由在发送消息传递的时候须要阻塞执行HTTP处理的主goroutine, 那么这种状况就是使用goroutine实现并行的最佳时机。
main.go文件也须要更新一点代码以即可以在启动的时候使用加载的并注入到Viper中的配置来初始化AMQ链接。
... func main() { fmt.Printf("Starting %v\n", appName) config.LoadConfigurationFromBranch( viper.GetString("configServerUrl"), appName, viper.GetString("profile"), viper.GetString("configBranch")) initializeBoltClient() initializeMessaging() // 新增行,初始化消息传递 handleSigterm(func() { service.MessagingClient.Close() }) service.StartWebServer(viper.GetString("server_port")) } func initializeMessaging() { if !viper.IsSet("amqp_server_url") { panic("No 'amqp_server_url' set in configuration, cannot start") } service.MessagingClient = &messaging.MessagingClient{} service.MessagingClient.ConnectToBroker(viper.GetString("amqp_server_url")) service.MessagingClient.Subscribe(viper.GetString("config_event_bus"), "topic", appName, config.HandleRefreshEvent) } ...
没有什么大不了的东西 - 咱们建立一个空的MessagingClient实例并将其地址赋值给service.MessagingClient, 而后使用配置amqp_server_url来调用ConnectToBroker方法。若是配置中没有broker_url,咱们就panic()退出,由于咱们不但愿在甚至都没有可能链接到broker的状况下运行服务。
若是成功的链接到broker, 那么咱们就调用Subscribe方法来订阅由配置指定的topic。
咱们在咱们的.yml配置文件中添加amqp_broker_url
属性到第八部分中的配置文件中,这些东西已经没有人管了。
broker_url: amqp://guest:guest@192.168.99.100:5672 _(dev)_ broker_url: amqp://guest:guest@rabbitmq:5672 _(test)_
注意test profile, 咱们使用的是Swarm服务名"rabbitmq", 而不是我笔记本上看到的Swarm的网络IP地址。(你实际的IP地址可能会变化,192.168.99.100彷佛是运行Docker Toolbox的标准IP)。
配置文件中使用明文的用户名和密码是不推荐的,在现实生活中,咱们通常会使用第八部分中看到的Spring Cloud Config服务器内置的加密特性。
固然,咱们应该至少编写一个单元测试,确保咱们handlers.go中的GetAccount函数当某人请求神奇的并不是常特殊的帐号标识为10000的帐号时尝试发送一个消息。为此,咱们须要模拟IMessagingClient和handlers_test.go中添加新的测试用例实现。让咱们开始模拟吧。 此次咱们将使用第三方工具mockery来产生IMessagingClient接口的实现:(记住在命令行运行这些命令的时候使用恰当的GOPATH设置)。
> go get github.com/vektra/mockery/.../ > cd $GOPATH/src/github.com/callistaenterprise/goblog/common/messaging > ./$GOPATH/bin/mockery -all -output . Generating mock for: IMessagingClient
咱们如今在当前目录有一个IMessagingClient.go模拟文件。 我不太喜欢这样的文件名字,不喜欢驼峰,因此我将它重命名为一个明显的东西,它模拟并遵循本博客系列中文件名的约定。
mv IMessagingClient.go mockmessagingclient.go
可能须要调整通常文件中的import语句,删除import别名。 除了那些,咱们使用一个黑盒方式来达到这个特殊模拟 - 仅假设它在咱们开始写测试的时候会工做。
请随意检查生成的模拟实现的源代码,它很是相似咱们以前第四部分中手工写的东西。
切到handlers_test.go,咱们添加一个新的测试用例:
// declare mock types to make test code a bit more readable var anyString = mock.AnythingOfType("string") var anyByteArray = mock.AnythingOfType("[]uint8") // == []byte func TestNotificationIsSentForVIPAccount(t *testing.T) { // Set up the DB client mock mockRepo.On("QueryAccount", "10000").Return(model.Account{Id:"10000", Name:"Person_10000"}, nil) DBClient = mockRepo mockMessagingClient.On("PublishOnQueue", anyByteArray, anyString).Return(nil) MessagingClient = mockMessagingClient Convey("Given a HTTP req for a VIP account", t, func() { req := httptest.NewRequest("GET", "/accounts/10000", nil) resp := httptest.NewRecorder() Convey("When the request is handled by the Router", func() { NewRouter().ServeHTTP(resp, req) Convey("Then the response should be a 200 and the MessageClient should have been invoked", func() { So(resp.Code, ShouldEqual, 200) time.Sleep(time.Millisecond * 10) // Sleep since the Assert below occurs in goroutine So(mockMessagingClient.AssertNumberOfCalls(t, "PublishOnQueue", 1), ShouldBeTrue) }) })}) }
能够查看注释了解详情。我不喜欢在断言调用数以前人为添加10毫秒睡眠,但因为模拟是在goroutine中调用,和主线程是独立的,咱们须要容许它有一些时间来完成。 但愿在涉及到有goroutine或者channel的时候,有更好的单元测试方式。
我认可,模拟这种方式比使用相似Mockito的东西更冗余, 当写Java应用的单元测试的时候。不过,我认为可读性和易读性仍是不错的。
确保测试经过:
go test ./...
若是你尚未作的话,先运行springcloud.sh脚本更新配置服务器。 而后,运行copyall.sh并等几秒钟更新accountservice。咱们将使用curl来获取咱们特殊的帐号:
> curl http://$ManagerIP:6767/accounts/10000 {"id":"10000","name":"Person_0","servedBy":"10.255.0.11"}
若是全部进行顺利的话,咱们能够打开RabbitMQ管理控制台,并看咱们是否在名为vipQueue的队列上得到了一个消息。
在上面截图最底下,咱们看到vipQueue有一个消息。若是咱们使用RabbitMQ管理控制台的Get Message功能, 咱们会看到下面的消息:
最后,是时候从头开始写一个全新的微服务了, 咱们须要用它来展现如何从RabbitMQ消费消息。咱们将确保应用在前面内容中学到的模式包括:
若是你已经切出P9分支的代码了,那么在你goblog目录下面就已经有了vipservice了。
我不会一行行过每一个代码文件的内容,由于有些和accountservice里边的重复了。相反我将聚焦在刚才发送消息的消费方面。须要注意一些事情:
咱们会使用common/messaging的SubscribeToQueue函数,例如:
SubscribeToQueue(queueName string, consumerName string, handlerFunc func(amqp.Delivery)) error
这里咱们应该提供的最重要的是:
vip_queue
)。实际上将咱们的回调函数绑定到队列的SubscribeToQueue实现的实现并不奇怪,若是咱们须要了解细节,能够查阅源代码。
继续快速看看vipservice的入口文件main.go, 看看咱们如何设置的:
package main import ( "flag" "fmt" "github.com/callistaenterprise/goblog/common/config" "github.com/callistaenterprise/goblog/common/messaging" "github.com/callistaenterprise/goblog/vipservice/service" "github.com/spf13/viper" "github.com/streadway/amqp" "os" "os/signal" "syscall" ) var appName = "vipservice" var messagingClient messaging.IMessagingClient func init() { configServerUrl := flag.String("configServerUrl", "http://configserver:8888", "Address to config server") profile := flag.String("profile", "test", "Environment profile, something similar to spring profiles") configBranch := flag.String("configBranch", "master", "git branch to fetch configuration from") flag.Parse() viper.Set("profile", *profile) viper.Set("configServerUrl", *configServerUrl) viper.Set("configBranch", *configBranch) } func main() { fmt.Println("Starting " + appName + "...") config.LoadConfigurationFromBranch(viper.GetString("configServerUrl"), appName, viper.GetString("profile"), viper.GetString("configBranch")) initializeMessaging() // Makes sure connection is closed when service exits. handleSigterm(func() { if messagingClient != nil { messagingClient.Close() } }) service.StartWebServer(viper.GetString("server_port")) } func onMessage(delivery amqp.Delivery) { fmt.Printf("Got a message: %v\n", string(delivery.Body)) } func initializeMessaging() { if !viper.IsSet("amqp_server_url") { panic("No 'broker_url' set in configuration, cannot start") } messagingClient = &messaging.MessagingClient{} messagingClient.ConnectToBroker(viper.GetString("amqp_server_url")) // Call the subscribe method with queue name and callback function err := messagingClient.SubscribeToQueue("vip_queue", appName, onMessage) failOnError(err, "Could not start subscribe to vip_queue") err = messagingClient.Subscribe(viper.GetString("config_event_bus"), "topic", appName, config.HandleRefreshEvent) failOnError(err, "Could not start subscribe to "+viper.GetString("config_event_bus")+" topic") } // Handles Ctrl+C or most other means of "controlled" shutdown gracefully. Invokes the supplied func before exiting. func handleSigterm(handleExit func()) { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) signal.Notify(c, syscall.SIGTERM) go func() { <-c handleExit() os.Exit(1) }() } func failOnError(err error, msg string) { if err != nil { fmt.Printf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } }
看起来和accountservice很是类似,对不对? 咱们可能会重复如何安装和启动咱们添加的每一个微服务的基本知识。
onMessage函数在这里仅仅打印咱们接到的vip消息的body。若是咱们须要实现更多虚构的用例,它会调用一些花哨的逻辑来肯定帐号持有人是否有资格得到"超级可怕的购买咱们全部东西(TM)"的offer, 而且可能写一个offer给"VIP offer数据库"。你能够随意实现并提交一个PR。
没有什么可补充的。除了这个片断,当咱们按下Ctrl + C或者当Swarm认为是时候杀死服务实例:
func handleSigterm(handleExit func()) { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) signal.Notify(c, syscall.SIGTERM) go func() { <-c handleExit() os.Exit(1) }() }
不是最容易读的代码片断,它所作的就是注册通道c做为os.Interrupt和syscall的监听器。SIGTERM和goroutine会阻塞在c上的消息监听,知道接收到这两种信号。 这样就使得咱们很是确定咱们提供的handleExit()函数在微服务被杀掉的时候都会被调用。怎么肯定? Ctrl + C或docker swarm扩展也工做良好。kill也同样。 kill -9不会。 所以请求不要使用kill -9中止,除非你必需要这样作。
它将调用咱们在IMessageConsumer接口中声明的Close()函数, 它实现的时候确保AMQP链接被正确关闭。
咱们对copyall.sh内容进行了修改:
#!/bin/bash export GOOS=linux export CGO_ENABLED=0 cd accountservice;go get;go build -o accountservice-linux-amd64;echo built `pwd`;cd .. cd healthchecker;go get;go build -o healthchecker-linux-amd64;echo built `pwd`;cd .. cd vipservice;go get;go build -o vipservice-linux-amd64;echo built `pwd`;cd .. export GOOS=darwin cp healthchecker/healthchecker-linux-amd64 accountservice/ cp healthchecker/healthchecker-linux-amd64 vipservice/ docker build -t someprefix/accountservice accountservice/ docker service rm accountservice docker service create --name=accountservice --replicas=1 --network=my_network -p=6767:6767 someprefix/accountservice docker build -t someprefix/vipservice vipservice/ docker service rm vipservice docker service create --name=vipservice --replicas=1 --network=my_network someprefix/vipservice
运行这个脚本,等待几秒钟,让服务从新构建部署完成。而后查看:
> docker service ls ID NAME REPLICAS IMAGE kpb1j3mus3tn accountservice 1/1 someprefix/accountservice n9xr7wm86do1 configserver 1/1 someprefix/configserver r6bhneq2u89c rabbitmq 1/1 someprefix/rabbitmq sy4t9cbf4upl vipservice 1/1 someprefix/vipservice u1qcvxm2iqlr viz 1/1 manomarks/visualizer:latest
或者可使用dvizz Docker Swarm服务呈现来查看:
既然docker service logs特性已经在1.13.0中被标记为试验阶段,咱们依然可使用前面的方式来查看vipservice的日志。首先,运行docker ps找出容器id:
> docker ps CONTAINER ID IMAGE a39e6eca83b3 someprefix/vipservice:latest b66584ae73ba someprefix/accountservice:latest d0074e1553c7 someprefix/configserver:latest
而后使用vipservice的容器id来查看日志:
> docker logs -f a39e6eca83b3 Starting vipservice... 2017/06/06 19:27:22 Declaring Queue () 2017/06/06 19:27:22 declared Exchange, declaring Queue () 2017/06/06 19:27:22 declared Queue (0 messages, 0 consumers), binding to Exchange (key 'springCloudBus') Starting HTTP service at 6868
而后另外打开一个窗口,执行下面的请求:
> curl http://$ManagerIP:6767/accounts/10000
而后你就会在刚才日志里边看到多了下面一行信息:
Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:27.033757223 +0000 UTC"}
也就是说咱们的vipservice成功的消费了从accountservice发布的消息。
跨越服务的多个实例的分布式work模式是利用了work队列的概念。每一个vip消息应该只能被单个vipservice实例处理。
所以让咱们看看当咱们将vipservice规模扩大到2个的时候会发生什么:
> docker service scale vipservice=2
数秒以后新的实例就可使用了。既然咱们使用的是AMQP中的direct/queue方式,咱们但愿有轮询的行为。使用curl触发四个VIP帐户查询。
> curl http://$ManagerIP:6767/accounts/10000 > curl http://$ManagerIP:6767/accounts/10000 > curl http://$ManagerIP:6767/accounts/10000 > curl http://$ManagerIP:6767/accounts/10000
而后在看看日志:
> docker logs -f a39e6eca83b3 Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:27.033757223 +0000 UTC"} Got a message: {"accountId":"10000","readAt":"2017-02-15 20:06:29.073682324 +0000 UTC"}
正如咱们预料的,咱们看到第一个实例处理了四条消息中的两条。若是咱们对其余的vipservice进行docker logs查询,咱们会看到其余的消息在它们里边消费了。很是满意。
此次不会作性能测试,在发送和接受一些消息后,快速查看内存使用就足够了:
CONTAINER CPU % MEM USAGE / LIMIT vipservice.1.tt47bgnmhef82ajyd9s5hvzs1 0.00% 1.859MiB / 1.955GiB accountservice.1.w3l6okdqbqnqz62tg618szsoj 0.00% 3.434MiB / 1.955GiB rabbitmq.1.i2ixydimyleow0yivaw39xbom 0.51% 129.9MiB / 1.955GiB
上买呢在服务了一些请求后获得的信息。新的vipservice和accountservice同样不是很复杂,所以和预料的同样启动的时候占用的内存很是小。
本文多是这个系列目前最长的一篇文章了!咱们完成了:
在第十部分,咱们将作一些轻量的但在现实世界很是重要的模型 - 使用Logrus, Docker GELF日志驱动记录结构化日志以及将日志发不到Laas提供者商。