概念:html
消息队列(Message Queue,简称MQ),本质是个队列,FIFO先入先出,只不过队列中存放的内容是一些Message
。java
传统模式spring
使用队列解耦json
例如:淘宝秒杀活动等。服务器
一个线程负责生产,一个线程负责总队列里取出来消费。app
一个线程生产,多个线程取出来消费。异步
一个发布者发送消息,多个订阅者能够同时获取到发布的消息ide
生产者发送的消息会发往每一个与其绑定的队列。学习
AMQP协议: Advanced Message Queuing Protocol 高级消息队列协议,是一个异步消息传递所使用的应用层协议规范。fetch
组成:
服务主机:接收客户端请求,并做出相应处理
虚拟主机:一个服务器能够开启多个Virtual Host,每一个虚拟主机都能提供完整的服务,有本身的权限控制。
生产者:发送消息
消费者:接收消息并处理
交换器:RabbitMQ中,消息不是直接发往队列,而是要先给交换器,而后交换器按照必定的路由规则发送到相对应的队列上。
路由Key:发送消息时要指定交换器和路由Key
绑定:将队列和交换器绑定起来路由Key做为绑定时的关键字
队列:消息的载体,消费者从队列中获取消息,路由器根据路由规则把消息发往对应的队列。
消息:队列中存储的信息单元。
生产者发送消息时指定交换器名和路由Key,而后交换器根据路由Key与绑定信息进行比对,找到对应的队列后将信息发送出去。
消费者监听某个队列,若是有消息就取出来作对应操做,没有就阻塞。
Direct:固定名称匹配,只有路由Key与绑定的Key一致才会将消息发送到该队列。
Topic:主题模式,路由Key能够用*#来填充
#能够匹配任意多个单词,*只能匹配一个单词
好比bingdKey x.y x.y.z a.y a.b.z
x.*只能匹配x.y,而x.#能够匹配x.y 和 x.y.z
Fanout:广播模式
安装RabbitMQ,并启动服务。默认用户名密码guest。建立VirtualHost。
publicclass Recv { publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //声明队列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "hola"; //绑定队列,经过键 hola将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, routingKey);
//消费消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr);
} }); } } |
publicclass Send { publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "hola"; //发布消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close(); conn.close();
} } |
与1:1的相似,只不过两个消费者共同消费一个队列内的信息,复制一份消费者便可。
生产者发送的消息,发往每一个订阅他的消费者那里。全部消费者均可以获取相同的信息。
发布者A
publicclass Send {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "王力宏"; channel.exchangeDeclare(exchangeName, "fanout", true);
//发布消息 byte[] messageBodyBytes = "王力宏发布的消息:啦啦啦啦啦".getBytes(); channel.basicPublish(exchangeName, "", null, messageBodyBytes);
channel.close(); conn.close(); } } |
发布者B
publicclass Send2 {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "赵薇"; channel.exchangeDeclare(exchangeName, "fanout", true);
//发布消息 byte[] messageBodyBytes = "赵薇发布的消息:啊啊啊啊".getBytes(); channel.basicPublish(exchangeName, "", null, messageBodyBytes);
channel.close(); conn.close(); } } |
订阅者A1、A2
publicclass Recv {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "王力宏"; channel.exchangeDeclare(exchangeName, "fanout", true); //声明队列 String queueName = channel.queueDeclare().getQueue(); //绑定队列,经过键 hola将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, "");
while(true) { //消费消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); } } } |
订阅者B1、B2
publicclass Recv3 {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); channel.exchangeDeclare("赵薇", "fanout", true); //声明交换器 String exchangeName = "赵薇"; //声明队列 String queueName = channel.queueDeclare().getQueue(); //绑定队列,经过键 hola将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, "");
while(true) { //消费消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr);
} }); } }
} |
生产者
publicclass Send {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "topic-exchange"; channel.exchangeDeclare(exchangeName, "topic", true);
// String routingKey = "#.B";
//发布消息 for (inti = 0; i < 3; i++) { channel.basicPublish(exchangeName, routingKey, null, "ss".getBytes()); } //byte[] messageBodyBytes = "匹配消息".getBytes(); //channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close(); conn.close(); } } |
消费者A
publicclass Recv {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "topic-exchange"; channel.exchangeDeclare(exchangeName, "topic", true); //声明队列 String queueName = channel.queueDeclare("X", false, false, false, null).getQueue();
String routingKey = "X.A"; //绑定队列,经过键 hola将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, routingKey);
while(true) { //消费消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); String bodyStr = new String(body); System.out.println(bodyStr); } }); } } } |
消费者B,routingKey routingKey.A
消费者C,routingKey routingKey.B
7-5、广播模式,不须要管routingKey和bindingKey是否匹配。
生产者
publicclass Send {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "fanout-exchange"; channel.exchangeDeclare(exchangeName, "fanout", true);
// String routingKey = "hola";
//发布消息 byte[] messageBodyBytes = "群发消息".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close(); conn.close(); } } |
消费者
publicclass Recv {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "fanout-exchange"; channel.exchangeDeclare(exchangeName, "fanout", true); //声明队列 String queueName = channel.queueDeclare().getQueue();
String routingKey = "hola2"; //绑定队列,经过键 hola将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, routingKey);
while(true) { //消费消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); String bodyStr = new String(body); System.out.println(bodyStr);
} }); } } } |
8-1、添加依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>x.x.x</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>x.x.xRELEASE</version> </dependency> |
8-2、配置文件中加入rabbit服务链接配置
mq.host=real_host mq.username=guest mq.password=guest mq.port=5672 mq.vhost=real_vhost |
8-3、新建application-mq.xml文件,添加配置信息
主要用来配置链接信息、Producer配置、队列声明、交换器声明、队列与交换器的绑定、队列的监听器配置(即消费者)等。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd"> <!-- 全局配置 --> <!-- 定义RabbitMQ的链接工厂 --> <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}" virtual-host="${mq.vhost}"/> <!-- MQ的管理,包括队列、交换器等 --> <rabbit:admin connection-factory="connectionFactory"/>
<!-- Sender配置 --> <!-- spring template声明--> <!-- 能够不指定交换器,在每次发送请求时须要指明发给哪一个交换器 <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/> --> <rabbit:template exchange="test" id="amqpTemplate" connection-factory="connectionFactory"/><!-- message-converter="jsonMessageConverter" /> --> <!-- 消息对象json转换类 --> <!-- <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" /> -->
<!--定义queue 说明:durable:是否持久化 exclusive: 仅建立者可使用的私有队列,断开后自动删除 auto_delete: 当全部消费客户端链接断开后,是否自动删除队列--> <rabbit:queue name="mq.A" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="mq.B" durable="true" auto-delete="false" exclusive="false" /> <rabbit:queue name="mq.C" durable="true" auto-delete="false" exclusive="false" />
<!-- 定义交换机,而且完成队列和交换机的绑定 --> <rabbit:direct-exchange name="test" durable="true" auto-delete="false" id="test"> <rabbit:bindings> <rabbit:binding queue="mq.A" key="key.A"/> <rabbit:binding queue="mq.B" key="key.B"/> <rabbit:binding queue="mq.C" key="key.C"/> </rabbit:bindings> </rabbit:direct-exchange>
<!-- queues:监听的队列,多个的话用逗号(,)分隔 ref:监听器 --> <!-- 配置监听 acknowledeg = "manual" 设置手动应答 当消息处理失败时:会一直重发 直到消息处理成功 --> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="manual" prefetch="1"> <!-- 配置监听器 --> <rabbit:listener queues="mq.A" ref="listenerA"/> <rabbit:listener queues="mq.B" ref="listenerB"/> <rabbit:listener queues="mq.C" ref="listenerC"/> </rabbit:listener-container> </beans> |
生产者:Spring提供的AmqpTemplate,使用注解注入便可使用
@Autowired private AmqpTemplate amqpTemplate; publicvoid sendMsg(String msg) { amqpTemplate.convertAndSend("routingKey", "发送了消息A"); } |
监听器:即收到队列的消息后做何处理,要实现ChannelAwareMessageListener
例如:监听器listenerA
@Component publicclass ListenerA implements ChannelAwareMessageListener {
privatefinalstatic Log logger = LogFactory.getLog(ListenerA.class);
@Override publicvoid onMessage(Message message, Channel channel) throws Exception { // TODO Auto-generated method stub String msg = new String(message.getBody()); System.out.println("A received : " + msg); logger.error(msg); } } |
有可能遇到程序崩溃或者Rabbit服务器宕机的状况,那么若是没有持久化机制,全部数据都会丢失。
Durable:是否持久化参数设为True便可
channel.exchangeDeclare(exchangeName, type, true);
在以前,消息分发给consumer后当即就会被标记为已消费,这时候若是consumber接到了一个消息可是尚未来的及处理就异常退出,那么这个消息的状态是已被消费的,因而就会形成消息丢失的问题。
处理的代码也很简单,一共有两个步骤。第一个把autoAck改为false
//消费结果须要进行确认
channel.BasicConsume("firstTest", false, consumer);
第二部分就是在咱们消费完成后进行确认
//进行交付,肯定此消息已经处理完成
channel.BasicAck(deliveryTag: e.DeliveryTag, multiple: false);
若是没有进行确认queue会把这个消息交给其它的consumer去处理,若是没有交付的代码,那么这个消息会一直存在。
消息持久化步骤:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException; |
exchange表示exchange的名称
routingKey表示routingKey的名称
body表明发送的消息体
MessageProperties.PERSISTENT_TEXT_PLAIN 能够设置为持久化,类型为文本
MessageProperties.PERSISTENT_BASIC 类型为二进制数据
mandatory当mandatory标志位设置为true时,若是exchange没法找到一个队列取转发,就返回给生产者。
immediate当immediate标志位设置为true时,若是exchange要转发的队列上没有消费者时,就返回给生产者。
RabbitMQ可能会遇到的一个问题,即生成者不知道消息是否真正到达broker,那么有没有高效的解决方式呢?答案是采用Confirm模式。
生产者将信道设置成confirm模式,一旦信道进入confirm模式,全部在该信道上面发布的消息都会被指派一个惟一的ID(从1开始),一旦消息被投递到全部匹配的队列以后,broker就会发送一个确认给生产者(包含消息的惟一ID),这就使得生产者知道消息已经正确到达目的队列了,若是消息和队列是可持久化的,那么确认消息会将消息写入磁盘以后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也能够设置basic.ack的multiple域,表示到这个序列号以前的全部消息都已经获得了处理。
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就能够在等信道返回确认的同时继续发送下一条消息,当消息最终获得确认以后,生产者应用即可以经过回调方法来处理该确认消息,若是RabbitMQ由于自身内部错误致使消息丢失,就会发送一条nack消息,生产者应用程序一样能够在回调方法中处理该nack消息。
在channel 被设置成 confirm 模式以后,全部被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。可是没有对消息被 confirm 的快慢作任何保证,而且同一条消息不会既被 confirm又被nack 。
//开启Procedure确认机制 channel.confirmSelect(); //发布消息 channel.basicPublish(exchangeName, routingKey,null,message); //消息发送成功的确认,也能够设置超时时间 if (channel.waitForConfirms([long timeOut]) { System.out.println("send success..."); } else { System.out.println("send failed..."); } |
批量发送消息后再进行确认。
//待确认的序列 SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); //开启确认机制 channel.confirmSelect(); //添加处理事件 channel.addConfirmListener(new ConfirmListener() { publicvoid handleAck(longdeliveryTag, booleanmultiple) throws IOException { if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } System.out.println("发送成功..."); } publicvoid handleNack(longdeliveryTag, booleanmultiple) throws IOException { System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } System.err.println("发送失败..."); } });
//发送消息 for (inti = 0; i < 10; i++) { //获取下个发送序号 longnextSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish(exchangeName, routingKey, null, ("车票ID:" + i).getBytes()); //加入待处理集合中 confirmSet.add(nextSeqNo); //休息0.2s Thread.sleep(200); } |
自动确认,默认是自动确认,即获取消息后,直接确认。
手动确认,给当前消息设置状态,当手动ack后服务端才会删除该消息,若是返回nack,从新入队。
//手动确认 booleanautoAck = false; channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //…其余处理操做 longdeliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); } }); |
https://blog.csdn.net/wangshuminjava/article/details/80998992
深刻解读RabbitMQ工做原理及简单使用
http://www.javashuo.com/article/p-cabanpbf-gk.html
RabbitMQ深刻学习指导
概念:
消息队列(Message Queue,简称MQ),本质是个队列,FIFO先入先出,只不过队列中存放的内容是一些Message
。
传统模式
使用队列解耦
例如:淘宝秒杀活动等。
一个线程负责生产,一个线程负责总队列里取出来消费。
一个线程生产,多个线程取出来消费。
一个发布者发送消息,多个订阅者能够同时获取到发布的消息
生产者发送的消息会发往每一个与其绑定的队列。
AMQP协议: Advanced Message Queuing Protocol 高级消息队列协议,是一个异步消息传递所使用的应用层协议规范。
组成:
服务主机:接收客户端请求,并做出相应处理
虚拟主机:一个服务器能够开启多个Virtual Host,每一个虚拟主机都能提供完整的服务,有本身的权限控制。
生产者:发送消息
消费者:接收消息并处理
交换器:RabbitMQ中,消息不是直接发往队列,而是要先给交换器,而后交换器按照必定的路由规则发送到相对应的队列上。
路由Key:发送消息时要指定交换器和路由Key
绑定:将队列和交换器绑定起来路由Key做为绑定时的关键字
队列:消息的载体,消费者从队列中获取消息,路由器根据路由规则把消息发往对应的队列。
消息:队列中存储的信息单元。
生产者发送消息时指定交换器名和路由Key,而后交换器根据路由Key与绑定信息进行比对,找到对应的队列后将信息发送出去。
消费者监听某个队列,若是有消息就取出来作对应操做,没有就阻塞。
Direct:固定名称匹配,只有路由Key与绑定的Key一致才会将消息发送到该队列。
Topic:主题模式,路由Key能够用*#来填充
#能够匹配任意多个单词,*只能匹配一个单词
好比bingdKey x.y x.y.z a.y a.b.z
x.*只能匹配x.y,而x.#能够匹配x.y 和 x.y.z
Fanout:广播模式
安装RabbitMQ,并启动服务。默认用户名密码guest。建立VirtualHost。
publicclass Recv { publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //声明队列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "hola"; //绑定队列,经过键 hola将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, routingKey);
//消费消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr);
} }); } } |
publicclass Send { publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "hola"; //发布消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close(); conn.close();
} } |
与1:1的相似,只不过两个消费者共同消费一个队列内的信息,复制一份消费者便可。
生产者发送的消息,发往每一个订阅他的消费者那里。全部消费者均可以获取相同的信息。
发布者A
publicclass Send {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "王力宏"; channel.exchangeDeclare(exchangeName, "fanout", true);
//发布消息 byte[] messageBodyBytes = "王力宏发布的消息:啦啦啦啦啦".getBytes(); channel.basicPublish(exchangeName, "", null, messageBodyBytes);
channel.close(); conn.close(); } } |
发布者B
publicclass Send2 {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "赵薇"; channel.exchangeDeclare(exchangeName, "fanout", true);
//发布消息 byte[] messageBodyBytes = "赵薇发布的消息:啊啊啊啊".getBytes(); channel.basicPublish(exchangeName, "", null, messageBodyBytes);
channel.close(); conn.close(); } } |
订阅者A1、A2
publicclass Recv {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "王力宏"; channel.exchangeDeclare(exchangeName, "fanout", true); //声明队列 String queueName = channel.queueDeclare().getQueue(); //绑定队列,经过键 hola将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, "");
while(true) { //消费消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); } } } |
订阅者B1、B2
publicclass Recv3 {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); channel.exchangeDeclare("赵薇", "fanout", true); //声明交换器 String exchangeName = "赵薇"; //声明队列 String queueName = channel.queueDeclare().getQueue(); //绑定队列,经过键 hola将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, "");
while(true) { //消费消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr);
} }); } }
} |
生产者
publicclass Send {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "topic-exchange"; channel.exchangeDeclare(exchangeName, "topic", true);
// String routingKey = "#.B";
//发布消息 for (inti = 0; i < 3; i++) { channel.basicPublish(exchangeName, routingKey, null, "ss".getBytes()); } //byte[] messageBodyBytes = "匹配消息".getBytes(); //channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close(); conn.close(); } } |
消费者A
publicclass Recv {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "topic-exchange"; channel.exchangeDeclare(exchangeName, "topic", true); //声明队列 String queueName = channel.queueDeclare("X", false, false, false, null).getQueue();
String routingKey = "X.A"; //绑定队列,经过键 hola将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, routingKey);
while(true) { //消费消息 booleanautoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); longdeliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); String bodyStr = new String(body); System.out.println(bodyStr); } }); } } } |
消费者B,routingKey routingKey.A
消费者C,routingKey routingKey.B
7-5、广播模式,不须要管routingKey和bindingKey是否匹配。
生产者
publicclass Send {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
//建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "fanout-exchange"; channel.exchangeDeclare(exchangeName, "fanout", true);
// String routingKey = "hola";
//发布消息 byte[] messageBodyBytes = "群发消息".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close(); conn.close(); } } |
消费者
publicclass Recv {
publicstaticvoid main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); // |