ActiveMQ消息队列的使用和应用

1、什么是ActiveMQ

AciveMQ是Apache出品的目前最流行,能力强劲的开源消息总线java

消息列队有两种消息模式,一种是点对点的消息模式,还有一种就是订阅的模式.apache

主要功能:浏览器

  1. 解决服务器之间的耦合性
  2. 使用消息队列,增长系统并发处理量

主要应用场景:缓存

  1. 当系统使用短信平台、邮件平台的时候
  2. 当系统使用搜索平台、缓存平台的时候你

2、使用外置ActiveMQ流程:

1.官网地址:http://activemq.apache.org/服务器

2.安装包下载完成后解压后 就是这个样子了 (注意必定要解压到全英文路径下的包内)session

3.打开你的bin目录,打开你系统对应位数的目录并发

4.双击启动activemq.batmaven

5.点完以后会自动弹出来doc启动,稍等一下 若是你最后跟个人同样 说明启动成功了tcp

6.启动成功,在浏览器中访问http://localhost:8161/ 就能访问到activemq的页面ide

7.登陆成功后(用户名和密码都是admin),会有两个队列:topicqueue (后台建立队列,这边会实时显示)

3、两种消息模式

1.点对点的实现代码

项目使用MAVEN来构建(pom.xml)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.itsiji</groupId>
  <artifactId>activemq</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <!--activemq-->
  <dependencies>
  	<dependency>
		<groupId>org.apache.activemq</groupId>
		<artifactId>activemq-client</artifactId>
		<version>5.13.4</version>
	 </dependency>
  </dependencies>
  
  <build>
	<plugins>			
		<!-- java编译插件 -->
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-compiler-plugin</artifactId>
			<version>3.2</version>
			<configuration>
				<source>1.7</source>
				<target>1.7</target>
				<encoding>UTF-8</encoding>
			</configuration>
	     </plugin>
	</plugins>
  </build>
</project>

1.1点对点的发送方

须要注意的:链接地址的ip和端口号是 127.0.0.1:61616

                      在页面管理控制台查看消息队列的ip和端口号是 127.0.0.1:8161

package com.itsiji.test.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TestQueueSend {
	

	
	    //链接帐号
	    private String userName = "";
	    //链接密码
	    private String password = "";
	    //链接地址
	    private String brokerURL = "tcp://127.0.0.1:61616";
	    //connection的工厂
	    private ConnectionFactory factory;
	    //链接对象
	    private Connection connection;
	    //一个操做会话
	    private Session session;
	    //目的地,其实就是链接到哪一个队列,若是是点对点,那么它的实现是Queue,若是是订阅模式,那它的实现是Topic
	    private Destination destination;
	    //生产者,就是产生数据的对象
	    private MessageProducer producer;
	    
	    public static void main(String[] args) {
	         TestQueueSend send = new TestQueueSend();
	        send.start();
	    }
	    
	    public void start(){
	        try {
	            //根据用户名,密码,url建立一个链接工厂
	            factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
	            //从工厂中获取一个链接
	            connection = factory.createConnection();
	            //测试过这个步骤不写也是能够的,可是网上的各个文档都写了
	            connection.start();
	            //建立一个session
	            //第一个参数:是否支持事务,若是为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
	            //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
	            //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不须要作额外的工做。哪怕是接收端发生异常,也会被看成正常发送成功。
	            //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会看成发送成功,并删除消息。
	            //DUPS_OK_ACKNOWLEDGE容许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;并且容许重复确认。
	            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	            //建立一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是链接了一个名为"text-msg"的队列,这个会话将会到这个队列,固然,若是这个队列不存在,将会被建立
	            destination = session.createQueue("text-msg");
	            //从session中,获取一个消息生产者
	            producer = session.createProducer(destination);
	            //设置生产者的模式,有两种可选
	            //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
	            //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空
	            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
	            
	            //建立一条消息,固然,消息的类型有不少,如文字,字节,对象等,能够经过session.create..方法来建立出来
	            TextMessage textMsg = session.createTextMessage("呵呵");
	            for(int i = 0 ; i < 100 ; i ++){
	                //发送一条消息
	                producer.send(textMsg);
	            }
	            
	            System.out.println("发送消息成功");
	            //即使生产者的对象关闭了,程序还在运行哦
	            producer.close();
	            
	        } catch (JMSException e) {
	            e.printStackTrace();
	        }
	    }
	}

1.2点对点的接收方

package com.itsiji.test.queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TestQueueReceive {
	

	
	    //链接帐号
	    private String userName = "";
	    //链接密码
	    private String password = "";
	    //链接地址
	    private String brokerURL = "tcp://127.0.0.1:61616";
	    //connection的工厂
	    private ConnectionFactory factory;
	    //链接对象
	    private Connection connection;
	    //一个操做会话
	    private Session session;
	    //目的地,其实就是链接到哪一个队列,若是是点对点,那么它的实现是Queue,若是是订阅模式,那它的实现是Topic
	    private Destination destination;
	    //消费者,就是接收数据的对象
	    private MessageConsumer consumer;
	    public static void main(String[] args) {
	        TestQueueReceive receive = new TestQueueReceive();
	        receive.start();
	    }
	    
	    public void start(){
	        try {
	            //根据用户名,密码,url建立一个链接工厂
	            factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
	            //从工厂中获取一个链接
	            connection = factory.createConnection();
	            //测试过这个步骤不写也是能够的,可是网上的各个文档都写了
	            connection.start();
	            //建立一个session
	            //第一个参数:是否支持事务,若是为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
	            //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
	            //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不须要作额外的工做。哪怕是接收端发生异常,也会被看成正常发送成功。
	            //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会看成发送成功,并删除消息。
	            //DUPS_OK_ACKNOWLEDGE容许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;并且容许重复确认。
	            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
	            //建立一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是链接了一个名为"text-msg"的队列,这个会话将会到这个队列,固然,若是这个队列不存在,将会被建立
	            destination = session.createQueue("text-msg");
	            //根据session,建立一个接收者对象
	            consumer = session.createConsumer(destination);
	            
	            
	            //实现一个消息的监听器
	            //实现这个监听器后,之后只要有消息,就会经过这个监听器接收到
	            consumer.setMessageListener(new MessageListener() {
	                @Override
	                public void onMessage(Message message) {
	                    try {
	                        //获取到接收的数据
	                        String text = ((TextMessage)message).getText();
	                        System.out.println(text);
	                    } catch (JMSException e) {
	                        e.printStackTrace();
	                    }
	                }
	            });
	            //关闭接收端,也不会终止程序哦
//	            consumer.close();
	        } catch (JMSException e) {
	            e.printStackTrace();
	        }
	    }
	}

2.订阅/发布模式的实现代码

2.1订阅模式的发布方

package com.itsiji.test.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class TestTopicSend {
	
//链接帐号
private String userName = "";
//链接密码
private String password = "";
//链接地址
private String brokerURL = "tcp://127.0.0.1:61616";
//connection的工厂
private ConnectionFactory factory;
//链接对象
private Connection connection;
//一个操做会话
private Session session;
//目的地,其实就是链接到哪一个队列,若是是点对点,那么它的实现是Queue,若是是订阅模式,那它的实现是Topic
private Destination destination;
//生产者,就是产生数据的对象
private MessageProducer producer;

public static void main(String[] args) {
    TestTopicSend send = new TestTopicSend();
    send.start();
}

public void start(){
    try {
        //根据用户名,密码,url建立一个链接工厂
        factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
        //从工厂中获取一个链接
        connection = factory.createConnection();
        //测试过这个步骤不写也是能够的,可是网上的各个文档都写了
        connection.start();
        //建立一个session
        //第一个参数:是否支持事务,若是为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
        //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
        //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不须要作额外的工做。哪怕是接收端发生异常,也会被看成正常发送成功。
        //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会看成发送成功,并删除消息。
        //DUPS_OK_ACKNOWLEDGE容许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;并且容许重复确认。
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //建立一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是链接了一个名为"text-msg"的队列,这个会话将会到这个队列,固然,若是这个队列不存在,将会被建立
        
        
        
        //=======================================================
        //点对点与订阅模式惟一不一样的地方,就是这一行代码,点对点建立的是Queue,而订阅模式建立的是Topic
        destination = session.createTopic("topic-text");
        //=======================================================
        
        
        
        
        //从session中,获取一个消息生产者
        producer = session.createProducer(destination);
        //设置生产者的模式,有两种可选
        //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
        //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        
        //建立一条消息,固然,消息的类型有不少,如文字,字节,对象等,能够经过session.create..方法来建立出来
        TextMessage textMsg = session.createTextMessage("哈哈");
        long s = System.currentTimeMillis();
        for(int i = 0 ; i < 100 ; i ++){
            //发送一条消息
            producer.send(textMsg);
        }
        long e = System.currentTimeMillis();
        System.out.println("发送消息成功");
        System.out.println(e - s);
        //即使生产者的对象关闭了,程序还在运行哦
                producer.close();
                
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
}

2.2订阅模式的接收方

package com.itsiji.test.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
public class TestTopicReceive {

    //链接帐号
    private String userName = "";
    //链接密码
    private String password = "";
    //链接地址
    private String brokerURL = "tcp://127.0.0.1:61616";
    //connection的工厂
    private ConnectionFactory factory;
    //链接对象
    private Connection connection;
    //一个操做会话
    private Session session;
    //目的地,其实就是链接到哪一个队列,若是是点对点,那么它的实现是Queue,若是是订阅模式,那它的实现是Topic
    private Destination destination;
    //生产者,就是产生数据的对象
    private MessageProducer producer;
    
    public static void main(String[] args) {
        TestTopicReceive send = new TestTopicReceive();
        send.start();
    }
    
    public void start(){
        try {
            //根据用户名,密码,url建立一个链接工厂
            factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
            //从工厂中获取一个链接
            connection = factory.createConnection();
            //测试过这个步骤不写也是能够的,可是网上的各个文档都写了
            connection.start();
            //建立一个session
            //第一个参数:是否支持事务,若是为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
            //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
            //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不须要作额外的工做。哪怕是接收端发生异常,也会被看成正常发送成功。
            //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会看成发送成功,并删除消息。
            //DUPS_OK_ACKNOWLEDGE容许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;并且容许重复确认。
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //建立一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是链接了一个名为"text-msg"的队列,这个会话将会到这个队列,固然,若是这个队列不存在,将会被建立
            
            
            
            //=======================================================
            //点对点与订阅模式惟一不一样的地方,就是这一行代码,点对点建立的是Queue,而订阅模式建立的是Topic
            destination = session.createTopic("topic-text");
            //=======================================================
            
            
            
            
            //从session中,获取一个消息生产者
            producer = session.createProducer(destination);
            //设置生产者的模式,有两种可选
            //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
            //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            
            //建立一条消息,固然,消息的类型有不少,如文字,字节,对象等,能够经过session.create..方法来建立出来
            TextMessage textMsg = session.createTextMessage("哈哈");
            long s = System.currentTimeMillis();
            for(int i = 0 ; i < 100 ; i ++){
                //发送一条消息
                textMsg.setText("哈哈" + i);
                producer.send(textMsg);
            }
            long e = System.currentTimeMillis();
            System.out.println("发送消息成功");
            System.out.println(e - s);
            //即使生产者的对象关闭了,程序还在运行哦
            producer.close();
            
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
	
}

3.发送消息的数据类型

javax.jms.Message 这个接口,只要是这个接口的数据,均可以被发送

//纯字符串的数据
session.createTextMessage();
//序列化的对象
session.createObjectMessage();
//流,能够用来传递文件等
session.createStreamMessage();
 //用来传递字节
session.createBytesMessage();
//这个方法建立出来的就是一个map,能够把它看成map来用,当你看了它的一些方法,你就懂了
session.createMapMessage();
//这个方法,拿到的是javax.jms.Message,是全部message的接口
session.createMessage();