springboot中activemq消息队列同时支持queue和topic消息

上一篇文章《SpringBoot2 集成 activeMQ 消息队列》中讲到springboot2集成activemq,使用queue和topic发送消息。java

最后实现topic发布订阅消息时,遇到一个问题,调用http://127.0.0.1:8080/queue接口,消费者没有收到消息。web

是由于咱们的配置致使的,咱们设定了系统中的消息类别为topic:spring

pub-sub-domain: true

为了同时支持topic和queue,只须要在消费者customer端改造一下。
新增一个配置类,我这里加了线程池,你们测试的时候能够去掉。springboot

/** * @author linyun * @date 2018/12/3 13:51 */
@Slf4j
@Configuration
public class ActiveConfig {

    private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("activemq-pool-%d").setDaemon(true).build();
    private static ExecutorService pool;

    static {
        if (pool == null) {
            pool = new ThreadPoolExecutor(6, 6,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(6), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
        }
    }

    /** * 处理topic消息 * @param connectionFactory * @return */
    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConnectionFactory(connectionFactory);
        factory.setTaskExecutor(pool);
        return factory;
    }

    /** * 处理queue消息 * @param connectionFactory * @return */
    @Bean
    public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(false);
        factory.setConnectionFactory(connectionFactory);
        factory.setTaskExecutor(pool);
        return factory;
    }

}

而后改造一下Customer,为每一个监听,加上containerFactory解析消息。dom

@Slf4j
@EnableJms
@Component
public class Customer {
    @JmsListener(destination = "queue01", containerFactory = "queueListenerFactory")
    public void customer(String msg) {
        System.out.println("接收到的消息:");
        System.out.println(msg);
    }

    @JmsListener(destination = "delaySend01", containerFactory = "queueListenerFactory")
    public void customer2(String msg) {
        log.info("接收延时消息:" + msg);
    }

    @JmsListener(destination = "topic01", containerFactory = "topicListenerFactory")
    public void customer3(String msg) {
        log.info("收到订阅消息:" + msg);
    }

}

效果:
在这里插入图片描述
在这里插入图片描述svg

以上。测试