AciveMQ是Apache出品的目前最流行,能力强劲的开源消息总线java
消息列队有两种消息模式,一种是点对点的消息模式,还有一种就是订阅的模式.apache
主要功能:浏览器
主要应用场景:缓存
1.官网地址:http://activemq.apache.org/服务器
2.安装包下载完成后解压后 就是这个样子了 (注意必定要解压到全英文路径下的包内)session
4.双击启动activemq.batmaven
5.点完以后会自动弹出来doc启动,稍等一下 若是你最后跟个人同样 说明启动成功了tcp
6.启动成功,在浏览器中访问http://localhost:8161/ 就能访问到activemq的页面ide
7.登陆成功后(用户名和密码都是admin),会有两个队列:topic和queue (后台建立队列,这边会实时显示)
项目使用MAVEN来构建(pom.xml)
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.itsiji</groupId> <artifactId>activemq</artifactId> <version>0.0.1-SNAPSHOT</version> <!--activemq--> <dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.13.4</version> </dependency> </dependencies> <build> <plugins> <!-- java编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.7</source> <target>1.7</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>
须要注意的:链接地址的ip和端口号是 127.0.0.1:61616
在页面管理控制台查看消息队列的ip和端口号是 127.0.0.1:8161
package com.itsiji.test.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class TestQueueSend { //链接帐号 private String userName = ""; //链接密码 private String password = ""; //链接地址 private String brokerURL = "tcp://127.0.0.1:61616"; //connection的工厂 private ConnectionFactory factory; //链接对象 private Connection connection; //一个操做会话 private Session session; //目的地,其实就是链接到哪一个队列,若是是点对点,那么它的实现是Queue,若是是订阅模式,那它的实现是Topic private Destination destination; //生产者,就是产生数据的对象 private MessageProducer producer; public static void main(String[] args) { TestQueueSend send = new TestQueueSend(); send.start(); } public void start(){ try { //根据用户名,密码,url建立一个链接工厂 factory = new ActiveMQConnectionFactory(userName, password, brokerURL); //从工厂中获取一个链接 connection = factory.createConnection(); //测试过这个步骤不写也是能够的,可是网上的各个文档都写了 connection.start(); //建立一个session //第一个参数:是否支持事务,若是为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。 //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不须要作额外的工做。哪怕是接收端发生异常,也会被看成正常发送成功。 //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会看成发送成功,并删除消息。 //DUPS_OK_ACKNOWLEDGE容许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;并且容许重复确认。 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是链接了一个名为"text-msg"的队列,这个会话将会到这个队列,固然,若是这个队列不存在,将会被建立 destination = session.createQueue("text-msg"); //从session中,获取一个消息生产者 producer = session.createProducer(destination); //设置生产者的模式,有两种可选 //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存 //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //建立一条消息,固然,消息的类型有不少,如文字,字节,对象等,能够经过session.create..方法来建立出来 TextMessage textMsg = session.createTextMessage("呵呵"); for(int i = 0 ; i < 100 ; i ++){ //发送一条消息 producer.send(textMsg); } System.out.println("发送消息成功"); //即使生产者的对象关闭了,程序还在运行哦 producer.close(); } catch (JMSException e) { e.printStackTrace(); } } }
package com.itsiji.test.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class TestQueueReceive { //链接帐号 private String userName = ""; //链接密码 private String password = ""; //链接地址 private String brokerURL = "tcp://127.0.0.1:61616"; //connection的工厂 private ConnectionFactory factory; //链接对象 private Connection connection; //一个操做会话 private Session session; //目的地,其实就是链接到哪一个队列,若是是点对点,那么它的实现是Queue,若是是订阅模式,那它的实现是Topic private Destination destination; //消费者,就是接收数据的对象 private MessageConsumer consumer; public static void main(String[] args) { TestQueueReceive receive = new TestQueueReceive(); receive.start(); } public void start(){ try { //根据用户名,密码,url建立一个链接工厂 factory = new ActiveMQConnectionFactory(userName, password, brokerURL); //从工厂中获取一个链接 connection = factory.createConnection(); //测试过这个步骤不写也是能够的,可是网上的各个文档都写了 connection.start(); //建立一个session //第一个参数:是否支持事务,若是为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。 //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不须要作额外的工做。哪怕是接收端发生异常,也会被看成正常发送成功。 //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会看成发送成功,并删除消息。 //DUPS_OK_ACKNOWLEDGE容许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;并且容许重复确认。 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是链接了一个名为"text-msg"的队列,这个会话将会到这个队列,固然,若是这个队列不存在,将会被建立 destination = session.createQueue("text-msg"); //根据session,建立一个接收者对象 consumer = session.createConsumer(destination); //实现一个消息的监听器 //实现这个监听器后,之后只要有消息,就会经过这个监听器接收到 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { //获取到接收的数据 String text = ((TextMessage)message).getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //关闭接收端,也不会终止程序哦 // consumer.close(); } catch (JMSException e) { e.printStackTrace(); } } }
package com.itsiji.test.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class TestTopicSend { //链接帐号 private String userName = ""; //链接密码 private String password = ""; //链接地址 private String brokerURL = "tcp://127.0.0.1:61616"; //connection的工厂 private ConnectionFactory factory; //链接对象 private Connection connection; //一个操做会话 private Session session; //目的地,其实就是链接到哪一个队列,若是是点对点,那么它的实现是Queue,若是是订阅模式,那它的实现是Topic private Destination destination; //生产者,就是产生数据的对象 private MessageProducer producer; public static void main(String[] args) { TestTopicSend send = new TestTopicSend(); send.start(); } public void start(){ try { //根据用户名,密码,url建立一个链接工厂 factory = new ActiveMQConnectionFactory(userName, password, brokerURL); //从工厂中获取一个链接 connection = factory.createConnection(); //测试过这个步骤不写也是能够的,可是网上的各个文档都写了 connection.start(); //建立一个session //第一个参数:是否支持事务,若是为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。 //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不须要作额外的工做。哪怕是接收端发生异常,也会被看成正常发送成功。 //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会看成发送成功,并删除消息。 //DUPS_OK_ACKNOWLEDGE容许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;并且容许重复确认。 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是链接了一个名为"text-msg"的队列,这个会话将会到这个队列,固然,若是这个队列不存在,将会被建立 //======================================================= //点对点与订阅模式惟一不一样的地方,就是这一行代码,点对点建立的是Queue,而订阅模式建立的是Topic destination = session.createTopic("topic-text"); //======================================================= //从session中,获取一个消息生产者 producer = session.createProducer(destination); //设置生产者的模式,有两种可选 //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存 //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //建立一条消息,固然,消息的类型有不少,如文字,字节,对象等,能够经过session.create..方法来建立出来 TextMessage textMsg = session.createTextMessage("哈哈"); long s = System.currentTimeMillis(); for(int i = 0 ; i < 100 ; i ++){ //发送一条消息 producer.send(textMsg); } long e = System.currentTimeMillis(); System.out.println("发送消息成功"); System.out.println(e - s); //即使生产者的对象关闭了,程序还在运行哦 producer.close(); } catch (JMSException e) { e.printStackTrace(); } } }
package com.itsiji.test.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class TestTopicReceive { //链接帐号 private String userName = ""; //链接密码 private String password = ""; //链接地址 private String brokerURL = "tcp://127.0.0.1:61616"; //connection的工厂 private ConnectionFactory factory; //链接对象 private Connection connection; //一个操做会话 private Session session; //目的地,其实就是链接到哪一个队列,若是是点对点,那么它的实现是Queue,若是是订阅模式,那它的实现是Topic private Destination destination; //生产者,就是产生数据的对象 private MessageProducer producer; public static void main(String[] args) { TestTopicReceive send = new TestTopicReceive(); send.start(); } public void start(){ try { //根据用户名,密码,url建立一个链接工厂 factory = new ActiveMQConnectionFactory(userName, password, brokerURL); //从工厂中获取一个链接 connection = factory.createConnection(); //测试过这个步骤不写也是能够的,可是网上的各个文档都写了 connection.start(); //建立一个session //第一个参数:是否支持事务,若是为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。 //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不须要作额外的工做。哪怕是接收端发生异常,也会被看成正常发送成功。 //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会看成发送成功,并删除消息。 //DUPS_OK_ACKNOWLEDGE容许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;并且容许重复确认。 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是链接了一个名为"text-msg"的队列,这个会话将会到这个队列,固然,若是这个队列不存在,将会被建立 //======================================================= //点对点与订阅模式惟一不一样的地方,就是这一行代码,点对点建立的是Queue,而订阅模式建立的是Topic destination = session.createTopic("topic-text"); //======================================================= //从session中,获取一个消息生产者 producer = session.createProducer(destination); //设置生产者的模式,有两种可选 //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存 //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空 producer.setDeliveryMode(DeliveryMode.PERSISTENT); //建立一条消息,固然,消息的类型有不少,如文字,字节,对象等,能够经过session.create..方法来建立出来 TextMessage textMsg = session.createTextMessage("哈哈"); long s = System.currentTimeMillis(); for(int i = 0 ; i < 100 ; i ++){ //发送一条消息 textMsg.setText("哈哈" + i); producer.send(textMsg); } long e = System.currentTimeMillis(); System.out.println("发送消息成功"); System.out.println(e - s); //即使生产者的对象关闭了,程序还在运行哦 producer.close(); } catch (JMSException e) { e.printStackTrace(); } } }
javax.jms.Message 这个接口,只要是这个接口的数据,均可以被发送
//纯字符串的数据 session.createTextMessage(); //序列化的对象 session.createObjectMessage(); //流,能够用来传递文件等 session.createStreamMessage(); //用来传递字节 session.createBytesMessage(); //这个方法建立出来的就是一个map,能够把它看成map来用,当你看了它的一些方法,你就懂了 session.createMapMessage(); //这个方法,拿到的是javax.jms.Message,是全部message的接口 session.createMessage();