micro.newService()中newOptionsnode
func newOptions(opts ...Option) Options { opt := Options{ Auth: auth.DefaultAuth, Broker: broker.DefaultBroker, Cmd: cmd.DefaultCmd, Config: config.DefaultConfig, Client: client.DefaultClient, Server: server.DefaultServer, Store: store.DefaultStore, Registry: registry.DefaultRegistry, Router: router.DefaultRouter, Runtime: runtime.DefaultRuntime, Transport: transport.DefaultTransport, Context: context.Background(), Signal: true, } for _, o := range opts { o(&opt) } return opt }
初始化了一堆基础设置,先来看看Broker broker.DefaultBroker,
在broker/broker.go中 DefaultBroker Broker = NewBroker()
web
// NewBroker returns a new http broker func NewBroker(opts ...Option) Broker { return newHttpBroker(opts...) } func newHttpBroker(opts ...Option) Broker { options := Options{ Codec: json.Marshaler{}, Context: context.TODO(), Registry: registry.DefaultRegistry, } for _, o := range opts { o(&options) } // set address addr := DefaultAddress if len(options.Addrs) > 0 && len(options.Addrs[0]) > 0 { addr = options.Addrs[0] } h := &httpBroker{ id: uuid.New().String(), address: addr, opts: options, r: options.Registry, c: &http.Client{Transport: newTransport(options.TLSConfig)}, subscribers: make(map[string][]*httpSubscriber), exit: make(chan chan error), mux: http.NewServeMux(), inbox: make(map[string][][]byte), } // specify the message handler h.mux.Handle(DefaultPath, h) // get optional handlers if h.opts.Context != nil { handlers, ok := h.opts.Context.Value("http_handlers").(map[string]http.Handler) if ok { for pattern, handler := range handlers { h.mux.Handle(pattern, handler) } } } return h }
这里作了几件事json
h.mux.Handle(DefaultPath, h)
h就是httpBroker,在httpBroker中实现了ServeHTTP(),则全部请求都经过他来处理,即全部订阅的消息处理都是经过httpBroker.ServeHTTP()来处理的segmentfault
下面看看example/broker, 一个broker的示例数组
var ( topic = "go.micro.topic.foo" ) func pub() { tick := time.NewTicker(time.Second) i := 0 for _ = range tick.C { msg := &broker.Message{ Header: map[string]string{ "id": fmt.Sprintf("%d", i), }, Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())), } if err := broker.Publish(topic, msg); err != nil { log.Printf("[pub] failed: %v", err) } else { fmt.Println("[pub] pubbed message:", string(msg.Body)) } i++ } } func sub() { _, err := broker.Subscribe(topic, func(p broker.Event) error { fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) return nil }) if err != nil { fmt.Println(err) } } func main() { cmd.Init() if err := broker.Init(); err != nil { log.Fatalf("Broker Init error: %v", err) } if err := broker.Connect(); err != nil { log.Fatalf("Broker Connect error: %v", err) } go pub() go sub() <-time.After(time.Second * 10) }
cmd.init()请见micro cmd篇,有详细介绍
cmd.opts.Broker默认使用的是上文分析的httpBroker
先看broker.Init()
app
func (h *httpBroker) Init(opts ...Option) error { h.RLock() if h.running { h.RUnlock() return errors.New("cannot init while connected") } h.RUnlock() h.Lock() defer h.Unlock() for _, o := range opts { o(&h.opts) } if len(h.opts.Addrs) > 0 && len(h.opts.Addrs[0]) > 0 { h.address = h.opts.Addrs[0] } if len(h.id) == 0 { h.id = "go.micro.http.broker-" + uuid.New().String() } // get registry reg := h.opts.Registry if reg == nil { reg = registry.DefaultRegistry } // get cache if rc, ok := h.r.(cache.Cache); ok { rc.Stop() } // set registry h.r = cache.New(reg) // reconfigure tls config if c := h.opts.TLSConfig; c != nil { h.c = &http.Client{ Transport: newTransport(c), } } return nil }
这里作了如下几件事情异步
上读写锁,在进行后面操做async
再看broker.Connect()
tcp
func (h *httpBroker) Connect() error { h.RLock() if h.running { h.RUnlock() return nil } h.RUnlock() h.Lock() defer h.Unlock() var l net.Listener var err error if h.opts.Secure || h.opts.TLSConfig != nil { config := h.opts.TLSConfig fn := func(addr string) (net.Listener, error) { if config == nil { hosts := []string{addr} // check if its a valid host:port if host, _, err := net.SplitHostPort(addr); err == nil { if len(host) == 0 { hosts = maddr.IPs() } else { hosts = []string{host} } } // generate a certificate cert, err := mls.Certificate(hosts...) if err != nil { return nil, err } config = &tls.Config{Certificates: []tls.Certificate{cert}} } return tls.Listen("tcp", addr, config) } l, err = mnet.Listen(h.address, fn) } else { fn := func(addr string) (net.Listener, error) { return net.Listen("tcp", addr) } l, err = mnet.Listen(h.address, fn) } if err != nil { return err } addr := h.address h.address = l.Addr().String() go http.Serve(l, h.mux) go func() { h.run(l) h.Lock() h.opts.Addrs = []string{addr} h.address = addr h.Unlock() }() // get registry reg := h.opts.Registry if reg == nil { reg = registry.DefaultRegistry } // set cache h.r = cache.New(reg) // set running h.running = true return nil }
这里作了如下几件事情函数
上读写锁,在进行后面操做
newHttpBroker()
中指定的handle函数ServeHTTP()
( 标准库http提供了Handler接口,用于开发者实现本身的handler。只要实现接口的ServeHTTP方法便可。)看看刚才的run()
作了什么
func (h *httpBroker) run(l net.Listener) { t := time.NewTicker(registerInterval) defer t.Stop() for { select { // heartbeat for each subscriber case <-t.C: h.RLock() for _, subs := range h.subscribers { for _, sub := range subs { _ = h.r.Register(sub.svc, registry.RegisterTTL(registerTTL)) } } h.RUnlock() // received exit signal case ch := <-h.exit: ch <- l.Close() h.RLock() for _, subs := range h.subscribers { for _, sub := range subs { _ = h.r.Deregister(sub.svc) } } h.RUnlock() return } } }
这里作了如下几件事情
Deregister()
注销subscribers中的订阅接下来看看怎么订阅消息broker.Subscribe()
func (h *httpBroker) Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscriber, error) { var err error var host, port string options := NewSubscribeOptions(opts...) // parse address for host, port host, port, err = net.SplitHostPort(h.Address()) if err != nil { return nil, err } addr, err := maddr.Extract(host) if err != nil { return nil, err } var secure bool if h.opts.Secure || h.opts.TLSConfig != nil { secure = true } // register service node := ®istry.Node{ Id: topic + "-" + h.id, Address: mnet.HostPort(addr, port), Metadata: map[string]string{ "secure": fmt.Sprintf("%t", secure), "broker": "http", "topic": topic, }, } // check for queue group or broadcast queue version := options.Queue if len(version) == 0 { version = broadcastVersion } service := ®istry.Service{ Name: serviceName, Version: version, Nodes: []*registry.Node{node}, } // generate subscriber subscriber := &httpSubscriber{ opts: options, hb: h, id: node.Id, topic: topic, fn: handler, svc: service, } // subscribe now if err := h.subscribe(subscriber); err != nil { return nil, err } // return the subscriber return subscriber, nil } func (h *httpBroker) subscribe(s *httpSubscriber) error { h.Lock() defer h.Unlock() if err := h.r.Register(s.svc, registry.RegisterTTL(registerTTL)); err != nil { return err } h.subscribers[s.topic] = append(h.subscribers[s.topic], s) return nil }
这里作了如下几件事情
调用subscribe(),返回subscriber
获取消息经过调用Subscribe时传递的处理函数,示例以下
broker.Subscribe(topic, func(p broker.Event) error { fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) return nil })
Event及httpEvent定义
// Event is given to a subscription handler for processing type Event interface { Topic() string Message() *Message Ack() error Error() error } type httpEvent struct { m *Message t string err error } func (h *httpEvent) Ack() error { return nil } func (h *httpEvent) Error() error { return h.err } func (h *httpEvent) Message() *Message { return h.m } func (h *httpEvent) Topic() string { return h.t }
能够看到httpEvent实现了Event,这样p.Message()就能够获得消息了
获取消息在ServeHTTP()中,收到消息,调用传入的fn处理便可
下面再看发布消息,只须要定义broker.Message
,再调用broker.Publish()
便可,示例以下
msg := &broker.Message{ Header: map[string]string{ "id": fmt.Sprintf("%d", i), }, Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())), } if err := broker.Publish(topic, msg); err != nil { log.Printf("[pub] failed: %v", err) } else { fmt.Println("[pub] pubbed message:", string(msg.Body)) }
看看broker.Publish()
func (h *httpBroker) Publish(topic string, msg *Message, opts ...PublishOption) error { // create the message first m := &Message{ Header: make(map[string]string), Body: msg.Body, } for k, v := range msg.Header { m.Header[k] = v } m.Header["Micro-Topic"] = topic // encode the message b, err := h.opts.Codec.Marshal(m) if err != nil { return err } // save the message h.saveMessage(topic, b) // now attempt to get the service h.RLock() s, err := h.r.GetService(serviceName) if err != nil { h.RUnlock() return err } h.RUnlock() pub := func(node *registry.Node, t string, b []byte) error { scheme := "http" // check if secure is added in metadata if node.Metadata["secure"] == "true" { scheme = "https" } vals := url.Values{} vals.Add("id", node.Id) uri := fmt.Sprintf("%s://%s%s?%s", scheme, node.Address, DefaultPath, vals.Encode()) r, err := h.c.Post(uri, "application/json", bytes.NewReader(b)) if err != nil { return err } // discard response body io.Copy(ioutil.Discard, r.Body) r.Body.Close() return nil } srv := func(s []*registry.Service, b []byte) { for _, service := range s { var nodes []*registry.Node for _, node := range service.Nodes { // only use nodes tagged with broker http if node.Metadata["broker"] != "http" { continue } // look for nodes for the topic if node.Metadata["topic"] != topic { continue } nodes = append(nodes, node) } // only process if we have nodes if len(nodes) == 0 { continue } switch service.Version { // broadcast version means broadcast to all nodes case broadcastVersion: var success bool // publish to all nodes for _, node := range nodes { // publish async if err := pub(node, topic, b); err == nil { success = true } } // save if it failed to publish at least once if !success { h.saveMessage(topic, b) } default: // select node to publish to node := nodes[rand.Int()%len(nodes)] // publish async to one node if err := pub(node, topic, b); err != nil { // if failed save it h.saveMessage(topic, b) } } } } // do the rest async go func() { // get a third of the backlog messages := h.getMessage(topic, 8) delay := (len(messages) > 1) // publish all the messages for _, msg := range messages { // serialize here srv(s, msg) // sending a backlog of messages if delay { time.Sleep(time.Millisecond * 100) } } }() return nil }
这里作了如下几件事情
Micro-Topic
,值是topic调用saveMessage()保存消息
数组大于64条则只取前64条重赋值给inbox,【`震惊,后面直接丢了!!!`】
httpBroker.r.GetService(serviceName)
获取service,serviceName默认"micro.http.broker"建立处理函数pub()
建立处理函数srv()
pub()
发送,失败则从新调用saveMessage()
建立协程
调用getMessage()
srv()
处理每条消息,若是刚才取出的消息大于1条,每次发送间隔Millisecond*100至此,整个broker的流程比较清晰了。
总结:
写到这里又要推荐你们看micro in action了
Micro In Action(四):Pub/Sub
Micro In Action(五):Message Broker
go micro 分析系列文章
go micro server 启动分析
go micro client
go micro broker
go micro cmd
go micro config
go micro store
go micro registry
go micro router
go micro runtime
go micro transport
go micro web
go micro registry 插件consul
go micro plugin
go micro jwt 网关鉴权
go micro 链路追踪
go micro 熔断与限流
go micro wrapper 中间件
go micro metrics 接入Prometheus、Grafana