消息队列-ActiveMQ学习笔记(三)-发布-订阅消息模式实现

发布-订阅消息模式与点对点模式类似,只不过在session创建消息队列时,由session.createQuene()变为session.createTopic()。


消息发布者代码:

[java]  view plain  copy
  1. package com.feiyang.activemq2;  
  2.   
  3. import javax.jms.Connection;  
  4. import javax.jms.ConnectionFactory;  
  5. import javax.jms.Destination;  
  6. import javax.jms.JMSException;  
  7. import javax.jms.MessageProducer;  
  8. import javax.jms.Session;  
  9. import javax.jms.TextMessage;  
  10.   
  11. import org.apache.activemq.ActiveMQConnectionFactory;  
  12.   
  13.   
  14. /** 
  15.  * 消息发布者 
  16.  *  
  17.  * @author MCL 
  18.  * 
  19.  */  
  20. public class JMSProducer {  
  21.     private static final String USERNAME = ActiveMQConnectionFactory.DEFAULT_USER;// 默认的连接的用户名  
  22.     private static final String PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;// 默认的连接密码  
  23.     private static final String BROKEURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;// 默认的连接地址  
  24.     private static final int SENDNUM = 10;// 发送消息的数量  
  25.   
  26.     public static void main(String[] args) {  
  27.         ConnectionFactory connectionFactory;// 连接工厂  
  28.         Connection connection = null;// 连接  
  29.         Session session;// 会话 发送或者接受消息的线程  
  30.         Destination destination;// 消息的目的地  
  31.         MessageProducer messageProducer;// 消息生产者  
  32.         // 实例化连接工厂  
  33.         connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD,  
  34.                 JMSProducer.BROKEURL);  
  35.   
  36.         try {  
  37.             // 通过连接工厂获取连接  
  38.             connection = connectionFactory.createConnection();  
  39.             // 启动连接  
  40.             connection.start();  
  41.             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);  
  42.             destination = session.createTopic("FirstTopic");// 创建消息队列  
  43.             messageProducer = session.createProducer(destination);// 创建消息生产者  
  44.             sendMessage(session, messageProducer);  
  45.             // 由于设置添加事务,这里需要使用提交才能将数据发送出去  
  46.             session.commit();  
  47.   
  48.         } catch (JMSException e) {  
  49.             // TODO Auto-generated catch block  
  50.             e.printStackTrace();  
  51.         } finally {  
  52.             if (connection != null) {  
  53.                 try {  
  54.                     connection.close();  
  55.                 } catch (JMSException e) {  
  56.                     // TODO Auto-generated catch block  
  57.                     e.printStackTrace();  
  58.                 }  
  59.             }  
  60.         }  
  61.     }  
  62.   
  63.     // 发送消息  
  64.     public static void sendMessage(Session session, MessageProducer messageProducer) {  
  65.         for (int i = 0; i < JMSProducer.SENDNUM; i++) {  
  66.             try {  
  67.                 TextMessage message = session.createTextMessage("Active MQ发送消息" + i);  
  68.                 System.out.println("发布消息:Active MQ发送消息");  
  69.                 messageProducer.send(message);  
  70.             } catch (JMSException e) {  
  71.                 // TODO Auto-generated catc h block  
  72.                 e.printStackTrace();  
  73.             }  
  74.         }  
  75.     }  
  76. }  

消息订阅者代码:

[java]  view plain  copy
  1. package com.feiyang.activemq2;  
  2.   
  3. import javax.jms.Connection;  
  4. import javax.jms.ConnectionFactory;  
  5. import javax.jms.Destination;  
  6. import javax.jms.JMSException;  
  7. import javax.jms.MessageConsumer;  
  8. import javax.jms.Session;  
  9.   
  10. import org.apache.activemq.ActiveMQConnection;  
  11. import org.apache.activemq.ActiveMQConnectionFactory;  
  12.   
  13. /** 
  14.  * 消息订阅者一 
  15.  * @author MCL 
  16.  * 
  17.  */  
  18. public class JMSConsumer {  
  19.     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默认的连接的用户名  
  20.     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默认的连接密码  
  21.     private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默认的连接地址  
  22.   
  23.     public static void main(String[] args) {  
  24.         ConnectionFactory connectionFactory; // 连接工厂  
  25.         Connection connection = null// 连接  
  26.         Session session; // 会话 接受或者发送消息的线程  
  27.         Destination destination; // 消息的目的地  
  28.         MessageConsumer messageConsumer; // 消息的消费者  
  29.           
  30.   
  31.         connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,  
  32.                 JMSConsumer.BROKEURL);  
  33.         try {  
  34.             // 通过连接工厂获取连接  
  35.             connection=connectionFactory.createConnection();  // 通过连接工厂获取连接  
  36.             connection.start(); // 启动连接  
  37.             session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session  
  38.             destination=session.createTopic("FirstTopic");   
  39.             messageConsumer=session.createConsumer(destination); // 创建消息消费者  
  40.             messageConsumer.setMessageListener(new Listener()); // 注册消息监听  
  41.               
  42.         } catch (JMSException e) {  
  43.             // TODO Auto-generated catch block  
  44.             e.printStackTrace();  
  45.         }   
  46.     }  
  47. }  

监听器代码:

[java]  view plain  copy
  1. package com.feiyang.activemq2;  
  2.   
  3. import javax.jms.JMSException;  
  4. import javax.jms.Message;  
  5. import javax.jms.MessageListener;  
  6. import javax.jms.TextMessage;  
  7. /** 
  8.  * 消息监听器 
  9.  * @author MCL 
  10.  * 
  11.  */  
  12. public class Listener implements MessageListener{  
  13.   
  14.     @Override  
  15.     public void onMessage(Message message) {  
  16.         // TODO Auto-generated method stub  
  17.         try {  
  18.             System.out.println("消息订阅者一收到的消息:"+((TextMessage)message).getText());  
  19.         } catch (JMSException e) {  
  20.             // TODO Auto-generated catch block  
  21.             e.printStackTrace();  
  22.         }  
  23.     }  
  24.   
  25. }  

我们可以定义多个消息订阅者及其监听器。这里定义了两个订阅者,由于代码相似所以只粘贴一份。

由于发布-订阅模型的关系,需要先进行订阅后,才能接收发布者的消息。
先启动 订阅者一和订阅者二的线程,然后用发布者 发布消息。打开后台管理界面,点击Topics

有上述图片可以看出,FirstTopic中,消息发布者发布了10条信息,并由两个订阅者进行消费,每人消费10条信息。