SpringBoot集成activeMQ实现Topic发布/订阅模式通信

    上一期我们讲了SpringBoot集成activeMQ实现Queue模式点对点通信,这一期我们接着讲SpringBoot集成activeMQ实现Topic发布/订阅模式通信

    发布/订阅模式通信是对点对点通信模式的扩展。Queue模式下一个人发送的消息只能由一个人接收,而Topic模式下,A发送的消息,可以被所有监听A的对象的接收,即:

    ①:一个消息可以被多个服务接收

    ②:订阅一个主题的消费者,只能消费自它订阅之后发布的消息。

    ③:消费端如果在生产端发送消息之后启动,是接收不到消息的,除非生产端对消息进行了持久化(例如广播,只有当时听到的人能听到信息)

下面我们来看看代码中的实现:

①:pom.xml和Queue模式下的配置相同

②:application.yml的配置

较Queue模式下多配置了一个topic: wucy-topic

spring:

  activemq:

    broker-url: tcp://127.0.0.1:61616

    user: admin

    password: admin

queue: wucy-queue

topic: wucy-topic

server:

  port: 8080

③:关于queue的配置ActivemqConfig.java

因为activeMQ默认不接受Topic消息,所以需要配置topicListenerContainer来开启

@Configuration

public class ActivemqConfig {

@Value("${queue}")

private String queue;

 

@Value("${topic}")

private String topic;

 

@Bean

public Queue wucyQueue() {

return new ActiveMQQueue(queue);

}

 

@Bean

public Topic wucyTopic() {

return new ActiveMQTopic(topic);

}

 

/**

 * JmsListener注解默认只接收queue消息,如果要接收topic消息,需要设置containerFactory

 */

@Bean

    public JmsListenerContainerFactory<?> topicListenerContainer(ConnectionFactory activeMQConnectionFactory) {

        DefaultJmsListenerContainerFactory topicListenerContainer = new DefaultJmsListenerContainerFactory();

        topicListenerContainer.setPubSubDomain(true);

        topicListenerContainer.setConnectionFactory(activeMQConnectionFactory);

        return topicListenerContainer;

    }

}

④:创建生产者

@RequestMapping("/registerTopic")

public String registerTopic(String name) {

long startTime = System.currentTimeMillis();

 

// 数据库的操作

try {

Thread.sleep(50);

// 为了提高用户体验

// 发短信,去调用别人的API

// mqServer.send("发送短信*******");

// Thread.sleep(1000);

// 发邮件,qq发邮件的smtp

JSONObject json = new JSONObject();

json.put("type", "email");

json.put("to", "[email protected]");

json.put("content", "恭喜你注册Topic成功,"+name);

 

mqServer.send(topic, json.toJSONString());

// Thread.sleep(1000);

 

} catch (InterruptedException e) {

}

long endTime = System.currentTimeMillis();

return "你注册Topic成功,用户名为:" + name + ",耗时:" + (endTime - startTime);

}

⑤:创建消费者

@JmsListener(destination = "${topic}", containerFactory = "topicListenerContainer")

public void receive(String msg) {

System.out.println("topic监听器2收到msg:" + msg);

 

JSONObject parseObject = JSONObject.parseObject(msg);

String type = (String) parseObject.get("type");

String to = (String) parseObject.get("to");

String content = (String) parseObject.get("content");

if ("email".equals(type)) {

System.out.println("发送邮件到:"+to+",内容为:"+content);

}

}

 

@JmsListener(destination = "${topic}", containerFactory = "topicListenerContainer")

public void receive3(String msg) {

System.out.println("topic监听器3收到msg:" + msg);

 

JSONObject parseObject = JSONObject.parseObject(msg);

String type = (String) parseObject.get("type");

String to = (String) parseObject.get("to");

String content = (String) parseObject.get("content");

if ("email".equals(type)) {

System.out.println("发送邮件到:"+to+",内容为:"+content);

}

}

实现完成了,启动BootMQApp,通过访问http://127.0.0.1:8080/user/registerTopic?name=wucy实现生产消息,我们可以看到:

 

 

WX公众号关注码技术秘圈并后台私信“ActiveMQ”获取源码