Java消息队列-Spring整合ActiveMq

一、概述


 

  首先和你们一块儿回顾一下Java 消息服务,在我以前的博客《Java消息队列-JMS概述》中,我为你们分析了:html

  1. 消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由Java 实现。
  2. 优点:异步、可靠
  3. 消息模型:点对点,发布/订阅
  4. JMS中的对象

  而后在另外一篇博客《Java消息队列-ActiveMq实战》中,和你们一块儿从0到1的开启了一个ActiveMq 的项目,在项目开发的过程当中,咱们对ActiveMq有了必定的了解:  java

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 彻底支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  3. 对Spring的支持,ActiveMQ能够很容易内嵌到使用Spring的系统里面去,并且也支持Spring2.0的特性
  4. 经过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中经过JCA 1.5 resource adaptors的配置,可让ActiveMQ能够自动的部署到任何兼容J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持经过JDBC和journal提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持Ajax
  9. 支持与Axis的整合
  10. 能够很容易得调用内嵌JMS provider,进行测试

  在接下来的这篇博客中,我会和你们一块儿来整合Spring 和ActiveMq,这篇博文,咱们基于Spring+JMS+ActiveMQ+Tomcat,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务。web

 

二、目录结构

 


  2.1 项目目录

      IDE选择了IDEA(建议你们使用),为了不下载jar 的各类麻烦,底层使用maven搭建了一个项目,整合了Spring 和ActiveMqspring

   

   

    2.2 pom.xml

  View Code

    由于这里pom.xml 文件有点长,就不展开了。apache

    咱们能够看到其实依赖也就几个,一、Spring 核心依赖 二、ActiveMq core和pool(这里若是同窗们选择导入jar,能够直接导入咱们上一篇博客中说道的那个activemq-all 这个jar包)三、java servlet 相关依赖后端

    这里面咱们选择的ActiveMq pool 的依赖版本会和以后的dtd 有关系,须要版本对应,因此同窗们等下配置activemq 文件的时候,须要注意dtd 版本选择spring-mvc

 

    2.3 web.xml

    web.xml 也大同小异,指定Spring 配置文件,springMvc 命名,编码格式tomcat

复制代码
<?xml version="1.0" encoding="UTF-8"?>
<web-app xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
          http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
         version="3.0">

  <display-name>Archetype Created Web Application</display-name>

  <!-- 加载spring的配置文件,例如hibernate、jms等集成 -->
  <context-param>
    <param-name>contextConfigLocation</param-name>
    <param-value>
      classpath:applicationContext*.xml;
    </param-value>
  </context-param>

  <listener>
    <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  </listener>

  <servlet>
    <servlet-name>springMVC</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
      <param-name>contextConfigLocation</param-name>
      <param-value>classpath:spring-mvc.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
  </servlet>
  <servlet-mapping>
    <servlet-name>springMVC</servlet-name>
    <url-pattern>/</url-pattern>
  </servlet-mapping>

  <!-- 处理编码格式 -->
  <filter>
    <filter-name>characterEncodingFilter</filter-name>
    <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
    <init-param>
      <param-name>encoding</param-name>
      <param-value>UTF-8</param-value>
    </init-param>
    <init-param>
      <param-name>forceEncoding</param-name>
      <param-value>true</param-value>
    </init-param>
  </filter>
  <filter-mapping>
    <filter-name>characterEncodingFilter</filter-name>
    <url-pattern>/*</url-pattern>
  </filter-mapping>

</web-app>
复制代码

 

 

    2.4 SpringMvc 和applicationContext.xml

      这里面的SpringMVC没什么特别,有须要的同窗能够参考一下:服务器

  View Code

      applicationContext.xml 主要使用来装载Bean,咱们项目中并无什么特别的Java Bean,所以只用来指出包扫描路径:session

  View Code

 

   

    2.5 applicationContext-ActiveMQ.xml

复制代码
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:mvc="http://www.springframework.org/schema/mvc"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.1.xsd
        http://www.springframework.org/schema/mvc
        http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd"
>

    <context:component-scan base-package="com.Jayce" />
    <mvc:annotation-driven />

    <amq:connectionFactory id="amqConnectionFactory"
                           brokerURL="tcp://192.168.148.128:61616"
                           userName="admin"
                           password="admin" />

    <!-- 配置JMS链接工长 -->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory" />
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 定义消息队列(Queue) -->
    <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 设置消息队列的名字 -->
        <constructor-arg>
            <value>Jaycekon</value>
        </constructor-arg>
    </bean>

    <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestination" ref="demoQueueDestination" />
        <property name="receiveTimeout" value="10000" />
        <!-- true是topic,false是queue,默认是false,此处显示写出false -->
        <property name="pubSubDomain" value="false" />
    </bean>


    <!-- 配置消息队列监听者(Queue) -->
    <bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />

    <!-- 显示注入消息监听容器(Queue),配置链接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
    <bean id="queueListenerContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="demoQueueDestination" />
        <property name="messageListener" ref="queueMessageListener" />
    </bean>

</beans>
复制代码

 

       这里和你们讲解一下这个配置文件,若是你们可以从上述配置文件中看懂,能够跳过。同窗们也能够在ActiveMQ官网中的查看。

       一、ActiveMq 中的DTD,咱们在声明相关配置以前,咱们须要先导入ActiveMq 中的DTD,否则Spring 并不理解咱们的标签是什么意思。

         http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd

        咱们在pom.xml 文件中有配置了activemq 的版本依赖咱们这里的版本,须要和依赖的版本同样,否则是找不到相关的dtd

       二、amq:connectionFactory:很直白的一个配置项,用于配置咱们连接工厂的地址和用户名密码,这里须要注意的是选择tcp链接而不是http链接

       三、jmsTemplate:比较重要的一个配置,这里指定了链接工厂,默认消息发送目的地,还有链接时长,发布消息的方式

 

三、项目结构


  3.1 ProducerService

复制代码
package com.Jayce.Service;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 * Created by Administrator on 2017/1/5.
 */
@Service
public class ProducerService {

    @Resource(name="jmsTemplate")
    private JmsTemplate jmsTemplate;

    public void sendMessage(Destination destination,final String msg){
        System.out.println(Thread.currentThread().getName()+" 向队列"+destination.toString()+"发送消息---------------------->"+msg);
        jmsTemplate.send(destination, new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }

    public void sendMessage(final String msg){
        String destination = jmsTemplate.getDefaultDestinationName();
        System.out.println(Thread.currentThread().getName()+" 向队列"+destination+"发送消息---------------------->"+msg);
        jmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(msg);
            }
        });
    }
}
复制代码

 

     将消息生产者作成一个服务,当咱们须要发送消息的时候,只须要调用ProducerService实例中的sendMessage 方法就能够向默认目的发送一个消息。

    这里提供了两个发送方式,一个是发送到默认的目的地,一个是根据目的地发送消息。

    有兴趣的同窗能够和我上一篇文章《ActiveMq实战》中ActiveMq 发送消息的方式对比一下,能够发现一些不一样。

 

 

   3.2 ConsumerService

复制代码
package com.Jayce.Service;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
@Service
public class ConsumerService {
    @Resource(name="jmsTemplate")
    private JmsTemplate jmsTemplate;

    public TextMessage receive(Destination destination){
        TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
        try{
            System.out.println("从队列" + destination.toString() + "收到了消息:\t"
                    + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return textMessage;
    }
}
复制代码

 

 

     由于咱们项目中并无什么业务,因此的话对消息的处理也就是打印输出。咱们只须要调用jmsTemplate中的 receive 方法,就能够从里面获取到一条消息。

     再和咱们上一篇博客对比一下,上一篇博客中,咱们接受到信息以后须要手动确认事务,这样ActiveMQ中才会肯定这条消息已经被正确读取了。而整合了Spring以后,事务将由Spring 来管理。

 

   3.3 MessageController

复制代码
package com.Jayce.Controller;

import com.Jayce.Service.ConsumerService;
import com.Jayce.Service.ProducerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
@Controller
public class MessageController {
    private Logger logger = LoggerFactory.getLogger(MessageController.class);
    @Resource(name = "demoQueueDestination")
    private Destination destination;

    //队列消息生产者
    @Resource(name = "producerService")
    private ProducerService producer;

    //队列消息消费者
    @Resource(name = "consumerService")
    private ConsumerService consumer;

    @RequestMapping(value = "/SendMessage", method = RequestMethod.POST)
    @ResponseBody
    public void send(String msg) {
        logger.info(Thread.currentThread().getName()+"------------send to jms Start");
        producer.sendMessage(msg);
        logger.info(Thread.currentThread().getName()+"------------send to jms End");
    }

    @RequestMapping(value= "/ReceiveMessage",method = RequestMethod.GET)
    @ResponseBody
    public Object receive(){
        logger.info(Thread.currentThread().getName()+"------------receive from jms Start");
        TextMessage tm = consumer.receive(destination);
        logger.info(Thread.currentThread().getName()+"------------receive from jms End");
        return tm;
    }

}
复制代码

    控制层里面须要注入咱们的生产者和消费者(实际开发中,生产者和消费者确定不会在同一个项目中的,否则就消息服务这个东西就没有意义了)。

    如今服务层和控制层都好了,接下来咱们就进行一个简单的测试

 

四、项目测试


  4.1 启动ActiveMq

      先肯定你的ActiveMQ服务已经开启。

    

 

 

  4.2 启动项目

    项目使用了Tomcat 插件,避免了本地再下载Tomcat的麻烦,有须要的同窗可使用一下。

复制代码
<plugins>
      <plugin>
        <groupId>org.apache.tomcat.maven</groupId>
        <artifactId>tomcat7-maven-plugin</artifactId>
        <configuration>
          <port>8080</port>
          <path>/</path>
        </configuration>
      </plugin>
</plugins>
复制代码

 

 

 

  4.3 发送消息

  这里用了Chrome 的一个插件PostMan 有兴趣的同窗能够了解一下,在Chrome 拓展程序中能够找到,避免了后端的同窗去弄页面!

    

    咱们发送了一个post 请求以后,看一下服务器的效果:

    咱们能够看到,已经向队列发送了一条消息。咱们看一下ActiveMq如今的状态:

    咱们能够看到,一条消息已经成功发送到了ActiveMq中。

 

 

  4.4 接收消息

    使用get请求访问服务器后台:

  

 

     服务的输出:

 

     ActiveMq服务器状态:

    咱们能够看到,消费者已经消费了一条信息,而且没有断开与ActiveMq之间的连接。

  

  4.5 监听器

    在实际项目中,咱们不多会本身手动去获取消息,若是须要手动去获取消息,那就没有必要使用到ActiveMq了,能够用一个Redis 就足够了。

    不能手动去获取消息,那么咱们就能够选择使用一个监听器来监听是否有消息到达,这样子能够很快的完成对消息的处理。

   4.5.1 applicationContext-ActiveMQ.xml 配置

      在上面的配置文件中,咱们已经默认的添加了这段监听器的配置文件,若是同窗们不想使用这个监听器,能够直接注释掉。

复制代码
    <!-- 配置消息队列监听者(Queue) -->
    <bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />

    <!-- 显示注入消息监听容器(Queue),配置链接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
    <bean id="queueListenerContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="demoQueueDestination" />
        <property name="messageListener" ref="queueMessageListener" />
    </bean>
复制代码

 

 

   4.5.2 MessageListener

      咱们须要建立一个类实现MessageListener 接口:

复制代码
package com.Jayce.Filter;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * Created by Administrator on 2017/1/5.
 */
public class QueueMessageListener implements MessageListener {
    public void onMessage(Message message) {
        TextMessage tm = (TextMessage) message;
        try {
            System.out.println("QueueMessageListener监听到了文本消息:\t"
                    + tm.getText());
            //do something ...
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
复制代码

 

   实现接口的onMessage 方法,咱们将须要的业务操做在里面解决,这样子,就完成了咱们生产者-中间件-消费者,这样一个解耦的操做了。 

 

   4.5.3 测试

    和上面同样,使用postMan 发送post请求,咱们能够看到控制台里面,消息立刻就能打印出来:

   

    再看看ActiveMQ服务器的状态:

  咱们能够看到,使用监听器的效果,和手动接收消息的效果是同样的。

  这样子一整个项目下来,咱们已经成功的整合了Spring和ActiveMQ。

 

  4.6 压力测试

    这里其实也算不上什么压力测试,在配置pom.xml文件的时候,你们有看到一个 commons-httpclient 的依赖,接下来咱们使用httpClient 不停的想服务器发送消息,看一下服务器解决消息的速度如何:

复制代码
package com.Jaycekon.test;

import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Administrator on 2017/1/5.
 */
public class Client {

    @Test
    public void test() {
        HttpClient httpClient = new HttpClient();
        new Thread(new Sender(httpClient)).start();

    }

}

class Sender implements Runnable {
    public static AtomicInteger count = new AtomicInteger(0);
    HttpClient httpClient;

    public Sender(HttpClient client) {
        httpClient = client;
    }

    public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+"---Send message-"+count.getAndIncrement());
                PostMethod post = new PostMethod("http://127.0.0.1:8080/SendMessage");
                post.addParameter("msg", "Hello world!");
                httpClient.executeMethod(post);
                System.out.println(Thread.currentThread().getName()+"---Send message Success-"+count.getAndIncrement());

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
复制代码

 

     这里面用了HttpClient 来向服务器发送Post 请求,而后计数输出,有兴趣的同窗能够本身测试一下,能够多开几个线程,这里只开了一个线程。

相关文章
相关标签/搜索