分布式消息通讯ActiveMQ原理-消费消息策略-笔记

消息消费流程图java

消费端消费消息的原理缓存

  • 咱们经过上一节课的讲解,知道有两种方法能够接收消息,
    • 一种是使用同步阻塞的MessageConsumer#receive方法。
    • 另外一种是使用消息监听器MessageListener。
  • 这里须要注意的是,在同一个session下,这二者不能同时工做,
    • 也就是说不能针对不一样消息采用不一样的接收方式。
    • 不然会抛出异常。
  • 至于为何这么作,最大的缘由仍是在事务性会话中,两种消费模式的事务很差管控

ActiveMQMessageConsumer.receivesession

  • 消费端同步接收消息的源码入口
public Message receive() throws JMSException {
        checkClosed();
        checkMessageListener(); //检查receive和MessageListener是否同时配置在当前的会话中
        sendPullCommand(0); //若是PrefetchSizeSize为0而且unconsumerMessage为空,则发起pull命令
        MessageDispatch md = dequeue(-1); //从unconsumerMessage出队列获取消息
        if (md == null) {
            return null;
        }
        beforeMessageIsConsumed(md);
        afterMessageIsConsumed(md, false); //发送ack给到broker
        return createActiveMQMessage(md);//获取消息并返回
    }

sendPullCommand异步

  • 发送pull命令从broker上获取消息,前提是prefetchSize=0而且unconsumedMessages为空。
  • unconsumedMessage表示未消费的消息,这里面预读取的消息大小为prefetchSize的值
protected void sendPullCommand(long timeout) throws JMSException {
        clearDeliveredList();
        if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
            MessagePull messagePull = new MessagePull();
            messagePull.configure(info);
            messagePull.setTimeout(timeout);
            session.asyncSendPacket(messagePull); //向服务端异步发送messagePull指令
        }
    }

clearDeliveredListasync

  • 在上面的sendPullCommand方法中,会先调用clearDeliveredList方法,
    • 主要用来清理已经分发的消息链表deliveredMessages
      • deliveredMessages,存储分发给消费者但还为应答的消息链表
      • Ø 若是session是事务的,则会遍历deliveredMessage中的消息放入到previouslyDeliveredMessage中来作重发
      • Ø 若是session是非事务的,根据ACK的模式来选择不一样的应答操做
private void clearDeliveredList() {
        if (clearDeliveredList) {
            synchronized (deliveredMessages) {
                if (clearDeliveredList) {
                    if (!deliveredMessages.isEmpty()) {
                        if (session.isTransacted()) {
                            if (previouslyDeliveredMessages == null) {
                                previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId,Boolean>(session.getTransactionContext().getTransactionId());
                            }
                            for (MessageDispatch delivered : deliveredMessages) {
                                previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);
                            }
                            LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt",
                                    getConsumerId(), previouslyDeliveredMessages.transactionId,
                                    deliveredMessages.size());
                        } else {
                            if (session.isClientAcknowledge()) {
                                LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());
                                // allow redelivery
                                if (!this.info.isBrowser()) {
                                    for (MessageDispatch md : deliveredMessages) {
                                        this.session.connection.rollbackDuplicate(this,
                                                md.getMessage());
                                    }
                                }
                            }
                            LOG.debug("{} clearing delivered list ({}) on transport interrupt",getConsumerId(), deliveredMessages.size());
                            deliveredMessages.clear();
                            pendingAck = null;
                        }
                    }
                    clearDeliveredList = false;
                }
            }
        }
    }

dequeue性能

  • 从unconsumedMessage中取出一个消息,
  • 在建立一个消费者时,就会未这个消费者建立一个为消费的消息通道,这个通道分为两种,
    • 一种是简单优先级队列分发通道SimplePriorityMessageDispatchChannel ;
    • 另外一种是先进先出的分发通道FifoMessageDispatchChannel.
  • 至于为何要存在这样一个消息分发通道,你们能够想象一下,
    • 若是消费者每次去消费完一个消息之后再去broker拿一个消息,效率是比较低的。
    • 因此经过这样的设计能够容许session可以一次性将多条消息分发给一个消费者。
    • 默认状况下对于queue来讲,prefetchSize的值是1000

beforeMessageIsConsumedfetch

  • 这里面主要是作消息消费以前的一些准备工做,
  • 若是ACK类型不是DUPS_OK_ACKNOWLEDGE或者队列模式(简单来讲就是除了Topic和DupAck这两种状况),
    • 全部的消息先放到deliveredMessages链表的开头。
  • 而且若是当前是事务类型的会话,
    • 则判断transactedIndividualAck,若是为true,表示单条消息直接返回ack。
    • 不然,调用ackLater,批量应答,
      • client端在消费消息后暂且不发送ACK,而是把它缓存下来(pendingACK),
        • 等到这些消息的条数达到必定阀值时,只须要经过一个ACK指令把它们所有确认;
        • 这比对每条消息都逐个确认,在性能上要提升不少
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
        md.setDeliverySequenceId(session.getNextDeliveryId());
        lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
        if (!isAutoAcknowledgeBatch()) {
            synchronized(deliveredMessages) {
                deliveredMessages.addFirst(md);
            }
            if (session.getTransacted()) {
                if (transactedIndividualAck) {
                    immediateIndividualTransactedAck(md);
                } else {
                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                }
            }
        }
    }

afterMessageIsConsumed优化

  • 这个方法的主要做用是执行应答操做,这里面作如下几个操做
    • Ø 若是消息过时,则返回消息过时的ack
    • Ø 若是是事务类型的会话,则不作任何处理
    • Ø 若是是AUTOACK或者(DUPS_OK_ACK且是队列),而且是优化ack操做,则走批量确认ack
    • Ø 若是是DUPS_OK_ACK,则走ackLater逻辑
    • Ø 若是是CLIENT_ACK,则执行ackLater
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws
            JMSException {
        if (unconsumedMessages.isClosed()) {
            return;
        }
        if (messageExpired) {
            acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);
            stats.getExpiredMessageCount().increment();
        } else {
            stats.onMessage();
            if (session.getTransacted()) {
                // Do nothing.
            } else if (isAutoAcknowledgeEach()) {
                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
                    synchronized (deliveredMessages) {
                        if (!deliveredMessages.isEmpty()) {
                            if (optimizeAcknowledge) {
                                ackCounter++;
                                // AMQ-3956 evaluate both expired and normal msgs as
                                // otherwise consumer may get stalled
                                if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65)
                                        || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp +
                                        optimizeAcknowledgeTimeOut))) {
                                    MessageAck ack =
                                            makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                    if (ack != null) {
                                        deliveredMessages.clear();
                                        ackCounter = 0;
                                        session.sendAck(ack);
                                        optimizeAckTimestamp = System.currentTimeMillis();
                                    }
                                    // AMQ-3956 - as further optimization send
                                    // ack for expired msgs when there are any.
                                    // This resets the deliveredCounter to 0 so that
                                    // we won't sent standard acks with every msg just
                                    // because the deliveredCounter just below
                                    // 0.5 * prefetch as used in ackLater()
                                    if (pendingAck != null && deliveredCounter > 0) {
                                        session.sendAck(pendingAck);
                                        pendingAck = null;
                                        deliveredCounter = 0;
                                    }
                                }
                            } else {
                                MessageAck ack =
                                        makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                if (ack != null) {
                                    deliveredMessages.clear();
                                    session.sendAck(ack);
                                }
                            }
                        }
                    }
                    deliveryingAcknowledgements.set(false);
                }
            } else if (isAutoAcknowledgeBatch()) {
                ackLater(md, MessageAck.STANDARD_ACK_TYPE);
            } else if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) {
                boolean messageUnackedByConsumer = false;
                synchronized (deliveredMessages) {
                    messageUnackedByConsumer = deliveredMessages.contains(md);
                }
                if (messageUnackedByConsumer) {
                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                }
            } else {
                throw new IllegalStateException("Invalid session state.");
            }
        }
    }
相关文章
相关标签/搜索