JMS简介和ActiveMQ简单使用

一、JMS是什么java

“The Java Message Service (JMS) API is a messaging standard that allows application components based on the Java Platform Enterprise Edition (Java EE) to create, send, receive, and read messages. It enables distributed communication that is loosely coupled, reliable, and asynchronous.”。翻译成中文就是:JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准或者说是规范,容许应用程序组件基于JavaEE平台建立、发送、接收和读取消息。它使分布式通讯耦合度更低,消息服务更加可靠以及异步性。spring

 

二、JMS的消息模式:apache

1)队列模式:消费者一个一个消费(按次序分配)安全

2)主题模式:每一个订阅者都能消费任意消息(订阅后)session

3)队列模式与主题模式最大的区别是:app

(1)队列模式:平均接收消息生产者产生的消息负载均衡

(2)主题模式:所有接收生产者产生的全部消息异步

 

三、JMS编码接口之间的关系async

对上图解析:maven

1)又链接工厂(ConnectionFactory)建立链接(Connection);

2)由链接(Connection)建立会话(Session)(链接能够建立多个会话,链接可共多个线程同时使用,而会话是单线程的,它只能在单个线程里面有效。会话可作一些事务上的处理。一个会话是一个线程上下文);

3)由会话(Session)建立生产者(MessageProducer)和消费者(MessageConsumer);

4)会话(Session)建立新的消息(Message);

5)生产者(MessageProducer)将新消息发送到指定目的地,消费者(MessageConsumer)到指定目的地接收新消息。

四、Spring集成jms链接ActiveMQ

1)ConnectionFactory用于管理链接功工厂(Spring提供的链接池,spring提供了SingleConnectionFactory和CachingConnectionFactory)

2)JmsTemplate用于发送和接收消息的模板类

(1)每次发送消息都会从新建立链接、会话和productor。

(2)是spring提供的,只须要向spring容器内注册这个类就可使用JmsTemplate方便的操做Jms。

(3)JmsTemplate类是线程安全的,能够在整个应用范围使用。

3)MessageListerner消息监听器

(1)实现一个onMessage方法,该方法只接收一个Message参数,就能够处理消息了。

五、ActiveMQ集群配置

1)对消息中间件集群缘由:

(1)实现高可用,以排除单点故障引发的服务中断

(2)实现负载均衡,以提高效率为更多客户提供服务

2)ActiveMQ集群的方式:

(1)客户端集群:让多个消费者消费同一个队列

(2)Broker cluster:多个Broker之间同步消息

(3)Master Slave:实现高可用

3)ActiveMQ的队列模式实例:

(1)引用ActiveMQ的maven依赖

<!-- ActiveMQ -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>5.15.2</version>
		</dependency>

(2)AppProducer.java

import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Connection;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 队列模式实例 - 生产者 
 */
public class AppProducer {

	// 默认端口
	private static final String url = "tcp://127.0.0.1:61616";
	private static final String queueName = "queue-test";
	
	
	public static void main(String[] args) throws JMSException {
		// 一、建立链接工厂ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
		// 二、建立链接
		Connection connection = connectionFactory.createConnection();
		// 三、启动链接
		connection.start();
		// 四、建立会话
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 五、建立一个目标
		Destination destination = session.createQueue(queueName);
		// 六、建立生产者
		MessageProducer producer = session.createProducer(destination);
		for (int i = 0; i < 100; i++) {
			// 七、建立消息
			TextMessage textMessage = session.createTextMessage("test queue" + i);
			// 八、发布消息
			producer.send(textMessage);
		}
		// 九、关闭链接
		connection.close();
	}

}

(3)AppConsumer.java

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 队列模式实例 -消费者
 */
public class AppConsumer {

	// 默认端口
	private static final String url = "tcp://127.0.0.1:61616";
	private static final String queueName = "queue-test";
	
	
	public static void main(String[] args) throws JMSException {
		// 一、建立链接工厂ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
		// 二、建立链接
		Connection connection = connectionFactory.createConnection();
		// 三、启动链接
		connection.start();
		// 四、建立会话
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 五、建立一个目标
		Destination destination = session.createQueue(queueName);
		// 六、建立一个消费者
		MessageConsumer consumer = session.createConsumer(destination);
		// 七、建立一个监听器(注意消息的监听是个异步的过程)
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message message) {
				TextMessage textMessage = (TextMessage) message;
				try {
					System.out.println("队列模式消费消息 : " + textMessage.getText());
				} catch (JMSException e) {
				}
			}
		});
		// 八、关闭链接(注意消息的监听是个异步的过程,因此在此不能先关闭链接)
//		connection.close();
	}

}

(4)运行:先运行提供者类,再运行多个消费者类,发现队列中的消费者中不重复消费掉了全部消息。

4)ActiveMQ的主题模式实例:

(1)一样,先引用ActiveMQ的maven依赖

<!-- ActiveMQ -->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-all</artifactId>
			<version>5.15.2</version>
		</dependency>

(2)AppProducer.java

import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Connection;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 主题模式实例 - 生产者 
 */
public class AppProducer {

	// 默认端口
	private static final String url = "tcp://127.0.0.1:61616";
	private static final String topicName = "topic-test";
	
	
	public static void main(String[] args) throws JMSException {
		// 一、建立链接工厂ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
		// 二、建立链接
		Connection connection = connectionFactory.createConnection();
		// 三、启动链接
		connection.start();
		// 四、建立会话
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 五、建立一个目标,(注意是建立主题模式目标session.createTopic();)
		Destination destination = session.createTopic(topicName);
		// 六、建立生产者
		MessageProducer producer = session.createProducer(destination);
		for (int i = 0; i < 100; i++) {
			// 七、建立消息
			TextMessage textMessage = session.createTextMessage("test queue" + i);
			// 八、发布消息
			producer.send(textMessage);
		}
		// 九、关闭链接
		connection.close();
	}

}

(3)AppConsumer.java

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 主题模式实例 - 消费者
 */
public class AppConsumer {

	// 默认端口
	private static final String url = "tcp://127.0.0.1:61616";
	private static final String topicName = "topic-test";
	
	
	public static void main(String[] args) throws JMSException {
		// 一、建立链接工厂ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
		// 二、建立链接
		Connection connection = connectionFactory.createConnection();
		// 三、启动链接
		connection.start();
		// 四、建立会话
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 五、建立一个目标(注意是建立队列模式目标session.createQueue())
		Destination destination = session.createTopic(topicName);
		// 六、建立一个消费者
		MessageConsumer consumer = session.createConsumer(destination);
		// 七、建立一个监听器(注意消息的监听是个异步的过程)
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message message) {
				TextMessage textMessage = (TextMessage) message;
				try {
					System.out.println("主题模式消费消息 : " + textMessage.getText());
				} catch (JMSException e) {
				}
			}
		});
		// 八、关闭链接(注意消息的监听是个异步的过程,因此在此不能先关闭链接)
//		connection.close();
	}

}

(4)运行:先运行提供者类,再运行多个消费者类,发现队列中的每一个消费者到能够消费掉全部的消息。