rabbitmq知识库: http://rabbitmq.org.cn/html
Rabbitmq舍弃了繁重的事务消息而使用了消息确认机制实现了分布式事务,实在是解耦之一大神器。可是其配置起来挺麻烦,各类参数,各类调整。但国内貌似资料不多,找来找去都找不到,本身撸一发先spring
发送确认用来确保消息是否已送达消息队列。消息一旦到达消息服务,就会触发确认机制,能够分为两种状况:数据库
消费确认用来确保消费者是否成功的消费了消息。一旦有消费者成功注册到相应的消息服务,消息将会被消息服务经过basic.deliver推(push)给消费者,此时消息会包含一个deliver tag用来惟一的标识消息。若是此时是手动模式,就须要手动的确认消息已经被成功消费,不然消息服务将会重发消息(由于消息已经持久化到了硬盘上,因此不管消息服务是否是可能挂掉,都会重发消息)。并且必须确认,不管是成功或者失败,不然会引发很是严重的问题缓存
有三种状况可能进死信交换机服务器
只须要设置一个args,就ok拉网络
channel.exchangeDeclare("some.exchange.name", "direct"); Map<String, Object> args = new HashMap<String, Object>(); args.put("x-dead-letter-exchange", "some.exchange.name"); channel.queueDeclare("myqueue", false, false, false, args);
Channel Prefetch Setting (QoS),表示当前channel中未应答消息的数目,若是超过了,队列中将再也不接受新的消息。这里所谓的channel就是指从消息服务到消费者的一个通道,简单来讲就是指消息从消息队列发送到消费者了,若是没收到应答,就算是一个Qos并发
加大这个值会增长消息的发送速度(Throughput),可是会加剧消息队列的内存,因此100-300之间是一个比较理想的状态,可参考:http://next.rabbitmq.com/conf... :Channel Prefetch Setting (QoS)。分布式
暴力的设置微100-300是存在一些问题的,若是太大,可能消息所有都压在消费者中而得不到消费,看起来队列是空的,实际上所有积压在客户端;若是过小则得不到消费,浪费资源。具体该怎么设置要根据实际的网络吞吐量、以及消费者的消费能力。好比果消费者很快,是内存操做,那么你设置很大,甚至不设置均可以;可是若是消费者很慢,好比是个数据库操做,那么极可能将消息所有积压到消费者而得不到响应
。能够参考http://www.rabbitmq.com/blog/...微服务
Qos同时也会存在一个问题,一个channel是会被多个消费者的,因此必须计算出全部消费者中未应答的数目,这显然是很是不合理的,并且很麻烦,因此能够改成设置每一个消费者缓存(Prefetch Buf)能够容许的最大的数目。高并发
例子:
// 1. 一会儿设置全部的queue都是10条unacknowledged Channel channel = ...; Consumer consumer = ...; channel.basicQos(10); // Per consumer limit channel.basicConsume("my-queue", false, consumer); //2. 分别设置10条 Channel channel = ...; Consumer consumer1 = ...; Consumer consumer2 = ...; channel.basicQos(10); // Per consumer limit channel.basicConsume("my-queue1", false, consumer1); channel.basicConsume("my-queue2", false, consumer2); //3. 分别设置channel和consume,我的不推荐 Channel channel = ...; Consumer consumer1 = ...; Consumer consumer2 = ...; channel.basicQos(10, false); // Per consumer limit channel.basicQos(15, true); // Per channel limit channel.basicConsume("my-queue1", false, consumer1); channel.basicConsume("my-queue2", false, consumer2);
当前最大容许空闲的最大channel数。若是在高并发的环境中,若是值太小的话channel会关关开开很是频繁。因此在1.6的版本中,spring amqp将这个值从1提升到了25。同时你也能够从RabbitMQ Admin中心观察到channel关关开开,那么就能够考虑增大cache的值了。
当你遇到 connetion error的错误时,就能够考虑增大channel cache size了。
通道(channel)中的消息标志,按照正数递增,消息队列中用来标识消息的惟一标识
当消息服务器资源不足时,会向全部的生产者发送这个消息,咱们能够捕获这个消息并作处理,资源能够是内存不足,cpu负载太重等等。
ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); connection.addBlockedListener(new BlockedListener() { public void handleBlocked(String reason) throws IOException { // Connection is now blocked } public void handleUnblocked() throws IOException { // Connection is now unblocked } });
话说真出现block了也应该是在管理台直接给出警告,不过也能够作一下避免异常
全应答标识。若是设置为true,一条消息应答了,那么以前的所有消息将被应答。好比目前channel中有delivery tags为5,6,7,8的消息,那么一旦8被应答,那么5,6,7将都被应答,若是设置为false,那么5,6,7将不会被应答。(不建议设置,毕竟一个channel中会绑定好多consumer)
当消息服务发生异常时,不会发送basic.ack,反而会发送一个basic.nack,并且不会自动requeue,此时须要消息发送方手动处理,进行重发。只有一种状况会发送nack:“basic.nack will only be delivered if an internal error occurs in the Erlang process responsible for a queue”
1 mq配置的listener到什么地方,是配置到每一个微服务、或者是配置到mq中?若是在每一个服务里面都配置concurency,是否是随着节点的增长,listener数量也会无限增长?2 ssl和max-queue-length的配置