手册:http://www.rabbitmq.com/getstarted.htmlcss
安装:http://www.rabbitmq.com/download.htmlhtml
参考:http://blog.csdn.net/whycold/article/details/41119807java
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。nginx
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。git
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Go、Python、Ruby。用于在分布式系统中存储转发消息。github
ubuntu直接下载deb文件安装,默认已经启动,sudo敲入:正则表达式
sudo rabbitmq-server start sudo lsof -i:5672
启用插件,进入UI:ubuntu
sudo rabbitmq-plugins enable rabbitmq_management
登陆http://127.0.0.1:15672(默认guest只能localhost访问,要远程访问,须要使用可远程访问的管理员帐号)
segmentfault
用户名:密码=guest:guest缓存
# 敲入查看帮助 sudo rabbitmqctl # 建立用户 sudo rabbitmqctl add_user 登陆用户名 密码 # 能够建立管理员用户,负责整个MQ的运维 sudo rabbitmqctl set_user_tags 登陆用户名 administrator # 能够建立RabbitMQ监控用户,负责整个MQ的监控 sudo rabbitmqctl set_user_tags 登陆用户名 monitoring # 能够建立某个项目的专用用户,只能访问项目本身的virtual hosts sudo rabbitmqctl set_user_tags 登陆用户名 management # 查看用户 sudo rabbitmqctl list_users # 受权 # 该命令使用户具备/这个virtual host中全部资源的配置、写、读权限以便管理其中的资源 # set_permissions [-p <vhostpath>] <user> <conf> <write> <read> # 其中,<conf> <write> <read>的位置分别用正则表达式来匹配特定的资源,如'^(amq\.gen.*|amq\.default)$'能够匹配server生成的和默认的exchange,'^$'不匹配任何资源 sudo rabbitmqctl set_permissions -p / 登陆用户名 '.*' '.*' '.*'
ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。
Connection是RabbitMQ的socket连接,它封装了socket协议相关部分逻辑。
ConnectionFactory为Connection的制造工厂。
Channel是咱们与RabbitMQ打交道的最重要的一个接口,咱们大部分的业务操做是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
Queue(队列)是RabbitMQ的内部对象,用于存储消息,用下图表示。
RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)能够从Queue中获取消息并消费。
多个消费者能够订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每一个消费者都收到全部的消息并处理。
在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其余意外)的状况,这种状况下就可能会致使消息丢失。为了不这种状况发生,咱们能够要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;若是RabbitMQ没有收到回执并检测到消费者的RabbitMQ链接断开,则RabbitMQ会将该消息发送给其余消费者(若是存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会致使该消息被发送给其余消费者,除非它的RabbitMQ链接断开。
这里会产生另一个问题,若是咱们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会致使严重的bug——Queue中堆积的消息会愈来愈多;消费者重启后会重复消费这些消息并重复执行业务逻辑…
另外pub message是没有ack的。(??)
若是咱们但愿即便在RabbitMQ服务重启的状况下,也不会丢失消息,咱们能够将Queue与Message都设置为可持久化的(durable),这样能够保证绝大部分状况下咱们的RabbitMQ消息不会丢失。但依然解决不了小几率丢失事件的发生(好比RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),若是咱们须要对这种小几率事件也要管理起来,那么咱们要用到事务。因为这里仅为RabbitMQ的简单介绍,因此这里将不讲解RabbitMQ相关的事务。
前面咱们讲到若是有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时若是每一个消息的处理时间不一样,就有可能会致使某些消费者一直在忙,而另一些消费者很快就处理完手头工做并一直空闲的状况。咱们能够经过设置prefetchCount来限制Queue每次发送给每一个消费者的消息数,好比咱们设置prefetchCount=1,则Queue每次给每一个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。就是变慢而已。订阅模式如何平摊?这种模式是一个消费者一次性拿不少条消息?
在上一节咱们看到生产者将消息投递到Queue中,实际上这在RabbitMQ中这种事情永远都不会发生。实际的状况是,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。
Exchange是按照什么逻辑将消息路由到Queue的?这个将在Binding一节介绍。
RabbitMQ中的Exchange有四种类型,不一样的类型有着不一样的路由策略,这将在Exchange Types一节介绍。
生产者在将消息发送给Exchange的时候,通常会指定一个routing key,来指定这个消息的路由规则,而这个routing key须要与Exchange Type及binding key联合使用才能最终生效。
在Exchange Type与binding key固定的状况下(在正常使用时通常这些内容都是固定配置好的),咱们的生产者就能够在发送消息给Exchange时,经过指定routing key来决定消息流向哪里。
RabbitMQ为routing key设定的长度限制为255 bytes。
RabbitMQ中经过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了。
在绑定(Binding)Exchange与Queue的同时,通常会指定一个binding key;消费者将消息发送给Exchange时,通常会指定一个routing key;当binding key与routing key相匹配时,消息将会被路由到对应的Queue中。这个将在Exchange Types章节会列举实际的例子加以说明。
在绑定多个Queue到同一个Exchange的时候,这些Binding容许使用相同的binding key。 binding key 并非在全部状况下都生效,它依赖于Exchange Type,好比fanout类型(广播)的Exchange就会无视binding key,而是将消息路由到全部绑定到该Exchange的Queue。
RabbitMQ经常使用的Exchange Type有fanout、direct、topic、headers这四种(AMQP规范里还提到两种Exchange Type,分别为system与自定义,这里不予以描述),下面分别进行介绍。
fanout类型的Exchange路由规则很是简单,它会把全部发送到该Exchange的消息路由到全部与它绑定的Queue中。
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key彻底匹配的Queue中。
以上图的配置为例,咱们以routingKey=”error”发送消息到Exchange,则消息会路由到Queue1(amqp.gen-S9b…,这是由RabbitMQ自动生成的Queue名称)和Queue2(amqp.gen-Agl…);若是咱们以routingKey=”info”或routingKey=”warning”来发送消息,则消息只会路由到Queue2。若是咱们以其余routingKey发送消息,则消息不会路由到这两个Queue中。
前面讲到direct类型的Exchange路由规则是彻底匹配binding key与routing key,但这种严格的匹配方式在不少状况下不能知足实际业务需求。topic类型的Exchange在匹配规则上进行了扩展,它与direct类型的Exchage类似,也是将消息路由到binding key与routing key相匹配的Queue中,但这里的匹配规则有些不一样,它约定:
1. routing key为一个句点号“. ”分隔的字符串(咱们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” 2. binding key与routing key同样也是句点号“. ”分隔的字符串 3. binding key中能够存在两种特殊字符“*”与“#”,用于作模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(能够是零个)
以上图中的配置为例,routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,routingKey=”lazy.orange.fox”的消息会路由到Q1与Q2,routingKey=”lazy.brown.fox”的消息会路由到Q2,routingKey=”lazy.pink.rabbit”的消息会路由到Q2(只会投递给Q2一次,虽然这个routingKey与Q2的两个bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,由于它们没有匹配任何bindingKey
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否彻底匹配Queue与Exchange绑定时指定的键值对;若是彻底匹配则消息会路由到该Queue,不然不会路由到该Queue。
该类型的Exchange没有用到过(不过也应该颇有用武之地),因此不作介绍。
MQ自己是基于异步的消息处理,前面的示例中全部的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。
但实际的应用场景中,咱们极可能须要一些同步处理,须要同步等待服务端将个人消息处理完成后再进行下一步处理。这至关于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。
RabbitMQ中实现RPC的机制是:
http://www.rabbitmq.com/tutorials/tutorial-one-go.html
请看官方示例:
https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/go/send.go
上面的例子仔细看,有必要看源码!
四种模式
发布方,一个!
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { // 拨号,下面例子都同样 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // 这个是最重要的 ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 申明一个队列 // https://godoc.org/github.com/streadway/amqp#Channel.QueueDeclare q, err := ch.QueueDeclare( "task_queue", // name 有名字! true, // durable 持久性的,若是事前已经声明了该队列,不能重复声明 false, // delete when unused false, // exclusive 若是是真,链接一断开,队列删除 false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") body := bodyFrom(os.Args) // 发布 err = ch.Publish( "", // exchange 默认模式,exchange为空 q.Name, // routing key 默认模式路由到同名队列,便是task_queue false, // mandatory false, amqp.Publishing{ // 持久性的发布,由于队列被声明为持久的,发布消息必须加上这个(可能不用),但消息仍是可能会丢,如消息到缓存但MQ挂了来不及持久化。 DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s }
工做方,多个,拿发布方的消息
package main import ( "bytes" "fmt" "github.com/streadway/amqp" "log" "time" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 指定队列! q, err := ch.QueueDeclare( "task_queue", // name true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // Fair dispatch 预取,每一个工做方每次拿一个消息,确认后才拿下一次,缓解压力 err = ch.Qos( 1, // prefetch count // 待解释 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS") // 消费根据队列名 msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack 设置为真自动确认消息 false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") // 确认消息被收到!!若是为真的,那么同在一个channel,在该消息以前未确认的消息都会确认,适合批量处理 // 真时场景:每十条消息确认一次,相似 d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever }
发布方
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 默认模式有默认交换机,广播本身定义一个交换机,交换机可与队列进行绑定 err = ch.ExchangeDeclare( "logs", // name "fanout", // type 广播模式 true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) // 发布 err = ch.Publish( "logs", // exchange 消息发送到交换机,这个时候没队列绑定交换机,消息会丢弃 "", // routing key 广播模式不须要这个,它会把全部消息路由到绑定的全部队列 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello" } else { s = strings.Join(args[1:], " ") } return s }
订阅方
package main import ( "fmt" "log" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 一样要申明交换机 err = ch.ExchangeDeclare( "logs", // name "fanout", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") // 新建队列,这个队列没名字,随机生成一个名字 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive 表示链接一断开,这个队列自动删除 false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 队列和交换机绑定,便是队列订阅了发到这个交换机的消息 err = ch.QueueBind( q.Name, // queue name 队列的名字 "", // routing key 广播模式不须要这个 "logs", // exchange 交换机名字 false, nil) failOnError(err, "Failed to bind a queue") // 开始消费消息,可开多个订阅方,由于队列是临时生成的,全部每一个订阅方都能收到一样的消息 msgs, err := ch.Consume( q.Name, // queue 队列名字 "", // consumer true, // auto-ack 自动确认 false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <-forever }
发布-订阅每一个绑定的队列都收到同样的消息,如今不想!使用路由功能,队列绑定进行分发。
发布方
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 交换机申明,且类型为点对点默认 err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) // 发布 err = ch.Publish( "logs_direct", // exchange 发到这个交换机 severityFrom(os.Args), // routing key 且路由key是由命令行指定,以下方,指定了error false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "info" } else { s = os.Args[1] } return s }
发消息到交换机,路由key为error
go run *.go error "Run. Run. Or it will explode."
消费方
package main import ( "fmt" "log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 惯例 err = ch.ExchangeDeclare( "logs_direct", // name "direct", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") // 申明临时队列 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [info] [warning] [error]", os.Args[0]) os.Exit(0) } # 绑定队列和交换机,绑定多个路由key,见下方 for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_direct", s) // 下面同个队列能够收到不一样路由key的消息 ,广播模式除外! err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_direct", // exchange false, nil) failOnError(err, "Failed to bind a queue") } // 消费队列 msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <forever }
消费这些key:info warning error
go run *.go info warning error
上面的路由都是标准的,就是固定字符串名字,话题模式可使用类正则的路由,这样模糊匹配更棒!!
路由相似于这样 *.love.*
* (star) can substitute for exactly one word. # (hash) can substitute for zero or more words.
发布方
package main import ( "fmt" "log" "os" "strings" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 交换机,话题模式 err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") body := bodyFrom(os.Args) // 相似上面 err = ch.Publish( "logs_topic", // exchange severityFrom(os.Args), // routing key 路由能够不标准了 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), }) failOnError(err, "Failed to publish a message") log.Printf(" [x] Sent %s", body) } func bodyFrom(args []string) string { var s string if (len(args) < 3) || os.Args[2] == "" { s = "hello" } else { s = strings.Join(args[2:], " ") } return s } func severityFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "anonymous.info" } else { s = os.Args[1] } return s }
命令运行
To receive all the logs:
go run *.go "#"
To receive all logs from the facility “kern”:
go run *.go "kern.*"
Or if you want to hear only about “critical” logs:
go run *.go "*.critical"
You can create multiple bindings:
go run *.go "kern.*" "*.critical"
消费方
package main import ( "fmt" "log" "os" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 同理 err = ch.ExchangeDeclare( "logs_topic", // name "topic", // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare an exchange") // 临时队列 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") if len(os.Args) < 2 { log.Printf("Usage: %s [binding_key]...", os.Args[0]) os.Exit(0) } for _, s := range os.Args[1:] { log.Printf("Binding queue %s to exchange %s with routing key %s", q.Name, "logs_topic", s) // 绑定也是相似的 err = ch.QueueBind( q.Name, // queue name s, // routing key "logs_topic", // exchange false, nil) failOnError(err, "Failed to bind a queue") } msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto ack false, // exclusive false, // no local false, // no wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { for d := range msgs { log.Printf(" [x] %s", d.Body) } }() log.Printf(" [*] Waiting for logs. To exit press CTRL+C") <forever }
命令运行
go run *.go "kern.critical" "A critical kernel error"
应答方
package main import ( "fmt" "log" "strconv" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func fib(n int) int { if n == 0 { return 0 } else if n == 1 { return 1 } else { return fib(n-1) + fib(n-2) } } func main() { // 拨号 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() // channel ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 声明匿名队列 q, err := ch.QueueDeclare( "rpc_queue", // name false, // durable false, // delete when usused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 公平分发 没有这个则round-robbin:https://segmentfault.com/a/1190000004492447 err = ch.Qos( 1, // prefetch count 0, // prefetch size false, // global ) failOnError(err, "Failed to set QoS") // 消费,等待请求 msgs, err := ch.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") forever := make(chan bool) go func() { //请求来了 for d := range msgs { n, err := strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to integer") log.Printf(" [.] fib(%d)", n) // 计算 response := fib(n) // 回答 err = ch.Publish( "", // exchange d.ReplyTo, // routing key 回答队列 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: d.CorrelationId, 序列号 Body: []byte(strconv.Itoa(response)), }) failOnError(err, "Failed to publish a message") // 确认回答完毕 d.Ack(false) } }() log.Printf(" [*] Awaiting RPC requests") <forever }
请教方
package main import ( "fmt" "log" "math/rand" "os" "strconv" "strings" "time" "github.com/streadway/amqp" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func randomString(l int) string { bytes := make([]byte, l) for i := 0; i < l; i++ { bytes[i] = byte(randInt(65, 90)) } return string(bytes) } func randInt(min int, max int) int { return min + rand.Intn(max-min) } func fibonacciRPC(n int) (res int, err error) { // 拨号 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 队列声明 q, err := ch.QueueDeclare( "", // name false, // durable false, // delete when usused true, // exclusive 为真即链接断开就删除 false, // noWait nil, // arguments ) failOnError(err, "Failed to declare a queue") // 消费 msgs, err := ch.Consume( q.Name, // queue "", // consumer true, // auto-ack false, // exclusive 这个为真,服务器会认为这是该队列惟一的消费者 false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") corrId := randomString(32) // 请教! err = ch.Publish( "", // exchange "rpc_queue", // routing key 问题发到这里 false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", CorrelationId: corrId, ReplyTo: q.Name, 但愿回答被发到这里 Body: []byte(strconv.Itoa(n)), }) failOnError(err, "Failed to publish a message") // 取答案 for d := range msgs { if corrId == d.CorrelationId { res, err = strconv.Atoi(string(d.Body)) failOnError(err, "Failed to convert body to integer") break } } return } func main() { rand.Seed(time.Now().UTC().UnixNano()) n := bodyFrom(os.Args) log.Printf(" [x] Requesting fib(%d)", n) res, err := fibonacciRPC(n) failOnError(err, "Failed to handle RPC request") log.Printf(" [.] Got %d", res) } func bodyFrom(args []string) int { var s string if (len(args) < 2) || os.Args[1] == "" { s = "30" } else { s = strings.Join(args[1:], " ") } n, err := strconv.Atoi(s) failOnError(err, "Failed to convert arg to integer") return n }
http://www.rabbitmq.com/tutorials/tutorial-six-go.html
package main import ( "fmt" "github.com/streadway/amqp" "log" "time" ) func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } // Queue属性测试 // // durable属性和auto-delete属性能够同时生效; // durable属性和exclusive属性会有性质上的冲突,二者同时设置时,仅exclusive属性生效; // auto_delete属性和exclusive属性能够同时生效; // // auto_delete若是有链接存在消费者订阅该http://www.lenggirl.com/tool/RabbitMQ.htmlqueue,正常,若是消费者所有消失,自动删除队列 // 能够在没有建立consumer的状况下,建立出具备auto-delete属性的queue。 // // exclusive,若是声明该队列的链接断开,自动删除队列 // queue的存在条件是在声明该队列的链接上存在某个consumer订阅了该queue。 // func main() { conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") failOnError(err, "Failed to connect to RabbitMQ") defer conn.Close() ch, err := conn.Channel() failOnError(err, "Failed to open a channel") defer ch.Close() // 默认模式有默认交换机,广播本身定义一个交换机,交换机可与队列进行绑定 /* ExchangeDeclare declares an exchange on the server. If the exchange does not already exist, the server will create it. If the exchange exists, the server verifies that it is of the provided type, durability and auto-delete flags. ExchangeDeclare方法在服务器声明一个exchange。若是不存在,新建一个,存在的话则确认type和durability和auto-delete的标志是否一致。 Errors returned from this method will close the channel. 若是方法返回错误,channel会被关闭。 Exchange names starting with "amq." are reserved for pre-declared and standardized exchanges. The client MAY declare an exchange starting with "amq." if the passive option is set, or the exchange already exists. Names can consists of a non-empty sequence of letters, digits, hyphen, underscore, period, or colon. 名字以"amq."开头的Exchange是为以前已经声明和标准化的exchange们保留的, 在exchange已经存在的状况下,或者passive选项设置为真,客户端才有可能声明一个这样的exchange。 exchange的名字是一个非空序列,仅能包含字母,数字,连字符-,下划线_,句号.,冒号: 另外的方法ExchangeDeclarePassive主要用来检测exchange是否已经存在。 Each exchange belongs to one of a set of exchange kinds/types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. Once an exchange is declared, its type cannot be changed. The common types are "direct", "fanout", "topic" and "headers". Durable and Non-Auto-Deleted exchanges will survive server restarts and remain declared when there are no remaining bindings. This is the best lifetime for long-lived exchange configurations like stable routes and default exchanges. Non-Durable and Auto-Deleted exchanges will be deleted when there are no remaining bindings and not restored on server restart. This lifetime is useful for temporary topologies that should not pollute the virtual host on failure or after the consumers have completed. Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is running including when there are no remaining bindings. This is useful for temporary topologies that may have long delays between bindings. Durable and Auto-Deleted exchanges will survive server restarts and will be removed before and after server restarts when there are no remaining bindings. These exchanges are useful for robust temporary topologies or when you require binding durable queues to auto-deleted exchanges. Note: RabbitMQ declares the default exchange types like 'amq.fanout' as durable, so queues that bind to these pre-declared exchanges must also be durable. Exchanges declared as `internal` do not accept accept publishings. Internal exchanges are useful for when you wish to implement inter-exchange topologies that should not be exposed to users of the broker. When noWait is true, declare without waiting for a confirmation from the server. The channel may be closed as a result of an error. Add a NotifyClose listener to respond to any exceptions. Optional amqp.Table of arguments that are specific to the server's implementation of the exchange can be sent for exchange types that require extra parameters. func (me *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error { */ err = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil) failOnError(err,