ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。由多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP。
ActiveMQ也对Spring支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性。
一种是点对点的(Queue),即一个生产者和一个消费者一一对应; 另一种是发布/订阅模式(Topic),即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
ActiveMQ定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
• StreamMessage – Java原始值的数据流
• MapMessage–一套名称-值对
• TextMessage–一个字符串对象
• ObjectMessage–一个序列化的 Java对象
• BytesMessage–一个字节的数据流
最常用的就是TextMessage
1.进入http://activemq.apache.org/下载ActiveMQ。
2. 安装环境:需要jdk,这里还用centOS系统进行安装。
3. 将ActiveMQ 的压缩包上传到Linux系统,解压缩。使用bin目录下的activemq命令启动:
[[email protected] bin]# ./activemq start 关闭: [[email protected] bin]# ./activemq stop 查看状态: [[email protected] bin]# ./activemq status
注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建议使用5.11.2
4. 启动之后,就可以进行访问ActiveMQ的管理后台
http://192.168.25.129:8161/
8186是ActiveMQ默认的启动时占用的端口号
第一次进入需要输入用户名和密码,都是admin,这也是ActiveMQ自己设置的。
Producer表示消息的生产者,用于发送消息,它的使用如下:
先添加activeMq的jar包,使用5.11.2版本的jar包
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.11.2</version> </dependency>
/** * 点到点形式发送消息 * @throws Exception */ @Test public void testQueueProducer() throws Exception{ //1.创建一个链接工厂对象,需要指定activemq的服务id和端口 //tcp://192.168.25.129 这种ip形式是activeMq规定的,61616是默认端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616"); //2.使用工厂对象,创建一个Connection对象 Connection connection = connectionFactory.createConnection(); //3.开启连接,就是调用Connection对象的start方法 connection.start(); //4.创建一个Session对象,需要指定两个参数 //参数一:是否开启事务,如果true开始事务,则第二个参数就没有意义,一般不开启 //参数二:应答模式。自动应答或者手动应答,一般自动应答 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.使用Session对象创建Destination对象,它有两个子接口,Queue和Topic,这里使用Queue // 参数:队列的名称。 Queue queue = session.createQueue("test-queue"); //6.使用Session对象创建一个Producer对象(生产者),这里使用它的子接口MessageProducer MessageProducer messageProducer = session.createProducer(queue); //7.创建一个Message对象,这是消息内容,这里使用TextMessage,这也是最常用的 /* 方式一: * TextMessage message = new ActiveMQTextMessage(); message.setText("hello ActiveMq");*/ //方式二:使用session直接创建 TextMessage message = session.createTextMessage("hello ActiveMq"); //8.发送消息 messageProducer.send(message); //9.关闭资源,要关闭所有的资源 messageProducer.close(); session.close(); connection.close(); }
执行之后
因为此时还没有消费者启动,所以消息一直持久化在服务中。
Consumer表示消息的消费者,接收消息
/** * 点对点形式接受消息 * @throws Exception */ @Test public void testQueueConsumer() throws Exception{ //1.创建一个链接工厂对象,需要指定activemq的服务id和端口 //tcp://192.168.25.129 这种ip形式是activeMq规定的,61616是默认端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616"); //2.使用工厂对象,创建一个Connection对象 Connection connection = connectionFactory.createConnection(); //3.开启连接,就是调用Connection对象的start方法 connection.start(); //4.创建一个Session对象,需要指定两个参数 //参数一:是否开启事务,如果true开始事务,则第二个参数就没有意义,一般不开启 //参数二:应答模式。自动应答或者手动应答,一般自动应答 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.使用Session对象创建Destination对象,它有两个子接口,Queue和Topic,这里使用Queue // 参数:队列的名称。(注意:这里的队列的名称,要与消息生产者指定的名称一样,才能接收到指定队列的消息) Queue queue = session.createQueue("test-queue"); //6.使用session创建一个消费者对象 MessageConsumer consumer = session.createConsumer(queue); //7.接收消息,这里是让消费者对象consumer监听一个MessageListener事件,当有消息从生产者发送出来时,这里就马上接收到消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //接受内容,打印 TextMessage textMessage = (TextMessage) message; try { String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //等待接受消息,因为不知道消息什么时候发送,所以这里为了测试,不直接关闭,在这里一直等待消息 System.in.read(); //表示用户敲击在控制台输入任意键,才继续往下执行 //9.关闭资源,要关闭所有的资源 consumer.close(); session.close(); connection.close(); }
执行之后,可以在控制台看到输出 hello ActiveMq 字符串,此时如果不在控制台操作,则消费者会一直启动者,当再去运行消息的发送方法时,消息就会马上打印在控制台,因为消费者一直在监听消息的产生。
执行消息接收之后,ActivaMq的管理后台为就能看到,队列中的消息数变成了0,所以使用Queue形式,服务会默认保存没有被消费的消息,直到有消费方接收消息。
Producer表示消息的生产者
/** * 发布&订阅形式(Topic)发送消息 * @throws Exception */ @Test public void testTopicProducer() throws Exception{ //1.创建一个链接工厂对象,需要指定activemq的服务id和端口 //tcp://192.168.25.129 这种ip形式是activeMq规定的,61616是默认端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616"); //2.使用工厂对象,创建一个Connection对象 Connection connection = connectionFactory.createConnection(); //3.开启连接,就是调用Connection对象的start方法 connection.start(); //4.创建一个Session对象,需要指定两个参数 //参数一:是否开启事务,如果true开始事务,则第二个参数就没有意义,一般不开启 //参数二:应答模式。自动应答或者手动应答,一般自动应答 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.使用Session对象创建Destination对象,它有两个子接口,Queue和Topic,这里使用Topic // 参数:话题的名称 Topic topic = session.createTopic("test-topic1"); //6.使用Session对象创建一个Producer对象(生产者),这里使用它的子接口MessageProducer MessageProducer messageProducer = session.createProducer(topic); //7.创建一个Message对象,这是消息内容,这里使用TextMessage,这也是最常用的 /* 方式一: * TextMessage message = new ActiveMQTextMessage(); message.setText("hello ActiveMq");*/ //方式二:使用session直接创建 TextMessage message = session.createTextMessage("hello ActiveMq"); //8.发送消息 messageProducer.send(message); //9.关闭资源,要关闭所有的资源 messageProducer.close(); session.close(); connection.close(); }
执行发布消息之后
从这个可以看到,使用Topic形式时,消息发送出来,如果没有消费者接收,则消息不会保存在服务中。
Consumer表示消息的消费者,接收消息
/** * 发布&订阅形式(Topic)接受消息 * @throws Exception */ @Test public void testTopicConsumer() throws Exception{ //1.创建一个链接工厂对象,需要指定activemq的服务id和端口 //tcp://192.168.25.129 这种ip形式是activeMq规定的,61616是默认端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.129:61616"); //2.使用工厂对象,创建一个Connection对象 Connection connection = connectionFactory.createConnection(); //3.开启连接,就是调用Connection对象的start方法 connection.start(); //4.创建一个Session对象,需要指定两个参数 //参数一:是否开启事务,如果true开始事务,则第二个参数就没有意义,一般不开启 //参数二:应答模式。自动应答或者手动应答,一般自动应答 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.使用Session对象创建Destination对象,它有两个子接口,Queue和Topic,这里使用Queue // 参数:话题的名称。(注意:这里的话题的名称,要与消息生产者指定的名称一样,才能接收到指定队列的消息) Topic topic = session.createTopic("test-topic1"); //6.使用session创建一个消费者对象 MessageConsumer consumer = session.createConsumer(topic); //7.接收消息,这里是让消费者对象consumer监听一个MessageListener事件,当有消息从生产者发送出来时,这里就马上接收到消息 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { //接受内容,打印 TextMessage textMessage = (TextMessage) message; try { String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //这里显示当前启动的消费者的index,为了测试,这里启动三个消费者,index分别是1,2,3 System.out.println("topic消费者1启动..."); //等待接受消息,因为不知道消息什么时候发送,所以这里为了测试,不直接关闭,在这里一直等待消息 System.in.read(); //表示用户敲击在控制台输入任意键,才继续往下执行 //9.关闭资源,要关闭所有的资源 consumer.close(); session.close(); connection.close(); }
当启动三个消息消费者之后,再次执行发送消息的方法,结果如下
可以看到三个消费者都接受到了消息,而ActivaMq的管理后台中,也能看到test-topic1这个话题,有了三个消费者,同时,也有了三个消息的出队