Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件,说白了就是个服务器,主要用来存放请求消息的java
这篇博客图文并茂,通俗易懂 ActiveMQ做用总结ajax
笔者将其精炼了一下,主要有4大应用场景:异步处理,应用解耦,流量削锋,消息通信spring
其核心思想都是把用户的请求先存放在MQ中,而后返回用户响应,后台再慢慢去处理MQ中的消息,不须要一条龙业务所有跑完再返回响应,这样的话单位时间内请求数能够更多,响应速度也更快,至关于提升了吞吐量。其实前3种场景都差很少,笔者看来没有绝对的边界,只不过异步处理强调非同时性,应用解耦强调子系统挂掉后MQ体现的做用,流量削锋强调MQ在高并发中体现的做用。消息通信的业务模式举例子:1.用微信和微信好友聊天 2.微信群聊天apache
源码地址:json
安装好activeMQ,如何安装自行百度tomcat
项目适用jdk1.8,采用idea多模块架构,涉及技术有spring, activemq, tomcatbash
client是模拟消费者,domain是公共工具包,被maven打成jar供其它项目适用,service是模拟消息生产者服务器
启动activemq服务器微信
双击activemq.bat启动session
登录 http://localhost:8161/admin/queues.jsp ,发现Queues是空的
看一下service的配置activemq_config.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<!-- 这里暴露内部统一使用的MQ地址 -->
<bean id="internalTargetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
</bean>
<bean id="internalConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
destroy-method="stop">
<property name="connectionFactory" ref="internalTargetConnectionFactory" />
<property name="maxConnections" value="20" />
</bean>
<!-- Spring提供的JMS工具类,它能够进行消息发送、接收等 -->
<bean id="internalJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="internalConnectionFactory" />
</bean>
<!-- 推送给用户信息 建立一个Queue-->
<bean id="userServiceQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>user.service.queue</value>
</constructor-arg>
</bean>
<!-- 推送给新闻信息 建立一个Queue-->
<bean id="newsServiceQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>news.service.queue</value>
</constructor-arg>
</bean>
<!-- 推送给客户信息 建立一个Queue-->
<bean id="clientServiceQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>client.service.queue</value>
</constructor-arg>
</bean>
</beans>复制代码
一共3种推送,每种推送对应1个队列名
PushService是1个通用接口,而后3种推送各对应1个实现,使用tomcat启动service服务
登录localhost:8080
按照如上填写后,咱们点击推送用户信息,出现以下提示框
登录 http://localhost:8161/admin/queues.jsp ,
发现新增了一个队列,待处理消息数量1,消费者数量0,消息排队1,消息已出列0
咱们看看后台执行过程
js经过ajax请求到后台
@RequestMapping(value="/user",method=RequestMethod.POST)
@ResponseBody
public ResultRespone userPush(User info){
ResultRespone respone = new ResultRespone();
try {
userPushService.push(info);
respone.setData(info);
} catch (Exception e) {
e.printStackTrace();
respone = new ResultRespone(false, e.getMessage());
}
return respone;
}复制代码
调用push()方法
@Autowired
@Qualifier("userServiceQueue")
private Destination destination;
@Override
public void push(final Object info) {
pushExecutor.execute(new Runnable() {
@Override
public void run() {
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
User p = (User) info;
return session.createTextMessage(JSON.toJSONString(p));
}
});
}
});
}复制代码
这个过程实际上将用户属性值组成的字符串发送到了activemq服务器,到此,生产者的任务就完成了
主要经过3个listener来接收activemq发送过来的消息
看其中一个UserPushListener.java
@Component("userPushListener")
public class UserPushListener implements MessageListener {
protected static final Logger logger = Logger.getLogger(UserPushListener.class);
@Override
public void onMessage(Message message) {
logger.info("[UserPushListener.onMessage]:begin onMessage.");
TextMessage textMessage = (TextMessage) message;
try {
String jsonStr = textMessage.getText();
logger.info("[UserPushListener.onMessage]:receive message is,"+ jsonStr);
if (jsonStr != null) {
User info = JSON.parseObject(jsonStr, User.class);
System.out.println("==============================接受到的用户信息 开始====================================");
System.out.println(info.toString());
System.out.println("==============================接受到的用户信息 结束====================================");
WebsocketController.broadcast("user", jsonStr);
}
} catch (JMSException e) {
logger.error("[UserPushListener.onMessage]:receive message occured an exception",e);
}
logger.info("[UserPushListener.onMessage]:end onMessage.");
}
}复制代码
看一下消费端的配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
<!-- 内部统一使用的MQ地址 -->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory" ref="targetConnectionFactory"/>
<property name="maxConnections" value="50"/>
</bean>
<!-- Spring提供的JMS工具类,它能够进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!-- 推送给用户信息 -->
<bean id="userPushListenerMQ" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>user.service.queue</value>
</constructor-arg>
</bean>
<!-- 推送给新闻信息 -->
<bean id="newsPushListenerMQ" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>news.service.queue</value>
</constructor-arg>
</bean>
<!-- 推送给客户信息 -->
<bean id="clientPushListenerMQ" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg>
<value>client.service.queue</value>
</constructor-arg>
</bean>
<!-- 用户接受推送 -->
<bean id="userPushListenerConsumer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="userPushListenerMQ" />
<property name="messageListener" ref="userPushListener" />
</bean>
<!-- 新闻接受推送 -->
<bean id="newsPushListenerConsumer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="newsPushListenerMQ" />
<property name="messageListener" ref="newsPushListener" />
</bean>
<!-- 客户接受推送 -->
<bean id="clientPushListenerConsumer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="clientPushListenerMQ" />
<property name="messageListener" ref="clientPushListener" />
</bean>
</beans>复制代码
消费端监听了3个队列,因此队列一旦有消息,消费端就会监听到,并且activemq能够确认哪些消息被推送成功了
关闭service服务,启动client服务,观察日志
成功接收到消息,再次查看 http://localhost:8161/admin/queues.jsp
发现user.service.queue这个队列的消息是待处理消息数量0,消费者数量1,消息排队1,消息已出列1,代表消息推送完毕,另外两个新增的队列是客户端监听形成的,能够看出待处理消息的数量都是0