使用go
语言基于redis
写了一个简单的消息队列
源码地址
使用demogit
redis的 list
很是的灵活,能够从左边或者右边添加元素,固然也以从任意一头读取数据github
添加数据和获取数据的操做也是很是简单的 LPUSH
从左边插入数据 RPUSH
大右边插入数据 LPOP
从左边取出一个数据 RPOP
从右边取出一个数据redis
127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"
复制代码
或者使用 BLPOP
BRPOP
来读取数据,不一样之处是取数据时,若是没有数据会等待指定的时间, 若是这期间有数据写入,则会读取并返回,没有数据则会返回空 在一个窗口1
读取json
127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"
复制代码
在另外一个窗口2
写入bash
127.0.0.1:6379> RPUSH list1 a b c
(integer) 3
复制代码
再开一个窗口3
读取,第二次读取时,list
是空的,因此等待1秒后返回空。并发
127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"
127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)
复制代码
若是咱们只从一边新增元素,向另外一边取出元素,这就不是一个消息队列么。但我估计你会有一个疑问,在消费数据时,同一个消息会不会同时被多个consumer
消费掉? ui
固然不会,由于redis是单线程的,在从list
取数据时自然不会出现并发问题。可是这是一个简单的消息队列,消费不成功怎么处理仍是须要咱们本身写代码来实现的spa
下面我说一下使用list
实现一个简单的消息队列的总体思路线程
consumer
主要作的就是从list里读取数据,使用LPOP
或者BLPOP
均可以, 这里作了一个开关 options
的UseBLopp
若是为true
时会使用BLPOP
。code
type consumer struct {
once sync.Once
redisCmd redis.Cmdable
ctx context.Context
topicName string
handler Handler
rateLimitPeriod time.Duration
options ConsumerOptions
_ struct{}
}
type ConsumerOptions struct {
RateLimitPeriod time.Duration
UseBLPop bool
}
复制代码
看一下建立consumer
的代码,最后面的opts
参数是可选的配置
type Consumer = *consumer
func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
consumer := &consumer{
redisCmd: redisCmd,
ctx: ctx,
topicName: topicName,
}
for _, o := range opts {
o(&consumer.options)
}
if consumer.options.RateLimitPeriod == 0 {
consumer.options.RateLimitPeriod = time.Microsecond * 200
}
return consumer
}
复制代码
读取数据后具体怎么进行处理调用者能够根据本身的业务逻辑进行相应处理 有一个小的interface
调用者根据本身的逻辑去实现
type Handler interface {
HandleMessage(msg *Message)
}
复制代码
读取数据的逻辑使用一个gorouting实现
func (s *consumer) startGetMessage() {
go func() {
ticker := time.NewTicker(s.options.RateLimitPeriod)
defer func() {
log.Println("stop get message.")
ticker.Stop()
}()
for {
select {
case <-s.ctx.Done():
log.Printf("context Done msg: %#v \n", s.ctx.Err())
return
case <-ticker.C:
var revBody []byte
var err error
if !s.options.UseBLPop {
revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
} else {
revs := s.redisCmd.BLPop(time.Second, s.topicName)
err = revs.Err()
revValues := revs.Val()
if len(revValues) >= 2 {
revBody = []byte(revValues[1])
}
}
if err == redis.Nil {
continue
}
if err != nil {
log.Printf("LPOP error: %#v \n", err)
continue
}
if len(revBody) == 0 {
continue
}
msg := &Message{}
json.Unmarshal(revBody, msg)
if s.handler != nil {
s.handler.HandleMessage(msg)
}
}
}
}()
}
复制代码
Producer
仍是很简单的就是把数据推送到 reids
type Producer struct {
redisCmd redis.Cmdable
_ struct{}
}
func NewProducer(cmd redis.Cmdable) *Producer {
return &Producer{redisCmd: cmd}
}
func (p *Producer) Publish(topicName string, body []byte) error {
msg := NewMessage("", body)
sendData, _ := json.Marshal(msg)
return p.redisCmd.RPush(topicName, string(sendData)).Err()
}
复制代码