【ActiveMQ】使用学习java
转载:web
一、启动spring
activemq start
二、中止app
activemq stop
http://localhost:8161负载均衡
admin / admintcp
Queue - Point-to-Point (点对点)学习
一条消息只能被一个消费者消费, 且是持久化消息 - 当没有可用的消费者时,该消息保存直到被消费为止;当消息被消费者收到但不响应时(具体等待响应时间是多久,如何设置,暂时还没去了解),该消息会一直保留或会转到另外一个消费者当有多个消费者的状况下。当一个Queue有多可用消费者时,能够在这些消费者中起到负载均衡的做用。测试
Topic - Publisher/Subscriber Model (发布/订阅者)url
一条消息发布时,全部的订阅者都会收到,topic有2种模式,Nondurable subscription(非持久订阅)和durable subscription (持久化订阅 - 每一个持久订阅者,都至关于一个持久化的queue的客户端), 默认是非持久订阅。spa
持久化:消息产生后,会保存到文件/DB中,直到消息被消费, 如上述Queue的持久化消息。默认保存在ActiveMQ中:%ActiveMQ_Home%/data/kahadb
非持久化:消息不会保存,若当下没有可用的消费者时,消息丢失。
Spring Boot 中使用
配置 JmsTemplate 和 DefaultJmsListenerContainerFactory
package ycx.activemq.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.core.JmsTemplate; import javax.jms.ConnectionFactory; @Configuration @EnableJms public class ActiveMQConfig { public static final String MY_QUEUE = "ycx.queue"; public static final String MY_TOPIC = "ycx.topic"; @Bean("queueJmsTemplate") public JmsTemplate queueJmsTemplate(ConnectionFactory connectionFactory) { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); jmsTemplate.setDefaultDestinationName(MY_QUEUE); return jmsTemplate; } @Bean("queueContainerFactory") public DefaultJmsListenerContainerFactory queueContainerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setSessionTransacted(true); factory.setConcurrency("1"); factory.setRecoveryInterval(1000L); return factory; } @Bean("topicJmsTemplate") public JmsTemplate topicJmsTemplate(ConnectionFactory connectionFactory) { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); jmsTemplate.setDefaultDestinationName(MY_QUEUE); jmsTemplate.setPubSubDomain(true); return jmsTemplate; } @Bean("topicContainerFactory") public DefaultJmsListenerContainerFactory topicContainerFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setSessionTransacted(true); factory.setConcurrency("1"); factory.setRecoveryInterval(1000L); factory.setPubSubDomain(true); return factory; } }
链接工厂使用自动注入进来的,若是不想使用默认的能够自动配置
spring: activemq: broker-url: tcp://localhost:61616 user: admin password: admin
或者在 java中指定
@Bean public ConnectionFactory connectionFactory() { return new ActiveMQConnectionFactory(MY_USERNAME, MY_PASSWORD, MY_BROKER_URL); }
定义监听器
Queue监听器A
package ycx.activemq.listener; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import ycx.activemq.config.ActiveMQConfig; @Component public class QueueMessageListenerA { @JmsListener(destination = ActiveMQConfig.MY_QUEUE, containerFactory = "queueContainerFactory") public void handleMessage(String msg) { System.out.println("queue A >>> " + msg); } }
Queue监听器B
package ycx.activemq.listener; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import ycx.activemq.config.ActiveMQConfig; @Component public class QueueMessageListenerB { @JmsListener(destination = ActiveMQConfig.MY_QUEUE, containerFactory = "queueContainerFactory") public void handleMessage(String msg) { System.out.println("queue B >>> " + msg); } }
Topic监听器A
package ycx.activemq.listener; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import ycx.activemq.config.ActiveMQConfig; @Component public class TopicMessageListenerA { @JmsListener(destination = ActiveMQConfig.MY_TOPIC, containerFactory = "topicContainerFactory") public void handleMessage(String msg) { System.out.println("topic A >>> " + msg); } }
Topic监听器B
package ycx.activemq.listener; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import ycx.activemq.config.ActiveMQConfig; @Component public class TopicMessageListenerB { @JmsListener(destination = ActiveMQConfig.MY_TOPIC, containerFactory = "topicContainerFactory") public void handleMessage(String msg) { System.out.println("topic B >>> " + msg); } }
Topic监听器C
package ycx.activemq.listener; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import ycx.activemq.config.ActiveMQConfig; @Component public class TopicMessageListenerC { @JmsListener(destination = ActiveMQConfig.MY_TOPIC, containerFactory = "topicContainerFactory") public void handleMessage(String msg) { System.out.println("topic C >>> " + msg); } }
测试
package ycx.activemq; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.jms.core.JmsTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import ycx.activemq.config.ActiveMQConfig; import java.time.LocalTime; import java.util.HashMap; import java.util.Map; @SpringBootApplication @RestController public class ActivemqServerApplication { public static void main(String[] args) { SpringApplication.run(ActivemqServerApplication.class, args); } @Autowired @Qualifier("queueJmsTemplate") JmsTemplate queueJmsTemplate; @Autowired @Qualifier("topicJmsTemplate") JmsTemplate topicJmsTemplate; @RequestMapping("/queue") public Object queue() { String content = "queue send: " + LocalTime.now().toString(); queueJmsTemplate.convertAndSend(ActiveMQConfig.MY_QUEUE, content); Map<String, String> res = new HashMap<>(); res.put("content", content); return res; } @RequestMapping("/topic") public Object topic() { String content = "topic send : " + LocalTime.now().toString(); topicJmsTemplate.convertAndSend(ActiveMQConfig.MY_TOPIC, content); Map<String, String> res = new HashMap<>(); res.put("content", content); return res; } }
访问地址: http://localhost:8080/topic
订阅 收到消息,全部的监听器都受到消息
topic B >>> topic send : 12:22:05.024 topic C >>> topic send : 12:22:05.024 topic A >>> topic send : 12:22:05.024
访问地址:http://localhost:8080/queue
队列 收到消息,只有一个监听器收到消息
queue B >>> queue send: 12:22:58.491