activeMQ消息发送过程与原理浅析

这篇文章重点分析使用JMS向activeMQ中间件发送消息的过程分析。javascript

同步发送与异步发送
activeMQ发送消息客户端发送消息分为同步发送与异步发送java

同步发送,发送者发送一条消息会阻塞直到broker反馈一个确认消息给发送者,表示消息已经被broker处理。这个机 制提供了消息的安全性保障,因为是阻塞的操做,会影响到客户端消息发送的性能。web

异步发送,发送者不须要等待broker提供反馈给发送者,性能相对较高。可是可能会出现消息丢失的状况。所 以使用异步发送的前提是在某些状况下容许出现数据丢失的状况。apache

那么在什么状况下选择同步发生,什么状况下是选择异步发送呢?
除去咱们人为设置的方式,其默认的选择策略总结以下:
1.非持久化的消息都是异步发送的。
2.持久化消息在非事务模式下是同步发送的。
3.在开启事务的状况下,消息都是异步发送。
因为异步发送的效率会比同步发送性能更高,在发送持久化消息的时候,尽可能去开启事务会话。或者设置使用异步发送。安全

以上三种默认策略,在源码中都会体现出来,稍后在源码中分析。bash

除了默认设置,咱们能够认为设置发送模式。总结以下三种方式。session

1.ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.0.15:61616? jms.useAsyncSend=true");
2.((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
3.((ActiveMQConnection)connection).setUseAsyncSend(true);

异步发送分析
首先分析异步发送过程,其数据流转以下图所示。
这里写图片描述
producerWindow
producerWindow主要是针对异步发送消息时,控制容许可以积压的消息累计大小,这些消息没有获得broker的ack,在获得ack以后会减去相应的消息size,释放producerWindow。
这个producerWindow的大小设置有2种方式
1.在brokerUrl中设置: “tcp://localhost:61616?jms.producerWindowSize=1048576”,这种设置将会对全部的 producer生效。 2.在destinationUri中设置: “test-queue?producer.windowSize=1048576”,此参数只会对使用此Destination实例 的producer失效,将会覆盖brokerUrl中的producerWindowSize值。
经过设置producerWindow大小能够控制消息发送的流量控制。异步

下面开始跟踪源码分析图中的流程。
ActiveMQMessageProducer.send
发送方法跟踪,首先的关键方法就是这个,源码以下。async

public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
        checkClosed();
        if (destination == null) {
            if (info.getDestination() == null) {
                throw new UnsupportedOperationException("A destination must be specified.");
            }
            throw new InvalidDestinationException("Don't understand null destinations");
        }

        ActiveMQDestination dest;
        if (destination.equals(info.getDestination())) {
            dest = (ActiveMQDestination)destination;
        } else if (info.getDestination() == null) {
            dest = ActiveMQDestination.transform(destination);
        } else {
            throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
        }
        if (dest == null) {
            throw new JMSException("No destination specified");
        }

        if (transformer != null) {
            Message transformedMessage = transformer.producerTransform(session, this, message);
            if (transformedMessage != null) {
                message = transformedMessage;
            }
        }

        if (producerWindow != null) {
            try {
            //窗口大小,执行是否阻塞
                producerWindow.waitForSpace();
            } catch (InterruptedException e) {
                throw new JMSException("Send aborted due to thread interrupt.");
            }
        }

        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);

        stats.onMessage();
    }

上面的代码根据异步的流程图分析,看到是先须要根据producerWindow,来判断是否阻塞的,若是producerWindow不够,那么producerWindow.waitForSpace();就会阻塞等待。tcp

ActiveMQSession.send

protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
                        MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {

        checkClosed();
        if (destination.isTemporary() && connection.isDeleted(destination)) {
            throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
        }
        //获取会话的监视器
        synchronized (sendMutex) {
            // tell the Broker we are about to start a new transaction
            //开启事务
            doStartTransaction();
            TransactionId txid = transactionContext.getTransactionId();
            long sequenceNumber = producer.getMessageSequence();

           //......组装消息内容
           //。。。。。。

             //判断是一部仍是同步发送
            if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
                this.connection.asyncSendPacket(msg);
                if (producerWindow != null) {
                    // Since we defer lots of the marshaling till we hit the
                    // wire, this might not
                    // provide and accurate size. We may change over to doing
                    // more aggressive marshaling,
                    // to get more accurate sizes.. this is more important once
                    // users start using producer window
                    // flow control.
                    int size = msg.getSize();
                    producerWindow.increaseUsage(size);
                }
            } else {
                if (sendTimeout > 0 && onComplete==null) {
                    this.connection.syncSendPacket(msg,sendTimeout);
                }else {
                    this.connection.syncSendPacket(msg, onComplete);
                }
            }

        }
    }

上面这段代码,根据异步流程图来分析,主要看的就是判断是异步仍是同步发送的代码,异步发送须要知足:

onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)

onComplete==null:发送回调为空。默认知足
sendTimeout <= 0:发送超时时间小于等于0。默认知足
!msg.isResponseRequired():消息不须要响应。默认知足
!connection.isAlwaysSyncSend():链接不能设置为同步发送。默认知足
(!msg.isPersistent() || connection.isUseAsyncSend() || txid != null):是非持久化消息或者设置为使用异步发送或者开启了事务,从这里能够看出默认策略中的:
1.非持久化的消息都是异步发送的。!msg.isPersistent()
3.在开启事务的状况下,消息都是异步发送。txid != null

ActiveMQConnection.asyncSendPacket->doAsyncSendPacket
跟踪发送,到以下代码。

private void doAsyncSendPacket(Command command) throws JMSException {
        try {
            this.transport.oneway(command);
        } catch (IOException e) {
            throw JMSExceptionSupport.create(e);
        }
    }

上面代码的transport对象在定义里面没有进行构造,那么猜想应该是在建立ActiveMQConnection时,构造方法里面注入的,看一下注入的实例是什么。

ActiveMQConnectionFactory.createTransport
从connectionFactory.createConnection()方法一路跟踪,最后到了以下代码

protected Transport createTransport() throws JMSException {
        try {
            URI connectBrokerUL = brokerURL;
            //获取url中定义的链接模式
            String scheme = brokerURL.getScheme();
            if (scheme == null) {
                throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
            }
            if (scheme.equals("auto")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
            } else if (scheme.equals("auto+ssl")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
            } else if (scheme.equals("auto+nio")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
            } else if (scheme.equals("auto+nio+ssl")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
            }

            return TransportFactory.connect(connectBrokerUL);
        } catch (Exception e) {
            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
        }
    }

来到上面这段代码,是根据咱们brokerURL中的scheme来构建一个uri对象,而后用这个uri对象调用TransportFactory.connect(connectBrokerUL);获取对应的transport实例。咋一看之下,有点像dubbo源码中利用url参数驱动,适配器根据url参数的不一样,加载不一样的实现类实例。接着跟踪TransportFactory.connect(connectBrokerUL);看是否是这样。

TransportFactory.connect

public static Transport connect(URI location) throws Exception {
        TransportFactory tf = findTransportFactory(location);
        return tf.doConnect(location);
    }
    public static TransportFactory findTransportFactory(URI location) throws IOException {
        String scheme = location.getScheme();
        if (scheme == null) {
            throw new IOException("Transport not scheme specified: [" + location + "]");
        }
        TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
        if (tf == null) {
            // Try to load if from a META-INF property.
            try {
                tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
                TRANSPORT_FACTORYS.put(scheme, tf);
            } catch (Throwable e) {
                throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
            }
        }
        return tf;
    }

能够看出是在(TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);建立实例的,查看TRANSPORT_FACTORY_FINDER的定义以下。

private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");

在newInstance方法中,代码以下

public Object newInstance(String key) throws IllegalAccessException, InstantiationException, IOException, ClassNotFoundException {
        return objectFactory.create(path+key);
    }

上面的path就是”“META-INF/services/org/apache/activemq/transport/”“,key在咱们的brokerUrl就是tcp,猜想
就是找一个在该路径下的文件中定义的类全路径做为实例化的类型信息,查看路径下的文件信息如下

还真有一个tcp文件,打开看

class=org.apache.activemq.transport.tcp.TcpTransportFactory

后续能估计到就是实例化一个TcpTransportFactory,而后回到下面的代码。

public static Transport connect(URI location) throws Exception {
        TransportFactory tf = findTransportFactory(location);
        return tf.doConnect(location);
    }

TcpTransportFactory.doConnect

public Transport doConnect(URI location) throws Exception {
        try {
            Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
            if( !options.containsKey("wireFormat.host") ) {
                options.put("wireFormat.host", location.getHost());
            }
            WireFormat wf = createWireFormat(options);
            //建立Transport
            Transport transport = createTransport(location, wf);
            //包装Transport
            Transport rc = configure(transport, wf, options);
            //remove auto
            IntrospectionSupport.extractProperties(options, "auto.");

            if (!options.isEmpty()) {
                throw new IllegalArgumentException("Invalid connect parameters: " + options);
            }
            return rc;
        } catch (URISyntaxException e) {
            throw IOExceptionSupport.create(e);
        }
    }

上面代码就执行了真正的建立Transport,同时给Transport包装了外城处理逻辑,具体包装在,以下代码

public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
        transport = compositeConfigure(transport, wf, options);

        transport = new MutexTransport(transport);
        transport = new ResponseCorrelator(transport);

        return transport;
    }
    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
    //若是配置了写超时则执行包装WriteTimeoutFilter
        if (options.containsKey(WRITE_TIMEOUT_FILTER)) {
            transport = new WriteTimeoutFilter(transport);
            String soWriteTimeout = (String)options.remove(WRITE_TIMEOUT_FILTER);
            if (soWriteTimeout!=null) {
                ((WriteTimeoutFilter)transport).setWriteTimeout(Long.parseLong(soWriteTimeout));
            }
        }
        IntrospectionSupport.setProperties(transport, options);
        return transport;
    }

因此随后执行oneway的Transport实例是这样的:ResponseCorrelator(MutexTransport(TcpTransport)),在调用时就会构成调用链条逐层调用。

ResponseCorrelator.oneway

public void oneway(Object o) throws IOException {
        Command command = (Command)o;
        command.setCommandId(sequenceGenerator.getNextSequenceId());
        command.setResponseRequired(false);
        next.oneway(command);
    }

MutexTransport.oneway
负责枷锁

@Override
    public void oneway(Object command) throws IOException {
        writeLock.lock();
        try {
            next.oneway(command);
        } finally {
            writeLock.unlock();
        }
    }

TcpTransport.oneWay
异步发送消息

@Override
    public void oneway(Object command) throws IOException {
        checkStarted();
        wireFormat.marshal(command, dataOut);
        dataOut.flush();
    }

这种链式代码风格值得学习,逻辑抽出也很巧妙的分布在不一样的处理器中。
差点忘了,最后还要增长producerWindowSize

if (producerWindow != null) {
                    // Since we defer lots of the marshaling till we hit the
                    // wire, this might not
                    // provide and accurate size. We may change over to doing
                    // more aggressive marshaling,
                    // to get more accurate sizes.. this is more important once
                    // users start using producer window
                    // flow control.
                    int size = msg.getSize();
                    producerWindow.increaseUsage(size);
                }

而后,异步发送的流程搞定。

同步发送消息分析

ActiveMqConnection.syncSendPacket

public Response syncSendPacket(Command command, int timeout) throws JMSException {
        if (isClosed()) {
            throw new ConnectionClosedException();
        } else {

            try {
            //调用Transport的request'方法
                Response response = (Response)(timeout > 0
                        ? this.transport.request(command, timeout)
                        : this.transport.request(command));
                if (response.isException()) {
                    ExceptionResponse er = (ExceptionResponse)response;
                    if (er.getException() instanceof JMSException) {
                        throw (JMSException)er.getException();
                    } else {
                        if (isClosed()||closing.get()) {
                            LOG.debug("Received an exception but connection is closing");
                        }
                        JMSException jmsEx = null;
                        try {
                            jmsEx = JMSExceptionSupport.create(er.getException());
                        } catch(Throwable e) {
                            LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
                        }
                        if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
                            forceCloseOnSecurityException(er.getException());
                        }
                        if (jmsEx !=null) {
                            throw jmsEx;
                        }
                    }
                }
                return response;
            } catch (IOException e) {
                throw JMSExceptionSupport.create(e);
            }
        }
    }

在异步分析时已经知道Transport是一个调用链,首先来到的是
ResponseCorrelator.request

public Object request(Object command) throws IOException {
        FutureResponse response = asyncRequest(command, null);
        return response.getResult();
    }

而后你会发现,其实同步发送也是先执行异步发送请求asyncRequest,而后再阻塞response.getResult()获取响应的,因此实际上是先异步后同步的过程。

public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
        Command command = (Command) o;
        command.setCommandId(sequenceGenerator.getNextSequenceId());
        command.setResponseRequired(true);
        FutureResponse future = new FutureResponse(responseCallback, this);
        IOException priorError = null;
        synchronized (requestMap) {
            priorError = this.error;
            if (priorError == null) {
                requestMap.put(new Integer(command.getCommandId()), future);
            }
        }

        if (priorError != null) {
            future.set(new ExceptionResponse(priorError));
            throw priorError;
        }
        //异步流程
        next.oneway(command);
        return future;
    }