你在使用消息队列的时候关注过吞吐量吗?git
思考过吞吐量的影响因素吗?github
考虑过怎么提升吗?redis
总结过最佳实践吗?微信
本文带你一块儿探讨下消息队列消费端高吞吐的 Go
框架实现。Let’s go!网络
写入消息队列吞吐量取决于如下两个方面框架
最佳吞吐量是让其中之一打满,而通常状况下内网带宽都会很是高,不太可能被打满,因此天然就是讲消息队列的写入速度打满,这就就有两个点须要平衡函数
go-zero 的 PeriodicalExecutor
和 ChunkExecutor
就是为了这种状况设计的微服务
从消息队列里消费消息的吞吐量取决于如下两个方面ui
这里有个核心问题是不能不考虑业务处理速度,而读取过多的消息到内存里,不然可能会引发两个问题:atom
pod
也是有 memory limit
的pod
时堆积的消息来不及处理而致使消息丢失借用一下 Rob Pike
的一张图,这个跟队列消费殊途同归。左边4个 gopher
从队列里取,右边4个 gopher
接过去处理。比较理想的结果是左边和右边速率基本一致,没有谁浪费,没有谁等待,中间交换处也没有堆积。
咱们来看看 go-zero
是怎么实现的:
Producer
端for { select { case <-q.quit: logx.Info("Quitting producer") return default: if v, ok := q.produceOne(producer); ok { q.channel <- v } } }
没有退出事件就会经过 produceOne
去读取一个消息,成功后写入 channel
。利用 chan
就能够很好的解决读取和消费的衔接问题。
Consumer
端for { select { case message, ok := <-q.channel: if ok { q.consumeOne(consumer, message) } else { logx.Info("Task channel was closed, quitting consumer...") return } case event := <-eventChan: consumer.OnEvent(event) } }
这里若是拿到消息就去处理,当 ok
为 false
的时候表示 channel
已被关闭,能够退出整个处理循环了。同时咱们还在 redis queue
上支持了 pause/resume
,咱们原来在社交场景里大量使用这样的队列,能够通知 consumer
暂停和继续。
queue
,有了这些咱们就能够经过控制 producer/consumer
的数量来达到吞吐量的调优了func (q *Queue) Start() { q.startProducers(q.producerCount) q.startConsumers(q.consumerCount) q.producerRoutineGroup.Wait() close(q.channel) q.consumerRoutineGroup.Wait() }
这里须要注意的是,先要停掉 producer
,再去等 consumer
处理完。
到这里核心控制代码基本就讲完了,其实看起来仍是挺简单的,也能够到 https://github.com/tal-tech/go-zero/tree/master/core/queue 去看完整实现。
基本的使用流程:
producer
或 consumer
queue
对应到 queue
中,大体以下:
// 生产者建立工厂 producer := newMockedProducer() // 消费者建立工厂 consumer := newMockedConsumer() // 将生产者以及消费者的建立工厂函数传递给 NewQueue() q := queue.NewQueue(func() (Producer, error) { return producer, nil }, func() (Consumer, error) { return consumer, nil })
咱们看看 NewQueue
须要什么参数:
producer
工厂方法consumer
工厂方法将 producer & consumer
的工厂函数传递 queue
,由它去负责建立。框架提供了 Producer
和 Consumer
的接口以及工厂方法定义,而后整个流程的控制 queue
实现会自动完成。
message
咱们经过自定义一个 mockedProducer
来模拟:
type mockedProducer struct { total int32 count int32 // 使用waitgroup来模拟任务的完成 wait sync.WaitGroup } // 实现 Producer interface 的方法:Produce() func (p *mockedProducer) Produce() (string, bool) { if atomic.AddInt32(&p.count, 1) <= p.total { p.wait.Done() return "item", true } time.Sleep(time.Second) return "", false }
queue
中的生产者编写都必须实现:
Produce()
:由开发者编写生产消息的逻辑AddListener()
:添加事件 listener
message
咱们经过自定义一个 mockedConsumer
来模拟:
type mockedConsumer struct { count int32 } func (c *mockedConsumer) Consume(string) error { atomic.AddInt32(&c.count, 1) return nil }
queue
启动,而后验证咱们上述的生产者和消费者之间的数据是否传输成功:
func main() { // 建立 queue q := NewQueue(func() (Producer, error) { return newMockedProducer(), nil }, func() (Consumer, error) { return newMockedConsumer(), nil }) // 启动panic了也能够确保stop被执行以清理资源 defer q.Stop() // 启动 q.Start() }
以上就是 queue
最简易的实现示例。咱们经过这个 core/queue
框架实现了基于 redis
和 kafka
等的消息队列服务,在不一样业务场景中通过了充分的实践检验。你也能够根据本身的业务实际状况,实现本身的消息队列服务。
总体流程如上图:
channel
进行Producer
和 Consumer
的数量能够设定以匹配不一样业务需求Produce
和 Consume
具体实现由开发者定义,queue
负责总体流程本篇文章讲解了如何经过 channel
来平衡从队列中读取和处理消息的速度,以及如何实现一个通用的消息队列处理框架,并经过 mock
示例简单展现了如何基于 core/queue
实现一个消息队列处理服务。你能够经过相似的方式实现一个基于 rocketmq
等的消息队列处理服务。
关于 go-zero
更多的设计和实现文章,能够关注『微服务实践』公众号。
https://github.com/tal-tech/go-zero
欢迎使用 go-zero 并 star 支持咱们!
关注『微服务实践』公众号并点击 进群 获取社区群二维码。
go-zero 系列文章见『微服务实践』公众号