一文带你理解最简消息队列实现

最近在看公司的 redis queue 时,发现底层使用的是 go-zeroqueue 。本篇文章来看看 queue 的设计,也但愿能够从里面了解到 mq 的最小型设计实践。git

使用

结合其余 mq 的使用经历,基本的使用流程:github

  1. 建立 producerconsumer
  2. 启动 mq
  3. 生产消息/消费消息

对应到 queue 中,大体也是这个:redis

建立 queue

// 生产者建立工厂
producer := newMockedProducer()
// 消费者建立工厂
consumer := newMockedConsumer()
// 将生产者以及消费者的建立工厂函数传递给 NewQueue()
q := queue.NewQueue(func() (Producer, error) {
  return producer, nil
}, func() (Consumer, error) {
  return consumer, nil
})

咱们看看 NewQueue 须要什么构建条件:微信

  1. producer constructor
  2. consumer constructor

将双方的工厂函数传递给 queue ,由它去执行以及重试。架构

这两个须要的目的是将生产者/消费者的构建和消息生产/消费都封装在 mq 中,并且将生产者/消费者的整套逻辑交给开发者处理:函数

type (
    // 开发者须要实现此接口
    Producer interface {
        AddListener(listener ProduceListener)
        Produce() (string, bool)
    }
    ...
    // ProducerFactory定义了生成Producer的方法
    ProducerFactory func() (Producer, error)
)
  1. 其实也就是将生产者的逻辑交个开发者本身完成,mq 只负责生产者/消费者的消息传递和之间的调度。
  2. 工厂方法的设计,是将生产者自己和生产消息,这两个任务都交给 queue 本身来作调度或者重试。

生产msg

生产消息固然要回到生产者自己:微服务

type mockedProducer struct {
    total int32
    count int32
  // 使用waitgroup来模拟任务的完成
    wait  sync.WaitGroup
}
// 实现 Producer interface 的方法:Produce()
func (p *mockedProducer) Produce() (string, bool) {
    if atomic.AddInt32(&p.count, 1) <= p.total {
        p.wait.Done()
        return "item", true
    }
    time.Sleep(time.Second)
    return "", false
}

queue 中的生产者编写都必须实现:atom

  • Produce():由开发者编写生产消息的逻辑
  • AddListener():生产者

消费msg

和生产者相似:spa

type mockedConsumer struct {
    count  int32
}

func (c *mockedConsumer) Consume(string) error {
    atomic.AddInt32(&c.count, 1)
    return nil
}

启动 queue

启动,而后验证咱们上述的生产者和消费者之间的数据是否传输成功:设计

func TestQueue(t *testing.T) {
    producer := newMockedProducer(rounds)
    consumer := newMockedConsumer()
    // 建立 queue
    q := NewQueue(func() (Producer, error) {
        return producer, nil
    }, func() (Consumer, error) {
        return consumer, nil
    })
    // 当生产者生产完毕,执行 Stop() 关闭生产端生产
    go func() {
        producer.wait.Wait()
    // mq生产端中止生产,不是mq自己 Stop 运行
        q.Stop()
    }()
    // 启动
    q.Start()
    // 验证生产消费端是否消息消费完成
    assert.Equal(t, int32(rounds), atomic.LoadInt32(&consumer.count))
}

以上就是 queue 最简易的入门使用代码。开发者能够根据本身的业务实际状况:自由定义生产者/消费者已经生产/消费逻辑。

总体设计

image-20210506224102836

总体流程如上图:

  1. 全体的通讯都由 channel 进行
  2. 经过加入监听器 listener ,以及事件触发 event ,至关于将触发器逻辑分离出来
  3. 生产者有 produceone ,这个是生产消息的逻辑,可是其中的 Produce() 是由开发者编写【上面的 interface 中正是这个函数】
  4. 同理消费者,Consume()

基本的消息流动就入上图以及上述描写的,具体的代码分析咱们就留到下一篇,咱们😁分析里面,尤为是如何控制 channel 是整个设计的核心。

总结

本篇文章从使用以及整个架构分析上简略介绍了 queue 的设计。下篇咱们将深刻源码,分析内部消息流转以及 channel 控制。

关于 go-zero 更多的设计和实现文章,能够持续关注咱们。欢迎你们去关注和使用。

项目地址

https://github.com/tal-tech/go-zero

欢迎使用 go-zero 并 star 支持咱们!

微信交流群

关注『微服务实践』公众号并回复 进群 获取社区群二维码。

go-zero 系列文章见『微服务实践』公众号