ActiveMQ之生产者消费者模式

ActiveMQ

 
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

简单来说ActiveMQ是JMS规范的实现者之一,因为它免费,所以用的人就多

要想使用ActiveMQ首先必须安装ActiveMQ服务,下载地址:

http://activemq.apache.org/download.html


找到对应电脑的版本号启动:


ActiveMQ 服务启动地址:http://127.0.0.1:8161/admin/ 用户名/密码 admin/admin,看到如下页面表示安装成功


创建项目需要引用服务目录下的jar包:



消息的发布和订阅,意义在于有人订阅我的消息,当我生产消息的时候即刻让我的订阅者收到我的消息.,否则,生产者也不知道向哪个服务发送消息.

创建订阅者:

package com.java1234.activemq2;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 消息消费者-消息订阅者一
 * @author Administrator
 *
 */
public class JMSConsumer {
private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址

public static void main(String[] args) {
ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageConsumer messageConsumer; // 消息的消费者
// 实例化连接工厂
connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
try {
connection=connectionFactory.createConnection();  // 通过连接工厂获取连接
connection.start(); // 启动连接
session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
// destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列
destination=session.createTopic("FirstTopic1");
messageConsumer=session.createConsumer(destination); // 创建消息消费者
messageConsumer.setMessageListener(new Listener()); // 注册消息监听
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();

}
}

监听消息:

package com.java1234.activemq2;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
 * 消息监听-订阅者一
 * @author Administrator
 *
 */
public class Listener implements MessageListener{
@Override
public void onMessage(Message message) {
// TODO Auto-generated method stub
try {
System.out.println("订阅者一收到的消息:"+((TextMessage)message).getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

创建两个雷同的订阅者和消息监听程序,需要更改注册消息监听:

        messageConsumer.setMessageListener(new Listener2());

                System.out.println("订阅者二收到的消息:"+((TextMessage)message).getText());

创建消息生产者:

package com.java1234.activemq2;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 消息生产者-消息发布者
 * @author Administrator
 *
 */
public class JMSProducer {
private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
private static final int SENDNUM=10; // 发送的消息数量
public static void main(String[] args) {
ConnectionFactory connectionFactory; // 连接工厂
Connection connection = null; // 连接
Session session; // 会话 接受或者发送消息的线程
Destination destination; // 消息的目的地
MessageProducer messageProducer; // 消息生产者
// 实例化连接工厂
connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
try {
connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
connection.start(); // 启动连接
session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
// destination=session.createQueue("FirstQueue1"); // 创建消息队列
destination=session.createTopic("FirstTopic1");
messageProducer=session.createProducer(destination); // 创建消息生产者
sendMessage(session, messageProducer); // 发送消息
session.commit();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
/**
* 发送消息
* @param session
* @param messageProducer
* @throws Exception
*/
public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{
for(int i=0;i<JMSProducer.SENDNUM;i++){
TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
System.out.println("发送消息:"+"ActiveMQ 发布的消息"+i);
messageProducer.send(message);
}
}
}

查看ActiveMQ服务是否有消费端和服务端,以及是否被消费

当我启动消费端的时候,显示的消费和生产数量都为零,但是服务已经正常启动



消息一但被创建,即刻被监听到:



ActiveMQ的消息类型有5种:

//文本消息     TextMessage textMessage = session.createTextMessage("文本消息");  //键值对消息     MapMessage mapMessage = session.createMapMessage();     //流消息     StreamMessage streamMessage = session.createStreamMessage();     //字节消息     BytesMessage bytesMessage = session.createBytesMessage();     //对象消息     User user = new User("obj", "对象消息"); //User对象必须实现Serializable接口     ObjectMessage objectMessage = session.createObjectMessage();     objectMessage.setObject(user);