JMS即Java Message Service(Java消息服务),它是一组规范,提供了方便的而且通用的方式来让Java程序建立,发送,接受和读取消息。java
在中大型的项目中,不少都用到了被叫作面向消息的中间件(Message Oriented Middleware),一般简称消息中间件,如今流行的消息中间件产品主要有ActiveMQ,RabbitMQ,Kafka,RocketMQ等等,不少大型的商业软件使用这些产品构建应用程序的消息系统,消息在应用程序中的各个模块(或者服务)之间传递,以此达到解耦,削峰,提升应用程序的吞吐量等目的。shell
由于产品众多,若是没有一个成熟的规范来约束,那么各个产商消息中间件的具体实现可能会截然不同,使得开发者在切换产品的时候很是麻烦,不得不从新编写大量代码来适应产品。JMS就是这么一个规范,它定义了一个消息中间件产品应该具备哪些组件,哪些接口等等,若是消息中间件都能根据规范定义的接口来实现的话,开发者在构建消息系统的时候只须要付出不多的代价来切换不一样的产品,由于接口都是同样的,几乎不须要如何修改代码。apache
JMS规范定义的几个组件以下所示:windows
JMS Message包含了以下几个部分:浏览器
以上是对JMS的简单介绍,更多关于JMS规范的内容,建议参看JSR 914文档(直接在搜索引擎中搜索该关键字就能够找到了),下面我将介绍JMS规范定义的接口并使用ActiveMQ来构建一个小Demo做为实践,最后会介绍两种常见消息传送模型:点对点传输模型和发布/订阅传输模型。session
注意,这里所说的消息不是狭义的消息,即不特指一句话或者一个邮件等,消息不只能够是字符串形式的,还能够是一个Java对象等。负载均衡
上图是JMS规范定义的接口,最左侧是JMS通用接口,右侧是两种不一样的消息传送模型的接口,下面来介绍通用接口:ide
下图是上述几个接口的关系图:学习
下面会看到,咱们在使用ActiveMQ编写代码的时候几乎就是根据这么一个关系图来写的。搜索引擎
标准的Java SE里没有实现JMS规范,因此标准JDK里没有上述提到的几个接口和对应的实现,但Java EE里有实现,不过搭建Java EE相对比较麻烦(只是一个Demo而已),因此这里就直接使用ActiveMQ这个产品。ActiveMQ实现了JMS的规范,提供了JMS规范定义的接口,之因此使用ActiveMQ而不是其余的产品,是由于ActiveMQ的实现比较“标准”,很是贴合JMS规范,且学习门槛叫其余几个产品低。
安装很简单,直接到ActiveMQ官网去下载便可,我这里使用的是5.15.6这个版本,在windows下,下载apache-activemq-5.15.6-bin.zip这个压缩包(在Unix系统下,下载apache-activemq-5.15.6-bin.tar.gz),而后解压,在命令行里进入bin目录,输入activemq start便可。
建议到官网的Get Start部分看看,安装的启动都讲得很详细,这里我就很少说了。
启动以后,能够在浏览器输入http://localhost:8161来查看ActiveMQ的各项数据,以下所示:
点击Manage ActiveMQ broker便可进入控制台,帐号和密码默认都是admin,进入以后以下所示:
在这里,能够选择各个标签栏来查看不一样的信息,例如Topic,Queue等,具体在这就很少说了,读者能够自行尝试。
若是使用的是Maven来构建项目,能够加入以下依赖:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.6</version>
</dependency>
复制代码
若是没有使用Maven,就须要本身往项目里导入jar包了,在刚刚下载的压缩包里,包含了activemq-all-5.15.6.jar这个jar包,将其导入到项目的类路径便可。
准备工做作完以后,就能够正式编写代码了,演示代码以下所示:
JMSProducer
package top.yeonon.jms;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/** * @Author yeonon * @date 2018/10/20 0020 14:09 **/
public class JMSProducer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) throws JMSException {
//链接工厂
ConnectionFactory connectionFactory;
//链接
Connection connection;
//Session会话
Session session;
//目的地
Destination destination;
//建立链接工厂实例
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD, BROKER_URL);
//从链接工厂实例中建立链接
connection = connectionFactory.createConnection();
//从链接中建立Session,第一个参数是是否开启事务,这里选择不开启,传入false
// 第二个参数是消息确认模式,这里选择的是自动确认
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立了一个队列,队列Queue(javax.jms包里的Queue)继承了Destination接口
destination = session.createQueue("Hello");
//建立一个消息生产者,须要传入一个destination,这样这个producer就和destination绑定了
MessageProducer producer = session.createProducer(destination);
//配置完成以后,启动链接
connection.start();
//发送消息
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("message " + i);
System.out.println("producer send message : " + message.getText());
producer.send(message);
}
//发送完成后,关闭链接
connection.close();
}
}
复制代码
JMSConsumer
package top.yeonon.jms;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/** * @Author yeonon * @date 2018/10/20 0020 14:19 **/
public class JMSConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) throws JMSException {
//链接工厂
ConnectionFactory connectionFactory;
//链接
Connection connection;
//Session会话
Session session;
//目的地
Destination destination;
//建立链接工厂实例
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD, BROKER_URL);
//从链接工厂实例中建立链接
connection = connectionFactory.createConnection();
//从链接中建立Session,第一个参数是是否开启事务,这里选择不开启,传入false
// 第二个参数是消息确认模式,这里选择的是自动确认
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立了一个队列,队列Queue(javax.jms包里的Queue)继承了Destination接口
destination = session.createQueue("Hello");
//建立一个消息生产者,须要传入一个destination,这样这个producer就和destination绑定了
MessageConsumer consumer = session.createConsumer(destination);
//配置完成以后,启动链接
connection.start();
//接受消息
while (true) {
TextMessage message = (TextMessage) consumer.receive(10000);
if (message != null)
System.out.println("receive message : " + message.getText());
}
}
}
复制代码
代码中注释写的很清楚了,就只说说一个让人感受疑惑的Destination,在演示代码中,使用了Queue来表示destination,初次接触消息队列的话,可能会以为有些奇怪,一个Queue怎么就是一个目的地了?其实这是彻底能够的,由于目的地并不就必定是ip地址或者邮箱之类的东西,只要是一个能够接受消息的的载体便可,如今大多数的消息中间件都会或多或少只用队列来做为消息的载体,因此有时候直接把消息中间件简称为消息队列(Message Queue)。
先运行consumer,稍微等待一会,再运行producer,producer运行完毕以后能够在producer的控制台看到相似以下输出:
producer send message : message 0
producer send message : message 1
producer send message : message 2
producer send message : message 3
producer send message : message 4
producer send message : message 5
producer send message : message 6
producer send message : message 7
producer send message : message 8
producer send message : message 9
复制代码
而后去consumer的控制台看看,发现有相似以下输出:
receive message : message 0
receive message : message 1
receive message : message 2
receive message : message 3
receive message : message 4
receive message : message 5
receive message : message 6
receive message : message 7
receive message : message 8
receive message : message 9
复制代码
若是能看到如上输出,说明生产者已经顺利将消息发送到JMS provider(ActiveMQ的服务)了,而且消费者也已经顺利的从JMS provider里取出消息并进行读取,处理了。若是你开启了多个consumer,还能看到producer发送的消息被多个consumer共同消费,例若有两个consumer,可能会有以下输出:
consumer1:
receive message : message 0
receive message : message 2
receive message : message 4
receive message : message 6
receive message : message 8
复制代码
consumer2:
receive message : message 1
receive message : message 3
receive message : message 5
receive message : message 7
receive message : message 9
复制代码
有点负载均衡的意思,这是由于如今有两个consumer共享一个队列Hello,而后轮流从队列Hello中取出消息并消费(这是ActiveMQ的默认方式)。
JMS规范定义了两种消息传输模型:点对点传输模型和发布/订阅传输模型。
在点对点模型下,消息由生产者发送到队列里,队列再将消息发送给消费者。在这个过程当中,消息的目的地是队列,消费者监听队列,当队列中有消息的时候,队列就会把消息发送给监听了队列的消费者。能够简单的把队列看作是一个中转站,负责接受生产者发送过来的消息,而后将消息再发送给消费者。
一个队列能够有多个生产者和消费者,可是一个消息只能被一个消费者接受并消费,JMS provider将根据“先来先服务”的原则肯定消息该发送到哪一个消费者中。若是没有任何一个消费者在监听队列,那么消息再会在队列里堆积,可堆积的消息多少由队列的大小以及JMS provider的具体实现决定,若是超过了规定的大小,多余的消息可能会被抛弃,也可能会被存储到持久化存储器中(这取决于具体实现)。
在发布/订阅模型下,消息的目的地再也不是简单的队列,而是一个Topic(主题),消息由生产者发送到Topic中,而后再传递到订阅了该主题的消费者中。一个Topic也能够绑定多个生产者和多个消费者,和点对点模型不一样的是,一个消息能够被多个消费者消费,即假设有两个消费者同时订阅了一个Topic,那么当有消息发往这个Topic的时候,Topic会将消息的两个拷贝发送到两个消费者中。
值得注意的是,这里的Topic实际上是一个抽象的概念,其具体的形式也能够是一个队列或者多个队列组成的队列组(这取决于具体实现),只是在队列上再打上了一个Topic标签,生产者和消费者的目标再也不是简单的某个队列,而是一个Topic。举个例子,如今咱们定义了一个名为hello的Topic,生产者表示要将消息发送到这个hello Topic中,消费者也对这个hello Topic感兴趣(即订阅Topic),而这个Topic的底层是一个队列,当生产者发送消息的时候,底层其实就是发送到这个打上了hello topic标签的队列里,而后再把消息拷贝被传递到对hello topic感兴趣的消费者中。
这里的解释可能有些不那么合理,但愿读者能理解。不理解也没事,下面我会写一个发布/订阅模式的演示代码,方便读者理解这个模型。
在第3节中编写的Demo其实就是点对点模型,从点贵点模型切换到发布/订阅模型很是简单,对于上面的那个Demo,只须要修改两行代码便可:
//点对点模型中的producer
destination = session.createQueue("Hello");
//发布/订阅模型中的producer
destination = session.createTopic("Hello");
//点对点模型中的consumer
destination = session.createQueue("Hello");
//发布/订阅模型中的consumer
destination = session.createTopic("Hello");
复制代码
修改完毕以后,运行两个consumer,一个producer,而后看看控制台的输出。
producer的输出大体以下所示:
producer send message : message 0
producer send message : message 1
producer send message : message 2
producer send message : message 3
producer send message : message 4
producer send message : message 5
producer send message : message 6
producer send message : message 7
producer send message : message 8
producer send message : message 9
复制代码
consumer1:
receive message : message 0
receive message : message 1
receive message : message 2
receive message : message 3
receive message : message 4
receive message : message 5
receive message : message 6
receive message : message 7
receive message : message 8
receive message : message 9
复制代码
consumer2:
receive message : message 0
receive message : message 1
receive message : message 2
receive message : message 3
receive message : message 4
receive message : message 5
receive message : message 6
receive message : message 7
receive message : message 8
receive message : message 9
复制代码
发现consumer1和consumer2都收到了producer发布的10个消息,总共20个消息。这和点对点模型中不同,点对点模型中不管有多少个消费者,总共都只能接受10个消息。
本文简单介绍了JMS规范的一些组件,接口,了解JMS能够加深对各类消息中间件产品的理解。以后介绍了JMS定义的两种消息传输模型:点对点模型和发布/订阅模型。二者最大的区别是:点对点模型中,不管有多少个消费者,一个消息只能被一个消费者接受并消费,发布/订阅模型中,一个消息能够被多个消费者消费。文中还提供了一个Demo,让你们直观的感觉到这两种模型的区别,加深理解。