安装环境:
Activemq5.11.1, jdk1.7(activemq5.11.1版本须要jdk升级到1.7),虚拟机: 192.168.147.131
[root@localhost software]# pwd
/export/software [root@localhost software]# tar -zxvf apache-activemq-5.11.1-bin.tar.gz [root@localhost software]# mv apache-activemq-5.11.1 /usr/local
配置Nginx代理Activemq后台管理应用默认绑定的8161端口
upstream tomcat_tools.activemq.local { server 127.0.0.1:8161 weight=10 max_fails=2 fail_timeout=300s; } server { listen 80; server_name tools.activemq.local.com; root /usr/local/apache-activemq-5.11.1/webapps/; access_log /usr/local/apache-activemq-5.11.1/logs/tools.activemq.local.com_access.log main; error_log /usr/local/apache-activemq-5.11.1/logs/tools.activemq.local.com_error.log warn; error_page 403 404 /40x.html; location / { index index.html index.htm; proxy_next_upstream http_500 http_502 http_503 http_504 error timeout invalid_header; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_pass http://tomcat_tools.activemq.local;
} #静态文件,nginx本身处理 location ~ ^/(images|javascript|js|css|flash|media|static)/ { #过时30天,静态文件不怎么更新,过时能够设大一点, #若是频繁更新,则能够设置得小一点。 expires 30d; } }
重启nginx
启动activemq
[root@localhost linux-x86-64]# pwd
/usr/local/apache-activemq-5.11.1/bin/linux-x86-64 [root@localhost linux-x86-64]# ./activemq start
配置host[192.168.147.131 tools.activemq.local.com]
登陆activemq的后台,默认帐号 admin/admin
http://tools.activemq.local.com/admin
实例展现MQ消息的发送和接收[消息类型分为queue 和 Topic]
pom引入
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.1</version>
</dependency>
Queue类型消息
一、定义消息destination和brokerUrl[61616为activemq用于消息通信的端口]
public class Constant { public static final String brokerURL = "tcp://192.168.147.131:61616"; public static final String queueDestination = "testQueue"; }
二、编写消息的发送程序
package com.mq.base.queue; import javax.jms.*; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * created on 2015/6/4 * @author dennisit@163.com * @version 1.0 */
public class MqSender { public static void main(String[] args) throws JMSException { // 默认的帐号和密码为null
String username = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; // 初始化链接工厂, DEFAULT_BROKER_URL =failover://tcp://localhost:61616
ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, Constant.brokerURL); // 建立链接
Connection connection = factory.createConnection(); connection.start(); // 建立会话
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立消息主题Queue
Destination destination = session.createQueue(Constant.queueDestination); // MessageProducer负责发送消息
MessageProducer producer = session.createProducer(destination); // 消息不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); ObjectMessage message = session.createObjectMessage("hello world..."); producer.send(message); // 只有commit以后,消息才会进入队列
session.commit(); System.out.println("send..."); // 测试状态,这里把关闭会话和链接注释掉了。 // session.close(); // connection.close();
} }
执行消息发送,在管理后台查看javascript
三、编写消息的消费程序css
package com.mq.base.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.ObjectMessage; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * created on 2015/6/4 * @author dennisit@163.com * @version 1.0 */
public class MqReceiver { public static void main(String[] args) throws JMSException { // 默认的帐号和密码为null
String username = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; // 初始化链接工厂, DEFAULT_BROKER_URL =failover://tcp://localhost:61616
ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, Constant.brokerURL); // 建立链接
Connection connection = factory.createConnection(); connection.start(); // 建立会话
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue(Constant.queueDestination); // MessageConsumer负责接受消息
MessageConsumer consumer = session.createConsumer(destination); ObjectMessage message = (ObjectMessage)consumer.receive(); if (null != message) { String messageString = (String)message.getObject(); System.out.println("Receive : " + messageString); } // 测试状态,这里把关闭会话和链接注释掉了。 // session.close(); // connection.close();
} }
执行这段代码会输出接收到的消息内容:html
管理后台在查看queue中心结果以下:java
Topic类型消息linux
一、定义消息destination和brokerUrl[61616为activemq用于消息通信的端口]nginx
public class Constant { public static final String brokerURL = "tcp://192.168.147.131:61616"; public static final String topicDestination = "testTopic"; }
二、编写消息生产者web
package com.mq.base.topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * created on 2015/6/4 * @author dennisit@163.com * @version 1.0 */
public class MqSender { public static void main(String[] args) throws JMSException { // 默认的帐号和密码为null
String username = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; // 初始化链接工厂, DEFAULT_BROKER_URL =failover://tcp://localhost:61616
ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, com.mq.base.queue.Constant.brokerURL); // 建立链接
Connection connection = factory.createConnection(); connection.start(); // 建立会话
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立消息主题Topic,和Queue的区别就在此
Destination destination = session.createTopic(Constant.topicDestination); // MessageProducer负责发送消息
MessageProducer producer = session.createProducer(destination); // 消息不持久化
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); TextMessage message = session.createTextMessage(); // createObjectMessage("hello world...");
message.setStringProperty("msgId","topicMessage"); producer.send(message); // 只有commit以后,消息才会进入队列
session.commit(); System.out.println("send..."); // 测试状态,这里把关闭会话和链接注释掉了。 // session.close(); // connection.close();
} }
三、编写消息消费者数据库
package com.mq.base.topic; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * created on 2015/6/4 * @author dennisit@163.com * @version 1.0 */
public class MqReceiver { public static void main(String[] args) throws JMSException { // 默认的帐号和密码为null
String username = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; // 初始化链接工厂, DEFAULT_BROKER_URL =failover://tcp://localhost:61616
ConnectionFactory factory = new ActiveMQConnectionFactory(username, password, com.mq.base.queue.Constant.brokerURL); // 建立链接
Connection connection = factory.createConnection(); connection.start(); // 建立会话
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createTopic(Constant.topicDestination); // MessageConsumer负责接受消息
MessageConsumer consumer = session.createConsumer(destination); TextMessage message = (TextMessage)consumer.receive(); if (null != message) { String messageString = message.getStringProperty("msgId"); System.out.println("Receive : " + messageString); session.commit(); } // 测试状态,这里把关闭会话和链接注释掉了。 // session.close(); // connection.close();
} }
先启动消费者:apache
启动生产者,生产消息,此时会接收到消息如图:tomcat
观察topic后台管理
Queue模型消息和Topic模型消息区别
queue[点对点模型]
一、只有一个消费者
每条消息只有一个消费者,若是这条消息被消费,那么其它消费者不能接受到此消息。
二、时间无关性
消息的消费和时间无关,只要消息被发送了,在消息过时以前,若是没有其余消费者消费了这个消息,那么客户端能够在任什么时候候来消费这条消息。
三、消费者必须确认
消费者收到消息以后,必须向Message Provider确认,不然会被认为消息没有被消费,仍然能够被其余消费者消费。能够设置自动确认。这个特色其实也是保证一条消息只能由一个消费者来消费。
四、非持久化的消息只发一次
非持久化的消息,可能会丢失,由于消息会过时,另外Message Provider可能宕机。
五、持久化的消息严格发一次
消息能够被持久化,好比持久化在文件系统或者数据库中,这样能够避免Message Provider的异常或者其它异常致使消息丢失。
Topic[发布者/订阅者模型]
一、每条消息能够有多个订阅者
二、订阅者只能消费它们订阅topic以后的消息
三、非持久化订阅,订阅者必须保持为活动状态才能使用这些消息,若是一个订阅者A断开了10分钟,那么A就会收不到这10分钟内的消息。
四、持久化订阅,Message Provider会保存这些消息,即便订阅者由于网络缘由断开了,再从新链接之后,能让消费这些消息。
五、是否使用持久化订阅,须要根据业务场景判断。