一:介绍java
1.模式ide
2.使用场景spa
一个生产者,多个消费者code
每个消费者都有本身的队列blog
生产者没有直接把消息发送给队列,而是发送到了交换机rabbitmq
每个队列都要绑定到交换机队列
能够实现一个消息被多个消费者消费。utf-8
二:程序get
1.生产者it
1 package com.mq.PubSubFanout; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 public class FanoutSend { 8 private static final String EXCHANGE_NAME="text_exchange_fanout"; 9 public static void main(String[] args) throws Exception { 10 //获取一个链接 11 Connection connection= ConnectionUtil.getConnection(); 12 //从链接中获取一个通道 13 Channel channel=connection.createChannel(); 14 //建立交换机 15 channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); 16 17 //消息 18 String msg="hello pubsub"; 19 20 //发送 21 channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes()); 22 23 System.out.println("send msg:"+msg); 24 //关闭链接 25 channel.close(); 26 connection.close(); 27 } 28 }
2.消费者一
1 package com.mq.PubSubFanout; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class FanoutReceive1 { 9 private static final String EXCHANGE_NAME="text_exchange_fanout"; 10 private static final String QUENE_NAME="test_fanout_queue_email"; 11 public static void main(String[] args)throws Exception{ 12 //获取一个链接 13 Connection connection = ConnectionUtil.getConnection(); 14 //建立通道 15 final Channel channel = connection.createChannel(); 16 //建立队列声明 17 channel.queueDeclare(QUENE_NAME,false,false,false,null); 18 19 //绑定交换机 20 channel.queueBind(QUENE_NAME,EXCHANGE_NAME,""); 21 22 //一次只能发送一个消息 23 channel.basicQos(1); 24 25 //建立消费者 26 DefaultConsumer consumer=new DefaultConsumer(channel){ 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 29 String msg=new String(body,"utf-8"); 30 System.out.println("[1]receive msg:"+msg); 31 try { 32 Thread.sleep(200); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 }finally { 36 System.out.println("done"); 37 //手动应答 38 channel.basicAck(envelope.getDeliveryTag(),false); 39 } 40 } 41 }; 42 //监听队列,不是自动应答 43 boolean autoAck=false; 44 channel.basicConsume(QUENE_NAME,autoAck,consumer); 45 } 46 }
3.消费者二
1 package com.mq.PubSubFanout; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class FanoutReceive2 { 9 private static final String EXCHANGE_NAME="text_exchange_fanout"; 10 private static final String QUENE_NAME="test_fanout_queue_ems"; 11 public static void main(String[] args)throws Exception{ 12 //获取一个链接 13 Connection connection = ConnectionUtil.getConnection(); 14 //建立通道 15 final Channel channel = connection.createChannel(); 16 //建立队列声明 17 channel.queueDeclare(QUENE_NAME,false,false,false,null); 18 19 //绑定交换机 20 channel.queueBind(QUENE_NAME,EXCHANGE_NAME,""); 21 22 //一次只能发送一个消息 23 channel.basicQos(1); 24 25 //建立消费者 26 DefaultConsumer consumer=new DefaultConsumer(channel){ 27 @Override 28 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 29 String msg=new String(body,"utf-8"); 30 System.out.println("[2]receive msg:"+msg); 31 try { 32 Thread.sleep(200); 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 }finally { 36 System.out.println("done"); 37 //手动应答 38 channel.basicAck(envelope.getDeliveryTag(),false); 39 } 40 } 41 }; 42 //监听队列,不是自动应答 43 boolean autoAck=false; 44 channel.basicConsume(QUENE_NAME,autoAck,consumer); 45 } 46 }
4.效果
send:
receive1:
receive2:
5.运行注意点
若是之间运行receive类,会发现报错,由于没有交换机。
因此,能够先运行send类,虽然交换机不能存储发送的消息,可是能够建立交换机。
而后,就能够按照原来的方式。
先启动两个消费者进行监听,而后启动生产者。
现象:就是消费者都获取到了生产者生产的消息。