上篇博文深刻浅出 JMS(一) – JMS 基本概念,咱们介绍了消息通讯的规范JMS,这篇博文介绍一款开源的 JMS 具体实现—— ActiveMQ。ActiveMQ 是一个易于使用的消息中间件。html
咱们简单的介绍一下消息中间件,对它有一个基本认识就好,消息中间件有不少的用途和优势:java
首先简单的介绍一下 MQ,MQ 英文名 MessageQueue,中文名也就是你们用的消息队列,干吗用的呢,说白了就是一个消息的接受和转发的容器,可用于消息推送。web
下面进入咱们今天的主题,为你们介绍 ActiveMQ。apache
Apache ActiveMQ ™ is the most popular and powerful open source messaging and Integration Patterns server. Apache ActiveMQ is fast, supports many Cross Language Clients and Protocols, comes with easy to use Enterprise Integration Patterns and many advanced features while fully supporting JMS 1.1 and J2EE 1.4.
ActiveMQ 是由 Apache 出品的,一款最流行的,能力强劲的开源消息总线。ActiveMQ 是一个彻底支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,它很是快速,支持多种语言的客户端和协议,并且能够很是容易的嵌入到企业的应用环境中,并有许多高级功能。后端
下面咱们下载一个版本,玩一玩。浏览器
ActiveMQ 下载网站:http://activemq.apache.org/activemq-5153-release.html安全
你们如今好以后,将 apache-activemq-5.15.3-bin.zip 解压缩,咱们能够看到它的总体目录结构:服务器
从它的目录来讲,仍是很简单的:网络
咱们了解 activemq 的基本目录,下面咱们运行一下 activemq 服务,双击 bin 目录下的 bin/win64/activemq.bat 脚本文件,就能够看下图的效果。session
从上图咱们能够看到 activemq 的存放地址,以及浏览器要访问的地址。
ActiveMQ 默认使用的 TCP 链接端口是 61616, 经过查看该端口的信息能够测试 ActiveMQ 是否成功启动
netstat -an | find "61616" TCP 0.0.0.0:61616 0.0.0.0:0 LISTENING
ActiveMQ 默认启动时,启动了内置的 jetty 服务器,提供一个用于监控 ActiveMQ 的 admin 应用。地址:http://127.0.0.1:8161/admin/
用户名和密码都是 admin
多个项目之间集成
下降系统间模块的耦合度,解耦
系统先后端隔离
们首先写一个简单的 Hello World 示例,让你们感觉下 Activemq,咱们须要实现接受者和发送者两部分代码的编写。
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.3</version> </dependency>
public class Producer { public static void main(String[] args) throws JMSException { //1. 建立 ConnectionFactory 链接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory( // (1) //ActiveMQConnectionFactory.DEFAULT_USER, //ActiveMQConnectionFactory.DEFAULT_PASSWORD, "admin", "password", "tcp://localhost:61616/" ); //2. 建立 connection,并启动链接 Connection connection = factory.createConnection(); // (2) connection.start(); //3. Session 是一个发送或接收消息的线程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // (3) //4. 指定生产消息目标禾消费消息来源的 Destination 对象 Destination dest = session.createQueue("queue1"); // (4) //5. 建立生产者 MessageProducer producer = session.createProducer(dest); // (5) //6. 指定签收模式 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // (6) //7. 建立消息 for (int i = 0; i < 10; i++) { // (7) TextMessage message = session.createTextMessage("aaaaaaaaaaaaaaa"); producer.send(message); } if (connection != null) { connection.close(); } } }
建立 ConnectionFactory 实例,ActiveMQConnectionFactory 构造函数传入三个参数,分别是用户名,密码,消息地址,tcp 端口能够在 conf/activemq.xml 中配制。
建立 connection,并启动链接,Connection 默认是关闭的。注意:使用结束后要关闭 connection.close()
。
建立 session 会话,一个 connection 能够建立多个 session,Session 是一个发送或接收消息的线程。
参数1:是否开启事务,若是开启事务,则在必须提交 session
for (int i = 0; i < 10; i++) { // (7) TextMessage message = session.createTextMessage("aaaaaaaaaaaaaaa"); producer.send(message); } session.commit();
参数2:签收模式,有 Session.AUTO_ACKNOWLEDGE
(自动)、Session.CLIENT_ACKNOWLEDGE
(手动 经常使用)、Session.DUPS_OK_ACKNOWLEDGE
(可能重复签收),如若选择 Session.CLIENT_ACKNOWLEDGE
,则必须在消费端确认,不然 ActiveMQ 不认为消息已经消费。生产中通常使用 Session.CLIENT_ACKNOWLEDGE
签收,不要相信自动签收方式。
# 客户端处理 TextMessage message = (TextMessage) consumer.receive(); message.acknowledge();
经过 session 建立 Destination 对象,指定生产消息目标禾消费消息来源的对象。在 PTP 模式中 Destination 被称做 Queue 即队列;在 Pub/sub 模式 Destination 被称做 Topic 即主题,在程序中可使用多个 Queue 和 Top。
建立消息的发送和接收对象(生产者和消贵者) MessageProducer/MessageConsumer
指定签收模式。使用 MessageProducer 的 setDeliveryMode 方法为其设置持久化特性和非持久化特性(DeliveryMode)
发送消息。使用 JMS 规范的 TextMessage 形式建立数据(经过 session 对象),而且 MessageProducer 的 send 方法发送数据。同理客户端使用 receive 方法进行接收数据。最后不要忘记关闭 Connection 链接。
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
destination
能够指定将不一样的消息发送到不一样的 destination,但消费端就必须指定对应的 destination
deliveryMode
是否开启持久化,默认为持久性,DeliveryMode.NON_PERSISTENT
、DeliveryMode.PERSISTENT
priority
优先级 0-9,默认为 4,优先级越高越先消费,几率
timeToLive
ActiveMQ 中消息保留的时间,单位秒,默认永久保存
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class Consumer { public static void main(String[] args) throws JMSException { ConnectionFactory factory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://localhost:61616/" ); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); Destination dest = session.createQueue("queue1"); // (1) MessageConsumer consumer = session.createConsumer(dest); while (true) { TextMessage message = (TextMessage) consumer.receive(); //message.acknowledge(); if (message == null) break; System.out.println(message.getText()); } if (connection != null) { connection.close(); } } }