这周姐姐入职了新公司,老板想探探他的底,看了一眼他的简历,呦呵,精通kafka,这小姑娘有两下子,既然这样,那你写一个消息队列吧。由于要用go语言写,这可给姐姐愁坏了。赶忙来求助我,我这么坚贞不屈一人,在姐姐的软磨硬泡下仍是答应他了,因此接下来我就手把手教姐姐怎么写一个消息队列。下面咱们就来看一看我是怎么写的吧~~~。本代码已上传到个人github:git
有须要的小伙伴,可自行下载,顺便给个小星星吧~~~github
姐姐真是把我愁坏了,本身写的精通kafka
,居然不知道什么是消息队列,因而,一贯好脾气的我开始给姐姐讲一讲什么是消息队列。golang
消息队列,咱们通常称它为MQ(Message Queue)
,两个单词的结合,这两个英文单词想必你们都应该知道吧,其实最熟悉的仍是Queue
吧,即队列。队列是一种先进先出的数据结构,队列的使用仍是比较广泛的,可是已经有队列了,怎么还须要MQ
呢?面试
我:问你呢,姐姐,知道吗?为何还须要MQ
?姐姐:快点讲,想挨打呀?编程
我:噗。。。 算我多嘴,哼~~~设计模式
欠欠的我开始了接下来的耐心讲解......安全
举一个简单的例子,假设如今咱们要作一个系统,该登录系统须要在用户登录成功后,发送封邮件到用户邮箱进行提醒,需求仍是很简单的,咱们先开看一看没有MQ
,咱们该怎么实现呢?画一个时序图来看一看:数据结构
看这个图,邮件发送在请求登录时进行,当密码验证成功后,就发送邮件,而后返回登录成功。这样是能够的,可是他是有缺陷的。这让咱们的登录操做变得复杂了,每次请求登录都须要进行邮件发送,若是这里出现错误,整个登录请求也出现了错误,致使登录不成功;还有一个问题,原本咱们登录请求调用接口仅仅须要100ms,由于中间要作一次发送邮件的等待,那么调用一次登录接口的时间就要增加,这就是问题所在,一封邮件他的优先级 不是很高的,用户也不须要实时收到这封邮件,因此这时,就体现了消息队列的重要性了,咱们用消息队列进行改进一下。架构
这里咱们将发送邮件请求放到Mq
中,这样咱们就能提升用户体验的吞吐量,这个很重要,顾客就是上帝嘛,毕竟也没有人喜欢用一个很慢很慢的app。并发
这里只是举了MQ
众多应用中的其中一个,即异步应用,MQ
还在系统解藕、削峰/限流中有着重要应用,这两个我就不具体讲解了,原理都同样,好好思考一下,大家都能懂得。
好啦,姐姐终于知道什么是消息队列了,可是如今仍是无法进行消息队列开发的,由于还差一个知识点,即go语言中的channel
。这个很重要,咱们还须要靠这个来开发咱们的消息队列呢。
因篇幅有限,这里不详细介绍channel
,只介绍基本使用方法。
channel
Goroutine 和 Channel 是 Go 语言并发编程的两大基石。Goroutine 用于执行并发任务,Channel 用于 goroutine 之间的同步、通讯。Go提倡使用通讯的方法代替共享内存,当一个Goroutine须要和其余Goroutine资源共享时,Channel就会在他们之间架起一座桥梁,并提供确保安全同步的机制。channel
本质上其实仍是一个队列,遵循FIFO原则。具体规则以下:
建立通道须要用到关键字 make ,格式以下:
通道实例 := make(chan 数据类型)
Go语言中无缓冲的通道(unbuffered channel)是指在接收前没有能力保存任何值的通道。这种类型的通道要求发送 goroutine 和接收 goroutine 同时准备好,才能完成发送和接收操做。
无缓冲通道的定义方式以下:
通道实例 := make(chan 通道类型)
写个例子来帮助你们理解一下吧:
package main import ( "sync" "time" ) func main() { c := make(chan string) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() c <- `Golang梦工厂` }() go func() { defer wg.Done() time.Sleep(time.Second * 1) println(`Message: `+ <-c) }() wg.Wait() }
Go语言中有缓冲的通道(buffered channel)是一种在被接收前能存储一个或者多个值的通道。这种类型的通道并不强制要求 goroutine 之间必须同时完成发送和接收。通道会阻塞发送和接收动做的条件也会不一样。只有在通道中没有要接收的值时,接收动做才会阻塞。只有在通道没有可用缓冲区容纳被发送的值时,发送动做才会阻塞。
有缓冲通道的定义方式以下:
通道实例 := make(chan 通道类型, 缓冲大小)
来写一个例子讲解一下:
package main import ( "sync" "time" ) func main() { c := make(chan string, 2) var wg sync.WaitGroup wg.Add(2) go func() { defer wg.Done() c <- `Golang梦工厂` c <- `asong` }() go func() { defer wg.Done() time.Sleep(time.Second * 1) println(`公众号: `+ <-c) println(`做者: `+ <-c) }() wg.Wait() }
好啦,通道的概念就介绍到这里了,若是须要,下一篇我出一个channel
详细讲解的文章。
终于开始进入主题了,姐姐都听的快要睡着了,我轰隆一嗓子,立马精神,可是呢,asong也是挨了一顿小电炮,代价惨痛呀,呜呜呜............
在开始编写代码编写直接,我须要构思咱们的整个代码架构,这才是正确的编码方式。咱们先来定义一个接口,把咱们须要实现的方法先列出来,后期对每个代码进行实现就能够了。所以能够列出以下方法:
type Broker interface { publish(topic string, msg interface{}) error subscribe(topic string) (<-chan interface{}, error) unsubscribe(topic string, sub <-chan interface{}) error close() broadcast(msg interface{}, subscribers []chan interface{}) setConditions(capacity int) }
publish
:进行消息的推送,有两个参数即topic
、msg
,分别是订阅的主题、要传递的消息subscribe
:消息的订阅,传入订阅的主题,便可完成订阅,并返回对应的channel
通道用来接收数据unsubscribe
:取消订阅,传入订阅的主题和对应的通道close
:这个的做用就是很明显了,就是用来关闭消息队列的broadCast
:这个属于内部方法,做用是进行广播,对推送的消息进行广播,保证每个订阅者均可以收到setConditions
:这里是用来设置条件,条件就是消息队列的容量,这样咱们就能够控制消息队列的大小了细心的大家有没有发现什么问题,这些代码我都定义的是内部方法,也就是包外不可用。为何这么作呢,由于这里属于代理要作的事情,咱们还须要在封装一层,也就是客户端能直接调用的方法,这样才符合软件架构。所以能够写出以下代码:
package mq type Client struct { bro *BrokerImpl } func NewClient() *Client { return &Client{ bro: NewBroker(), } } func (c *Client)SetConditions(capacity int) { c.bro.setConditions(capacity) } func (c *Client)Publish(topic string, msg interface{}) error{ return c.bro.publish(topic,msg) } func (c *Client)Subscribe(topic string) (<-chan interface{}, error){ return c.bro.subscribe(topic) } func (c *Client)Unsubscribe(topic string, sub <-chan interface{}) error { return c.bro.unsubscribe(topic,sub) } func (c *Client)Close() { c.bro.close() } func (c *Client)GetPayLoad(sub <-chan interface{}) interface{}{ for val:= range sub{ if val != nil{ return val } } return nil }
上面只是准好了代码结构,可是消息队列实现的结构咱们尚未设计,如今咱们就来设计一下。
type BrokerImpl struct { exit chan bool capacity int topics map[string][]chan interface{} // key: topic value : queue sync.RWMutex // 同步锁 }
exit
:也是一个通道,这个用来作关闭消息队列用的capacity
:即用来设置消息队列的容量topics
:这里使用一个map结构,key便是topic
,其值则是一个切片,chan
类型,这里这么作的缘由是咱们一个topic能够有多个订阅者,因此一个订阅者对应着一个通道sync.RWMutex
:读写锁,这里是为了防止并发状况下,数据的推送出现错误,因此采用加锁的方式进行保证好啦,如今咱们已经准备的很充分啦,开始接下来方法填充之旅吧~~~
Publish
和broadcast
这里两个合在一块儿讲的缘由是braodcast
是属于publish
里的。这里的思路很简单,咱们只须要把传入的数据进行广播便可了,下面咱们来看代码实现:
func (b *BrokerImpl) publish(topic string, pub interface{}) error { select { case <-b.exit: return errors.New("broker closed") default: } b.RLock() subscribers, ok := b.topics[topic] b.RUnlock() if !ok { return nil } b.broadcast(pub, subscribers) return nil } func (b *BrokerImpl) broadcast(msg interface{}, subscribers []chan interface{}) { count := len(subscribers) concurrency := 1 switch { case count > 1000: concurrency = 3 case count > 100: concurrency = 2 default: concurrency = 1 } pub := func(start int) { for j := start; j < count; j += concurrency { select { case subscribers[j] <- msg: case <-time.After(time.Millisecond * 5): case <-b.exit: return } } } for i := 0; i < concurrency; i++ { go pub(i) } }
publish
方法中没有什么好讲的,这里主要说一下broadcast
的实现:
这里主要对数据进行广播,因此数据推送出去就能够了,不必一直等着他推送成功,因此这里咱们咱们采用goroutine
。在推送的时候,当推送失败时,咱们也不能一直等待呀,因此这里咱们加了一个超时机制,超过5毫秒就中止推送,接着进行下面的推送。
可能大家会有疑惑,上面怎么还有一个switch
选项呀,干什么用的呢?考虑这样一个问题,当有大量的订阅者时,,好比10000个,咱们一个for循环去作消息的推送,那推送一次就会耗费不少时间,而且不一样的消费者之间也会产生延时,,因此采用这种方法进行分解能够下降必定的时间。
subscribe
和 unsubScribe
咱们先来看代码:
func (b *BrokerImpl) subscribe(topic string) (<-chan interface{}, error) { select { case <-b.exit: return nil, errors.New("broker closed") default: } ch := make(chan interface{}, b.capacity) b.Lock() b.topics[topic] = append(b.topics[topic], ch) b.Unlock() return ch, nil } func (b *BrokerImpl) unsubscribe(topic string, sub <-chan interface{}) error { select { case <-b.exit: return errors.New("broker closed") default: } b.RLock() subscribers, ok := b.topics[topic] b.RUnlock() if !ok { return nil } // delete subscriber var newSubs []chan interface{} for _, subscriber := range subscribers { if subscriber == sub { continue } newSubs = append(newSubs, subscriber) } b.Lock() b.topics[topic] = newSubs b.Unlock() return nil }
这里其实就很简单了:
subscribe
:这里的实现则是为订阅的主题建立一个channel
,而后将订阅者加入到对应的topic
中就能够了,而且返回一个接收channel
。unsubScribe
:这里实现的思路就是将咱们刚才添加的channel
删除就能够了。close
func (b *BrokerImpl) close() { select { case <-b.exit: return default: close(b.exit) b.Lock() b.topics = make(map[string][]chan interface{}) b.Unlock() } return }
这里就是为了关闭整个消息队列,这句代码b.topics = make(map[string][]chan interface{})
比较重要,这里主要是为了保证下一次使用该消息队列不发生冲突。
setConditions
GetPayLoad
还差最后两个方法,一个是设置咱们的消息队列容量,另外一个是封装一个方法来获取咱们订阅的消息:
func (b *BrokerImpl)setConditions(capacity int) { b.capacity = capacity } func (c *Client)GetPayLoad(sub <-chan interface{}) interface{}{ for val:= range sub{ if val != nil{ return val } } return nil }
好啦,代码这么快就被写完了,接下来咱们进行测试一下吧。
正式测试以前,咱们仍是须要先进行一下单元测试,养成好的习惯,只有先自测了,才能有底气说个人代码没问题,要不直接跑程序,会出现不少bug
的。
这里咱们测试方法以下:咱们向不一样的topic
发送不一样的信息,当订阅者收到消息后,就行取消订阅。
func TestClient(t *testing.T) { b := NewClient() b.SetConditions(100) var wg sync.WaitGroup for i := 0; i < 100; i++ { topic := fmt.Sprintf("Golang梦工厂%d", i) payload := fmt.Sprintf("asong%d", i) ch, err := b.Subscribe(topic) if err != nil { t.Fatal(err) } wg.Add(1) go func() { e := b.GetPayLoad(ch) if e != payload { t.Fatalf("%s expected %s but get %s", topic, payload, e) } if err := b.Unsubscribe(topic, ch); err != nil { t.Fatal(err) } wg.Done() }() if err := b.Publish(topic, payload); err != nil { t.Fatal(err) } } wg.Wait() }
测试经过,没问题,接下来咱们在写几个方法测试一下
这里分为两种方式测试
测试一:使用一个定时器,向一个主题定时推送消息.
// 一个topic 测试 func OnceTopic() { m := mq.NewClient() m.SetConditions(10) ch,err :=m.Subscribe(topic) if err != nil{ fmt.Println("subscribe failed") return } go OncePub(m) OnceSub(ch,m) defer m.Close() } // 定时推送 func OncePub(c *mq.Client) { t := time.NewTicker(10 * time.Second) defer t.Stop() for { select { case <- t.C: err := c.Publish(topic,"asong真帅") if err != nil{ fmt.Println("pub message failed") } default: } } } // 接受订阅消息 func OnceSub(m <-chan interface{},c *mq.Client) { for { val := c.GetPayLoad(m) fmt.Printf("get message is %sn",val) } }
测试二:使用一个定时器,定时向多个主题发送消息:
//多个topic测试 func ManyTopic() { m := mq.NewClient() defer m.Close() m.SetConditions(10) top := "" for i:=0;i<10;i++{ top = fmt.Sprintf("Golang梦工厂_%02d",i) go Sub(m,top) } ManyPub(m) } func ManyPub(c *mq.Client) { t := time.NewTicker(10 * time.Second) defer t.Stop() for { select { case <- t.C: for i:= 0;i<10;i++{ //多个topic 推送不一样的消息 top := fmt.Sprintf("Golang梦工厂_%02d",i) payload := fmt.Sprintf("asong真帅_%02d",i) err := c.Publish(top,payload) if err != nil{ fmt.Println("pub message failed") } } default: } } } func Sub(c *mq.Client,top string) { ch,err := c.Subscribe(top) if err != nil{ fmt.Printf("sub top:%s failedn",top) } for { val := c.GetPayLoad(ch) if val != nil{ fmt.Printf("%s get message is %sn",top,val) } } }
终于帮助姐姐解决了这个问题,姐姐开心死了,给我一顿亲,啊不对,是一顿夸,夸的人家都很差意思了。
这一篇你学会了吗?没学会没关系,赶快去把源代码下载下来,好好通读一下,很好理解的~~~。
其实这一篇是为了接下来的kafka学习打基础的,学好了这一篇,接下来学习的kafka就会容易不少啦~~~
github地址: https://github.com/asong2020/...若是能给一个小星星就行了~~~
结尾给你们发一个小福利吧,最近我在看[微服务架构设计模式]这一本书,讲的很好,本身也收集了一本PDF,有须要的小伙能够到自行下载。获取方式:关注公众号:[Golang梦工厂],后台回复:[微服务],便可获取。
我翻译了一份GIN中文文档,会按期进行维护,有须要的小伙伴后台回复[gin]便可下载。
我是asong,一名普普统统的程序猿,让我一块儿慢慢变强吧。我本身建了一个golang
交流群,有须要的小伙伴加我vx
,我拉你入群。欢迎各位的关注,咱们下期见~~~
推荐往期文章: