rabbitMq

rabbitmq的基础知识

参考资料: 很丰富的面试题 html

rabbitmq.png

基本概念

broker: 实体服务器 VirtualHost:缩小版的RabbitMq服务器,拥有本身独立的交换器、消息队列和相关的对象 Exchange:接受生产者的消息,并将这些消息路由到具体的Queue中 Binding:Exchange和Queue之间的关联 Queue:用来保存消息直到发送给消费者,它是消息的容器。 Channel:多路复用链接中的独立的双向数据流通道,由于创建和销毁TCP的Connection开销node

rabbitmq对消息的保存方式

  1. disk 后缀.rdp a. 在发送时指定须要持久化或者服务器内存紧张时会将部分中的内存消息保存到磁盘中 b. 单个文件增长到16M后会生成新的文件 c. 文件中的消息被标记删除的比例达到阈值时会触发文件的合并,提升磁盘的利用率
  2. RAM,内存保存,效率高

Exchange 类型

订阅模式(Fanout Exchange):

会将消息放到全部绑定到该exchange的队列上 面试

rabbitMq_fanOut.png

public class ConnectionUtils {

    private static Connection connection;
    private static String lock = "aaa";

  /**
     * 获取rabbitmq链接
     * @return
     */
    public static Connection getConnection() {
        if (null != connection) {
            return connection;
        }
        synchronized (lock) {
            if (null != connection) {
                return connection;
            }

            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("host");
            connectionFactory.setUsername("userName");
            connectionFactory.setPassword("password");
            connectionFactory.setVirtualHost("/vhost");
            try {
                connection = connectionFactory.newConnection();
            } catch (IOException e) {
                throw new RuntimeException("IoException", e);
            } catch (TimeoutException e) {
                throw new RuntimeException("timeOutException", e);
            }
            return connection;
        }
    }

}
复制代码
public class FanOutProducer {

    private static final String EXCHAGE_NAME = "test_exchange_li_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建链接
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明一个exchange,fanout类型,不持久化
        channel.exchangeDeclare(EXCHAGE_NAME, "fanout", false);

        //发送消息
        StringBuilder stringBuilder = new StringBuilder("message");
        for (int i = 0; i < 10; i++) {
            channel.basicPublish(EXCHAGE_NAME, "", null, stringBuilder.append(i).toString().getBytes("utf-8"));
        }

        //关闭链接
        channel.close();
        connection.close();
    }
}
复制代码
public class FanOutConsumer01 {

    private static final String EXCHAGE_NAME = "test_exchange_li_fanout";
    private static final String QUEUE_NAME = "test_queue_Name_li_fanout";

    public static void main(String[] args) throws IOException {
        //创建连接
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明exchange和queue--exchange要和生产者一致
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHAGE_NAME, "fanout");

        //将queue绑定到exchange上
        channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, "");

        //定义消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                throws IOException {
                String message = new String(body, "utf-8");
                System.out.println(message);
            }
        };

        //开始消费
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
复制代码
public class FanOutConsumer02 {

    private static final String EXCHAGE_NAME = "test_exchange_li_fanout";
    private static final String QUEUE_NAME = "test_queue_Name_li_fanout02";
....
}
复制代码
两个消费者都能打印以下
message0
message01
message012
message0123
message01234
message012345
message0123456
message01234567
message012345678
message0123456789
复制代码

Direct Exchange

只会把消息routingkey一致的queue中 安全

rabbitMq_binding.png

rabbitMq_Direct.png

public class DirectProducer {

    private static final String EXCHAGE_NAME = "test_exchange_li_direct";
    private static final String ROUTING_KEY = "direct_routing_key";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建链接
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明一个exchange,direct类型,不持久化
        channel.exchangeDeclare(EXCHAGE_NAME, "direct", false);

        //发送消息
        StringBuilder stringBuilder = new StringBuilder("message");
        for (int i = 0; i < 10; i++) {
            channel.basicPublish(EXCHAGE_NAME, ROUTING_KEY, null, stringBuilder.append(i).toString().getBytes("utf-8"));
        }

        //关闭链接
        channel.close();
        connection.close();
    }
}
复制代码
public class DirectConsumer01 {

    private static final String EXCHAGE_NAME = "test_exchange_li_direct";
    private static final String QUEUE_NAME = "test_queue_Name_li_direct";
    private static final String ROUTING_KEY = "direct_routing_key";

    public static void main(String[] args) throws IOException {
        //创建连接
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明exchange和queue--exchange要和生产者一致
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHAGE_NAME, "direct");

        //将queue绑定到exchange上
        channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, ROUTING_KEY);

        //定义消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                throws IOException {
                String message = new String(body, "utf-8");
                System.out.println(message);
            }
        };

        //开始消费
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
复制代码
public class DirectConsumer02 {

    private static final String EXCHAGE_NAME = "test_exchange_li_direct";
    private static final String QUEUE_NAME = "test_queue_Name_li_direct01";
    private static final String ROUTING_KEY = "direct_routing_key02";
....
}
复制代码
运行完成,只有DirectConsumer01能收到消息
message0
message01
message012
message0123
message01234
message012345
message0123456
message01234567
message012345678
message0123456789
复制代码

Topic Exchange

对key进行模式匹配后进行投递,符号”#”匹配一个或多个词,符号””匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”bash

rabbitMq_Topic.png

public class TopicProducer {

    private static final String EXCHAGE_NAME = "test_exchange_li_topic";

    private static final String ROUTING_KEY = "test.routingkey.01";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建链接
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明一个exchange,topic类型,不持久化
        channel.exchangeDeclare(EXCHAGE_NAME, "topic", false);

        //发送消息
        StringBuilder stringBuilder = new StringBuilder("message");
        for (int i = 0; i < 10; i++) {
            channel.basicPublish(EXCHAGE_NAME,ROUTING_KEY, null, stringBuilder.append(i).toString().getBytes("utf-8"));
        }
        //关闭链接
        channel.close();
        connection.close();
    }
}
复制代码
public class TopicConsumer01 {

    private static final String EXCHAGE_NAME = "test_exchange_li_topic";
    private static final String QUEUE_NAME = "test_queue_Name_li_topic_01";
    private static final String ROUTING_KEY = "test.routingkey.#";

    public static void main(String[] args) throws IOException {
        //创建连接
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        //声明exchange和queue--exchange要和生产者一致
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.exchangeDeclare(EXCHAGE_NAME, "topic");

        //将queue绑定到exchange上
        channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, ROUTING_KEY);

        //定义消费者
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                throws IOException {
                String message = new String(body, "utf-8");
                System.out.println(message);
            }
        };

        //开始消费
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
复制代码
为了节省篇幅,
public class TopicConsumer02 {

    private static final String EXCHAGE_NAME = "test_exchange_li_topic";
    private static final String QUEUE_NAME = "test_queue_Name_li_topic_02";
    private static final String ROUTING_KEY = "test.routingkey.*";
......
}

public class TopicConsumer03 {

    private static final String EXCHAGE_NAME = "test_exchange_li_topic";
    private static final String QUEUE_NAME = "test_queue_Name_li_topic_03";
    private static final String ROUTING_KEY = "test.routingkey";
....
}
复制代码
输出结果只有TopicConsumer01和TopicConsumer02有日志输出
message0
message01
message012
message0123
message01234
message012345
message0123456
message01234567
message012345678
message0123456789
复制代码

内容的持久化durable 表示持久化

  1. Queue的持久化
com.rabbitmq.client.Channel

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;

复制代码
  1. Message的持久化
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;


public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
        private String contentType;
        private String contentEncoding;
        private Map<String,Object> headers;
        private Integer deliveryMode;
        private Integer priority;
        private String correlationId;
        private String replyTo;
        private String expiration;
        private String messageId;
        private Date timestamp;
        private String type;
        private String userId;
        private String appId;
        private String clusterId;

其中deliveryMode=2表示持久化
复制代码
  1. Exchange的持久化
com.rabbitmq.client.Channel

Exchange.DeclareOk exchangeDeclare(String exchange,
                                              String type,
                                              boolean durable,
                                              boolean autoDelete,
                                              boolean internal,
                                              Map<String, Object> arguments) throws IOException;

复制代码
  1. binding是如何持久化的 当queueexchange都被定义为持久化的状况下,两种个相关联的binding也会被保存下来,若是queue或者exchange被删除,这个绑定关系也会被删除

就算上面三个的持久化方式都开启了,也不能保证消息在使用过程当中彻底不丢失,若是消费者autoAck=true,在收到消息以后自动确认了,可是处理时服务崩了,就会致使消息的丢失,因此须要确认机制的支持服务器

消息投递的确认模式

  1. 默认状况下,生产者投递消息后,broker时不会作出任何返回的
  2. 解决方式以下:
    1. 使用Amqp协议中的事务机制效率低,影响吞吐量
    2. 将信道channel设置成确认模式

channel信道的确认模式

channel设置成确认模式以后,全部提交的消息都会被分配一条惟一的ID,当消息被投递到匹配的队列中,信道会向生产者发出确认消息,而且消息中带上这个Id。确认模式是异步的,生产者能够发送完一条消息后继续发送下一条消息。调用channel的confirmSelect方法开启确认模式网络

  1. 普通方式,发送完成以后调用waitForConfirms
  2. 异步回调模式,addConfirmListener注册回调函数

消息消费的应答模式

  1. autoAck,若是等于true,会在消息发送过来以后自动响应--队列会将该消息删除,可能会致使消息消费失败了,可是消息已经被删除的状况
  2. autoAck=false,须要业务逻辑在处理完成以后,调用channel.basicAck作显示的响应

消费者获取消息时,能够指定预取的消息数量 经过channel的basicQos方法设置app

rabbitmq 的死信队列至关于为一个队列设置一个备用的队列,在出现如下状况的时候将所谓的死亡信息推送到死亡信息队列

  1. 消息被拒绝(basic.reject/ basic.nack)而且再也不从新投递 requeue=false
  2. 消息超期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())
  3. 队列超载 具体内容参考:my.oschina.net/u/2948566/b…

rabbitmq的blackhole问题

什么是blackhole问题 生产者向exchange投递message,而因为各类缘由致使该 message 丢失,但发送者殊不知道异步

致使balckhole问题的缘由
  1. 向未绑定 queue 的 exchange 发送 message
  2. exchange 以 binding_key key_A绑定了 queue queue_A,但向该 exchange 发送 message 使用的 routing_key 倒是 key_B。使用了不存在的绑定关系

如何防止blackhole 没有特别好的办法,只能在具体实践中经过各类方式保证相关 fabric 的存在。另外,若是在执行 Basic.Publish 时设置 mandatory=true ,则在遇到可能出现 blackholed 状况时,服务器会经过返回 Basic.Return 告之当前 message 没法被正确投递(内含缘由 312 NO_ROUTE)。ide

Consumer Cancellation Notification 机制

channel消费queue的时候,可能由于某些缘由致使消费中止,

  1. 消费者发起 basic.cacncel命令
channel.basicCancel(consumerTag);
复制代码
  1. 队列被删除或者节点失败也有可能致使消费被取消

正常状况下,第一种状况,由于是消费者本身发起的,本身能够感知到,可是第二种状况若是没有一个通知机制的话,可能会致使消费者一直傻傻的在等一个不可能来的消息 因此为了不上面这些状况出现,RabbitMQ引入了扩展特性:因为消息中间件代理出现的异常或者正常状况致使消费者取消,会向对应的消费者(信道)发送basic.cancel,可是由客户端信道主动向消息中间件代理发送basic.cancel以取消消费者的状况下不会受到消息中间件代理的basic.cancel回复。

channel.basicConsume("throwable.queue.direct", new DefaultConsumer(channel) {

				@Override
				public void handleCancelOk(String consumerTag) {
					System.out.println("收到来自消息中间件代理的basic.cancel-ok回复,consumerTag=" + consumerTag);
				}

				@Override
				public void handleCancel(String consumerTag) throws IOException {
					System.out.println("收到来自消息中间件代理的basic.cancel回复,consumerTag=" + consumerTag);
				}
			});
复制代码

rebbitmq的集群知识

参考资料: www.cnblogs.com/xishuai/p/r… www.ywnds.com/?p=4741 blog.csdn.net/zhu_tianwei… 对异常状况解释的比较多

为何须要集群

  1. 摆脱单机资源上的限制,提供更大的吞吐量
  2. 提供更加稳定的更高可用的服务

集群中节点之间须要同步的信息rabbitmq的元数据

a. 队列元数据:队列名称和它的属性; b. 交换器元数据:交换器名称、类型和属性; c. 绑定元数据:一张简单的表格展现了如何将消息路由到队列; d. vhost元数据:为vhost内的队列、交换器和绑定提供命名空间和安全属性; e. 元数据信息须要保存的磁盘中,因此每一个集群中至少须要一个disk节点

所以,当用户访问其中任何一个RabbitMQ节点时,经过rabbitmqctl查询到的queue/user/exchange/vhost等信息都是相同的。

rabbitmq集群中同步元数据.png

rabbitmq 的集群模式

集群内节点的类型

磁盘节点

保存数据到磁盘和内存中,若是集群中群都是内存节点,那就不能中止他们,不然元数据就会丢。

RabbitMQ只要求集群中至少有一个磁盘节点,若是只有一个磁盘节点,恰好又崩溃了,集群能够继续路由消息,但不能建立队列、交换器、绑定、添加用户、更改权限等操做。因此,建议设置两个磁盘节点,当内存节点重启后,会链接到预先配置的磁盘节点,下载当前集群元数据拷贝,因此要将全部磁盘节点告诉内存节点。

内存节点

数据只保存到内存中,除非遇到

  1. publish消息的时候指定须要持久化
  2. 内存吃紧的时候,会把部分消息持久化到磁盘

内存节点的特色就是执行效率高

普通模式

不是每一个节点都有全部队列的彻底拷贝,若是在集群中建立队列,只会在单个节点上建立完整的队列信息(元数据、状态、内容),全部其余节点只知道队列的元数据和指向该队列的节点指针。

既然一个队列的数据只存在一个节点上,那么在链接集群内其余节点的时候,是如何进行发布消息和消费消息的呢? 若是消息生产者所链接的是节点2或者节点3,此时队列1的完整数据不在该两个节点上,那么在发送消息过程当中这两个节点主要起了一个路由转发做用,根据这两个节点上的元数据(也就是上文提到的:指向queue的owner node的指针)转发至节点1上,最终发送的消息仍是会存储至节点1的队列1上。 一样,若是消息消费者所链接的节点2或者节点3,那这两个节点也会做为路由节点起到转发做用,将会从节点1的队列1中拉取消息进行消费。

若是节点崩溃了,附加在队列上的消费者也就没法接收新的消息了。可让消费者重连到集群并从新建立队列,这种作法仅当队列没设置持久化时才可行,若是作了队列持久化或消息持久化,必须等到对应的节点恢复了才能被消费,这是为了确保当失败的节点恢复后加入集群,节点上的队列消息不会丢失。

为何不将队列内容和状态复制到全部节点:

  1. 存储空间,若是每一个集群节点都拥有全部队列的彻底拷贝,添加新节点不会带来更多存储空间;
  2. 性能,消息的发布者须要将消息复制到每个集群节点,对于持久化消息,网络和磁盘复制都会增长。

优势:

  1. 使用集群能很好的实现服务能力的水平拓展

缺点:

  1. 由于单个队列只维持在单个节点上,也很难认为是高可用
  2. 若是是不持久化的消息和队列,单机宕机后消息会丢失

镜像模式 把须要的队列作成镜像队列,存在于多个节点,属于RabbitMQ的HA方案

根据策略能够为节点定义镜像节点,镜像节点之间能够实现队列中消息实体的同步。 对于发送方确认消息,Rabbit会在全部队列和队列的从拷贝安全地接收到消息时,才会通知发送方。

rabbitmq镜像集群模式的策略.jpg

优势:

  1. 由于能对节点维护的队列中的消息实体作了同步,能够保证

缺点:

  1. 由于要进行消息实体的复制,因此势必会影响系统的性能
  2. 网络通讯也会加大,若是消息量比较大话
相关文章
相关标签/搜索