rabbitmq是一个消息代理系统,为应用提供一个通用得消息发布,接受平台,为应用提供非阻塞的消息系统,方便进行异步处理。服务器
减小用户对没必要要的耗时操做的等待,处理结果以异步方式(邮件,消息推送)进行提醒。并发
当某个应用发展到必定规模的时候,须要把里面的模块分别拆出来进行解耦,而模块之间的通信方式是多样的,常见的有rpc,消息队列,http请求。其中消息队列在内部模块通讯是更为稳定。异步
若是突发遇到大量的数据请求的时候,服务器若是不作队列处理,一会儿处理所有的请求,会很容易形成宕机,若是把请求的数据都放入队列里,以后再逐个逐个地进行处理,能够平缓地渡过流量高峰期。编码
rabbitmq的工做方式以下,生产者(publisher)发送消息到交换机,交换机(exchange)根据本身的类型以及消息的路由键,路由到对应的队列里,队列分发消息到消费者(consumer)
debug
如今咱们假设有这个场景,客服A须要发送客户的下单信息给库存人员B,客服A有一个订单信息发送器,库存人员B拥有消息接收器。
首先库存人员B创建链接并接受消息,伪代码:设计
// 创建链接 conn, _ := amqp.Dial("amqp://localhost") ch, _ := conn.Channel() // 声明队列,不存在则建立,存在则不会进行任何操做 queue, _ := ch.QueueDeclare("order") // 从队列里面获取消息 deliver, _ := ch.Consume(q.Name) for d:= range deliver { // 输出消息主体 log.Printf("B Received a message: %s", d.Body) // 返回获取成功标识给队列 d.Ack(true) }
而后客服A也创建链接并发送消息,伪代码:代理
// 创建链接 conn, _ := amqp.Dial("amqp://localhost") ch, _ := conn.Chanenel() // 声明队列,不存在则建立,存在则不会进行任何操做 queue, _ := ch.QueueDeclare("order") // 发布消息 ch.Publish( q.Name, // 队列名字 amqp.Publishing{ Body:[]byte("new order" + product.String()), })
上面就是一种简单的直接经过队列进行链接的方法,可能会有人看出来,为何没有交换机的参与,其实上面的操做实际上是经过默认交换机进行消息传递,能够不指定交换机名字直接指定队列名字进行交互。日志
经过上面的简单例子,咱们能够更进一步地了解到rabbitmq的工做方式,下面我会更详细地讲解各个模块。code
消息是通讯内容的主体,消息对象有点像http的request,除了能够携带消息内容,还能够带有各类属性,如:对象
有些属性只是约定规范,如ContentType,ContentEncoding,须要程序本身作处理,有些属性rabbitmq会根据值来进行处理,如RemoteKey,交换机会根据消息的RemoteKey和自身的类型来决定投递到哪些队列,DeliveryMode能够决定是否持久化消息。
#消息投递 ch.Publish( "", # exchange名字,空为默认交换机 key, # routingkey 路由键 false, false, # 消息 amqp.Publishing{ DeliveryMode:amqp.Persistent, ContentType:"text/plain", Body:[]byte("hello world"), })
队列是存储消息的主体,队列自己所拥有的一些属性:
在代码里面队列声明,若是队列不存在则新建队列,若是已存在相同名字的队列且属性不一样的话则会报错。能够选择让系统自动生成队列,而后返回队列名字。
# 队列声明,参数依次为name,Durable,auto-delete,exclusive,no-wait.args amqp.QueueDeclare("queuename", true, false, flase, false, nil)
消费者用以消费队列里的消息的自定义程序片断,消费者获取队列里的消息有两种方式,一种是拉取(pull)的方式,经过channel.basicget方法,一种是订阅方式,队列推送消息到rabbitmq,这种方式用的最多。
消息处理,消费者端链接队列后,能够获得一个相似于句柄的东西,若是没有消息就会一直阻塞。
消费者在收到消息以后处理的状况多是成功的,也有多是失败的,为了确保消息已经成功处理而后队列删除消息,若是失败则进行其余机制,以避免消息一直重复在队列里面,或消息因消费者宕机而丢失。
若是消息成功地被消费者处理的话,须要有一个消息确认的机制。
rabbitmq提供两种确认机制:
通常而言咱们用的更多的是显式确认模式,若是消费者接收到消息没有进行确认以后就宕机了,队列里面的该消息仍是会存在的,而后会把消息转发到其余消费者。
若是消费者对消息的处理出现了一些问题,能够调用rabbitmq的basic.reject来拒绝消息,拒绝消息以后,能够作的是把消息放回到队列里面,或者直接删除消息。
其实若是出现问题的消息,即使是交给其余的消费者,很会很大几率继续出现问题,这时候咱们能够把消息放到其余专门处理记录问题的队列里面,交由另外的消费者处理。
交换机更像是消息的路由层,队列绑定到交换机,而后发布者能够发送的消息都是通过交换机的,而后经由消息的remote key(路由键)路由到交换机所绑定的队列里。
交换机分为4种类型:
直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。
其实初尝rabbitmq的例子里面,看上去没有绑定交换机,实际上也是绑定了直连交换机,只是是一个特殊的预先声明好的,名字为空字符串的交换机,叫默认交换机,每一个队列都会自动绑定到默认交换机上。
扇型交换机(funout exchange)将消息路由给绑定到它身上的全部队列,而不理会绑定的路由键。若是N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这全部的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。
主题交换机(topic exchanges)经过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机常常用来实现各类分发/订阅模式及其变种。主题交换机一般用来实现消息的多播路由(multicast routing)。
主题交换机在我看来就像添加了简单的通配符+字符串来达到一个路由的规则。
头交换机用的不是不少,有时消息的路由操做会涉及到多个属性,此时使用消息头就比用路由键更容易表达,头交换机(headers exchange)就是为此而生的。头交换机使用多个消息属性来代替路由键创建路由规则。经过判断消息头的值可否与指定的绑定相匹配来确立路由规则。
生产者代码
conn, _ := amqp.Dial("amqp://localhost") ch, _ := conn.Channel() ch.ExchangeDeclare( "hello", "fanout", true, false, false, false, nil, ) ch.Publish( "hello", "", // 因为是广播,因此能够不填写路由键 false, false, amqp.Publishing{ DeliveryMode:amqp.Persistent, Body:[]byte("hello"+time.Now().String()), })
消费者代码
conn, _ := amqp.Dial("amqp://localhost") ch, _ := conn.Channel() ch.ExchangeDeclare( "hello", // 交换机名字 "fanout", // 交换机类型 true, // durable false, // autoDelete false, // internal false, // noWait nil, // args ) q, _ := ch.QueueDeclare( "", false, // durable false, // autoDelete true, // exclusive false, // noWait nil, // ) ch.QueueBind( q.Name, // queuename "", // remote key,因为是广播,能够不填写路由键 "hello", // exchange name false, // nowait nil, ) msgs, _ := ch.Consume(q.Name,"", true, false, false,false,nil) for msg := range msgs { log.Printf("%s", msg.Body) }
设有以下场景:设计一个日志收集系统,日志有不一样的级别,debug,info,warn,error,日志格式为:
级别.模块名字 如:info.login
有不一样的队列负责收集不一样级别的日志,其中有个队列专门收集收集warn和error的数据,设计以下:
生产者
func main() { conn, _ := amqp.Dial("amqp://localhost") ch, _ := conn.Channel() ch.ExchangeDeclare( "logs", "topic", true, false, false, false, nil, ) ch.Publish( "logs", "debug.123", false, false, amqp.Publishing{ DeliveryMode:amqp.Persistent, Body:[]byte("hello"), }, ) }
消费者
func main() { conn, _ := amqp.Dial("amqp://localhost") ch, _ := conn.Channel() ch.ExchangeDeclare( "logs", "topic", true, false, false, false, nil, ) q, _ := ch.QueueDeclare( "log1", true, false, false, false, nil, ) // 队列绑定的remote key keys := []string{"error.*", "warn.*"} for _, key := range keys{ ch.QueueBind( q.Name, key, "logs", false, nil, ) } deliver, _ := ch.Consume( q.Name, "", false, false, false, false, nil, ) for d:= range deliver { fmt.Println(string(d.Body)) d.Ack(true) } }