publish和subscribe模式使用场景仍是不少的,记得之前面试的时候有被问到若是要你作一个版本自动升级的,你会怎么作?当时正想在项目中引入etcd,而后考虑了下以为也能够用publish和subscribe的思路去解决这个问题。git
publish是gitlab产生的代码merge到master消息,能够考虑配置webhook来实现,固然用jenkins也没问题,在构建完成以后,jenkins经过curl发送最终的构建结果。github
subscribe是各个节点服务器须要安装的一个agent程序,这个agent程序watch本身关注的项目的更新,一旦发现更新以后,拉取最新版本的程序进行更新。web
publish能够经过长连接也能够经过http接口来实现,问题不大。subscribe若是要保证明时性,须要经过长连接来实现,若是对实时性要求不高,设定时间间隔轮训调用http接口也彻底没问题。面试
整个功能分红三个模块:json
简单画一个草图: bash
大体就这么个思路。而后考虑了下,好像整个模型代码写起来也不是很复杂,而后想就动手写起来了。服务器
首先是broker,broker要服务于publisher和subscriber,publisher能够采用标准的htt接口,subscriber能够采用websocket或者本身手写一个长链接,也能够用开源的一些长链接保活的库,像smux是我常常会用到的一个tcp库。websocket
websocket一般状况不用处理编解码的问题,可是还须要添加心跳保活,smux不须要处理心跳保活(库自己提供有心跳机制),可是编解码要本身另外处理。app
publisher能够不用实现,提供接口规范就行。curl
subscriber实现一个长链接的客户端,读到消息就打印消息内容便可(不考虑版本更新这一过程设计)
首先开始设计broker和subscribe的通讯协议 proto.go
package proto
const (
CMD_S2B_HEARTBEAT = iota
CMD_B2S_HEARTBEAT
CMD_B2S_MSG
)
type B2SBody S2BBody
type S2BBody struct {
Cmd int `json:"cmd"`
Data interface{} `json:"data"`
}
type S2BSubscribe struct {
Topics []string `json:"topics"`
}
type S2BHeartbeat struct{}
type B2SHeartbeat struct{}
复制代码
ok,而后能够开始写broker代码了,broker和subscriber最终采用websocket来进行, broker和publisher采用http协议。
import (
"encoding/json"
"io/ioutil"
"log"
"net/http"
"sync"
"github.com/ICKelin/pubsub/proto"
"github.com/gorilla/websocket"
)
type Broker struct {
addr string
muEntry sync.RWMutex
entry map[string][]*subscriber
done chan struct{}
}
func NewBroker(pubaddr, subaddr string) *Broker {
return &Broker{
addr: pubaddr,
entry: make(map[string][]*subscriber),
done: make(chan struct{}),
}
}
func (b *Broker) Run() {
http.HandleFunc("/pub", b.onPublish)
http.HandleFunc("/sub", b.onSubscriber)
http.ListenAndServe(b.addr, nil)
}
type pubBody struct {
Topic string `json:"topic"`
Msg interface{} `json:"msg"`
}
func (b *Broker) onPublish(w http.ResponseWriter, r *http.Request) {
bytes, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err)
return
}
body := &pubBody{}
err = json.Unmarshal(bytes, &body)
if err != nil {
log.Println(err)
return
}
topic := body.Topic
msg := body.Msg
log.Println("publish topic ", topic, msg)
b.muEntry.RLock()
subscribers := b.entry[topic]
b.muEntry.RUnlock()
if subscribers == nil {
// drop message once no subscriber
// TODO: store msg
return
}
for _, s := range subscribers {
s.push(msg)
}
}
type subscribeMsg struct {
Topics []string `json:"topics"`
}
func (b *Broker) onSubscriber(w http.ResponseWriter, r *http.Request) {
upgrade, err := websocket.Upgrade(w, r, nil, 1024, 1024)
if err != nil {
log.Println(err)
return
}
subMsg := proto.S2BSubscribe{}
upgrade.ReadJSON(&subMsg)
s := newSubscriber(subMsg.Topics, upgrade.RemoteAddr().String())
b.muEntry.Lock()
for _, topic := range s.topics {
b.entry[topic] = append(b.entry[topic], s)
}
b.muEntry.Unlock()
s.serveSubscriber(upgrade)
upgrade.Close()
b.muEntry.Lock()
for _, topic := range s.topics {
for i, s := range b.entry[topic] {
if s.raddr == upgrade.RemoteAddr().String() {
log.Println("remove subscriber: ", s.raddr, " from topic ", topic)
if i == len(b.entry[topic])-1 {
b.entry[topic] = b.entry[topic][:i]
} else {
b.entry[topic] = append(b.entry[topic][:i], b.entry[topic][i+1:]...)
}
break
}
}
}
b.muEntry.Unlock()
}
复制代码
broker的subscriber处理
import (
"log"
"time"
"github.com/ICKelin/pubsub/proto"
"github.com/gorilla/websocket"
)
type subscriber struct {
topics []string
raddr string
done chan struct{}
writebuf chan *proto.B2SBody
// parent string
// children []*subscriber
}
func newSubscriber(topics []string, raddr string) *subscriber {
return &subscriber{
topics: topics,
raddr: raddr,
done: make(chan struct{}),
writebuf: make(chan *proto.B2SBody),
}
}
func (s *subscriber) serveSubscriber(conn *websocket.Conn) {
go s.reader(conn)
s.writer(conn)
}
func (s *subscriber) reader(conn *websocket.Conn) {
defer close(s.done)
for {
var obj proto.S2BBody
err := conn.ReadJSON(&obj)
if err != nil {
log.Println(err)
break
}
switch obj.Cmd {
case proto.CMD_S2B_HEARTBEAT:
conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
conn.WriteJSON(&proto.S2BBody{Cmd: proto.CMD_B2S_HEARTBEAT})
conn.SetWriteDeadline(time.Time{})
}
}
}
func (s *subscriber) writer(conn *websocket.Conn) {
for {
select {
case buf := <-s.writebuf:
conn.SetWriteDeadline(time.Now().Add(time.Second * 5))
conn.WriteJSON(buf)
conn.SetWriteDeadline(time.Time{})
// drop msg once writejson fail
case <-s.done:
return
}
}
}
func (s *subscriber) push(data interface{}) {
s.writebuf <- &proto.B2SBody{Cmd: proto.CMD_B2S_MSG, Data: data}
}
复制代码
这两段代码还有几个很是明显的问题
为了解决上面三个问题,可能须要考虑数据持久化以及消息ack等机制,这个也是一大难题。
再来看看subscriber的实现,subscriber能够很简单实现,一个goroutine发心跳包,一个goroutine收数据包,收到的消息以后调用回调函数进行相应的处理。
package main
import (
"log"
"time"
"github.com/ICKelin/pubsub/proto"
"github.com/gorilla/websocket"
)
type subscriber struct {
topics []string
broker string
cb func(msg interface{})
}
func newSubscriber(topics []string, broker string, cb func(msg interface{})) *subscriber {
return &subscriber{
topics: topics,
broker: broker,
cb: cb,
}
}
func (s *subscriber) Run() error {
conn, _, err := websocket.DefaultDialer.Dial(s.broker, nil)
if err != nil {
return err
}
defer conn.Close()
subMsg := proto.S2BSubscribe{Topics: s.topics}
err = conn.WriteJSON(&subMsg)
if err != nil {
return err
}
go func() {
for {
body := &proto.S2BBody{
Cmd: proto.CMD_S2B_HEARTBEAT,
Data: &proto.S2BHeartbeat{},
}
conn.WriteJSON(body)
time.Sleep(time.Second * 3)
}
}()
for {
var body = proto.S2BBody{}
conn.ReadJSON(&body)
switch body.Cmd {
case proto.CMD_B2S_HEARTBEAT:
log.Println("hb from broker")
case proto.CMD_B2S_MSG:
s.cb(body.Data)
}
}
}
func main() {
s := newSubscriber([]string{"gtun", "https://www.notr.tech"}, "ws://127.0.0.1:10002/sub", func(msg interface{}) {
log.Println(msg)
})
s.Run()
}
复制代码