ActiveMQ 消息队列服务

 

ActiveMQ简介

1.1 ActiveMQ是什么

ActiveMQ是一个消息队列应用服务器推送服务器。支持JMS规范html

 

1.1.1 JMS概述

全称:Java Message Service 即为Java消息服务是一套java消息服务的API标准。(标准即接口)前端

实现了JMS标准的系统,称之为JMS Providerjava

1.1.2 消息队列

1.1.2.1 概念

消息队列是在消息的传输过程当中保存消息的容器,提供一种不一样进程或者同一进程不一样线程直接通信的方式node

 

Producer:消息生产者,负责产生和发送消息到 Brokermysql

Broker:消息处理中心。负责消息存储、确认、重试等,通常其中会包含多个 queueweb

Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;spring

 

1.1.2.2 常见消息队列应用

1)、ActiveMQsql

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个彻底支持JMS1.1和J2EE 1.4规范的 JMS Provider实现数据库

2)、RabbitMQapache

RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。开发语言为Erlang。

3)、RocketMQ

由阿里巴巴定义开发的一套消息队列应用服务。

1.2 ActiveMQ能作什么

1)实现两个不一样应用(程序)之间的消息通信。

2)实现同一个应用,不一样模块之间的消息通信。确保数据发送的稳定性

 

1.3 ActiveMQ下载

ActiveMQ官网地址: http://activemq.apache.org

ActiveMQ下载地址:http://activemq.apache.org/download-archives.html

 

--可供下载的历史版本

--说明:

ActiveMQ 5.10.x以上版本必须使用JDK1.8才能正常使用。

ActiveMQ 5.9.x及如下版本使用JDK1.7便可正常使用。

 

 

 

 

--根据操做系统,选择下载版本。(本教程下载Linux版本)

 

 

 

1.4 ActiveMQ主要特色

1)支持多语言、多协议客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

2Spring的支持ActiveMQ能够很容易整合到Spring的系统里面去

3)支持高可用、高性能的集群模式。

 

 

入门示例

2.1 需求

使用ActiveMQ实现消息队列模型。

 

2.2 配置步骤说明

1)搭建ActiveMQ消息服务器。

2)建立一个java项目。

3)建立消息生产者,发送消息。

4)建立消息消费者,接收消息。

 

2.3 第一部分:搭建ActiveMQ消息服务器

2.3.1 第一步:下载、上传至Linux

--说明:确保已经安装了jdk

 

 

 

2.3.2 第二步:安装到/usr/local/activemq目录

1)解压到/usr/local目录下  

[root@node07192 ~]# tar -zxvf apache-activemq-5.9.0-bin.tar.gz -C /usr/local

 

2)修更名称为activemq

[root@node07192 ~]# cd /usr/local/

[root@node07192 local]# mv apache-activemq-5.9.0/ activemq

2.3.3 第三步启动ActiveMQ服务器

--说明:ActiveMQ是免安装软件,解压便可启动服务。

[root@node07192 local]# cd activemq/bin

[root@node07192 bin]# ./activemq start

 

--查看ActiveMQ启动状态

[root@node07192 bin]# ./activemq status

 

 

 

 

2.3.4 第四步:浏览器访问ActiveMQ管理界面

2.3.4.1 Step1:查看ActiveMQ管理界面的服务端口。在/conf/jetty.xml

--访问管理控制台的服务端口,默认为:8161

[root@node07192 bin]# cd ../conf

[root@node07192 conf]# vim jetty.xml

 

 

 

 

2.3.4.2 Step2:查看ActiveMQ用户、密码。在/conf/users.properties:

--默认的用户名、密码均为amdin

[root@node07192 conf]# vim users.properties

 

 

 

 

2.3.4.3 Step3:访问ActiveMQ管理控制台。地址:http://ip:8161/

--注意:防火墙是没有配置该服务的端口的。

所以,要访问该服务,必须在防火墙中配置。

 

1)修改防火墙,开放8161端口

[root@node07192 conf]# vim /etc/sysconfig/iptables

 

2)重启防火墙

[root@node07192 conf]# service iptables restart

 

3)登陆管理控制台

--登录,用户名、密码均为admin

 

 

 

--控制台主界面

 

 

 

--搭建ActiveMQ服务器成功!!!

2.4 第二部分:建立java项目,导入jar

--导包说明:

ActiveMQ的解压包中,提供了运行ActiveMQ的全部jar

 

 

 

--建立项目

 

 

 

2.5 第三部分:建立消息生成者,发送消息

--说明:ActiveMQ是实现了JMS规范的。在实现消息服务的时候,必须基于API接口规范。

 

2.5.1 JMS经常使用的API说明

下述API都是接口类型,定义在javax.jms包中,是JMS标准接口定义。ActiveMQ彻底实现这一套api标准。

2.5.1.1 ConnectionFactory

连接工厂, 用于建立连接的工厂类型。

2.5.1.2 Connection

连接,用于创建访问ActiveMQ链接的类型, 由连接工厂建立。

2.5.1.3 Session

会话, 一次持久有效、有状态的访问,由连接建立。

2.5.1.4 Destination  &  Queue & Topic

目的地, 即本次访问ActiveMQ消息队列的地址,由Session会话建立。

1interface Queue extends Destination

2Queue:队列模型,只有一个消费者。消息一旦被消费,默认删除。

3Topic:主题订阅中的消息,会发送给全部的消费者同时处理。

2.5.1.5 Message

消息,在消息传递过程当中数据载体对象,是全部消息【文本消息TextMessage,对象消息ObjectMessage等】具体类型的顶级接口,能够经过会话建立或经过会话从ActiveMQ服务中获取。

 

2.5.1.6 MessageProducer

消息生成者, 在一次有效会话中, 用于发送消息给ActiveMQ服务的工具,由Session会话建立。

 

2.5.1.7 MessageCustomer

消息消费者【消息订阅者,消息处理者】, 在一次有效会话中, 用于ActiveMQ服务中获取消息的工具,由Session会话建立。

 

 

咱们定义的消息生产者和消费者,都是基于上面API实现的。

 

2.5.2 第一步建立MyProducer类,定义sendMessage方法

package cn.gzsxt.mq.producer;

 

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageProducer;

import javax.jms.Session;

 

import org.apache.activemq.ActiveMQConnectionFactory;

 

public class MyProducer {

 

// 定义连接工厂

ConnectionFactory connectionFactory = null;

// 定义连接

Connection connection = null;

// 定义会话

Session session = null;

// 定义目的地

Destination destination = null;

// 定义消息生成者

MessageProducer producer = null;

// 定义消息

Message message = null;

 

public void sendToMQ(){

 

try{

 

/*

 * 建立连接工厂

 * ActiveMQConnectionFactory - ActiveMQ实现的ConnectionFactory接口实现类.

 * 构造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)

 *  userName - 访问ActiveMQ服务的用户名, 用户名能够经过jetty-realm.properties配置文件配置.

 *  password - 访问ActiveMQ服务的密码, 密码能够经过jetty-realm.properties配置文件配置.

 *  brokerURL - 访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号

 *      此连接基于TCP/IP协议.

 */

connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");

 

// 建立连接对象

connection = connectionFactory.createConnection();

// 启动连接

connection.start();

 

/*

 * 建立会话对象

 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode);

 *  transacted - 是否使用事务, 可选值为true|false

 *      true - 使用事务, 当设置此变量值, acknowledgeMode参数无效, 建议传递的acknowledgeMode参数值为

 *          Session.SESSION_TRANSACTED

 *      false - 不使用事务, 设置此变量值, acknowledgeMode参数必须设置.

 *  acknowledgeMode - 消息确认机制, 可选值为:

 *      Session.AUTO_ACKNOWLEDGE - 自动确认消息机制

 *      Session.CLIENT_ACKNOWLEDGE - 客户端确认消息机制

 *      Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制

 */

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 

// 建立目的地, 目的地命名即队列命名, 消息消费者须要经过此命名访问对应的队列

destination = session.createQueue("test-mq");

 

// 建立消息生成者, 建立的消息生成者与某目的地对应, 即方法参数目的地.

producer = session.createProducer(destination);

 

// 建立消息对象, 建立一个文本消息, 此消息对象中保存要传递的文本数据.

message = session.createTextMessage("hello,activeme");

 

// 发送消息

producer.send(message);

System.out.println("消息发送成功!");

}catch(Exception e){

e.printStackTrace();

System.out.println("访问ActiveMQ服务发生错误!!");

}finally{

try {

// 回收消息发送者资源

if(null != producer)

producer.close();

} catch (JMSException e) {

e.printStackTrace();

}

try {

// 回收会话资源

if(null != session)

session.close();

} catch (JMSException e) {

e.printStackTrace();

}

try {

// 回收连接资源

if(null != connection)

connection.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

}

 

}

 

 

2.5.3 第二步:建立一个测试类MessageTest

--添加junit类库快捷键ctrl+1

package cn.gzsxt.mq.test;

 

import org.junit.Test;

 

import cn.gzsxt.mq.producer.MyProducer;

 

public class MessageTest {

 

@Test

public void sendToMQ(){

MyProducer producer = new MyProducer();

producer.sendToMQ();

}

}

 

2.5.4 第三步:测试

1设置防火墙,配置61616端口。注意修改以后重启防火墙

2)测试结果:

--查看控制台

 

 

 

--查看ActiveMQ管理控制界面

 

 

 

--消息发送成功!!!

 

2.6 第四部分:建立消息消费者,消费消息

2.6.1 第一步:建立MyConsumer

package cn.gzsxt.mq.consumer;

 

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.Session;

import javax.jms.TextMessage;

 

import org.apache.activemq.ActiveMQConnectionFactory;

 

/**

 * @ClassName:MyConsumer

 * @Description: 消息消费者代码

*/

public class MyConsumer {

 

// 定义连接工厂

ConnectionFactory connectionFactory = null;

// 定义连接

Connection connection = null;

// 定义会话

Session session = null;

// 定义目的地

Destination destination = null;

// 定义消息消费者

MessageConsumer consumer = null;

// 定义消息

Message message = null;

 

public void recieveFromMQ(){

 

try{

 

/*

 * 建立连接工厂

 * ActiveMQConnectionFactory - ActiveMQ实现的ConnectionFactory接口实现类.

 * 构造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)

 *  userName - 访问ActiveMQ服务的用户名, 用户名能够经过jetty-realm.properties配置文件配置.

 *  password - 访问ActiveMQ服务的密码, 密码能够经过jetty-realm.properties配置文件配置.

 *  brokerURL - 访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号

 *      此连接基于TCP/IP协议.

 */

connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");

 

// 建立连接对象

connection = connectionFactory.createConnection();

// 启动连接

connection.start();

 

/*

 * 建立会话对象

 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode);

 *  transacted - 是否使用事务, 可选值为true|false

 *      true - 使用事务, 当设置此变量值, acknowledgeMode参数无效, 建议传递的acknowledgeMode参数值为

 *          Session.SESSION_TRANSACTED

 *      false - 不使用事务, 设置此变量值, acknowledgeMode参数必须设置.

 *  acknowledgeMode - 消息确认机制, 可选值为:

 *      Session.AUTO_ACKNOWLEDGE - 自动确认消息机制

 *      Session.CLIENT_ACKNOWLEDGE - 客户端确认消息机制

 *      Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制

 */

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 

// 建立目的地, 目的地命名即队列命名, 消息消费者须要经过此命名访问对应的队列

destination = session.createQueue("test-mq");

 

// 建立消息消费者, 建立的消息消费者与某目的地对应, 即方法参数目的地.

consumer = session.createConsumer(destination);

 

// ActiveMQ服务中获取消息

message = consumer.receive();

 

TextMessage tMsg = (TextMessage) message;

 

System.out.println("MQ中获取的消息是:"+tMsg.getText());

 

}catch(Exception e){

e.printStackTrace();

System.out.println("访问ActiveMQ服务发生错误!!");

 

}finally{

try {

// 回收消息消费者资源

if(null != consumer)

consumer.close();

} catch (JMSException e) {

e.printStackTrace();

}

try {

// 回收会话资源

if(null != session)

session.close();

} catch (JMSException e) {

e.printStackTrace();

}

try {

// 回收连接资源

if(null != connection)

connection.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

}

}

 

2.6.2 第二步:修改测试类MessageTest,新增测试方法

@Test

public void recieveFromMQ(){

MyConsumer consumer = new MyConsumer();

consumer.recieveFromMQ();

}

 

2.6.3 第三步:测试

--查看Eclipse控制台

 

--查看ActiveMQ管理控制界面

 

--消息被消费了测试成功!!!

ActiveMQ监听器

问题:在前面的示例中,咱们发现消费者每次只能消费一条消息。当队列中有多条消息的时候,咱们须要屡次运行消费者,才能消费完这些消息。很麻烦!!!!如何解决这个问题呢?咱们但愿一次将全部的消息所有接收。

答:使用ActiveMQ监听器来监听队列,持续消费消息。

 

3.1 配置步骤说明

1)建立一个监听器对象。

2)修改消费者代码,加载监听器。

 

3.2 配置步骤

3.2.1 第一步:建立监听器MyListener

--说明:自定义监听器须要实现MessageListener接口

package cn.gzsxt.mq.listener;

 

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.TextMessage;

 

public class MyListener implements MessageListener{

 

@Override

public void onMessage(Message message) {

 

if(null!=message){

TextMessage tMsg = (TextMessage) message;

 

try {

System.out.println("MQ中获取的消息是:"+tMsg.getText());

} catch (JMSException e) {

e.printStackTrace();

}

}

}

}

 

3.2.2 第二步:修改MyConsumer代码,加载监听器

--说明:监听器须要持续加载,所以消费程序不能结束。

这里咱们使用输入流阻塞消费线程结束。(实际开发中,使用web项目加载)

package cn.gzsxt.mq.consumer;

 

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.Session;

import javax.jms.TextMessage;

 

import org.apache.activemq.ActiveMQConnectionFactory;

 

import cn.gzsxt.mq.listener.MyListener;

 

/**

 * @ClassName:MyConsumer

 * @Description: 消息消费者代码

*/

public class MyConsumer {

 

// 定义连接工厂

ConnectionFactory connectionFactory = null;

// 定义连接

Connection connection = null;

// 定义会话

Session session = null;

// 定义目的地

Destination destination = null;

// 定义消息消费者

MessageConsumer consumer = null;

// 定义消息

Message message = null;

 

public Message recieveFromMQ(){

 

try{

 

/*

 * 建立连接工厂

 * ActiveMQConnectionFactory - ActiveMQ实现的ConnectionFactory接口实现类.

 * 构造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)

 *  userName - 访问ActiveMQ服务的用户名, 用户名能够经过jetty-realm.properties配置文件配置.

 *  password - 访问ActiveMQ服务的密码, 密码能够经过jetty-realm.properties配置文件配置.

 *  brokerURL - 访问ActiveMQ服务的路径地址. 路径结构为 - 协议名://主机地址:端口号

 *      此连接基于TCP/IP协议.

 */

connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");

 

// 建立连接对象

connection = connectionFactory.createConnection();

// 启动连接

connection.start();

 

/*

 * 建立会话对象

 * 方法 - connection.createSession(boolean transacted, int acknowledgeMode);

 *  transacted - 是否使用事务, 可选值为true|false

 *      true - 使用事务, 当设置此变量值, acknowledgeMode参数无效, 建议传递的acknowledgeMode参数值为

 *          Session.SESSION_TRANSACTED

 *      false - 不使用事务, 设置此变量值, acknowledgeMode参数必须设置.

 *  acknowledgeMode - 消息确认机制, 可选值为:

 *      Session.AUTO_ACKNOWLEDGE - 自动确认消息机制

 *      Session.CLIENT_ACKNOWLEDGE - 客户端确认消息机制

 *      Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制

 */

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

 

// 建立目的地, 目的地命名即队列命名, 消息消费者须要经过此命名访问对应的队列

destination = session.createQueue("test-mq");

 

// 建立消息消费者, 建立的消息消费者与某目的地对应, 即方法参数目的地.

consumer = session.createConsumer(destination);

 

// // ActiveMQ服务中获取消息

// message = consumer.receive();

//

// TextMessage tMsg = (TextMessage) message;

//

// System.out.println("MQ中获取的消息是:"+tMsg.getText());

//加载监听器

consumer.setMessageListener(new MyListener());

//监听器须要持续加载,这里咱们使用输入流阻塞当前线程结束。

System.in.read();

 

}catch(Exception e){

e.printStackTrace();

System.out.println("访问ActiveMQ服务发生错误!!");

 

}finally{

try {

// 回收消息消费者资源

if(null != consumer)

consumer.close();

} catch (JMSException e) {

e.printStackTrace();

}

try {

// 回收会话资源

if(null != session)

session.close();

} catch (JMSException e) {

e.printStackTrace();

}

try {

// 回收连接资源

if(null != connection)

connection.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

return message;

}

}

3.3 测试

1)屡次运行生产者,发送多条消息到队列中。

 

 

 

2)运行消费者。观察结果

--查看Eclipse控制台,一次消费了3条消息

 

 

 

--查看ActiveMQ管理控制界面,全部消息都被消费了!

 

 

--测试成功!!!

 

ActiveMQ消息服务模式

问题:在入门示例中,只能向一个消费者发送消息。可是有一些场景,需求有多个消费者都能接收到消息,好比:美团APP天天的消息推送。该如何实现呢?

答:ActiveMQ是经过不一样的服务模式来解决这个问题的。

 

因此,要搞清楚这个问题,必须知道ActiveMQ有哪些应用模式。

 

4.1 PTP模式(point to point

--消息模型

 

 

 

消息生产者生产消息发送到queue中,而后消息消费者从queue中取出而且消费消息。

消息被消费之后,queue中再也不有存储,因此消息消费者不可能消费到已经被消费的消息。

Queue支持存在多个消费者,可是对一个消息而言,只会有一个消费者能够消费、其它的则不能消费此消息了。

当消费者不存在时,消息会一直保存,直到有消费消费

 

咱们的入门示例,就是采用的这种PTP服务模式。

4.2 TOPIC(主题订阅模式)

--消息模型

 

 

 

 

消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。

和点对点方式不一样,发布到topic的消息会被全部订阅者消费。

 

当生产者发布消息,不论是否有消费者。都不会保存消息

因此,主题订阅模式下,必定要先有消息的消费者(订阅者),后有消息的生产者(发布者)

咱们前面已经实现了PTP模式,下面咱们来实现TOPIC模式。

 

Topic模式实现

5.1 配置步骤说明

1)搭建ActiveMQ消息服务器。(已实现)

2)建立主题订阅者。

3)建立主题发布者。

 

5.2 配置步骤

5.2.1 第一部分:搭建消息服务器。(已实现)

 

5.2.2 第二部分:建立主题订阅者MySubscriber

--说明:主题订阅模式下,能够有多个订阅者。咱们这里用多线程来模拟。

 

配置步骤:

1)建立订阅者(线程类)

2)修改测试类。

3)查看测试结果。

 

5.2.2.1 第一步:建立MySubscriber类,实现Runnable接口

package cn.gzsxt.mq.subscribe;

 

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.Session;

import javax.jms.TextMessage;

import javax.jms.Topic;

import javax.jms.TopicConnection;

import javax.jms.TopicConnectionFactory;

import javax.jms.TopicSession;

import javax.jms.TopicSubscriber;

 

import org.apache.activemq.ActiveMQConnectionFactory;

 

public class MySubscirber implements Runnable{

 

TopicConnectionFactory factory = null;

TopicConnection connection = null;

    TopicSession session = null;

    Topic topic = null;

    TopicSubscriber subscriber = null;

    Message message =null;

 

@Override

public void run() {

try{

// 建立连接工厂

factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");

// 经过工厂建立一个链接

connection = factory.createTopicConnection();

        // 启动链接

        connection.start();

// 建立一个session会话

session = connection.createTopicSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

// 建立一个消息队列

topic = session.createTopic("gzsxt.topic");

        // 建立消息制做者

        subscriber = session.createSubscriber(topic);

        message = subscriber.receive();

        if(null!=message){

         TextMessage tMsg = (TextMessage) message;

         System.out.println(Thread.currentThread().getName()+"订阅的内容是:"+tMsg.getText());

        }

} catch (Exception e) {

e.printStackTrace();

System.out.println("消息订阅异常");

} finally {

        // 关闭释放资源

if (session != null) {

try {

session.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

if (session != null) {

try {

session.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

if (connection != null) {

try {

connection.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

}

}

}

 

5.2.2.2 第二步:修改测试类MessageTest

--说明:junit单元测试,不支持多线程测试

 

因此,这里咱们在测试类的main方法中测试。

 

--修改MessageTest类,新增main方法。

public static void main(String[] args) {

MySubscirber subscirber = new MySubscirber();

Thread t1 = new Thread(subscirber);

Thread t2 = new Thread(subscirber);

t1.start();

t2.start();

}

 

5.2.2.3 第三步:查看测试结果

--查看AcitveMQ管理界面

 

 

--测试成功!!!

5.2.3 第三部分建立主题发布者MyPublisher

-配置步骤说明:

1)建立发布者

2)修改测试类测试

3)查看测试结果

 

5.2.3.1 第一步:建立MyPublish

package cn.gzsxt.mq.topic;

 

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.Session;

import javax.jms.Topic;

import javax.jms.TopicConnection;

import javax.jms.TopicConnectionFactory;

import javax.jms.TopicPublisher;

import javax.jms.TopicSession;

 

import org.apache.activemq.ActiveMQConnectionFactory;

 

public class MyPublisher {

 

 

TopicConnectionFactory factory = null;

TopicConnection connection = null;

    TopicSession session = null;

    Topic topic = null;

    TopicPublisher publisher = null;

    Message message =null;

    

    public void publishTopic(){

     try {

     // 建立连接工厂

factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");

// 经过工厂建立一个链接

connection = factory.createTopicConnection();

// 启动链接

        connection.start();

// 建立一个session会话

session = connection.createTopicSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

// 建立一个消息队列

topic = session.createTopic("gzsxt.topic");

// 建立主题发布者

publisher = session.createPublisher(topic);

// 建立消息

message = session.createTextMessage("hello,topic");

// 发布消息

publisher.publish(message);

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}finally {

// 关闭释放资源

if (publisher != null) {

try {

publisher.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

if (session != null) {

try {

session.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

if (connection != null) {

try {

connection.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

}

    }

}

 

5.2.3.2 第二步:修改测试类,新增测试方法

@Test

public void publishTopic(){

MyPublisher publisher = new MyPublisher();

publisher.publishTopic();

}

 

5.2.3.3 第三步:查看测试结果

--查看Eclipse控制。发现订阅者消费到了消息

 

 

 

--查看ActiveMQ主界面

 

 

--消息被消费了测试成功

 

5.3 Topic小结

1Topic模式可以实现多个订阅者同时消费消息。

2Topic主题模式下,消息不会保存,只有在线的订阅者才会接收到消息。

 

一般能够用来解决公共消息推送的相关业务。

ActiveMQ持久化

问题:当队列中有未被消费的消息时,咱们从新启动ActiveMQ服务器后,发现消息仍然在队列中。消息时如何保持的呢?

答:ActiveMQ是支持持久化的,能够永久保存消息。

 

消息是保存在内存中的。当内存空间不足,或者ActiveMQ服务关闭的时候,消息会被持久化到磁盘上。

 

被消费的时候,再加载到内存空间中。

 

--说明:ActiveMQ持久化方式在/conf/activemq.xml中指定

[root@node07192 conf]# vim activemq.xml

 

6.1 kahadb方式

ActiveMQ默认的持久化策略。不会保存已经被消费过的消息。

 

 

 

 

--消息存储位置

 

 

 

6.2 AMQ方式(已过期)

--说明:5.3版本以前,如今已通过时,不考虑。

 

6.3 JDBC持久化方式(了解)

ActiveMQ将数据持久化到数据库中。可使用任意的数据库。

本教程中使用MySQL数据库。

6.3.1 配置步骤说明

1)建立数据库。

2)添加数据库链接jar依赖到ActiveMQ服务器。

3)修改ActiveMQ配置,建立数据源。

4)修改ActiveMQ配置,修改持久化方式为jdbc

 

6.3.2 配置步骤

6.3.2.1 第一步:建立数据库

数据库最好不要跟ActiveMQ服务器在同一台机器。

由于当cpu线程资源不足时,往队列中写入消息时,若是数据库上一次持久化还没结束,容易形成线程阻塞。

 

 

 

6.3.2.2 第二步添加jar依赖

--配置数据源时是支持链接池的咱们这里使用dbcp2做为链接池。

 

jdbc驱动、dbcp2jar上传到/lib/目录下。

 

 

 

6.3.2.3 第三步:修改/conf/activemq.xml,建立数据源

--<broker>节点外,建立数据源节点

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource"

destroy-method="close">

<property name="driverClassName" value="com.mysql.jdbc.Driver"/>

<property name="url"

value="jdbc:mysql://192.168.7.149:3306/activemq?relaxAutoCommit=true"/>

<property name="username" value="root"/>

<property name="password" value="gzsxt"/>

<property name="maxActive" value="200"/>

<property name="poolPreparedStatements" value="true"/>

</bean>

 

--数据源节点位置以下以下

 

 

 

6.3.2.4 第四步修改/conf/activemq.xml,修改成jdbc持久化方式

--<broker>节点内部,注释kahadb方式,添加jdbc方式

添加以下配置:

<persistenceAdapter>

<jdbcPersistenceAdapter dataSource="#mysql-ds"

createTablesOnStartup="true/>

</persistenceAdapter>

 

--注意:注释<kahaDB>节点

 

 

6.3.3 测试

1)从新启动ActiveMQ

[root@node07192 bin]# ./activemq restart

 

2)查看数据库,发现生成了三张表

 

 

 

3)运行入门示例中的测试类,往队列中写入一条消息

--数据库表activemq_msgs中,新写入了一条数据

 

 

 

--配置成功!!!

 

6.3.4 三张表说明

数据表名称

做用

activemq_msgs

存储消息,QueueTopic都存储在这个表中

activemq_acks

用于存储订阅关系。订阅模式下有效

activemq_lock

集群模式下,存储主从节点关系

 

6.3.5 补充说明

Jdbc持久化方式,只要Mysql数据库稳定运行,就能保证队列中消息的安全。

安全级别高,可是效率低。

所以,在实际开发中,除非是像银行这类对数据安全极高的业务,咱们通常都是使用默认持久化方式kahadb

 

 

ActiveMQ应用场景

7.1 多模块解耦(模块之间消息通信)

咱们判断一个程序的优劣,有一个很重要的指标:高内聚、低耦合

高内聚:同一个模块中,功能是高度紧密的。

低耦合:各模块之间,业务尽可能不要交叉。

 

可是有一些业务功能,必须涉及到两个不一样的业务,那咱们就要想办法,尽可能将它们解耦开来。

 

以咱们前面学习的solr为例。咱们知道solr的数据来自数据库。这就意味着,当数据库中的商品发生变化时,咱们须要同步更新索引库。

 

这个时候咱们就可使用消息队列模型来解耦添加添加业务和同步索引库业务。

 

 

 

--后面的电商项目中,会重点讲解这个应用场景!!!

7.2 流量削峰(解决并发请求)

 

 

 

订单处理,就能够由前端应用将订单信息放到队列,后端应用从队列里依次得到消息处理,高峰时的大量订单能够积压在队列里慢慢处理掉。因为同步一般意味着阻塞,而大量线程的阻塞会下降计算机的性能。

 

7.3 日志处理

日志处理是指将消息队列用在日志处理中,好比Kafka的应用,解决大量日志传输的问题。架构简化以下:

 

 

 

7.4 同步业务异步处理

须要:当咱们在网站注册的时候,有时候须要认证邮箱或者手机号,这个时候保存数据到数据库以前,须要先等待认证结束。若是说认证程序耗时比较大,会影响影响用户注册的业务。

这个时候,咱们可使用消息队列模型,将同步执行的业务,经过队列,变成异步处理

 

 

1)在保存数据到数据库的时候,只须要将用户的邮箱写入队列,不须要等待邮箱认证程序执行结束,才把数据保存到数据库。

2)认证程序,经过监听队列,从中获取用户的邮箱地址,发送认证连接。

 

Spring整合ActiveMQ

8.1 必要性

Spring已经整合了jms规范了(spring-jms.jar),而ActiveMQ是实现了jms规范的。这就意味着Spring整合ActiveMQ是很是方便的。

而且Spring-jms,提供了一个JmsTemplate类,用来简化消息读写的业务代码。Spring整合ActivMQ以后,就可使用该类,简化开发!!!

 

8.2 需求

使用Spring整合ActiveMQ,模拟限时抢购下的流量削峰问题。

 

8.3 配置步骤说明

1)搭建环境。(建立项目,导入jar包)

2spring整合SpringMVC

3spring整合ActiveMQ

 

8.4 配置步骤

8.4.1 第一部分:建立项目(使用maven

8.4.1.1 第一步:使用maven建立项目

--注意:maven建立web项目时,默认建立web.xml文件。

 

/WEB-INF/目录下,手动建立一个web.xml文件。

 

 

 

8.4.1.2 第二步:导入pom依赖

导包说明:

     Spring核心包+AOP

     common-logging

     activemq核心包

     spring整合jms包  

     jsp相关依赖

       

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>cn.gzsxt.activemq</groupId>

  <artifactId>activemq-demo-02-spring</artifactId>

  <version>1.0</version>

  <packaging>war</packaging>

  

  <dependencies>

<!-- ActiveMQ客户端完整jar包依赖 -->

<dependency>

<groupId>org.apache.activemq</groupId>

<artifactId>activemq-all</artifactId>

<version>5.9.0</version>

</dependency>

 

<dependency>

     <groupId>org.apache.activemq</groupId>

     <artifactId>activemq-pool</artifactId>

     <version>5.9.0</version>

</dependency>

 

<!-- Spring-JMS插件相关jar包依赖 -->

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-jms</artifactId>

<version>4.1.6.RELEASE</version>

</dependency>

<!-- Spring框架上下文jar包依赖 -->

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-context</artifactId>

<version>4.1.6.RELEASE</version>

</dependency>

<!-- SpringMVC插件jar包依赖 -->

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-webmvc</artifactId>

<version>4.1.6.RELEASE</version>

</dependency>

<!-- jsp相关 -->

   <dependency>

<groupId>jstl</groupId>

<artifactId>jstl</artifactId>

<version>1.2</version>

</dependency>

<dependency>

<groupId>javax.servlet</groupId>

<artifactId>servlet-api</artifactId>

<version>2.5</version>

<scope>provided</scope>

</dependency>

<dependency>

<groupId>javax.servlet</groupId>

<artifactId>jsp-api</artifactId>

<version>2.0</version>

<scope>provided</scope>

</dependency>

   </dependencies>

   <build>

<plugins>

<!-- 配置Tomcat插件 -->

<plugin>

<groupId>org.apache.tomcat.maven</groupId>

<artifactId>tomcat7-maven-plugin</artifactId>

<version>2.2</version>

<configuration>

<port>9099</port>

<path>/</path>

</configuration>

</plugin>

</plugins>

</build>

</project>

 

8.4.2 第二部分spring整合springmvc

8.4.2.1 第一步:修改web.xml,配置springmvc核心控制器

<?xml version="1.0" encoding="UTF-8"?>

<web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xml="http://www.w3.org/XML/1998/namespace" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_2_5.xsd ">

<!-- 编码过滤器 -->

  <filter>

   <filter-name>characterEncodingFilter</filter-name>

   <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>

   <init-param>

   <param-name>encoding</param-name>

   <param-value>utf-8</param-value>

   </init-param>

  </filter>

  <filter-mapping>

   <filter-name>characterEncodingFilter</filter-name>

   <url-pattern>/*</url-pattern>

  </filter-mapping>

  

  <!-- 配置springmvc核心控制器 -->

  <servlet>

   <servlet-name>dispatcherServlet</servlet-name>

   <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>

  

   <init-param>

   <param-name>contextConfigLocation</param-name>

   <param-value>classpath:springmvc.xml</param-value>

   </init-param>

  

   <load-on-startup>1</load-on-startup>

  </servlet>

  <servlet-mapping>

   <servlet-name>dispatcherServlet</servlet-name>

   <url-pattern>*.action</url-pattern>

  </servlet-mapping>

 

</web-app>

 

8.4.2.2 第二步:配置springmvc.xml核心配置文件

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:context="http://www.springframework.org/schema/context"

xmlns:mvc="http://www.springframework.org/schema/mvc"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd

http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd">

 

<context:component-scan base-package="cn.gzsxt.controller" />

<mvc:annotation-driven />

 

</beans>

 

8.4.2.3 第三步建立相关jsp页面

--订单页面order.jsp

<%@ page language="java" contentType="text/html; charset=UTF-8"

    pageEncoding="UTF-8"%>

<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">

<html>

<head>

<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">

<title>Insert title here</title>

</head>

<body>

<form action="/save.action" method="post">

用户编号:<input type="text" name="userid"><br>

订单金额:<input type="text" name="price"><br>

<input type="submit" value="提交">

</form>

</body>

</html>

 

--成功页面success.jsp

<%@ page language="java" contentType="text/html; charset=UTF-8"

    pageEncoding="UTF-8"%>

<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">

<html>

<head>

<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">

<title>Insert title here</title>

</head>

<body>

订单提交成功!!!请稍后去结算中心支付。。。

</body>

</html>

 

8.4.2.4 第四步:java代码实现

--建立订单Order

package cn.gzsxt.jms.pojo;

 

public class Order {

 

private Integer id;

 

private Integer userid;

 

private float price;

 

public Order() {

super();

}

 

public Integer getId() {

return id;

}

 

public void setId(Integer id) {

this.id = id;

}

 

public Integer getUserid() {

return userid;

}

 

public void setUserid(Integer userid) {

this.userid = userid;

}

 

public float getPrice() {

return price;

}

 

public void setPrice(float price) {

this.price = price;

}

 

}

 

--建立OrderController

package cn.gzsxt.jms.controller;

 

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.RequestMapping;

 

import cn.gzsxt.jms.pojo.Order;

 

@Controller

public class OrderController {

 

@RequestMapping("/save.action")

public String save(Order order){

 

System.out.println("当前提交的订单用户是:"+order.getUserid()+",订单金额:"+order.getPrice());

 

return "/success.jsp";

}

}

 

8.4.2.5 第五步:整合测试

--tomcat插件启动项目,访问订单业务,提交订单

 

 

 

--整合springmvc成功!!!

 

8.4.3 第三部分:Spring整合ActiveMQ

整合步骤说明:

1)搭建ActiveMQ服务器。(已实现)

2)建立消息生产者

3)建立消息消费者

4spring整合activemq

 

8.4.3.1 第一步:搭建ActiveMQ服务器。(已实现)

 

8.4.3.2 第二步:建立消息生成者OrderProducer

--说明:在这里,咱们注入JmsTemplate类,来简化代码

package cn.gzsxt.jms.producer;

 

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.ObjectMessage;

import javax.jms.Session;

 

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jms.core.JmsTemplate;

import org.springframework.jms.core.MessageCreator;

import org.springframework.stereotype.Component;

 

import cn.gzsxt.jms.pojo.Order;

 

@Component

public class OrderProducer {

 

@Autowired

private JmsTemplate jmsTemplate;

 

//注意:内部类调用外部类属性,须要用final修饰

public void sendToMQ(final Order order){

//指定队列名称 order-mq

jmsTemplate.send("order-mq", new MessageCreator() {

 

@Override

public Message createMessage(Session session) throws JMSException {

//ActiveMQ处理对象消息时,对象须要实现序列化

Message message = session.createObjectMessage(order);

 

return message;

}

});

}

}

 

--注意事项

1ActiveMQ处理对象时,对象必须实现序列化

 

--修改Order类,实现序列化接口

 

 

 

2)匿名内部类访问外部类属性,该属性须要用final修饰。

 

8.4.3.3 第三步建立消息消费者OrderListener

--这里使用监听器模式

package cn.gzsxt.jms.listener;

 

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.ObjectMessage;

 

import org.springframework.stereotype.Component;

 

import cn.gzsxt.jms.pojo.Order;

 

@Component

public class OrderListener implements MessageListener{

 

@Override

public void onMessage(Message message) {

 

if(null!=message){

ObjectMessage oMsg = (ObjectMessage) message;

 

try {

Order order = (Order) oMsg.getObject();

System.out.println("当前提交的订单用户是:"+order.getUserid()+",订单金额:"+order.getPrice());

/*

 * 伪代码:

 *

 * orderDao.save(order);

 */

} catch (JMSException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

}

 

8.4.3.4 第四步spring整合ActiveMQ

--建立spring-jms.xml文件

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:jms="http://www.springframework.org/schema/jms"

xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd

http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd">

 

<!-- 1、配置activemq链接工程

使用链接池好处:连接只须要初始化一次,每次要使用的时候,直接从链接池获取,用完以后还给链接池。省去了每次建立、销毁链接的时间。                               

-->

<bean name="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">

<property name="connectionFactory">

<bean class="org.apache.activemq.ActiveMQConnectionFactory">

<property name="brokerURL" value="tcp://192.168.23.12:61616"/>

<property name="userName" value="admin"/>

<property name="password" value="admin"/>

</bean>

</property>

<property name="maxConnections" value="20"></property>

</bean>

 

    <!-- 2spring整合activemq连接工厂

     能够缓存session

    -->

<bean name="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">

    <property name="targetConnectionFactory" ref="pooledConnectionFactory"></property>

<property name="sessionCacheSize" value="5"></property>

</bean>

 

<!-- 3spring整合消息操做对象JmsTemplate

使用jmsTemplate能够简化代码,不须要本身去建立消息的发送对象。

-->

<bean name="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

<property name="connectionFactory" ref="cachingConnectionFactory"></property>

</bean>

 

 

<!-- 4spring加载监听器

acknowledge="auto"  表示消息获取以后,自动出队列

container-type    表示的容器的类型   default|simple

    default:支持session缓存。

-->

<jms:listener-container acknowledge="auto"

container-type="default"

destination-type="queue"

connection-factory="cachingConnectionFactory">

<!-- 指定监听器

destination="order-mq"  指定监听的是哪个队列

ref="orderListener"         指定监听器对象  使用注解的时候,对象的名称是类名首字母小写

 -->

<jms:listener destination="order-mq" ref="orderListener"/>

</jms:listener-container>

 

</beans> 

 

8.4.3.5 第五步:修改web.xml文件,加载jms配置

<!-- 配置springmvc核心控制器 -->

  <servlet>

   <servlet-name>dispatcherServlet</servlet-name>

   <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>

  

   <init-param>

   <param-name>contextConfigLocation</param-name>

   <!-- <param-value>classpath:springmvc.xml</param-value> -->

   <param-value>classpath:spring*.xml</param-value>

   </init-param>

  

   <load-on-startup>1</load-on-startup>

  </servlet>

  <servlet-mapping>

   <servlet-name>dispatcherServlet</servlet-name>

   <url-pattern>*.action</url-pattern>

  </servlet-mapping>

 

8.5 整合测试

8.5.1 第一步:修改OrderController

--注入OrderProducer,修改业务逻辑

@Controller

public class OrderController {

    @Autowired

private OrderProducer producer;

 

@RequestMapping("/save.action")

public String save(Order order){

 

// System.out.println("当前提交的订单用户是:"+order.getUserid()+",订单金额:"+order.getPrice());

 

producer.sendToMQ(order);

 

return "/success.jsp";

}

}

 

8.5.2 第二步:从新启动项目,提交多个订单

--查看Eclipse控制台

 

 

 

--查看ActiveMQ控制台

 

 

 

--整合成功!!!