RabbitMQ入门

什么是RabbitMQ?

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。java

消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。正则表达式

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。安全

做用

能够将一些无需即时返回且耗时的操做提取出来,进行异步处理(好比咱们项目订单系统中出票成功后记报表、发短信等操做)服务器

这样能够减轻服务器压力,大大节省服务器的请求响应时间,同时利于系统解耦。架构

系统架构

rabbitmq系统架构图

生产者发送消息

核心概念exchange、route、queue

rabbitmq_exchange类型

生产者在发送消息时,都须要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey之后,会判断该ExchangeType:异步

  1. 若是是Direct类型,则会将消息中的RoutingKey与该Exchange关联的全部Binding中的BindingKey进行比较,若是相等,则发送到该Binding对应的Queue中。分布式

  2. 若是是Fanout类型,则会将消息发送给全部与该Exchange定义过Binding的全部Queues中去,实际上是一种广播行为。ui

  3. 若是是Topic类型,则会按照正则表达式,对RoutingKey与BindingKey进行匹配,若是匹配成功,则发送到对应的Queue中。spa

发送消息(java)

  1. 创建链接设计

    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername(userName);
    factory.setPassword(password);
    factory.setVirtualHost(virtualHost);
    factory.setHost(hostName);
    factory.setPort(portNumber);
    Connection conn = factory.newConnection();
  2. 建立通道

    Channel channel = conn.createChannel();
  3. 申明exchange、queue以及它们之间的绑定关系

    channel.exchangeDeclare(exchangeName, "direct", true);
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, exchangeName, routingKey);
  4. 发送消息

    byte[] messageBodyBytes = "Hello, world!".getBytes();
    channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

消费者消费消息

  1. 一种是经过basic.consume命令,订阅某一个队列中的消息,Channel会自动在处理完上一条消息以后,接收下一条消息(同一个Channel消息处理是串行的)。除非关闭Channel或者取消订阅,不然客户端将会一直接收队列的消息。

  2. 另一种方式是经过basic.get命令主动获取队列中的消息,可是绝对不能够经过循环调用basic.get来代替basic.consume,这是由于basic.get在实际执行的时候,是首先Consume某一个队列,而后检索第一条消息,而后再取消订阅。若是是高吞吐率的消费者,最好仍是建议使用basic.consume。

若是有多个消费者同时订阅同一个队列的话,RabbitMQ是采用循环的方式分发消息的,每一条消息只能被一个订阅者接收。例如,有队列Queue,其中ClientA和ClientB都Consume了该队列,MessageA到达队列后,被分派到ClientA,ClientA回复服务器收到响应,服务器删除MessageA;再有一条消息MessageB抵达队列,服务器根据“循环推送”原则,将消息会发给ClientB,而后收到ClientB的确认后,删除MessageB;等到再下一条消息时,服务器会再将消息发送给ClientA。

这里咱们能够看出,消费者再接到消息之后,都须要给服务器发送一条确认命令,这个既能够在handleDelivery里显示的调用basic.ack实现,也能够在Consume某个队列的时候,设置autoACK属性为true实现。这个ACK仅仅是通知服务器能够安全的删除该消息,而不是通知生产者。 若是消费者在接到消息之后还没来得及返回ACK就断开了链接,消息服务器会重传该消息给下一个订阅者,若是没有订阅者就会存储该消息。

既然RabbitMQ提供了ACK某一个消息的命令,固然也提供了Reject某一个消息的命令。当客户端发生错误,调用basic.reject命令拒绝某一个消息时,能够设置一个requeue的属性,若是为true,则消息服务器会重传该消息给下一个订阅者;若是为false,则会直接删除该消息。固然,也能够经过ack,让消息服务器直接删除该消息而且不会重传。

相关文章
相关标签/搜索