golang实现rabbitmq消费者模式 断线重连机制

在链接第三方组件的时候实现断线重连机制是颇有必要的事情,由于你不知道在啥时候他忽然就抽风挂掉了。以rabbitmq为例,在忽然面对大流量写入,或者链接数被打满(好比在资讯,直播等模块使用rabbitmq(惨痛的教训))mq就挂掉了,若是你的消费者没有重连机制,你的消费者进程在mq挂掉后,也会自动挂掉。而后等运维修复了mq,可是你的消费者进程却没法再次链接消费了,这就有点恐怖了。要不就写个shell 脚本 ,每隔30S或者1分钟去检测进程活度,挂掉了就重启消费者。固然最简单的仍是在消费者的代码里实现断线重连机制。代码以下git

package rabbitmq

import (
   "encoding/json"
   "fmt"
   "github.com/streadway/amqp"
   "time"
)

//测试用例
type Obj struct {
   Item1 string `json:"item1"`
   Item2 string `json:"item2"`
   Item3 string `json:"item3"`
}

func StartAMQPConsume() {
   defer func() {
      if err := recover(); err != nil {
         time.Sleep(3 * time.Second)
         fmt.Println("休息3秒")
         StartAMQPConsume()
      }
   }()
   conn, err := amqp.Dial("amqp://guest:guest@127.0.0.1:5672/") //mq链接地址
   if err != nil {
      fmt.Println(err)
   }
   defer conn.Close()
   ch, err := conn.Channel()
   if err != nil {
      fmt.Println(err)
   }
   defer ch.Close()
   closeChan := make(chan *amqp.Error, 1)
   notifyClose := ch.NotifyClose(closeChan) //一旦消费者的channel有错误,产生一个amqp.Error,channel监听并捕捉到这个错误
   closeFlag := false
   msgs, err := ch.Consume(
      "xly.test.queue",
      "",
      true,
      false,
      false,
      false, nil)
   var obj Obj
   for {
      select {
      case e := <-notifyClose:
         fmt.Println("chan通道错误,e:%s", e.Error())
         close(closeChan)
         time.Sleep(5 * time.Second)
         StartAMQPConsume()
         closeFlag = true
      case msg := <-msgs:
         //fmt.Println()
         if err := json.Unmarshal(msg.Body, &obj); err != nil {

            fmt.Println(err.Error())
         }
         fmt.Println(obj.Item1)

      }
      if closeFlag {
         break
      }
   }
}

只须要在main 方法里 调用这个方法便可,或者go协程调用 ,效果以下github

当我关闭mq服务时会有以下效果shell

再次启动mqjson

以上就是rabbitmq 消费者断线重连的完整代码运维