微服务之间是相互独立的,不像单个工程同样各个模块之间能够直接经过方法调用实现通讯,相互独立的服务直接通常的通讯方式是使用 HTTP协议
、rpc协议
或者使用消息中间件如RabbitMQ``Kafka
等html
在这篇文章 使用Golang和MongoDB构建微服务 已经实现了一个微服务的应用,在文章中已经实现了各个服务直接的通讯,是使用的 HTTP
的形式 ,那各个服务之间如何经过 RabbitMQ
进行消息通讯呢,咱们如今要实现一个功能,就是一个用户预订电影票的接口,须要服务 User Service(port 8000) 和 服务 **Booking Service(port 8003)**之间通讯,用户预订以后,把预订信息写入到 booking的数据库中git
RabbitMQ
安装 RabbitMQ
以前须要先安装 Erlang 的环境 ,而后下载安装RabbitMQ ,请选择对应的版本,安装完成以后,RabbitMQ在Windows上是做为一个服务在后台运行,关于 RabbitMQ
的接口如何使用,请参考官网的 教程,有各个主流语言的实现咱们使用的是Go
版本,请下载对应的实现接口 go get github.com/streadway/amqp
github
对RabbitMQ
的接口作一下简单的封装web
messaging/message.go
数据库
type IMessageClient interface {
ConnectToBroker(connectionStr string) error
PublishToQueue(data []byte, queueName string) error
SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error
Close()
}
type MessageClient struct {
conn *amqp.Connection
}
复制代码
func (m *MessageClient) ConnectToBroker(connectionStr string) error {
if connectionStr == "" {
panic("the connection str mustnt be null")
}
var err error
m.conn, err = amqp.Dial(connectionStr)
return err
}
复制代码
func (m *MessageClient) PublishToQueue(body []byte, queueName string) error {
if m.conn == nil {
panic("before publish you must connect the RabbitMQ first")
}
ch, err := m.conn.Channel()
defer ch.Close()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
queueName,
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
},
)
failOnError(err, "Failed to publish a message")
return nil
}
复制代码
func (m *MessageClient) SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error {
ch, err := m.conn.Channel()
//defer ch.Close()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
queueName,
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
go consumeLoop(msgs, handlerFunc)
return nil
}
复制代码
在 User Service中定义一个新的POST
接口 /user/{name}/booking
,实现用户的预订功能,预订以后,经过RabbitMQ
发布一个消息给 Booking Service,Booking Service接收到消息以后,作相应的处理(写入数据库)json
MessageClient
users/controllers/user.go
bash
var client messaging.IMessageClient
func init() {
client = &messaging.MessageClient{}
err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println("connect to rabbitmq error", err)
}
}
复制代码
routes.go
app
register("POST", "/user/{name}/booking", controllers.NewBooking, nil)
复制代码
users/controllers/user.go
微服务
func NewBooking(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
user_name := params["name"]
defer r.Body.Close()
var bookings models.Booking
body, _ := ioutil.ReadAll(r.Body)
err := json.Unmarshal(body, &bookings)
if err != nil {
fmt.Println("the format body error ", err)
}
fmt.Println("user name:", user_name, bookings)
go notifyMsg(body)
}
复制代码
func notifyMsg(body []byte) {
err := client.PublishToQueue(body, "new_booking")
if err != nil {
fmt.Println("Failed to publis message", err)
}
}
复制代码
var client messaging.IMessageClient
func initMessage() {
client = &messaging.MessageClient{}
err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println("Failed to connect to RabbitMQ", err)
}
err = client.SubscribeToQueue("new_booking", getBooking)
if err != nil {
fmt.Println("Failed to comsuer the msg", err)
}
}
复制代码
在 web服务以前启动oop
func main() {
initMessage()
r := routes.NewRouter()
http.ListenAndServe(":8003", r)
}
复制代码
func getBooking(delivery amqp.Delivery) {
var booking models.Booking
json.Unmarshal(delivery.Body, &booking)
booking.Id = bson.NewObjectId().Hex()
dao.Insert("Booking", "BookModel", booking)
fmt.Println("the booking msg", booking)
}
复制代码
验证,须要启动 User Service 和 Booking Service
使用 Postman
发送对应的数据
post 127.0.0.1:8000/user/kevin_woo/booking
{
"name":"kevin_woo",
"books":[
{
"date":"20180727",
"movies":["5b4c45d49d5e3e33c4a5b97a"]
},
{
"date":"20180810",
"movies":["5b4c45ea9d5e3e33c4a5b97b"]
}
]
}
复制代码
能够看到数据库已经有了一条新的预订信息
说明,我这里POST的数据就是booking数据库中的结构,实际状况须要对数据进行封装处理,在POST数据时,没有对数据进行验证, 在实际开发过程当中须要对各个数据作相应的验证,这里主要是看一下 RabbitMQ的消息传递处理的过程