golang微服务框架go-micro 入门笔记2.3 micro工具之消息订阅和发布

本章节阐述micro消息订阅和发布相关内容git

阅读本文前你可能须要进行以下知识储备

broker代理

微服务之间须要经过broker来传递消息,go-micro支持http/nats/memory三种broker,其中http是默认的broker。github

同时,go-micro以强大的插件形式,提供以下几种常见的broker。golang

 $ls

gocloud/  googlepubsub/  grpc/  kafka/  mqtt/  nats/  nsq/  proxy/  rabbitmq/  redis/  snssqs/  sqs/  stan/  stomp/

复制代码

http

HTTP Broker 是基于HTTP的异步broker,源代码在github.com\micro\go-micro@v1.9.1\broker\broker.go中,默认DefaultBroker为httpweb

var (
	DefaultBroker Broker = newHttpBroker()
)

复制代码

httpbroker实际上就是一个结构体redis

type httpBroker struct {

	id      string  //微服务ID
	address string //主机地址
	opts    Options //一些配置
	mux *http.ServeMux  //经过这个监听其余端发送的http请求
	c *http.Client  //经过这个发送请求到其余端
	r registry.Registry 
	sync.RWMutex
	subscribers map[string][]*httpSubscriber  //订阅
	running     bool
	exit        chan chan error
	// offline message inbox
	mtx   sync.RWMutex 
	inbox map[string][][]byte   //数据缓存

}

复制代码

经过http.Client发送请求,经过http.ServeMux实现请求监听,经过inbox存储数据shell

redis

redis初始化代码以下json

//main.go

//初始化URL格式redis://密码@主机:端口/

b := redis.NewBroker(
		broker.Addrs("redis://user:secret@localhost:6379/"),
		)

//初始化

b.Init()

//链接

b.Connect()

// 新建service

service := grpc.NewService(
		micro.Name("go.micro.web.config"),
		micro.Version("latest"),
		micro.Broker(b),

	)

//初始化service
service.Init()
//启动,运行,监听
service.Run()

复制代码

启动应用程序须要指定broker为redisapi

go run main.go --broker=redis

复制代码

grpc

初始化过程以下缓存

//main.go

import (
	"github.com/micro/go-plugins/broker/grpc"
)



// 创建链接

b := grpc.NewBroker()
b.Init()
b.Connect()

// 订阅事件
sub, _ := b.Subscribe("events")
defer sub.Unsubscribe()

// 发布事件
b.Publish("events", &broker.Message{

	Headers: map[string]string{"type": "event"},

	Body: []byte(`an event`),

})

复制代码

启动应用程序须要指定broker为grpcbash

go run main.go --broker=grpc

复制代码

rabbitmq

初始化过程以下

//main.go

import (

	"github.com/micro/go-plugins/broker/grpc"

)

b := rabbitmq.NewBroker(

		broker.Addrs("amqp://用户名:密码@主机host:端口port"),

	)



	b.Init()

	b.Connect()

复制代码

启动应用程序须要指定broker为rabbitmq

go run main.go plugin.go --broker=rabbitmq

复制代码

mqtt

初始化过程以下

//main.go

import (

	"github.com/micro/go-micro"

	"github.com/micro/go-plugins/broker/mqtt"

)

func main() {

	service := micro.NewService(

		micro.Name("my.service"),

		micro.Broker(mqtt.NewBroker()),

	)

	//...

}

复制代码

启动应用程序须要指定broker为mqtt

go run main.go plugin.go --broker=mqtt

复制代码

其余

其余能够阅读代码

$GOPATH/src/github.com/micro/go-plugins/broker

复制代码

消息订阅和发布

经过micro.RegisterSubscriber实现消息订阅

消息订阅主要API接口以下,第一个参数标识消息主题,第二个参数表示服务实例。

// Register Struct as Subscriber

micro.RegisterSubscriber("go.micro.srv.testsrv", service.Server(), new(subscriber.Testsrv))

// Register Function as Subscriber

micro.RegisterSubscriber("go.micro.srv.testsrv", service.Server(), subscriber.Handler)

复制代码

重点注意第三个参数,第三个参数是处理函数,能够是函数,也能够是实现了

func Handler(ctx context.Context, msg *testsrv.Message) error方法的结构体,micro内部会根据参数类型自动适配。结构体中能够实现多个func Handler(ctx context.Context, msg *testsrv.Message) error类型方法

经过broker.Subscribe实现订阅

Broker提供以下接口

type Broker interface {

	Init(...Option) error

	Options() Options

	Address() string

	Connect() error

	Disconnect() error

	Publish(topic string, m *Message, opts ...PublishOption) error

	Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)

	String() string

}

复制代码
  • Subscribe 订阅事件,topic表明主题,h事件处理函数

  • Publish 发布事件

消息处理函数Handler 定义

在上述涉及处处理函数handler,具体含义以下

type Handler func(Event) error // Event is given to a subscription handler for processing type Event interface {

	Topic() string

	Message() *Message

	Ack() error

}



type Message struct {

	Header map[string]string

	Body   []byte

}

复制代码

经过broker.Publish实现发布

举例以下

// 创建链接

b := grpc.NewBroker()

b.Init()

b.Connect()
// 订阅事件

sub, _ := b.Subscribe("events")

defer sub.Unsubscribe()

// 发布事件

b.Publish("events", &broker.Message{

	Headers: map[string]string{"type": "event"},

	Body: []byte(`an event`),

})

复制代码

经过micro publish实现发布

举例以下

micro publish "go.micro.web.config" "hello"

复制代码

实战和代码

效果

下载代码broker.zip 解压到techidea8.com/microapp/broker下运行,效果图忑

效果

  • 发布消息须要注意json格式字符串
micro publish go.micro.srv.broker "{\"say\":\"这是测试消息\"}"
复制代码

得到代码

关注公众号betaidea回复micro-broker便可得到

公众号betaidea

推荐阅读

扫微信二维码实现网站登录提供体验地址和源代码

开源项目golang go语言后台管理框架restgo-admin

支持手势触摸,可左右滑动的日历插件

你必须知道的18个互联网业务模型

相关文章
相关标签/搜索