在以前内容中咱们经过一个队列实现了消息的发送跟接收。接下来咱们建立工做队列(Work Queue),用于在多个工做者之间分配耗时的任务fetch
工做队列(任务队列)背后的核心主要是避免当即执行资源密集型的任务,必须等待其工做完成。咱们将任务封装为消息后将其发送到队列,后台的工做进程将弹出任务并最终执行,当咱们运行不少Worker时候,任务将在它们之间共享spa
为了确保消息不会丢失,RabbitMQ支持消息确认,消费者消费了一个消息以后会发送一个ack给RabbitMQ,这样RabbitMQ就能够删除掉这个消息code
若是一个消费者异常(通道关闭或连接关闭或TCP连接丢失)没有发送ACK给rabbitMQ,rabbitMQ会将该消息从新放入队列当中。此时若是有其余消费者在线,rabbitMQ会从新将该消息再次投递到另外一个消费者rabbitmq
msgs,err := ch.Consume( q.Name, "", false,//将autoAck设置为false,则须要在消费者每次消费完成 // 消息的时候调用d.Ack(false)来告诉RabbitMQ该消息已经消费 false, false, false, nil, ) FailError(err,"Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs{ log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") //multiple为true的时候:这次交付和以前没有确认的交付都会在经过同一个通道交付,这在批量处理的时候颇有用 //为false的时候只交付本次。只有该方法执行了,RabbitMQ收到该确认才会将消息删除 d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever
使用以上设置后,咱们能够保证即便worker在执行任务的时候意外退出也不会丢失消息。在worker意外退出的不久以后消息将会被从新投递。确认ack必须使用接收到消息的通道,若是使用不一样的通道将会致使一个通道协议异常队列
忘记确认ack进程
Listing queues for vhost / ... name messages_ready messages_unacknowledged hello 0 1
咱们已经知道如何确保即便消费者意外退出的状况下保证任务不会丢失。可是若是RabbitMQ服务中止的话任务仍是会丢失。当RabbitMQ退出或异常的时候,它将会丢失队列和消息,除非你设置RabbitMQ的两个地方:将队列和消息进行标记为持久的ip
首先设置队列durable为true内存
q, err := ch.QueueDeclare( "hello", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments )
RabbitMQ不容许使用不一样参数从新定义一个已经存在的队列,因此队列已经存在的话修改了上面的配置后运行程序是不会改变已经存在的队列的资源
而后设置消息为持久化存储:路由
err = ch.Publish( "", // exchange q.Name, // routing key false, // mandatory false, amqp.Publishing { DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(body), })
注意:设置消息持久化并不能保证消息不会丢失,由于仍然有一小段时间片处于RabbitMQ收到消息可是还没保存,它可能只是保存在内存当中。可是已经知足咱们的基本使用,若是你须要强保证的话可使用publisher confirms
err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global )
注意:消费者必需要设置,生产者不用设置
func main() { conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/") failError(err,"send:Failed to connect to RabbitMQ") defer conn.Close() ch,err := conn.Channel() failError(err,"Failed to open a channel") defer ch.Close() q,err := ch.QueueDeclare( "task_queue", true,// 设置为true以后RabbitMQ将永远不会丢失队列,不然重启或异常退出的时候会丢失 false, false, false, nil, ) failError(err,"Failed to declare a queue") fmt.Println(q.Name) body := bodyFrom(os.Args) //生产者将消息发送到默认交换器中,不是发送到队列中 ch.Publish( "",//默认交换器 q.Name,//使用队列的名字来看成route-key是由于声明的每个队列都有一个隐式路由到默认交换器 false, false, amqp.Publishing{ DeliveryMode:amqp.Persistent, ContentType:"text/plain", Body:[]byte(body), }) failError(err,"Failed to publish a message") log.Printf(" [x] Sent %s",body) } func bodyFrom(args []string)string { var s string if len(args) < 2 || os.Args[1] == "" { s = "hello" }else { s = strings.Join(args[1:]," ") } return s } func failError(err error,msg string) { if err != nil { log.Fatal("%s : %s",msg,err) } }
func main() { conn,err := amqp.Dial("amqp://guest:guest@localhost:5672/") FailError1(err,"receive:Failed to connect to RabbitMQ") defer conn.Close() ch,err := conn.Channel() FailError1(err,"receive:Failed to open a channel") defer ch.Close() q,err := ch.QueueDeclare( "task_queue", true, false, false, false, nil, ) err = ch.Qos( 1, //// 在没有返回ack以前,最多只接收1个消息 0, false, ) FailError1(err,"Failed to set Qos") msgs,err := ch.Consume( q.Name, "", false,//将autoAck设置为false,则须要在消费者每次消费完成 // 消息的时候调用d.Ack(false)来告诉RabbitMQ该消息已经消费 false, false, false, nil, ) FailError1(err,"Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs{ log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) fmt.Println() time.Sleep(t * time.Second) log.Printf("Done") //multiple为true的时候:这次交付和以前没有确认的交付都会在经过同一个通道交付,这在批量处理的时候颇有用 //为false的时候只交付本次。只有该方法执行了,RabbitMQ收到该确认才会将消息删除 d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever } func FailError1(err error,msg string) { if err != nil { log.Fatal("%s : %s",msg,err) } }