即向多个消费者传递同一条信息
算法
RabbitMQ消息传递模型的核心思想是,生产者永远不会将任何消息直接发送到队列。安全
相反,生产者只能向交换机(Exchange)发送消息。交换机是一个很是简单的东西。一边接收来自生产者的消息,另外一边将消息推送到队列。交换器必须确切地知道如何处理它接收到的消息。它应该被添加到一个特定的队列中吗?它应该添加到多个队列中吗?或者它应该被丢弃。这些规则由exchange的类型定义。dom
有几种可用的交换类型:direct、topic、header和fanout。
建立fanout交换机logs: c.exchangeDeclare("logs", "fanout");
或c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
ide
fanout交换机很是简单。它只是将接收到的全部消息广播给它所知道的全部队列。ui
建立了一个fanout交换机和一个队列。如今咱们须要告诉exchange向指定队列发送消息。exchange和队列之间的关系称为绑定。spa
//指定的队列,与指定的交换机关联起来 //称为绑定 -- binding //第三个参数时 routingKey, 因为是fanout交换机, 这里忽略 routingKey ch.queueBind(queueName, "logs", "");
1.生产者
最重要的更改是,咱们如今但愿将消息发布到logs交换机,而不是无名的日志交换机。咱们须要在发送时提供一个routingKey,可是对于fanout交换机类型,该值会被忽略。日志
public class Producer { public static void main(String[] args) throws Exception { //创建链接 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection con = f.newConnection(); Channel c = con.createChannel(); //定义fanout类型交换机:logs //c.exchangeDeclare("logs", "fanout"); c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); //向交换机发送信息 while (true){ System.out.println("输入消息:"); String msg = new Scanner(System.in).nextLine(); c.basicPublish("logs", "", null, msg.getBytes()); } } }
2.消费者
若是尚未队列绑定到交换器,消息就会丢失,但这对咱们来讲没有问题;若是尚未消费者在听,咱们能够安全地丢弃这些信息。code
public class Consumer { public static void main(String[] args) throws Exception { //创建链接 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection con = f.newConnection(); Channel c = con.createChannel(); //1.定义随机队列 2.定义交换机 3.绑定 //随机命名,非持久,独占,自动删除 String queue = UUID.randomUUID().toString(); c.queueDeclare(queue, false, true, true, null); c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); //第三个参数对发布订阅模式fanout交换机无效 c.queueBind(queue, "logs", ""); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody()); System.out.println("收到:"+msg); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //正常的消费数据 c.basicConsume(queue, true, deliverCallback, cancelCallback); } }
路由模式与订阅模式不一样之处在于,咱们将向其添加一个特性—咱们将只订阅全部消息中的一部分.本文中已添加err/info/warning等报错提示来示范.
blog
绑定是交换机和队列之间的关系。这能够简单地理解为:队列对来自此交换的消息感兴趣。rabbitmq
绑定可使用额外的routingKey参数。为了不与basic_publish参数混淆,咱们将其称为bindingKey。这是咱们如何建立一个键绑定:
ch.queueBind(queueName, EXCHANGE_NAME, "black");
bindingKey的含义取决于交换机类型。咱们前面使用的fanout交换机彻底忽略它。
上一节中的日志系统向全部消费者广播全部消息。咱们但愿扩展它,容许根据消息的严重性过滤消息。
前面咱们使用的是fanout交换机,这并无给咱们太多的灵活性——它只能进行简单的广播。
咱们将用直连交换机(Direct exchange)代替。它背后的路由算法很简单——消息传递到bindingKey与routingKey彻底匹配的队列。
使用相同的bindingKey绑定多个队列是彻底容许的。可使用binding key "black"将X与Q1和Q2绑定。在这种状况下,直连交换机的行为相似于fanout,并将消息广播给全部匹配的队列。一条路由键为black的消息将同时发送到Q1和Q2。
1.发送消息
咱们将提供日志级别做为routingKey,这样,接收程序将可以选择它但愿接收的级别
//参数1: 交换机名 //参数2: routingKey, 路由键,这里咱们用日志级别,如"error","info","warning" //参数3: 其余配置属性 //参数4: 发布的消息数据 ch.basicPublish("direct_logs", "error", null, message.getBytes());
2.接收消息
咱们将为感兴趣的每一个日志级别建立一个新的绑定
ch.queueBind(queueName, "logs", "info"); ch.queueBind(queueName, "logs", "warning");
1.生产者
public class Producer { public static void main(String[] args) throws Exception { //创建链接 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection con = f.newConnection(); Channel c = con.createChannel(); //定义fanout类型交换机:logs //c.exchangeDeclare("logs", "fanout"); c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT); //向交换机发送信息 while (true){ System.out.println("输入消息:"); String msg = new Scanner(System.in).nextLine(); System.out.println("输入路由键:"); String key = new Scanner(System.in).nextLine(); c.basicPublish("direct_logs", key, //路由键关键词 null, msg.getBytes()); } } }
2.消费者
public class Consumer { public static void main(String[] args) throws Exception { //创建链接 ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection con = f.newConnection(); Channel c = con.createChannel(); //1.定义随机队列 2.定义交换机 3.绑定 //随机命名,非持久,独占,自动删除 String queue = UUID.randomUUID().toString(); c.queueDeclare(queue, false, true, true, null); c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT); //用输入绑定键进行绑定 System.out.println("输入绑定键,用空格隔开:"); String s = new Scanner(System.in).nextLine(); String[] a = s.split(" "); //["aaa","bbb","ccc"] for (String key:a){ c.queueBind(queue, "direct_logs", key); } DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody()); String key = message.getEnvelope().getRoutingKey(); System.out.println("收到:"+msg+" - "+key); } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; //正常的消费数据 c.basicConsume(queue, true, deliverCallback, cancelCallback); } }