最近在看公司的 redis queue
时,发现底层使用的是 go-zero
的 queue
。本篇文章来看看 queue
的设计,也但愿能够从里面了解到 mq
的最小型设计实践。git
结合其余 mq
的使用经历,基本的使用流程:github
producer
或 consumer
mq
对应到 queue
中,大体也是这个:redis
// 生产者建立工厂 producer := newMockedProducer() // 消费者建立工厂 consumer := newMockedConsumer() // 将生产者以及消费者的建立工厂函数传递给 NewQueue() q := queue.NewQueue(func() (Producer, error) { return producer, nil }, func() (Consumer, error) { return consumer, nil })
咱们看看 NewQueue
须要什么构建条件:微信
producer constructor
consumer constructor
将双方的工厂函数传递给 queue
,由它去执行以及重试。架构
这两个须要的目的是将生产者/消费者的构建和消息生产/消费都封装在 mq
中,并且将生产者/消费者的整套逻辑交给开发者处理:函数
type ( // 开发者须要实现此接口 Producer interface { AddListener(listener ProduceListener) Produce() (string, bool) } ... // ProducerFactory定义了生成Producer的方法 ProducerFactory func() (Producer, error) )
mq
只负责生产者/消费者的消息传递和之间的调度。queue
本身来作调度或者重试。生产消息固然要回到生产者自己:微服务
type mockedProducer struct { total int32 count int32 // 使用waitgroup来模拟任务的完成 wait sync.WaitGroup } // 实现 Producer interface 的方法:Produce() func (p *mockedProducer) Produce() (string, bool) { if atomic.AddInt32(&p.count, 1) <= p.total { p.wait.Done() return "item", true } time.Sleep(time.Second) return "", false }
queue
中的生产者编写都必须实现:atom
Produce()
:由开发者编写生产消息的逻辑AddListener()
:生产者和生产者相似:spa
type mockedConsumer struct { count int32 } func (c *mockedConsumer) Consume(string) error { atomic.AddInt32(&c.count, 1) return nil }
启动,而后验证咱们上述的生产者和消费者之间的数据是否传输成功:设计
func TestQueue(t *testing.T) { producer := newMockedProducer(rounds) consumer := newMockedConsumer() // 建立 queue q := NewQueue(func() (Producer, error) { return producer, nil }, func() (Consumer, error) { return consumer, nil }) // 当生产者生产完毕,执行 Stop() 关闭生产端生产 go func() { producer.wait.Wait() // mq生产端中止生产,不是mq自己 Stop 运行 q.Stop() }() // 启动 q.Start() // 验证生产消费端是否消息消费完成 assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count)) }
以上就是 queue
最简易的入门使用代码。开发者能够根据本身的业务实际状况:自由定义生产者/消费者已经生产/消费逻辑。
总体流程如上图:
channel
进行listener
,以及事件触发 event
,至关于将触发器逻辑分离出来produceone
,这个是生产消息的逻辑,可是其中的 Produce()
是由开发者编写【上面的 interface
中正是这个函数】Consume()
基本的消息流动就入上图以及上述描写的,具体的代码分析咱们就留到下一篇,咱们😁分析里面,尤为是如何控制 channel
是整个设计的核心。
本篇文章从使用以及整个架构分析上简略介绍了 queue
的设计。下篇咱们将深刻源码,分析内部消息流转以及 channel
控制。
关于 go-zero
更多的设计和实现文章,能够持续关注咱们。欢迎你们去关注和使用。
https://github.com/tal-tech/go-zero
欢迎使用 go-zero 并 star 支持咱们!
关注『微服务实践』公众号并回复 进群 获取社区群二维码。
go-zero 系列文章见『微服务实践』公众号