在咱们的生产环境中搭了两台rabbitmq, 前面架设了一台HAProxy作负载均衡,当咱们的客户端链接到HAProxy,而后由HAProxy负责将连接分配给其中一台rabbitmq,客户端须要须要负责断线重连,须要将获取的数据,分配消息给相应的处理方法,而后还须要回复给rabbitmq ACK,这其中客户端须要负责断线重连的逻辑是很重要的,由于有可能客户端和HAProxy的链接是正常的,可是HAProxy和rabbitmq的连接由于网络波动断开了,那么这个时候客户端实际上是没有工做的,而且会在rabbitmq中不断积累消息。mysql
下面的内容给出了一个比较完善的处理逻辑,以供参考。git
从以前的说明来看,这是一个典型的观察者模式,由RabbitMQ对象负责维护链接,获取消息,而后定义若干个接收者注册到RabbitMQ对象中,这时候RabbitMQ对象一旦收到了由RabbitMQ发来的数据,就能够将该消息分发到相应的接收者去处理,当接收者处理完成后告诉RabbitMQ对象消息消费成功,而后由RabbitMQ对象回复RabbitMQ ACK,固然能够在其中加上重试机制,接收者有可能由于某种状况处理失败,那么每隔必定的时间RabbitMQ对象须要从新调用一次接收者从新处理,直至成功,而后再返回ACK。github
先来看看基本的接口约定redis
// Receiver 观察者模式须要的接口 // 观察者用于接收指定的queue到来的数据 type Receiver interface { QueueName() string // 获取接收者须要监听的队列 RouterKey() string // 这个队列绑定的路由 OnError(error) // 处理遇到的错误,当RabbitMQ对象发生了错误,他须要告诉接收者处理错误 OnReceive([]byte) bool // 处理收到的消息, 这里须要告知RabbitMQ对象消息是否处理成功 }
这样就将接收者和RabbitMQ对象之间就解耦了,这样后期若是须要添加新的接收者那就很容易了。sql
下面来看一看RabbitMQ对象的定义:
这里用到的RabbitMQ client是RabbitMQ官方的 Github数据库
// RabbitMQ 用于管理和维护rabbitmq的对象 type RabbitMQ struct { wg sync.WaitGroup channel *amqp.Channel exchangeName string // exchange的名称 exchangeType string // exchange的类型 receivers []Receiver } // New 建立一个新的操做RabbitMQ的对象 func New() *RabbitMQ { // 这里能够根据本身的须要去定义 return &RabbitMQ{ exchangeName: ExchangeName, exchangeType: ExchangeType, } }
这里RabbitMQ对象须要初始化交换机,注册接收者并初始化接收者监听的Queue,以及断线重连的机制网络
// prepareExchange 准备rabbitmq的Exchange func (mq *RabbitMQ) prepareExchange() error { // 申明Exchange err := mq.channel.ExchangeDeclare( mq.exchangeName, // exchange mq.exchangeType, // type true, // durable false, // autoDelete false, // internal false, // noWait nil, // args ) if nil != err { return err } return nil } // run 开始获取链接并初始化相关操做 func (mq *RabbitMQ) run() { if !config.Global.RabbitMQ.Refresh() { log.Errorf("rabbit刷新链接失败,将要重连: %s", config.Global.RabbitMQ.URL) return } // 获取新的channel对象 mq.channel = config.Global.RabbitMQ.Channel() // 初始化Exchange mq.prepareExchange() for _, receiver := range mq.receivers { mq.wg.Add(1) go mq.listen(receiver) // 每一个接收者单独启动一个goroutine用来初始化queue并接收消息 } mq.wg.Wait() log.Errorf("全部处理queue的任务都意外退出了") // 理论上mq.run()在程序的执行过程当中是不会结束的 // 一旦结束就说明全部的接收者都退出了,那么意味着程序与rabbitmq的链接断开 // 那么则须要从新链接,这里尝试销毁当前链接 config.Global.RabbitMQ.Distory() } // Start 启动Rabbitmq的客户端 func (mq *RabbitMQ) Start() { for { mq.run() // 一旦链接断开,那么须要隔一段时间去重连 // 这里最好有一个时间间隔 time.Sleep(3 * time.Second) } }
// RegisterReceiver 注册一个用于接收指定队列指定路由的数据接收者 func (mq *RabbitMQ) RegisterReceiver(receiver Receiver) { mq.receivers = append(mq.receivers, receiver) } // Listen 监听指定路由发来的消息 // 这里须要针对每个接收者启动一个goroutine来执行listen // 该方法负责从每个接收者监听的队列中获取数据,并负责重试 func (mq *RabbitMQ) listen(receiver Receiver) { defer mq.wg.Done() // 这里获取每一个接收者须要监听的队列和路由 queueName := receiver.QueueName() routerKey := receiver.RouterKey() // 申明Queue _, err := mq.channel.QueueDeclare( queueName, // name true, // durable false, // delete when usused false, // exclusive(排他性队列) false, // no-wait nil, // arguments ) if nil != err { // 当队列初始化失败的时候,须要告诉这个接收者相应的错误 receiver.OnError(fmt.Errorf("初始化队列 %s 失败: %s", queueName, err.Error())) } // 将Queue绑定到Exchange上去 err = mq.channel.QueueBind( queueName, // queue name routerKey, // routing key mq.exchangeName, // exchange false, // no-wait nil, ) if nil != err { receiver.OnError(fmt.Errorf("绑定队列 [%s - %s] 到交换机失败: %s", queueName, routerKey, err.Error())) } // 获取消费通道 mq.channel.Qos(1, 0, true) // 确保rabbitmq会一个一个发消息 msgs, err := mq.channel.Consume( queueName, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if nil != err { receiver.OnError(fmt.Errorf("获取队列 %s 的消费通道失败: %s", queueName, err.Error())) } // 使用callback消费数据 for msg := range msgs { // 当接收者消息处理失败的时候, // 好比网络问题致使的数据库链接失败,redis链接失败等等这种 // 经过重试能够成功的操做,那么这个时候是须要重试的 // 直到数据处理成功后再返回,而后才会回复rabbitmq ack for !receiver.OnReceive(msg.Body) { log.Warnf("receiver 数据处理失败,将要重试") time.Sleep(1 * time.Second) } // 确认收到本条消息, multiple必须为false msg.Ack(false) } }
接收者的逻辑这里就不写的,只要根据实际的业务逻辑并实现了接口就能够了,这个比较容易。app
获取RabbitMQ的链接负载均衡
rabbitmqConn, err = amqp.Dial(url) if err != nil { panic("RabbitMQ 初始化失败: " + err.Error()) } rabbitmqChannel, err = rabbitmqConn.Channel() if err != nil { panic("打开Channel失败: " + err.Error()) }
// 启动并开始处理数据 func main() { // 假设这里有一个AReceiver和BReceiver aReceiver := NewAReceiver() bReceiver := NewBReceiver() mq := rabbitmq.New() // 将这个接收者注册到 mq.RegisterReceiver(aReceiver) mq.RegisterReceiver(bReceiver) mq.Start() }
举一个咱们本身用于生产环境的例子:框架
咱们主要是用于接收Mysql的变动,并增量更新Elasticsearch的索引,负责数据库变动监听的服务用的是Canel,它假装成一个mysql slave,用于接收mysql binlog的变动通知,而后将变动的数据格式化后写入RabbitMQ,而后由go实现的消费者去订阅数据库的变动通知。
因为客户端并不关心表中哪些字段发生了变化,只须要知道数据库指定的表有变动,那么就将这次变动写入Elasticsearch,这个逻辑对于每一张监听的表都是同样的,那么这样咱们就能够将须要监听表变动的操做彻底配置化,我只要再配置文件中指定一个接收者并指定待消费的队列,而后就能够由程序自动生成若干的接收者而且依次注册进RabbitMQ对象中,这样咱们只须要针对一些特殊的操做写相应地代码便可,这样大大简化了咱们地工做量,来看一看配置文件:
[[autoReceivers]] receiverName = "article_receiver" database = "blog" tableName = "articles" primaryKey = "articleId" queueName = "articles_queue" routerKey = "blog.articles.*" esIndex = "articles_idx" [[autoReceivers]] receiverName = "comment_receiver" database = "blog" tableName = "comments" primaryKey = "commentId" queueName = "comments_queue" routerKey = "blog.comments.*" esIndex = "comments_idx"
这个时候就须要调整一下接收者地注册函数了:
// WalkReceivers 使用callback遍历处理全部的接收者 // 这里地callback就是上面提到地 mq.RegisterReceiver func WalkReceivers(callback func(rabbitmq.Receiver)) { successCount := 0 // 遍历每个配置项,依次生成须要自动建立接收者 // 这里的congfig是统一获取配置地对象,你们根据实际状况处理就能够了 for _, receiverCfg := range config.Global.AutoReceivers { // 验证每个接收者的合法性 err := receiverCfg.Validate() if err != nil { log.Criticalf("生成 %s 失败: %s, 使用该配置: %+v", receiverCfg.ReceiverName, err.Error(), receiverCfg) continue } // 将接收者注册到监听rabbitmq的对象中 callback(NewAutoReceiver(receiverCfg)) log.Infof("生成 %s 成功使用该配置: %+v", receiverCfg.ReceiverName, receiverCfg) successCount++ } if successCount != len(config.Global.AutoReceivers) || successCount == 0 { panic("没法启动全部的接收者,请检查配置") } // 若有必要,这里能够继续添加须要手工建立的接收者 }
启动地流程也须要进行微调一下:
func registeAndStart() { mq := rabbitmq.New() // 遍历全部的receiver,将他们注册到rabbitmq中去 WalkReceivers(mq.RegisterReceiver) log.Info("初始化全部的Receiver成功") mq.Start() }
这样就定义好了两个receiver,启动程序后,就能够接收到数据库地变动并更新elasticsearch中地索引了,很是地方便。
这个是对平时工做地一点总结,但愿能够给你们带来帮助,若是文中有纰漏之处,还望指正,这里完整地代码就不贴了,文章里已经搭起了一个完整地框架了,剩下地就是业务逻辑了,若是有必要地化,我会整理成一个完整地项目放到github上。