golang可以使用库github.com/streadway/amqp操做rabbitmqgit
安装github
go get github.com/streadway/amqp
链接golang
conn, err := amqp.Dial(amqp://guest:guest@172.17.84.205:5672/)
创建通道code
ch, err := conn.Channel()
声明Queuerabbitmq
q, err := ch.QueueDeclare( "testqueue", //Queue name true, //durable false, false, false, nil, )
其中durable设为true则queue持久化,不然不会作持久化。get
发布消息string
err = ch.Publish( "", //exchange q.Name, //routing key(queue name) false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, //Msg set as persistent ContentType: "text/plain", Body: []byte(msgBody), })
其中amqp.Publishing的DeliveryMode若是设为amqp.Persistent则消息会持久化。
须要注意的是若是须要消息持久化Queue也是须要设定为持久化才有效it
接收消息test
msgs, err := ch.Consume( q.Name, "MsgWorkConsumer", false, //Auto Ack false, false, false, nil, )
其中Auto ack能够设置为true。若是设为true则消费者一接收到就从queue中去除了,若是消费者处理消息中发生意外该消息就丢失了。
若是Auto ack设为false。consumer在处理完消息后,调用msg.Ack(false)后消息才从queue中去除。即使当前消费者处理该消息发生意外,只要没有执行msg.Ack(false)那该消息就仍然在queue中,不会丢失。import
生成的Queue在生成是设定的参数,下次使用时不能更改设定参数,不然会报错
例子代码以下
conf.go
package config const ( RMQADDR = "amqp://guest:guest@172.17.84.205:5672/" QUEUENAME = "msgQueueWithPersist" PRODUCERCNT = 5 CONSUMERCNT = 20 )
producer.go
package main import ( config "xxx/conf" "fmt" "log" "sync" "github.com/streadway/amqp" ) func main() { conn, err := amqp.Dial(config.RMQADDR) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() var wg sync.WaitGroup wg.Add(config.PRODUCERCNT) for routine := 0; routine < config.PRODUCERCNT; routine++ { go func(routineNum int) { ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( config.QUEUENAME, //Queue name true, //durable false, false, false, nil, ) failOnError(err, "Failed to declare a queue") for i := 0; i < 500; i++ { msgBody := fmt.Sprintf("Message_%d_%d", routineNum, i) err = ch.Publish( "", //exchange q.Name, //routing key false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, //Msg set as persistent ContentType: "text/plain", Body: []byte(msgBody), }) log.Printf(" [x] Sent %s", msgBody) failOnError(err, "Failed to publish a message") } wg.Done() }(routine) } wg.Wait() log.Println("All messages sent!!!!") } func failOnError(err error, msg string) { if err != nil { fmt.Printf("%s: %s\n", msg, err) } }
consumer.go
package main import ( config "xxx/conf" "fmt" "log" "github.com/streadway/amqp" ) func main() { conn, err := amqp.Dial(config.RMQADDR) failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() forever := make(chan bool) for routine := 0; routine < config.CONSUMERCNT; routine++ { go func(routineNum int) { ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() q, err := ch.QueueDeclare( config.QUEUENAME, true, //durable false, false, false, nil, ) failOnError(err, "Failed to declare a queue") msgs, err := ch.Consume( q.Name, "MsgWorkConsumer", false, //Auto Ack false, false, false, nil, ) if err != nil { log.Fatal(err) } for msg := range msgs { log.Printf("In %d consume a message: %s\n", 0, msg.Body) log.Printf("Done") msg.Ack(false) //Ack } }(routine) } <-forever } func failOnError(err error, msg string) { if err != nil { fmt.Printf("%s: %s\n", msg, err) } }