ActiveMQ入门练习

  ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个彻底支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已是好久的事情了,可是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。java

  JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通讯。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。git

  JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它相似于JDBC(Java Database Connectivity)。这里,JDBC 是能够用来访问许多不一样关系数据库的 API,而 JMS 则提供一样与厂商无关的访问方法,以访问消息收发服务。许多厂商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ。 JMS 使您可以经过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另外一个 JMS客户机发送消息。
  本篇我将和你们一块儿分享一下Apache JMS的使用,ApacheMQ: http://activemq.apache.org/,下载后解压,这里须要说明,使用以前请先配置Java环境变量。启动后打开浏览器输入:http://localhost:8161/admin,这里须要经过密码登陆,默认用户名:admin 密码:admin(用户名密码是在conf/users.properties中配置的)
  这样咱们的环境就配置好了。更详细的解说: http://blog.csdn.net/clj198606061111/article/details/38145597
  下面就是咱们如何进行消息的发送和接收,这里有两种方式:点对点收发;发布订阅。下面咱们就一一进行探讨。
  首先搭建一下测试环境,这里我经过Maven进行项目的建立,咱们须要引入activemq的核心activemq-all-5.15.2.jar,以及单元测试
  <dependencies>
      <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>${active-mq-version}</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/junit/junit -->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
  </dependencies>

  下面咱们就开始消息的收发,先来探讨一下点对点的消息收发:github

  一、消息的发送数据库

  原理上咱们经过ConnectionFactory会话工厂 ---> 建立一个会话链接 ---> 经过会话链接建立一个会话线程 ---> 经过会话线程建立一个消息队列 ---> 经过会话线程和消息队列建立一个消息生产者(MessageProducer) ---> 最后由消息生产者进行消息的发送。apache

private static ConnectionFactory factory;//会话链接工厂
private static Connection connection;//会话链接
private static Session session;//会话接收或发送消息线程
private static Destination destination;//消息队列
private static MessageProducer messageProducer;//消息发送者

  经过ActiveMQConnectionFactory建立会话工厂浏览器

factory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);//建立会话链接工厂

  设置会话链接额度地址和用户名密码在(这里我使用的是默认的地址和用户名密码)服务器

private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

  下面封装了一下上面的过程:session

    /**
     * @Description 发送消息
     * @param queryName 消息队列名称
     * @param msg 消息内容
     * @return
     *
     * @author 高尚
     * @version 1.0
     * @date 建立时间:2018年1月31日 下午3:54:22
     */
    public static boolean producerSendQueryMessage(final String queryName, final String msg){
        boolean flag = true;
        try {
            connection = factory.createConnection();//建立会话链接
            connection.start();//启动会话链接
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//建立会话线程
            destination = session.createQueue(queryName);//建立消息队列
            messageProducer = session.createProducer(destination);//建立会话生成者
            Message message = session.createTextMessage(msg);//建立消息对象
            messageProducer.send(message);//发送消息
            session.commit();
        } catch (JMSException e) {
            e.printStackTrace();
            flag = false;
        }finally {
            if(null != connection){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
        return flag;
    }    

  这里单独说一下connection.createSession生成会话线程:异步

Session session = connection.createSession(paramA,paramB);
paramA是设置事务,paramB是设置acknowledgment mode

paramA 取值有:
一、true:支持事务
为true时:paramB的值忽略, acknowledgment mode被jms服务器设置为SESSION_TRANSACTED 。  
二、false:不支持事务 
为false时:paramB的值可为Session.AUTO_ACKNOWLEDGE、Session.CLIENT_ACKNOWLEDGE、DUPS_OK_ACKNOWLEDGE其中一个。

paramB 取值有:
1、Session.AUTO_ACKNOWLEDGE:为自动确认,客户端发送和接收消息不须要作额外的工做。
2、Session.CLIENT_ACKNOWLEDGE:为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会删除消息。 
3、DUPS_OK_ACKNOWLEDGE:容许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;并且容许重复确认。在须要考虑资源使用时,这种模式很是有效。
四、SESSION_TRANSACTED

  到这里咱们的消息发送模块就封装完成,下面咱们看一下消息的接收:分布式

  原理上咱们经过ConnectionFactory会话工厂 ---> 建立一个会话链接 ---> 经过会话链接建立一个会话线程 ---> 经过会话线程建立一个消息队列 ---> 经过会话线程和消息队列建立一个消息消费者(MessageConsumer) ---> 最后由消息消费者进行消息获取

    /**
     * @Description 接收消息队列中的消息
     * @param queryName 消息队列名称
     * @return
     *
     * @author 高尚
     * @version 1.0
     * @date 建立时间:2018年1月31日 下午4:24:14
     */
    public static void consumerGetQueryMessage(final String queryName){
        try {
            connection = factory.createConnection();//建立会话链接
            connection.start();//启动会话链接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(queryName);
            messageConsumer = session.createConsumer(destination);
            while(true){
                TextMessage message = (TextMessage) messageConsumer.receive();
                if(null != message){
                    System.out.println(queryName+"发送消息:"+message.getText());
                }
            }
        } catch (Exception e) {
            // TODO: handle exception
            System.out.println("消费消息异常");
        }finally {
            if(null != connection){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

  最后单独说一下messageConsumer.receive方法:

messageConsumer.receive();//一直等到消息
messageConsumer.receive(1000);//等到消息1秒钟
messageConsumer.receiveNoWait();//不等待消息

  固然这样的接收消息方式是否是感受很low,固然咱们还能够经过监听器来实现消息接收的监听(MessageListener),咱们实现MessageListener接口,自定义消息接收监听器:

/**
 * @Description 消息监听器
 * @author 高尚
 * @version 1.0
 * @date 建立时间:2018年1月31日 下午4:52:36
 */
public class MsgListener implements MessageListener {
    
    /**
     * 监听的会话队列
     */
    private static String queryName;

    @Override
    public void onMessage(Message msg) {
        TextMessage textMsg = (TextMessage) msg;
        try {
            if(null != textMsg){
                System.out.println("【" + queryName + "】发送的消息:" + textMsg.getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
            System.out.println("获取会话消息异常");
        }
    }

    public MsgListener(String queryName) {
        super();
        // TODO Auto-generated constructor stub
        this.queryName = queryName;
    }
    
}

  而后咱们须要简单修改一个消息接收策略:

    /**
     * @Description 经过Listener接收消息队列中的消息
     * @param queryName 消息队列名称
     * @return
     *
     * @author 高尚
     * @version 1.0
     * @date 建立时间:2018年1月31日 下午4:24:14
     */
    public static void consumerGetQueryMessageListener(String queryName) {
        try {
            connection = factory.createConnection();//建立会话链接
            connection.start();//启动会话链接
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(queryName);
            messageConsumer = session.createConsumer(destination);
            MsgListener msgListener = new MsgListener(queryName);
            messageConsumer.setMessageListener(msgListener);
            while(true){
                Thread.sleep(10000);
            }
        } catch (Exception e) {
            // TODO: handle exception
            System.out.println("消费消息异常");
        }finally {
            if(null != connection){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

  最后咱们一块儿看一下消息的发布和订阅,首先消息的发布和订阅与点点的消息收发基本一致,不一样点在于会话线程的建立:

destination = session.createTopic(queryName);//建立消息发布线程

  其余部分一致,没有什么难度,这里再也不阐述,你们能够自行测试。固然我也为你们提供了测试参考代码:https://github.com/hpugs/ActiveMQ

  到这里关于ActiveMQ入门部分就和你们探讨完毕,若有错误还有指教。谢谢

相关文章
相关标签/搜索