在上一篇文章Go 每日一库之 message-bus中,咱们介绍了一款小巧、实现简单的异步通讯库。做为学习,message-bus
确实不错。可是在实际使用上,message-bus
的功能就有点捉襟见肘了。例如,message-bus
将消息发送到订阅者管道以后就无论了,这样若是订阅者处理压力较大,会在管道中堆积太多消息,一旦订阅者异常退出,这些消息将会所有丢失!另外,message-bus
不负责保存消息,若是订阅者后启动,以前发布的消息,这个订阅者是没法收到的。这些问题,咱们将要介绍的watermill都能解决!git
watermill是 Go 语言的一个异步消息解决方案,它支持消息重传、保存消息,后启动的订阅者也能收到前面发布的消息。watermill
内置了多种订阅-发布实现,包括Kafka/RabbitMQ
,甚至还支持HTTP/MySQL binlog
。固然也能够编写本身的订阅-发布实现。此外,它还提供了监控、限流等中间件。github
watermill
内置了不少订阅-发布实现,最简单、直接的要属GoChannel
。咱们就以这个实现为例介绍watermill
的特性。golang
安装:windows
$ go get github.com/ThreeDotsLabs/watermill
复制代码
使用:服务器
package main
import (
"context"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
func main() {
pubSub := gochannel.NewGoChannel(
gochannel.Config{},
watermill.NewStdLogger(false, false),
)
messages, err := pubSub.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
publishMessages(pubSub)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
msg.Ack()
}
}
复制代码
首先,咱们建立一个GoChannel
对象,它是一个消息管理器。能够调用其Subscribe
订阅某个主题(topic)的消息,调用其Publish()
以某个主题发布消息。Subscribe()
方法会返回一个<-chan *message.Message
,一旦该主题有消息发布,GoChannel
就会将消息发送到该管道中。订阅者只需监听此管道,接收消息进行处理。在上面的例子中,咱们启动了一个消息处理的goroutine
,持续从管道中读取消息,而后打印输出。主goroutine
在一个死循环中每隔 1s 发布一次消息。微信
message.Message
这个结构是watermill
库的核心,每一个消息都会封装到该结构中发送。Message
保存的是原始的字节流([]byte
),因此能够将 JSON/protobuf/XML 等等格式的序列化结果保存到Message
中。并发
有两点注意:dom
Message
的Ack()
方法确认,不然GoChannel
会重发当前消息;Message
有一个UUID
字段,建议设置为惟一的,方便定位问题。watermill
提供方法NewUUID()
生成惟一 id。下面看示例运行:异步
上面的发布和订阅实现是很是底层的模式。在实际应用中,咱们一般想要监控、重试、统计等一些功能。并且上面的例子中,每一个消息处理结束须要手动调用Ack()
方法,消息管理器才会下发后面一条信息,很容易遗忘。还有些时候,咱们有这样的需求,处理完某个消息后,从新发布另一些消息。函数
这些功能都是比较通用的,为此watermill
提供了路由(Router)功能。直接拿来官网的图:
路由其实管理多个订阅者,每一个订阅者在一个独立的goroutine
中运行,彼此互不干扰。订阅者收到消息后,交由注册时指定的处理函数(HandlerFunc)。路由还能够设置插件(plugin)和中间件(middleware),插件是定制路由的行为,而中间件是定制处理器的行为。处理器处理消息后会返回若干消息,这些消息会被路由从新发布到(另外一个)管理器中。
var (
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
go publishMessages(pubSub)
router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)
router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("in_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
func printMessages(msg *message.Message) error {
fmt.Printf("\n> Received message: %s\n> %s\n>\n", msg.UUID, string(msg.Payload))
return nil
}
type myHandler struct {
}
func (m myHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("myHandler received message", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by myHandler"))
return message.Messages{msg}, nil
}
复制代码
首先,咱们建立一个路由:
router, err := message.NewRouter(message.RouterConfig{}, logger)
复制代码
而后为路由注册处理器。注册的处理器有两种类型,一种是:
router.AddHandler("myhandler", "in_topic", pubSub, "out_topic", pubSub, myHandler{}.Handler)
复制代码
这个方法原型为:
func (r *Router) AddHandler( handlerName string, subscribeTopic string, subscriber Subscriber, publishTopic string, publisher Publisher, handlerFunc HandlerFunc, ) *Handler 复制代码
该方法的做用是建立一个名为handlerName
的处理器,监听subscriber
中主题为subscribeTopic
的消息,收到消息后调用handlerFunc
处理,将返回的消息以主题publishTopic
发布到publisher
中。
另一种处理器是下面这种形式:
router.AddNoPublisherHandler("print_in_messages", "in_topic", pubSub, printMessages)
router.AddNoPublisherHandler("print_out_messages", "out_topic", pubSub, printMessages)
复制代码
从名字咱们也能够看出,这种形式的处理器只处理接收到的消息,不发布新消息。
最后,咱们调用router.Run()
运行这个路由。
其中,建立GoChannel
发布消息和上面的没什么不一样。
使用路由还有个好处,处理器返回时,若无错误,路由会自动调用消息的Ack()
方法;若发生错误,路由会调用消息的Nack()
方法通知管理器重发这条消息。
上面只是路由的最基本用法,路由的强大之处在于中间件。
watermill
中内置了几个比较经常使用的中间件:
IgnoreErrors
:能够忽略指定的错误;Throttle
:限流,限制单位时间内处理的消息数量;Poison
:将处理失败的消息以另外一个主题发布;Retry
:重试,处理失败能够重试;Timeout
:超时,若是消息处理时间超过给定的时间,直接失败。InstantAck
:直接调用消息的Ack()
方法,无论后续成功仍是失败;RandomFail
:随机抛出错误,测试时使用;Duplicator
:调用两次处理函数,两次返回的消息都从新发布出去,double~Correlation
:处理函数生成的消息都统一设置成原始消息中的correlation id
,方便追踪消息来源;Recoverer
:捕获处理函数中的panic
,包装成错误返回。中间件的使用也是比较简单和直接的:调用router.AddMiddleware()
。例如,咱们想要把处理返回的消息 double 一下:
router.AddMiddleware(middleware.Duplicator)
复制代码
想重试?能够:
router.AddMiddleware(middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware)
复制代码
上面设置最大重试次数为 3,重试初始时间间隔为 100ms。
通常状况下,生产环境须要保证稳定性,某个处理异常不能影响后续的消息处理。故设置Recoverer
是比较好的选择:
router.AddMiddleware(middleware.Recoverer)
复制代码
也能够实现本身的中间件:
func MyMiddleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
fields := watermill.LogFields{"name": m.Name}
logger.Info("myMiddleware before", fields)
ms, err := h(message)
logger.Info("myMiddleware after", fields)
return ms, err
}
}
复制代码
中间件有两种实现方式,若是不须要参数或依赖,那么直接实现为函数便可,像上面这样。若是须要有参数,那么能够实现为一个结构:
type myMiddleware struct {
Name string
}
func (m myMiddleware) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
fields := watermill.LogFields{"name": m.Name}
logger.Info("myMiddleware before", fields)
ms, err := h(message)
logger.Info("myMiddleware after", fields)
return ms, err
}
}
复制代码
这两种中间件的添加方式有所不一样,第一种直接添加:
router.AddMiddleware(MyMiddleware)
复制代码
第二种要构造一个对象,而后将其Middleware
方法传入,在该方法中能够访问MyMiddleware
对象的字段:
router.AddMiddleware(MyMiddleware{Name:"dj"}.Middleware)
复制代码
若是运行上面程序,你极可能会看到这样一条日志:
No subscribers to send message
复制代码
由于发布消息是在另外一个goroutine
,咱们没有控制什么时候发布,可能发布消息时,咱们还未订阅。咱们观察后面的处理日志,对比 uuid 发现这条消息直接被丢弃了。watermill
提供了一个选项,能够将消息都保存下来,订阅某个主题时将该主题以前的消息也发送给它:
pubSub := gochannel.NewGoChannel(
gochannel.Config{
Persistent: true,
}, logger)
复制代码
建立GoChannel
时将Config
中Persistent
字段设置为true
便可。此时运行,咱们仔细观察一下,出现No subscribers to send message
信息的消息后续确实被处理了。
除了GoChannel
,watermill
还内置了其余的发布-订阅实现。这些实现除了发布-订阅器建立的方式不一样,其余与咱们以前介绍的基本同样。这里咱们简单介绍一下RabbitMQ
,其余的可自行研究。
使用RabbitMQ
须要先运行RabbitMQ
程序,RabbitMQ
采用Erlang
开发。咱们以前不少文章也介绍过 windows 上的软件安装神器choco
。使用choco
安装RabbitMQ
:
$ choco install rabbitmq
复制代码
启动RabbitMQ
服务器:
$ rabbitmq-server.bat
复制代码
watermill
对RabbitMQ
的支持使用独立库的形式,须要另行安装:
$ go get -u github.com/ThreeDotsLabs/watermill-amqp/pkg/amqp
复制代码
发布订阅:
var amqpURI = "amqp://localhost:5672/"
func main() {
amqpConfig := amqp.NewDurableQueueConfig(amqpURI)
subscriber, err := amqp.NewSubscriber(
amqpConfig,
watermill.NewStdLogger(false, false),
)
if err != nil {
panic(err)
}
messages, err := subscriber.Subscribe(context.Background(), "example.topic")
if err != nil {
panic(err)
}
go process(messages)
publisher, err := amqp.NewPublisher(amqpConfig, watermill.NewStdLogger(false, false))
if err != nil {
panic(err)
}
publishMessages(publisher)
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
if err := publisher.Publish("example.topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
func process(messages <-chan *message.Message) {
for msg := range messages {
log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload))
msg.Ack()
}
}
复制代码
若是有自定义发布-订阅实现的需求,能够参考RabbitMQ
的实现:github.com/ThreeDotsLa…。
watermill
提供丰富的功能,且预留了扩展点,可自行扩展。另外,源码中处理goroutine
建立和通讯、多种并发模式的应用都是值得一看的。官方 GitHub 上还有一个事件驱动示例:github.com/ThreeDotsLa…。
你们若是发现好玩、好用的 Go 语言库,欢迎到 Go 每日一库 GitHub 上提交 issue😄
欢迎关注个人微信公众号【GoUpUp】,共同窗习,一块儿进步~
本文由博客一文多发平台 OpenWrite 发布!