参考资料: 很丰富的面试题 html
broker
: 实体服务器 VirtualHost
:缩小版的RabbitMq服务器,拥有本身独立的交换器、消息队列和相关的对象 Exchange
:接受生产者的消息,并将这些消息路由到具体的Queue中 Binding
:Exchange和Queue之间的关联 Queue:
用来保存消息直到发送给消费者,它是消息的容器。 Channel
:多路复用链接中的独立的双向数据流通道,由于创建和销毁TCP的Connection开销node
会将消息放到全部绑定到该exchange的队列上
面试
public class ConnectionUtils {
private static Connection connection;
private static String lock = "aaa";
/**
* 获取rabbitmq链接
* @return
*/
public static Connection getConnection() {
if (null != connection) {
return connection;
}
synchronized (lock) {
if (null != connection) {
return connection;
}
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("host");
connectionFactory.setUsername("userName");
connectionFactory.setPassword("password");
connectionFactory.setVirtualHost("/vhost");
try {
connection = connectionFactory.newConnection();
} catch (IOException e) {
throw new RuntimeException("IoException", e);
} catch (TimeoutException e) {
throw new RuntimeException("timeOutException", e);
}
return connection;
}
}
}
复制代码
public class FanOutProducer {
private static final String EXCHAGE_NAME = "test_exchange_li_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明一个exchange,fanout类型,不持久化
channel.exchangeDeclare(EXCHAGE_NAME, "fanout", false);
//发送消息
StringBuilder stringBuilder = new StringBuilder("message");
for (int i = 0; i < 10; i++) {
channel.basicPublish(EXCHAGE_NAME, "", null, stringBuilder.append(i).toString().getBytes("utf-8"));
}
//关闭链接
channel.close();
connection.close();
}
}
复制代码
public class FanOutConsumer01 {
private static final String EXCHAGE_NAME = "test_exchange_li_fanout";
private static final String QUEUE_NAME = "test_queue_Name_li_fanout";
public static void main(String[] args) throws IOException {
//创建连接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明exchange和queue--exchange要和生产者一致
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHAGE_NAME, "fanout");
//将queue绑定到exchange上
channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, "");
//定义消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "utf-8");
System.out.println(message);
}
};
//开始消费
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
复制代码
public class FanOutConsumer02 {
private static final String EXCHAGE_NAME = "test_exchange_li_fanout";
private static final String QUEUE_NAME = "test_queue_Name_li_fanout02";
....
}
复制代码
两个消费者都能打印以下
message0
message01
message012
message0123
message01234
message012345
message0123456
message01234567
message012345678
message0123456789
复制代码
只会把消息routingkey一致的queue中
安全
public class DirectProducer {
private static final String EXCHAGE_NAME = "test_exchange_li_direct";
private static final String ROUTING_KEY = "direct_routing_key";
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明一个exchange,direct类型,不持久化
channel.exchangeDeclare(EXCHAGE_NAME, "direct", false);
//发送消息
StringBuilder stringBuilder = new StringBuilder("message");
for (int i = 0; i < 10; i++) {
channel.basicPublish(EXCHAGE_NAME, ROUTING_KEY, null, stringBuilder.append(i).toString().getBytes("utf-8"));
}
//关闭链接
channel.close();
connection.close();
}
}
复制代码
public class DirectConsumer01 {
private static final String EXCHAGE_NAME = "test_exchange_li_direct";
private static final String QUEUE_NAME = "test_queue_Name_li_direct";
private static final String ROUTING_KEY = "direct_routing_key";
public static void main(String[] args) throws IOException {
//创建连接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明exchange和queue--exchange要和生产者一致
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHAGE_NAME, "direct");
//将queue绑定到exchange上
channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, ROUTING_KEY);
//定义消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "utf-8");
System.out.println(message);
}
};
//开始消费
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
复制代码
public class DirectConsumer02 {
private static final String EXCHAGE_NAME = "test_exchange_li_direct";
private static final String QUEUE_NAME = "test_queue_Name_li_direct01";
private static final String ROUTING_KEY = "direct_routing_key02";
....
}
复制代码
运行完成,只有DirectConsumer01能收到消息
message0
message01
message012
message0123
message01234
message012345
message0123456
message01234567
message012345678
message0123456789
复制代码
对key进行模式匹配后进行投递,符号”#”匹配一个或多个词,符号””匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.”只匹配”abc.def”bash
public class TopicProducer {
private static final String EXCHAGE_NAME = "test_exchange_li_topic";
private static final String ROUTING_KEY = "test.routingkey.01";
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明一个exchange,topic类型,不持久化
channel.exchangeDeclare(EXCHAGE_NAME, "topic", false);
//发送消息
StringBuilder stringBuilder = new StringBuilder("message");
for (int i = 0; i < 10; i++) {
channel.basicPublish(EXCHAGE_NAME,ROUTING_KEY, null, stringBuilder.append(i).toString().getBytes("utf-8"));
}
//关闭链接
channel.close();
connection.close();
}
}
复制代码
public class TopicConsumer01 {
private static final String EXCHAGE_NAME = "test_exchange_li_topic";
private static final String QUEUE_NAME = "test_queue_Name_li_topic_01";
private static final String ROUTING_KEY = "test.routingkey.#";
public static void main(String[] args) throws IOException {
//创建连接
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
//声明exchange和queue--exchange要和生产者一致
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.exchangeDeclare(EXCHAGE_NAME, "topic");
//将queue绑定到exchange上
channel.queueBind(QUEUE_NAME, EXCHAGE_NAME, ROUTING_KEY);
//定义消费者
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "utf-8");
System.out.println(message);
}
};
//开始消费
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
复制代码
为了节省篇幅,
public class TopicConsumer02 {
private static final String EXCHAGE_NAME = "test_exchange_li_topic";
private static final String QUEUE_NAME = "test_queue_Name_li_topic_02";
private static final String ROUTING_KEY = "test.routingkey.*";
......
}
public class TopicConsumer03 {
private static final String EXCHAGE_NAME = "test_exchange_li_topic";
private static final String QUEUE_NAME = "test_queue_Name_li_topic_03";
private static final String ROUTING_KEY = "test.routingkey";
....
}
复制代码
输出结果只有TopicConsumer01和TopicConsumer02有日志输出
message0
message01
message012
message0123
message01234
message012345
message0123456
message01234567
message012345678
message0123456789
复制代码
durable 表示持久化
com.rabbitmq.client.Channel
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
复制代码
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
public static class BasicProperties extends com.rabbitmq.client.impl.AMQBasicProperties {
private String contentType;
private String contentEncoding;
private Map<String,Object> headers;
private Integer deliveryMode;
private Integer priority;
private String correlationId;
private String replyTo;
private String expiration;
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
其中deliveryMode=2表示持久化
复制代码
com.rabbitmq.client.Channel
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments) throws IOException;
复制代码
queue
和exchange
都被定义为持久化的状况下,两种个相关联的binding也会被保存下来,若是queue
或者exchange
被删除,这个绑定关系也会被删除就算上面三个的持久化方式都开启了,也不能保证消息在使用过程当中彻底不丢失,若是消费者autoAck=true,在收到消息以后自动确认了,可是处理时服务崩了,就会致使消息的丢失,因此须要确认机制的支持服务器
效率低,影响吞吐量
channel设置成确认模式以后,全部提交的消息都会被分配一条惟一的ID,当消息被投递到匹配的队列中,信道会向生产者发出确认消息,而且消息中带上这个Id。确认模式是异步的,生产者能够发送完一条消息后继续发送下一条消息。调用channel的confirmSelect方法开启确认模式
网络
消费者获取消息时,能够指定预取的消息数量 经过channel的basicQos方法设置app
至关于为一个队列设置一个备用的队列,在出现如下状况的时候将所谓的死亡信息推送到死亡信息队列
requeue=false
什么是blackhole问题 生产者向exchange投递message,而因为各类缘由致使该 message 丢失,但发送者殊不知道异步
使用了不存在的绑定关系
如何防止blackhole 没有特别好的办法,只能在具体实践中经过各类方式保证相关 fabric 的存在。另外,若是在执行 Basic.Publish 时设置 mandatory=true ,则在遇到可能出现 blackholed 状况时,服务器会经过返回 Basic.Return 告之当前 message 没法被正确投递(内含缘由 312 NO_ROUTE)。ide
channel消费queue的时候,可能由于某些缘由致使消费中止,
channel.basicCancel(consumerTag);
复制代码
正常状况下,第一种状况,由于是消费者本身发起的,本身能够感知到,可是第二种状况若是没有一个通知机制的话,可能会致使消费者一直傻傻的在等一个不可能来的消息 因此为了不上面这些状况出现,RabbitMQ引入了扩展特性:因为消息中间件代理出现的异常或者正常状况致使消费者取消,会向对应的消费者(信道)发送basic.cancel,可是由客户端信道主动向消息中间件代理发送basic.cancel以取消消费者的状况下不会受到消息中间件代理的basic.cancel回复。
channel.basicConsume("throwable.queue.direct", new DefaultConsumer(channel) {
@Override
public void handleCancelOk(String consumerTag) {
System.out.println("收到来自消息中间件代理的basic.cancel-ok回复,consumerTag=" + consumerTag);
}
@Override
public void handleCancel(String consumerTag) throws IOException {
System.out.println("收到来自消息中间件代理的basic.cancel回复,consumerTag=" + consumerTag);
}
});
复制代码
参考资料: www.cnblogs.com/xishuai/p/r… www.ywnds.com/?p=4741 blog.csdn.net/zhu_tianwei… 对异常状况解释的比较多
rabbitmq的元数据
a. 队列元数据:队列名称和它的属性; b. 交换器元数据:交换器名称、类型和属性; c. 绑定元数据:一张简单的表格展现了如何将消息路由到队列; d. vhost元数据:为vhost内的队列、交换器和绑定提供命名空间和安全属性; e. 元数据信息须要保存的磁盘中,因此每一个集群中至少须要一个disk节点
所以,当用户访问其中任何一个RabbitMQ节点时,经过rabbitmqctl查询到的queue/user/exchange/vhost等信息都是相同的。
保存数据到磁盘和内存中,若是集群中群都是内存节点,那就不能中止他们,不然元数据就会丢。
RabbitMQ只要求集群中至少有一个磁盘节点,若是只有一个磁盘节点,恰好又崩溃了,集群能够继续路由消息,但不能建立队列、交换器、绑定、添加用户、更改权限等操做。因此,
建议设置两个磁盘节点
,当内存节点重启后,会链接到预先配置的磁盘节点,下载当前集群元数据拷贝,因此要将全部磁盘节点告诉内存节点。
数据只保存到内存中,除非遇到
内存节点的特色就是执行效率高
不是每一个节点都有全部队列的彻底拷贝,若是在集群中建立队列,只会在单个节点上建立完整的队列信息(元数据、状态、内容),全部其余节点只知道队列的元数据和指向该队列的节点指针。
既然一个队列的数据只存在一个节点上,那么在链接集群内其余节点的时候,是如何进行发布消息和消费消息的呢?
若是消息生产者所链接的是节点2或者节点3,此时队列1的完整数据不在该两个节点上,那么在发送消息过程当中这两个节点主要起了一个路由转发做用,根据这两个节点上的元数据(也就是上文提到的:指向queue的owner node的指针)转发至节点1上,最终发送的消息仍是会存储至节点1的队列1上。 一样,若是消息消费者所链接的节点2或者节点3,那这两个节点也会做为路由节点起到转发做用,将会从节点1的队列1中拉取消息进行消费。
若是节点崩溃了,附加在队列上的消费者也就没法接收新的消息了。可让消费者重连到集群并从新建立队列,这种作法仅当队列没设置持久化时才可行,若是作了队列持久化或消息持久化,必须等到对应的节点恢复了才能被消费
,这是为了确保当失败的节点恢复后加入集群,节点上的队列消息不会丢失。
为何不将队列内容和状态复制到全部节点:
优势:
缺点:
把须要的队列作成镜像队列,存在于多个节点,属于RabbitMQ的HA方案
根据策略能够为节点定义镜像节点,镜像节点之间能够实现队列中消息实体的同步。 对于发送方确认消息,Rabbit会在全部队列和队列的从拷贝安全地接收到消息时,才会通知发送方。
优势:
缺点: