ActiveMQ的安装和基本使用

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。由多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP。
ActiveMQ也对Spring支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性。

ActiveMQ的消息形式:
一种是点对点的(Queue),即一个生产者和一个消费者一一对应;
另一种是发布/订阅模式(Topic),即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

ActiveMQ定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
  • StreamMessage – Java原始值的数据流
  • MapMessage–一套名称-值对
  • TextMessage–一个字符串对象
  • ObjectMessage–一个序列化的 Java对象
  • BytesMessage–一个字节的数据流
最常用的就是TextMessage

ActiveMQ的安装

1.进入http://activemq.apache.org/下载ActiveMQ。
2. 安装环境:需要jdk,这里还用centOS系统进行安装。
3. 将ActiveMQ 的压缩包上传到Linux系统,解压缩。使用bin目录下的activemq命令启动:

[[email protected] bin]# ./activemq start
关闭:
[[email protected] bin]# ./activemq stop
查看状态:
[[email protected] bin]# ./activemq status

注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建议使用5.11.2
4. 启动之后,就可以进行访问ActiveMQ的管理后台
http://192.168.25.129:8161/
8186是ActiveMQ默认的启动时占用的端口号
在这里插入图片描述
第一次进入需要输入用户名和密码,都是admin,这也是ActiveMQ自己设置的。

ActiveMQ的使用方法

在这里插入图片描述

点对点(Queue)形式:

Producer表示消息的生产者,用于发送消息,它的使用如下:
先添加activeMq的jar包,使用5.11.2版本的jar包

<dependency>
		<groupId>org.apache.activemq</groupId>
		<artifactId>activemq-all</artifactId>
		<version>5.11.2</version>
	</dependency>
/**
* 点到点形式发送消息
 * @throws Exception 
 */
@Test
public void testQueueProducer() throws Exception{
	//1.创建一个链接工厂对象,需要指定activemq的服务id和端口
	//tcp://192.168.25.129 这种ip形式是activeMq规定的,61616是默认端口号
	ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
	
	//2.使用工厂对象,创建一个Connection对象
	Connection connection = connectionFactory.createConnection();
	
	//3.开启连接,就是调用Connection对象的start方法
	connection.start();
	
	//4.创建一个Session对象,需要指定两个参数
	//参数一:是否开启事务,如果true开始事务,则第二个参数就没有意义,一般不开启
	//参数二:应答模式。自动应答或者手动应答,一般自动应答
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	
	//5.使用Session对象创建Destination对象,它有两个子接口,Queue和Topic,这里使用Queue
	// 参数:队列的名称。
	Queue queue = session.createQueue("test-queue");
	
	//6.使用Session对象创建一个Producer对象(生产者),这里使用它的子接口MessageProducer
	MessageProducer messageProducer = session.createProducer(queue);
	
	//7.创建一个Message对象,这是消息内容,这里使用TextMessage,这也是最常用的
	/*	方式一:
	 *  TextMessage message = new ActiveMQTextMessage();
		message.setText("hello ActiveMq");*/
	//方式二:使用session直接创建
	TextMessage message = session.createTextMessage("hello ActiveMq");
	
	//8.发送消息
	messageProducer.send(message);
	
	//9.关闭资源,要关闭所有的资源
	messageProducer.close();
	session.close();
	connection.close();
}

执行之后
在这里插入图片描述
因为此时还没有消费者启动,所以消息一直持久化在服务中。

Consumer表示消息的消费者,接收消息

/**
 * 点对点形式接受消息
 * @throws Exception
 */
@Test
public void testQueueConsumer() throws Exception{
	//1.创建一个链接工厂对象,需要指定activemq的服务id和端口
	//tcp://192.168.25.129 这种ip形式是activeMq规定的,61616是默认端口号
	ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
	
	//2.使用工厂对象,创建一个Connection对象
	Connection connection = connectionFactory.createConnection();
	
	//3.开启连接,就是调用Connection对象的start方法
	connection.start();
	
	//4.创建一个Session对象,需要指定两个参数
	//参数一:是否开启事务,如果true开始事务,则第二个参数就没有意义,一般不开启
	//参数二:应答模式。自动应答或者手动应答,一般自动应答
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	
	//5.使用Session对象创建Destination对象,它有两个子接口,Queue和Topic,这里使用Queue
	// 参数:队列的名称。(注意:这里的队列的名称,要与消息生产者指定的名称一样,才能接收到指定队列的消息)
	Queue queue = session.createQueue("test-queue");
	
	//6.使用session创建一个消费者对象
	MessageConsumer consumer = session.createConsumer(queue);
	
	//7.接收消息,这里是让消费者对象consumer监听一个MessageListener事件,当有消息从生产者发送出来时,这里就马上接收到消息
	consumer.setMessageListener(new MessageListener() {
		
		@Override
		public void onMessage(Message message) {
			//接受内容,打印
			TextMessage textMessage = (TextMessage) message;
			try {
				String text = textMessage.getText();
				System.out.println(text);
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	});
	
	//等待接受消息,因为不知道消息什么时候发送,所以这里为了测试,不直接关闭,在这里一直等待消息
	System.in.read();		//表示用户敲击在控制台输入任意键,才继续往下执行
	
	//9.关闭资源,要关闭所有的资源
	consumer.close();
	session.close();
	connection.close();
}

执行之后,可以在控制台看到输出 hello ActiveMq 字符串,此时如果不在控制台操作,则消费者会一直启动者,当再去运行消息的发送方法时,消息就会马上打印在控制台,因为消费者一直在监听消息的产生。
执行消息接收之后,ActivaMq的管理后台为就能看到,队列中的消息数变成了0,所以使用Queue形式,服务会默认保存没有被消费的消息,直到有消费方接收消息。

发布&订阅形式(Topic)

Producer表示消息的生产者

/**
* 发布&订阅形式(Topic)发送消息
 * @throws Exception 
 */
@Test
public void testTopicProducer() throws Exception{
	//1.创建一个链接工厂对象,需要指定activemq的服务id和端口
	//tcp://192.168.25.129 这种ip形式是activeMq规定的,61616是默认端口号
	ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
	
	//2.使用工厂对象,创建一个Connection对象
	Connection connection = connectionFactory.createConnection();
	
	//3.开启连接,就是调用Connection对象的start方法
	connection.start();
	
	//4.创建一个Session对象,需要指定两个参数
	//参数一:是否开启事务,如果true开始事务,则第二个参数就没有意义,一般不开启
	//参数二:应答模式。自动应答或者手动应答,一般自动应答
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	
	//5.使用Session对象创建Destination对象,它有两个子接口,Queue和Topic,这里使用Topic
	// 参数:话题的名称
	Topic topic = session.createTopic("test-topic1");
	
	//6.使用Session对象创建一个Producer对象(生产者),这里使用它的子接口MessageProducer
	MessageProducer messageProducer = session.createProducer(topic);
	
	//7.创建一个Message对象,这是消息内容,这里使用TextMessage,这也是最常用的
	/*	方式一:
	 *  TextMessage message = new ActiveMQTextMessage();
		message.setText("hello ActiveMq");*/
	//方式二:使用session直接创建
	TextMessage message = session.createTextMessage("hello ActiveMq");
	
	//8.发送消息
	messageProducer.send(message);
	
	//9.关闭资源,要关闭所有的资源
	messageProducer.close();
	session.close();
	connection.close();
}

执行发布消息之后
在这里插入图片描述
从这个可以看到,使用Topic形式时,消息发送出来,如果没有消费者接收,则消息不会保存在服务中。

Consumer表示消息的消费者,接收消息

/**
	

     *  发布&订阅形式(Topic)接受消息
    	 * @throws Exception
    	 */
    	@Test
    	public void testTopicConsumer() throws Exception{
    		//1.创建一个链接工厂对象,需要指定activemq的服务id和端口
    		//tcp://192.168.25.129 这种ip形式是activeMq规定的,61616是默认端口号
    		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616");
    		
    		//2.使用工厂对象,创建一个Connection对象
    		Connection connection = connectionFactory.createConnection();
    		
    		//3.开启连接,就是调用Connection对象的start方法
    		connection.start();
    		
    		//4.创建一个Session对象,需要指定两个参数
    		//参数一:是否开启事务,如果true开始事务,则第二个参数就没有意义,一般不开启
    		//参数二:应答模式。自动应答或者手动应答,一般自动应答
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		
    		//5.使用Session对象创建Destination对象,它有两个子接口,Queue和Topic,这里使用Queue
    		// 参数:话题的名称。(注意:这里的话题的名称,要与消息生产者指定的名称一样,才能接收到指定队列的消息)
    		Topic topic = session.createTopic("test-topic1");
    		
    		//6.使用session创建一个消费者对象
    		MessageConsumer consumer = session.createConsumer(topic);
    		
    		//7.接收消息,这里是让消费者对象consumer监听一个MessageListener事件,当有消息从生产者发送出来时,这里就马上接收到消息
    		consumer.setMessageListener(new MessageListener() {
    			
    			@Override
    			public void onMessage(Message message) {
    				//接受内容,打印
    				TextMessage textMessage = (TextMessage) message;
    				try {
    					String text = textMessage.getText();
    					System.out.println(text);
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    		});
    		
    		//这里显示当前启动的消费者的index,为了测试,这里启动三个消费者,index分别是1,2,3
    		System.out.println("topic消费者1启动...");
    		
    		//等待接受消息,因为不知道消息什么时候发送,所以这里为了测试,不直接关闭,在这里一直等待消息
    		System.in.read();		//表示用户敲击在控制台输入任意键,才继续往下执行
    		
    		//9.关闭资源,要关闭所有的资源
    		consumer.close();
    		session.close();
    		connection.close();
    	}

当启动三个消息消费者之后,再次执行发送消息的方法,结果如下

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
可以看到三个消费者都接受到了消息,而ActivaMq的管理后台中,也能看到test-topic1这个话题,有了三个消费者,同时,也有了三个消息的出队
在这里插入图片描述