在前面的两篇博客中html
遇到的实例都是一个消息只发送给一个消费者(工做者),他们的消息模型分别为(P表明生产者,C表明消费者,红色表明队列):ide
此次咱们来看下将一个消息发送给多个消费者(工做者),这种模式通常被称为“发布/订阅”模式。其工做模型为(P表明生产者,X表明Exchange(路由器/交换机),C表明消费者,红色表明队列):ui
咱们发现,工做模型中首次出现路由器,而且每一个消费者有单独的队列。生产者生成消息后将其发送给路由器,而后路由器转送到队列,消费者各自到本身的队列里面获取消息进行消费。在实际的应用场景中,生产者通常不会直接将消息发送给队列,而是发送给路由器进行中转,Exchange必须清楚的知道怎么处理收到的消息:是将消息发送到一个特定队列仍是多有队列,或者直接废弃消息。这种才符合RabbitMQ消息模型的核心思想。this
接下来咱们详细展开今天的话题:spa
1、Exchange3d
Exchange在咱们的工做模型中首次出现,所以须要详细介绍下。日志
Exchange分为4种类型:code
Direct:彻底根据key进行投递的,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。 Topic:对key进行模式匹配后进行投递,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。 Fanout:不须要key,它采起广播模式,一个消息进来时,投递到与该交换机绑定的全部队列。 Headers:咱们能够不考虑它。
今天咱们的实例采用fanout类型的exchange。htm
尽管首次出现,可是其实咱们前面的案例中也有用到exchange,只是咱们没有给他名字,用的是RabbitMQ默认的,好比下面这段代码,咱们将路由器名这个参数传入了“”,若是咱们须要本身声明exchange的话,这个就不能传入“”了,而是传入本身定义好的值。blog
2、临时队列
前面两篇博客中,咱们都在使用队列的时候给出了定义好的名字,这在生产者和消费者共用相同队列的时候颇有必要,可是咱们有了exchange,生产者不须要知道有哪些队列,所以队列名字能够不用指定了,而是经过RabbitMQ 接口本身去生成临时队列,队列名字也由RabbitMQ自动生成。经过
能够声明一个非持久的、通道独占的、自动删除的队列,getQueue()方法能够获取随机队列名字。这个名字用来在队列和exchange之间创建binding关系的时候使用:
3、代码实现
基于上面exchange和临时队列的知识铺垫,能够展开今天的代码实现了。
public class Product { //exchange名字 public static String EXCHANGE_NAME = "exchange"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.建立链接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.为通道声明exchange和exchange的类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String msg = " hello rabbitmq, this is publish/subscribe mode"; // 3.发送消息到指定的exchange,队列指定为空,由exchange根据状况判断须要发送到哪些队列 channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); System.out.println("product send a msg: " + msg); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { // 4.关闭链接 if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
public class Consumer1 { public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = null; Channel channel = null; try { // 1.建立链接和通道 connection = factory.newConnection(); channel = connection.createChannel(); // 2.为通道声明exchange以及exchange类型 channel.exchangeDeclare(Product.EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 3.建立随机名字的队列 String queueName = channel.queueDeclare().getQueue(); // 4.创建exchange和队列的绑定关系 channel.queueBind(queueName, Product.EXCHANGE_NAME, ""); System.out.println(" **** Consumer1 keep alive ,waiting for messages, and then deal them"); // 5.经过回调生成消费者并进行监听 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { // 获取消息内容而后处理 String msg = new String(body, "UTF-8"); System.out.println("*********** Consumer1" + " get message :[" + msg + "]"); } }; // 6.消费消息 channel.basicConsume(queueName, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
生产者: product send a msg: hello rabbitmq, this is publish/subscribe mode 消费者1: **** Consumer1 keep alive ,waiting for messages, and then deal them *********** Consumer1 get message :[ hello rabbitmq, this is publish/subscribe mode] 消费者2: **** Consumer2 keep alive ,waiting for messages, and then deal them *********** Consumer2 get message :[ hello rabbitmq, this is publish/subscribe mode]
能够看到,当生产者发出消息后,两个消费者最终都收到了消息。
在Exchanges 标签页里面多了一个名为“exchange”的路由器,他的类型是fanout。点exchange 的link进入详细页面:
发如今binding项目中有了两条绑定关系,队列的名字也能够看到。将页面切换到Queues标签页:
出现了两个新的队列,队列名字和绑定关系中的同样,而且队列都是自动删除的、通道独占的。