ActiveMQ(三)——理解和掌握JMS

1、JMS基本概念服务器

  • JMS是什么
    JMS Java Message Service,Java消息服务,是JavaEE中的一个技术。
  • JMS规范
    JMS定义了Java中访问消息中间件的接囗,并无给予实现,实现JMS接囗的消息中间件称为JMS Provider,例如ActiveMQ
    JMS provider:实现JMS接囗和规范的消息中间件
    JMS message:JMS的消息,JMS消息由如下三部分组成:
    1:消息头:每一个消息头字段都有相应的getter和setter方法
    2消息属性:若是须要除消息头字段以夕卜的值,那么可使用消息属性
    3:消息体:分装具体的消息数据
  • JMS producer:消息生产者,建立和发送JMS消息的客户端应用
  • JMS consumer:消息消费者,接收和处理JMS消息的客户端应用
    消息的消费能够采用如下两种方法之一
    1:同步消费:经过调用消费者的receive方法从目的地中显式提取消息,receive方法能够一直阻塞到消息到达。
    2:异步消费:客户能够为消费者注册一个消息监听器,以定义在消息达到时所采起的动做
  • JMS domains.消息传递域,JMS规范中定义了两种消息传递域:点对点(point-to-point,简写成PTP)消息传递域和发布/订阅消息传递域(publish/subscrmbe,简写成pub/sub)
    1:点对点消息传递域的特色以下
    (1)每一个消息只能有一个消费者
    (2)消息的生产者和消费者没有时间上的相关性。不管消费者在生产者发送消
    息的时候是否处于运行状态,它均可以提取消息。
    ActiveMQ(三)——理解和掌握JMS
    2:发布/订阅消传递域的特色以下.
    (1)每一个消息能够有多个消费者
    (2)生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅以后发布的消息。JMS规范容许客户建立持久订阅,这在必定程度上放松了时间上的相关性要求。持久订阅容许消费者消费它在未处于激活状态时发送的消息。
    3:在点对点消息传递域中,目的被称为队列(queue);在发布/订阅消传递域中,
    目的被称为主题(topic)
    ActiveMQ(三)——理解和掌握JMS
    Connection factory:链接工厂,用来建立链接对象,以链接到JMS的provider
    JMS Connection:封装了客户与JMS提供者之间的一个虚拟的链接
    JMS Session:是生产的和消费消息的个单线程上下文会话用于建立消息生产者(producer)消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上卜文中,一组发送和接收被组合到了个原子操做中。
  • Destination:消息发送到的目的地
  • Acknowledge:签收
  • Transaction:事务
  • JMS client:用来收发消息的Java应用
  • Non-JMS client:使到JMS provider本地API写的应用,用来替换JMS API实现收发消息的功能,一般会提供其余的一些特性,好比:COR、RMIT等。
  • Administered objects:预约义的JMS对象,一般的provider规范中有定义,提供给JMS客户来司好比:ConnectionFactory和Destination

2、 jms的消息结构session

  • JMS消息山如下几部分组成,消息头,属性和消息体
  • 消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性以下
    1.JMS Destination:由send方法设置
    2.JMSDeliveryMode:由send方法设置
    3.JMSExpiration:由send方法设置
    4.JMSPriority:由send方法设置
    5.JMSMessageID:由send方法设置
    6.JMSTimestamp:由客户端设置
    7.JMSCorre1ationID:由客户端设置
    8.JMSReplyTo:由客户端设置
    9.JMSType:由客户端设置
    10.JMSRedelivered:由JMS Provider设置
    标准的JMS消息头包含如下属性:
    1:JMSDestination:消息发送的目的地:主要是指Queue和Topic,自动分配
    2:JMSDe1iveryMode:传送模式。有两种:持久模式和非持久模式。一条持久性的消息应该被传送“一次仅仅一次”,这就意味者若是JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复以后再次传递。一条非持久的消息最多会传送一次,这味这服务器出现故障,该消息将永远丢失。自动分配
    3:JMSExpiration:消息过时时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。若是timeToLive值等于零,则JMSExpiration被设为零,表示该消息永不过时。若是发送后,在消息过时时间以后消息尚未被发送到目的地,则该消息被清除。自动分配
    4:JMSPriority:消息优先级,从0-9十个级别,0到是普通消息,5一9是加急消息。JMS不要求JMS Provider严格接照这十个优先级发送消息,但必须保证加寻消息要先于普通消息到达。默认是4级。自动分配
    5:JMSMessageID:惟一识别每一个消息的标识,由JMS Provider产生。自动分配
    6:JMSTimestamp:一个JMS Provider在调用send()方法时自动设置的,它是消息被发送和消费者实际接收的时间差。自动分配
    7:JMSCorre1ationID:用来链接到另一个消息,典型的应用是在回复消息中链接到原消息。在大多数状况下,JMSCorre1ationID用于将一条消息标记为对JMSMessageID标示的上条消息的应答,不过,JMSCorre1ationID能够是任何值,不只仅是JMSMessageID。由开发者设置
    8:JMSReplyTo:提供本消息回复消息的目的地。由开发者设置
    9:JMSType:消息类类型的识别符。由开发者设置
    10:JHSRedelivered:若是一个客户端收到一个设置了JMSRede1ivered属性的消息,则表示可能客户端曾经在早些时候收到过该消息,但并无签收(acknowledged),若是该消息被从新传送,JMSRede1ivered=true反之,
    JMSRedelivered=false。自动设置dom

  • 消息体,JMS API定义了5种消息体格式,也叫消息类型,可使用不一样形式发送接收数据,并以兼容现有的消息格式。包括:TextMessage、apMessage、BytesMessage、StreamMessage和0bjectMessage
  • 消息属性,包含如下三种类型的属性
    1.应用程序设置和添加的属性,好比:
    Message.setStringProperty(username,username)
    2:JMS定义的属性
    史用“JMSX”做为属性名的前缀,
    connection.getMetaData0,getJMSXPropertyNames(),方法返回全部链接支持
    的JMSX属性的名字。
    3:JMS供应的特定的属性
  • JMS定义的属性以下:
    1:JMSXUserID:发送消息的用户标识,发送时提供商设置
    2:JMSXAppID:发送消息的应用标识,发送时提供商设置
    3:JMSXDe1iveryCount:转发消息重试次数,第一次是1,第二次是2,...。发送时提供商没置
    4:JMSXGroupID:消息所在消息组的标识,由客户端设置
    5:JMSXGroupSeq::组内消息的序号,第一个小时是1,第二个是2,...。由客户端设置
    6:JMSXProducerTXID:产生消息的事务的事务标识,发送时提供商设置
    7:JMSXConsumerTXID:消费消息的事务的事务标识,接收时提供商设置
    8:JMSXRcvTimestamp:JMS转发消息到消费者的时间,接收时提供商设置
    9:JMSXState:假定存在一个消息仓库,它存储了每一个消息的的单独拷贝,且这些消息从原始消息被发送时开始。每一个拷贝的状态有:1(等待),2(准备〕,3(到期)或,4(保留)。因为状态与生产者和消费者无关,因此它不是由它们来提供。它只和在仓库中查找消息相关,所以JMS没有提以这种API。由提供商设置异步

  • 测试案例
    ActiveMQ(三)——理解和掌握JMS
    ActiveMQ(三)——理解和掌握JMS

3、JMS的可靠性机制tcp

  • 消息接收确认
           JMS消息只有在被确认以后,才认为已经被成功地消费了。消息的成功消费一般包含三个阶段:客户接收消息、客户处理消息和消息被确认。
           在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息什么时候被确认取决于建立会话时的应答模式(acknowledgementmode)。该参数有如下三个可选值:
           Session.AUTO_ACKNOWLEDGE:当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
           Session.CLIENT_ACKNOWLEDGE:客户经过调用消息的acknowledge方法确认消息。须要注意的是,在这种模式中,确认是在会话层上进行,确认一个被消费的消息将自动确认全部已被会话消费的消息。例如,若是一个消息消费者消费了10个消息,而后确认了第5个消息,那么全部10个消息都被确认。
           Session.DUPS_ACKNOWLEDGE:该选择只是会话迟钝的确认消息的提交。若是JMS provider失败,那么能会致使一些重复的消息。若是是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true。
    ActiveMQ(三)——理解和掌握JMS
  • 消息持久性,JMS支持如下两种消息提交模式:
    PERSISTENT:指示JMS provider持久保存消息,以保证的消息不会由于JMS provider的失败而丢失
    NON_PERSISTENT:不要求JMS provider持久保存消息
  • 消息优先级
    可使用消息优先级别来指示JMS provider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。若是不指定优先级,默认级别是4。须要注意的是,JMS provider并不必定保证接照优先级的顺序提交消息
  • 消息过时
    能够设置消息在必定时间后过时,默认是永不过时
  • 消息的临时目的地
    能够经过会话上的createTemporaryQueue方法和createTemporaryTopic
    力法来建立临时目的地。它们的存在时间只限于建立它们的链接所保持的时间。
    只有建立临时目的地的链接上的消息消费者才可以从临时目的地中提取消息。
    ActiveMQ(三)——理解和掌握JMS
  • 持久订阅
           首先消息生产者必须使用PERSISTENT提交消息。客户能够经过会话上的
    createDurab1eSubscriber方法来建立一个持久订阅,该方法的第一个参数必须
    是一个topic。第二个参数是订阅的名称。
           JMS provider会存储发布到持久订阅对应的topic上的消息。若是最初建立
    持久订阅的客户或者任何其它客户,使用相同的链接工厂和链接的客户ID,相同的主题和相同的到订阅名,再次调用会话上的createDurab1eSubscriber方法,那么持久订阅就会被激活。JMS provider会向客户发送客户处于非激活状态时所发布的消息。
           持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在建立以后会一直保留,直到应用程序调用会话上的unsubscribe方法。ide

  • 本地事务
           在一个JMS客户端,可使用本地事务来组合消息的发送和接收。JMS Session接口提供了commit和rollback方法。事务提交意味着生产的全部消息被发送,消费的全部消息被确认;事务回滚意味生产的的全部消息被销毁,消费的全部消息被恢复并从新提交,除非它们已通过期。
           事务性的会话老是牵涉到事务处理中,commit或rollback方法一旦被调用,一个事务就结束了,而另外一个事务被开始。关闭事务性会话将回滚其中的事务。得要注意的是,若是使用请求/回复机制,即发送一个消息,同时但愿在同一个事务中等待接收该消息的回复,那么程序将被挂起,由于知道事务提交,发送操做才会真正执行。
           须要注意的还有一个,消息的生产和消费不能包含在同一个事务中。

4、JMS的PTP模型工具

  • JMS PTP(Point-to-Point)模型定义了客户端如向队列发送消息,从队列接收消息,以及浏览队列中的消息
           PTP模型是基于队列的,生产者发消息到队列,消费者从队列接收消息,队列的存在使得消息的异步传输成为金可能。和邮件系统中的邮箱同样,队列能够包含各类消息,JMS Provider提供工具管理队列的建立、删除。
  • PTP的一些特色
    1:若是在Session关闭时,有一些消息己经被收到,但尚未被签收(acknowledged),那么,当消费者下次链接到相同的队列时,这些消息还会被再
    次接收
    2:若是用户在receive方法中设定了消息选择条件,那么不符合条件的消息会留在队列中,不会被接收到
    3:队列能够长久地保存消息直到消费者收到消息。消费者不须要由于担忧消息会丢失而时刻和队列保持激活的链接状态,充分体现了异步传输模式的优点

5、JMS的Pub/Sub模型测试

  • JMSPub/Sub模型定义了如何向一个内容节点发布和订阅消息,这些节点被称做topic主题,能够被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(subscribe〕从主题订阅消息。主题使得消息订阅者和消发布者保持互相独立,不须要接触便可保证消息的传送。
  • JMSPub/Sub的一些特色.
    1:消息订阅分为非持久订阅和持久订阅
           非持久订阅只有当客户端处于激活状态,也就是和JMS Provider保持链接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。
           持久订阅时,客户端向JMS注册一个识别本身身份的ID,当这个客户端处于离线时,JMS provider会为这个ID保存全部发送到主题的消息,当客户再次链接到JMS provider时,会根据本身的ID获得全部当本身处于离线时发送到主题的消息。
    2:若是用户在receive方法中设定了消息选择条件,那么不符合条件的消息不会被接收:非持久订阅状态下,不能恢复或从新派送一个未签收的消息。只有持久订阅才能恢复或从新派送一个未签收的消息。
    4:当全部的消息必须被接收,则用持久订阅。当丢失消息可以被容忍,则用非持久订阅
    6、JMS的API结构(和开发相关)
    ActiveMQ(三)——理解和掌握JMS

7、JMS应用开发的基本步骤
ActiveMQ(三)——理解和掌握JMS线程

8、非持久的Topic消息示例code

  • 对于非持久的Topic消息的发送
    基本跟前面发送队列信息是同样的,只是把建立Destination的地方,由创
    建队列替换成建立Topic,例如:
    Destination destination = session.createTopic(MyTopic");

  • 对于非持久的Topic消息的接收
    1:必需要接收方在线,而后客户端发送信息,接收方才能接收到消息
    2:一样把建立Destination的地方,山建立队列替换成建立Topic,如:
    Destination destination = session.createTopic('MyTopic);

3:因为不知道客户端发送多少信息,所以改为while循环的方式了,例如:

Message message = consumer.receive();
while(message!=null){
TextMessage txtMsg= (TextMessage)message;
System.out.println(“收到消息:”+txtMsg.getText());
message = consumer.receive(1000L);
}
  • 完整示例

    public class NoPersistenceSender {
    public static void main(String[] args) throws Exception{
        //链接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
    
        //带事务的session
        Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic("myTopic");
        MessageProducer producer = session.createProducer(destination);
    
        for (int i = 0; i < 3; i++) {
            TextMessage message = session.createTextMessage("message---"+i);
            producer.send(message);
        }
        session.commit();
        session.close();
        connection.close();
    }
    }
public class NoPersistenceReceive {
    public static void main(String[] args) throws Exception{
        //链接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();

        //带事务的session
        final Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createTopic("myTopic");
        MessageConsumer consumer = session.createConsumer(destination);

        Message message = consumer.receive();
        while(message!=null){
            TextMessage txtMsg= (TextMessage)message;
            System.out.println("收到消息:"+txtMsg.getText());
            message = consumer.receive(1000L);
        }
        session.commit();
        session.close();
        connection.close();
    }
}

注意:对于非持久性的Topic消息,则须要接收者保持运行状态,否则消息发送者发出的消息会接收不到。
ActiveMQ(三)——理解和掌握JMS

9、持久的Topic消息示例
1:须要在链接上设置消费者ID,用来识别消费者
2:须要建立TopicSubscriber来订阅
3:要设置好了事后再start这个connnection
4:消费者必定要先运行一次,等于向消息服务中间件注册这个消费者,而后再运行客户端发送消息,这个时候不管消费者是否在线,都会接收到,下次链接的时候,会把没有收过的消息都接收下来。

public class PersistenceSender {
    public static void main(String[] args) throws Exception{
        //链接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();

        //带事务的session
        Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic("myTopic");
        MessageProducer producer = session.createProducer(destination);

        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();

        for (int i = 0; i < 3; i++) {
            TextMessage message = session.createTextMessage("message---"+i);
            producer.send(message);
        }
        session.commit();
        session.close();
        connection.close();
    }
}
public class PersistenceReceive {
    public static void main(String[] args) throws Exception{
        //链接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.setClientID("cc1");

        //带事务的session
        final Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

        Topic destination = session.createTopic("myTopic");
        TopicSubscriber ts = session.createDurableSubscriber(destination,"T1");
        connection.start();

        Message message = ts.receive();
        while(message!=null){
            TextMessage txtMsg= (TextMessage)message;
            System.out.println("收到消息:"+txtMsg.getText());
            message = ts.receive(1000L);
        }
        session.commit();
        session.close();
        connection.close();
    }
}

ActiveMQ(三)——理解和掌握JMS

10、总结

  • 持久化消息
    这是ActiveMQ的默认传送模式,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另外一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们以前不会丢失这些消息。
    这意味着在持久性消息传送至目标时,消息服务将其放入持久性数据存储。若是消息服务因为某种缘由致使失败,它能够恢复此消息并将此消息传送至相应的消费皆。虽然这样增长了消息传送的开销,但却增长了可靠性。
  • 非持久化消息保证这些消息最多被传送一次。对于这些消息,可靠性并不是主要的考虑因素。此模式并不要求持久性的数据存储,不保证消息服务因为某种缘由致使失败后消息不会丢失。有两种方法指定传送模式:1.使用setDe1iveryMode方法,这样全部的消息都采用此传送模式;如producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)2.使用send方法为每一条消息设置传送模式