RabbitMQ 之topics (通配符)篇 初学

官网地址:https://www.rabbitmq.com/getstarted.htmlhtml

1、RabbitMQ简介java

  MQ全称为Message Queue,即消息队列, RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message
Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通讯方法,消息队列在分布式系统开
发中应用很是普遍。并发

2、开发中消息队列一般有以下应用场景:
  一、任务异步处理。
将不须要同步处理的而且耗时长的操做由消息队列通知消息接收方进行异步处理。提升了应用程序的响应时间。
  二、应用程序解耦合
  MQ至关于一个中介,生产方经过MQ与消费方交互,它将应用程序进行解耦合。
  市场上还有哪些消息队列?
    ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
3、为何使用RabbitMQ呢?
  一、使得简单,功能强大。
  二、基于AMQP协议。
  三、社区活跃,文档完善。
  四、高并发性能好,这主要得益于Erlang语言。
  五、Spring Boot默认已集成RabbitMQ异步

4、RabbitMQ的全部模式(如下截图都是官方连接里面的)maven

 

 

 

 

5、代码举例Topics实现tcp

准备工做:下载RabbitMQ客户端: http://www.rabbitmq.com/download.html分布式

     下载erlang,由于RabbitMQ是erlang语言开发的,因此须要下载:  http://erlang.org/download/otp_win64_20.3.exe高并发

下载好之后进行安装。安装完成后进行开发测出:性能

一、首先第一步须要导入RabbitMQ的java客户端,我这里是建立的maven项目,因此直接导入依赖,以下:测试

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.1.2</version>
<scope>test</scope>
</dependency>

注意:通配符有两个:
  1. # :表明没有或一个或多个单词(单词与单词之间用“.”分割);
  2. * :表明一个零个或一个单词;
例如:
  aa.#.bb.* :
      匹配的: aa.HH.CC.bb 、 aa.bb 、aa.bb.cc
      不匹配: aa.cc 、 aa.bb.cc.dd
二、建立工程项目以及生产者P和消费者C1与C2(假设C1是短信SMS接收者,C2是邮件Email接收者)

 

三、编写生产者P类:

package com.test.rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class P {
//队列名称
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
//交换机名称
private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";

public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
//建立链接工厂
try {
//建立一个与MQ的链接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
//建立一个tcp链接
connection = factory.newConnection();
//经过链接建立一个通道,每一个通道表明一个会话
channel = connection.createChannel();
/**
* 声明交换机:参数以下
* 一、交换机名称
* 二、交换机类型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
/**
* 声明队列:参数以下:
* 一、队列名称
* 二、是否持久化
* 三、是否独占此队列
* 四、队列不用是否自动删除
* 五、参数
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);

//发送邮件Email消息
for (int i = 0; i < 10; i++) {
String message = "email inform to user : " + i;
/**
* 向交换机发送消息 :参数明细
* 一、交换机名称,不指令使用默认交换机名称 Default Exchange
* 二、routingKey(路由key),根据key名称将消息转发到具体的队列,这里填写队列名称表示消息将发到此队列
* 三、消息属性
* 四、消息内容
*/
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//发送短信消息
for (int i = 0; i < 10; i++) {
String message = "sms inform to user : " + i;
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//发送短信和邮件消息
for (int i = 0; i < 10; i++) {
String message = "sms and email inform to user" + i;
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
4.编写消费者C1类:email接受者
package com.test.rabbitmq;

import com.rabbitmq.client.*;

public class C1 {
//队列名称
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
//交换机名称
private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";

public static void main(String[] args) {
Connection connection = null;
Channel channel = null;

try {
//建立一个与MQ的链接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
//建立一个tcp链接
connection = factory.newConnection();
//经过链接建立一个通道,每一个通道表明一个会话
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);

//绑定email通知队列
channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM, "inform.#.email.#");

//消费消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received Email:'" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(QUEUE_INFORM_EMAIL, true, deliverCallback, consumerTag -> {
});

} catch (Exception e) {
e.printStackTrace();
}
}
}
5.编写消费者C2类 SMS接收者
package com.test.rabbitmq;

import com.rabbitmq.client.*;

public class C2 {
//队列名称
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
//交换机名称
private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";

public static void main(String[] args) {
Connection connection = null;
Channel channel = null;

try {
//建立一个与MQ的链接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
//建立一个tcp链接
connection = factory.newConnection();
//经过链接建立一个通道,每一个通道表明一个会话
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);

//绑定email通知队列
channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_TOPICS_INFORM, "inform.#.sms.#");

//消费消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received SMS:'" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(QUEUE_INFORM_SMS, true, deliverCallback, consumerTag -> {
});

} catch (Exception e) {
e.printStackTrace();
}
}
}
6.测试:
启动所有服务:先发送ssm消息:

 

 

 同时生成ssm消息和email消息并发送:

 

 

测试结果:
相关文章
相关标签/搜索