1、JMS基本概念服务器
2、 jms的消息结构session
消息头包含消息的识别信息和路由信息,消息头包含一些标准的属性以下
1.JMS Destination:由send方法设置
2.JMSDeliveryMode:由send方法设置
3.JMSExpiration:由send方法设置
4.JMSPriority:由send方法设置
5.JMSMessageID:由send方法设置
6.JMSTimestamp:由客户端设置
7.JMSCorre1ationID:由客户端设置
8.JMSReplyTo:由客户端设置
9.JMSType:由客户端设置
10.JMSRedelivered:由JMS Provider设置
标准的JMS消息头包含如下属性:
1:JMSDestination:消息发送的目的地:主要是指Queue和Topic,自动分配
2:JMSDe1iveryMode:传送模式。有两种:持久模式和非持久模式。一条持久性的消息应该被传送“一次仅仅一次”,这就意味者若是JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复以后再次传递。一条非持久的消息最多会传送一次,这味这服务器出现故障,该消息将永远丢失。自动分配
3:JMSExpiration:消息过时时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。若是timeToLive值等于零,则JMSExpiration被设为零,表示该消息永不过时。若是发送后,在消息过时时间以后消息尚未被发送到目的地,则该消息被清除。自动分配
4:JMSPriority:消息优先级,从0-9十个级别,0到是普通消息,5一9是加急消息。JMS不要求JMS Provider严格接照这十个优先级发送消息,但必须保证加寻消息要先于普通消息到达。默认是4级。自动分配
5:JMSMessageID:惟一识别每一个消息的标识,由JMS Provider产生。自动分配
6:JMSTimestamp:一个JMS Provider在调用send()方法时自动设置的,它是消息被发送和消费者实际接收的时间差。自动分配
7:JMSCorre1ationID:用来链接到另一个消息,典型的应用是在回复消息中链接到原消息。在大多数状况下,JMSCorre1ationID用于将一条消息标记为对JMSMessageID标示的上条消息的应答,不过,JMSCorre1ationID能够是任何值,不只仅是JMSMessageID。由开发者设置
8:JMSReplyTo:提供本消息回复消息的目的地。由开发者设置
9:JMSType:消息类类型的识别符。由开发者设置
10:JHSRedelivered:若是一个客户端收到一个设置了JMSRede1ivered属性的消息,则表示可能客户端曾经在早些时候收到过该消息,但并无签收(acknowledged),若是该消息被从新传送,JMSRede1ivered=true反之,
JMSRedelivered=false。自动设置dom
JMS定义的属性以下:
1:JMSXUserID:发送消息的用户标识,发送时提供商设置
2:JMSXAppID:发送消息的应用标识,发送时提供商设置
3:JMSXDe1iveryCount:转发消息重试次数,第一次是1,第二次是2,...。发送时提供商没置
4:JMSXGroupID:消息所在消息组的标识,由客户端设置
5:JMSXGroupSeq::组内消息的序号,第一个小时是1,第二个是2,...。由客户端设置
6:JMSXProducerTXID:产生消息的事务的事务标识,发送时提供商设置。
7:JMSXConsumerTXID:消费消息的事务的事务标识,接收时提供商设置
8:JMSXRcvTimestamp:JMS转发消息到消费者的时间,接收时提供商设置
9:JMSXState:假定存在一个消息仓库,它存储了每一个消息的的单独拷贝,且这些消息从原始消息被发送时开始。每一个拷贝的状态有:1(等待),2(准备〕,3(到期)或,4(保留)。因为状态与生产者和消费者无关,因此它不是由它们来提供。它只和在仓库中查找消息相关,所以JMS没有提以这种API。由提供商设置。异步
3、JMS的可靠性机制tcp
持久订阅
首先消息生产者必须使用PERSISTENT提交消息。客户能够经过会话上的
createDurab1eSubscriber方法来建立一个持久订阅,该方法的第一个参数必须
是一个topic。第二个参数是订阅的名称。
JMS provider会存储发布到持久订阅对应的topic上的消息。若是最初建立
持久订阅的客户或者任何其它客户,使用相同的链接工厂和链接的客户ID,相同的主题和相同的到订阅名,再次调用会话上的createDurab1eSubscriber方法,那么持久订阅就会被激活。JMS provider会向客户发送客户处于非激活状态时所发布的消息。
持久订阅在某个时刻只能有一个激活的订阅者。持久订阅在建立以后会一直保留,直到应用程序调用会话上的unsubscribe方法。ide
4、JMS的PTP模型工具
5、JMS的Pub/Sub模型测试
7、JMS应用开发的基本步骤线程
8、非持久的Topic消息示例code
对于非持久的Topic消息的发送
基本跟前面发送队列信息是同样的,只是把建立Destination的地方,由创
建队列替换成建立Topic,例如:
Destination destination = session.createTopic(MyTopic");
3:因为不知道客户端发送多少信息,所以改为while循环的方式了,例如:
Message message = consumer.receive(); while(message!=null){ TextMessage txtMsg= (TextMessage)message; System.out.println(“收到消息:”+txtMsg.getText()); message = consumer.receive(1000L); }
完整示例
public class NoPersistenceSender { public static void main(String[] args) throws Exception{ //链接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); //带事务的session Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("myTopic"); MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("message---"+i); producer.send(message); } session.commit(); session.close(); connection.close(); } }
public class NoPersistenceReceive { public static void main(String[] args) throws Exception{ //链接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.start(); //带事务的session final Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("myTopic"); MessageConsumer consumer = session.createConsumer(destination); Message message = consumer.receive(); while(message!=null){ TextMessage txtMsg= (TextMessage)message; System.out.println("收到消息:"+txtMsg.getText()); message = consumer.receive(1000L); } session.commit(); session.close(); connection.close(); } }
注意:对于非持久性的Topic消息,则须要接收者保持运行状态,否则消息发送者发出的消息会接收不到。
9、持久的Topic消息示例
1:须要在链接上设置消费者ID,用来识别消费者
2:须要建立TopicSubscriber来订阅
3:要设置好了事后再start这个connnection
4:消费者必定要先运行一次,等于向消息服务中间件注册这个消费者,而后再运行客户端发送消息,这个时候不管消费者是否在线,都会接收到,下次链接的时候,会把没有收过的消息都接收下来。
public class PersistenceSender { public static void main(String[] args) throws Exception{ //链接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); //带事务的session Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic("myTopic"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); connection.start(); for (int i = 0; i < 3; i++) { TextMessage message = session.createTextMessage("message---"+i); producer.send(message); } session.commit(); session.close(); connection.close(); } }
public class PersistenceReceive { public static void main(String[] args) throws Exception{ //链接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); connection.setClientID("cc1"); //带事务的session final Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE); Topic destination = session.createTopic("myTopic"); TopicSubscriber ts = session.createDurableSubscriber(destination,"T1"); connection.start(); Message message = ts.receive(); while(message!=null){ TextMessage txtMsg= (TextMessage)message; System.out.println("收到消息:"+txtMsg.getText()); message = ts.receive(1000L); } session.commit(); session.close(); connection.close(); } }
10、总结