ActiveMQ发送、接收消息

本文主要以一个简单的示例展现ActiveMQ收发消息:html

 

1、ActiveMQ说明java

2、代码示例apache

 

1、ActiveMQ说明session

一、当前ActiveMQ的最新版本为: ActiveMQ 5.15.3 Release  ====》点击:官网下载ActiveMQ地址dom

 

二、下载好以后解压,而后启动ActiveMQ,启动时要根据系统的位数来选择tcp



 

 

2、代码示例ide

 

导入依赖jar包spa

<dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-all</artifactId>
   <version> 5.15 . 3 </version>
</dependency>

 

(1)生产者代码code

package com.chinasoft.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * ActiveMQ消息的生产者 生产者发送消息到消息中间件,实现JMS规范接口的消息中间件称为JMS Provider
 * 
 * @author Freedom
 *
 */
public class Sender {

	public static void main(String[] args) throws JMSException {
		sender();
	}

	public static void sender() throws JMSException {

		// 1.创建ConnectionFactory
		ConnectionFactory f = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
				ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");

		// 2.经过ConnectionFactory工厂创建Connection的链接,并调用start开启链接
		Connection c = f.createConnection();
		c.start();

		// 3.经过Connection对象建立session会话,用于接收消息,参数1:是否开启事务,参数2:设置签收方式
		Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);// 自动签收

		// 4.经过session建立Destination对象,指的是一个客户端用来指定生产消息的目标和消费消息的来源对象
		// 在PTP的模式中Destination被称为Queue队列;在pub/sub模式下Destination被称为Topic主题
		Destination d = session.createQueue("firstMQ");

		// 5.经过session建立消息的发送者和接收者(生产者和消费者)
		MessageProducer p = session.createProducer(null);// 在生产者发送消息的同时指定Destination

		// 6.MessageProducer设置持久化特性和非持久化特性

		// 7.JMS规范的TextMessage形式数据经过Session来建立,并用MessageProducer发送消息,客户端receive方法进行数据的接受
		// 使用完成必定要关闭Connection链接
		TextMessage msg = null;
		for (int i = 0; i < 100; i++) {
			msg = session.createTextMessage();
			msg.setText("ActiveMQ做为消息中间件:" + i);
			System.out.println("生产者发送的消息==" + msg.getText());
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}

			// 发送消息
			p.send(d, msg);
		}

		// 关闭链接
		c.close();

	}

}

 

注意:htm

①为了保证消息中间件中数据的可靠性,数据默认保存到kahadb中

 

 

②经过管控台能够看到生产者数据的变化

ActiveMQ内置jetty容器,访问管控台(http://localhost:8161/admin)



 

 

(2)消费者代码示例

package com.chinasoft.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer {

	public static void main(String[] args) throws JMSException {
		receiver();
	}

	public static void receiver() throws JMSException {

		// 1.创建ConnectionFactory
		ConnectionFactory f = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,
				ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616");

		// 2.经过ConnectionFactory工厂创建Connection的链接,并调用start开启链接
		Connection c = f.createConnection();
		c.start();

		// 3.经过Connection对象建立session会话,用于接收消息,参数1:是否开启事务,参数2:设置签收方式
		Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);// 自动签收

		// 4.经过session建立Destination对象,指的是一个客户端用来指定生产消息的目标和消费消息的来源对象
		// 在PTP的模式中Destination被称为Queue队列;在pub/sub模式下Destination被称为Topic主题
		Destination d = session.createQueue("firstMQ");

		// 5.经过session建立消息的发送者和接收者(生产者和消费者)
		MessageConsumer consumer = session.createConsumer(d); // 消费者用于接受MQ的数据

		// 7.JMS规范的TextMessage形式数据经过Session来建立,并用MessageProducer发送消息,客户端receive方法进行数据的接受
		TextMessage msg = null;
		while (true) {
			// 阻塞等待接受MQ的数据
			msg = (TextMessage) consumer.receive();
			System.out.println("消费者接受消息===" + msg.getText());
		}
	}

}

 

控制台显示结果:



 

管控台显示结果: