RabbitMQ-从基础到实战(1)— Hello RabbitMQhtml
RabbitMQ-从基础到实战(2)— 防止消息丢失java
RabbitMQ-从基础到实战(4)— 消息的交换(中)app
RabbitMQ-从基础到实战(5)— 消息的交换(下)ide
RabbitMQ-从基础到实战(6)— 与Spring集成函数
在前面的例子中,每一个消息都只对应一个消费者,即便有多个消费者在线,也只会有一个消费者接收并处理一条消息,这是消息中间件的一种经常使用方式。性能
另一种方式,生产者生产一条消息,广播给一个或多个队列,全部订阅了这个队列的消费者,均可以消费这条消息,这就是消息订阅。this
官方教程列举了这样一个场景,生产者发出一条记录日志的消息,消费者1接收到后写日志到硬盘,消费者2接收到后打印日志到屏幕。工做中还有不少这样的场景有待发掘,适当的使用消息订阅后能够成倍的增长效率。idea
在前两章的例子中,咱们涉及到了三个概念spa
这不由让咱们觉得,生产者生产消息后直接到发送到队列,消费者从队列中获取消息,再消费掉。debug
其实这是错误的,在RabbitMQ中,生产者不会直接把消息发送给队列,实际上,生产者甚至不知道一条消息会不会被发送到队列上。
正确的概念是,生产者会把消息发送给RabbitMQ的交换中心(Exchange),Exchange的一侧是生产者,另外一侧则是一个或多个队列,由Exchange决定一条消息的生命周期--发送给某些队列,或者直接丢弃掉。
这个概念在官方文档中被称做RabbitMQ消息模型的核心思想(core idea)
以下图,其中X表明的是Exchange。
RabbitMQ中,有4种类型的Exchange
更详细的介绍,请看官方文档
能够对一个队列命名是十分重要的,在消费者消费消息时,要指明消费哪一个队列的消息(下面的queue),这样就可让多个消费者同时分享一个队列
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
上述记录日志的场景中,有如下几个特色
本身声明队列是比较麻烦的,所以,RabbitMQ提供了简便的获取临时队列的方法,该队列会在链接断开后销毁
String queueName = channel.queueDeclare().getQueue();
这行代码会获取一个名字相似于“amq.gen-JzTY20BRgKO-HjmUJj0wLg”的临时队列
再次注意,在RabbitMQ中,消息是发送到Exchange的,不是直接发送的Queue。所以,须要把Queue和Exchange进行绑定,告诉RabbitMQ把指定的Exchange上的消息发送的这个队列上来
绑定队列使用此方法
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;
其中,queue是队列名,exchange是要绑定的交换中心,routingKey就是这个queue的routingKey
下面来实现上述场景,生产者发送日志消息,消费者1记录日志,消费者2打印日志
下面的代码中,把链接工厂等方法放到了构造函数中,也就是说,每new一个对象,都会建立一个链接,在生产环境这样作是很浪费性能的,每次建立一个connection都会创建一次TCP链接,生产环境应使用链接池。而Channel又不同,多个Channel是共用一个TCP链接的,所以能够放心的获取Channel(本结论出自官方文档对Channel的定义)
AMQP 0-9-1 connections are multiplexed with channels that can be thought of as "lightweight connections that share a single TCP connection".
For applications that use multiple threads/processes for processing, it is very common to open a new channel per thread/process and not share channels between them.
日志消息发送类 LogSender
1 import java.io.IOException; 2 import java.util.concurrent.TimeoutException; 3 4 import org.slf4j.Logger; 5 import org.slf4j.LoggerFactory; 6 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.ConnectionFactory; 10 11 public class LogSender { 12 13 private Logger logger = LoggerFactory.getLogger(LogSender.class); 14 private ConnectionFactory factory; 15 private Connection connection; 16 private Channel channel; 17 18 /** 19 * 在构造函数中获取链接 20 */ 21 public LogSender(){ 22 super(); 23 try { 24 factory = new ConnectionFactory(); 25 factory.setHost("127.0.0.1"); 26 connection = factory.newConnection(); 27 channel = connection.createChannel(); 28 } catch (Exception e) { 29 logger.error(" [X] INIT ERROR!",e); 30 } 31 } 32 /** 33 * 提供个关闭方法,如今并无什么卵用 34 * @return 35 */ 36 public boolean closeAll(){ 37 try { 38 this.channel.close(); 39 this.connection.close(); 40 } catch (IOException | TimeoutException e) { 41 logger.error(" [X] CLOSE ERROR!",e); 42 return false; 43 } 44 return true; 45 } 46 47 /** 48 * 咱们更加关心的业务方法 49 * @param message 50 */ 51 public void sendMessage(String message) { 52 try { 53 //声明一个exchange,命名为logs,类型为fanout 54 channel.exchangeDeclare("logs", "fanout"); 55 //exchange是logs,表示发送到此Exchange上 56 //fanout类型的exchange,忽略routingKey,因此第二个参数为空 57 channel.basicPublish("logs", "", null, message.getBytes()); 58 logger.debug(" [D] message sent:"+message); 59 } catch (IOException e) { 60 e.printStackTrace(); 61 } 62 } 63 }
在LogSender中,和以前的例子不同的地方是,咱们没有直接声明一个Queue,取而代之的是声明了一个exchange
发布消息时,第一个参数填了咱们声明的exchange名字,routingKey留空,由于fanout类型忽略它。
在前面的例子中,咱们routingKey填的是队列名,由于默认的exchange(exchange位空字符串时使用默认交换中心)会把队列的routingKey设置为queueName(声明队列的时候设置的,不是发送消息的时候),又是direct类型,因此能够经过queueName当作routingKey找到队列。
消费类 LogConsumer
1 package com.liyang.ticktock.rabbitmq; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 9 import com.rabbitmq.client.AMQP; 10 import com.rabbitmq.client.Channel; 11 import com.rabbitmq.client.Connection; 12 import com.rabbitmq.client.ConnectionFactory; 13 import com.rabbitmq.client.Consumer; 14 import com.rabbitmq.client.DefaultConsumer; 15 import com.rabbitmq.client.Envelope; 16 17 public class LogConsumer { 18 19 private Logger logger = LoggerFactory.getLogger(LogConsumer.class); 20 private ConnectionFactory factory; 21 private Connection connection; 22 private Channel channel; 23 24 /** 25 * 在构造函数中获取链接 26 */ 27 public LogConsumer() { 28 super(); 29 try { 30 factory = new ConnectionFactory(); 31 factory.setHost("127.0.0.1"); 32 connection = factory.newConnection(); 33 channel = connection.createChannel(); 34 // 声明exchange,防止生产者没启动,exchange不存在 35 channel.exchangeDeclare("logs","fanout"); 36 } catch (Exception e) { 37 logger.error(" [X] INIT ERROR!", e); 38 } 39 } 40 41 /** 42 * 提供个关闭方法,如今并无什么卵用 43 * 44 * @return 45 */ 46 public boolean closeAll() { 47 try { 48 this.channel.close(); 49 this.connection.close(); 50 } catch (IOException | TimeoutException e) { 51 logger.error(" [X] CLOSE ERROR!", e); 52 return false; 53 } 54 return true; 55 } 56 57 /** 58 * 咱们更加关心的业务方法 59 */ 60 public void consume() { 61 try { 62 // 获取一个临时队列 63 String queueName = channel.queueDeclare().getQueue(); 64 // 把刚刚获取的队列绑定到logs这个交换中心上,fanout类型忽略routingKey,因此第三个参数为空 65 channel.queueBind(queueName, "logs", ""); 66 //定义一个Consumer,消费Log消息 67 Consumer consumer = new DefaultConsumer(channel) { 68 @Override 69 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 70 byte[] body) throws IOException { 71 String message = new String(body, "UTF-8"); 72 logger.debug(" [D] 我是来打印日志的:"+message); 73 } 74 }; 75 //这里自动确认为true,接收到消息后该消息就销毁了 76 channel.basicConsume(queueName, true, consumer); 77 } catch (IOException e) { 78 e.printStackTrace(); 79 } 80 } 81 }
复制一个项目,把72行改成以下代码,表明两个作不一样工做的消费者
1 logger.debug(" [D] 我已经把消息写到硬盘了:"+message);
消费者App
1 public class App 2 { 3 public static void main( String[] args ) 4 { 5 LogConsumer consumer = new LogConsumer(); 6 consumer.consume(); 7 } 8 }
生产者App
1 public class App { 2 public static void main( String[] args ) throws InterruptedException{ 3 LogSender sender = new LogSender(); 4 while(true){ 5 sender.sendMessage(System.nanoTime()+""); 6 Thread.sleep(1000); 7 } 8 } 9 }
把消费者打包成两个可执行的jar包,方便观察控制台
用java -jar 命令执行,结果以下
本章介绍了RabbitMQ中消息的交换,再次强调,RabbitMQ中,消息是经过交换中心转发到队列的,不要被默认的exchange混淆,默认的exchange会自动把queue的名字设置为它的routingKey,因此消息发布时,才能经过queueName找到该队列,其实此时queueName扮演的角色就是routingKey。
本教程是参考官方文档写出来的,后续章节会介绍更多RabbitMQ的相关知识以及项目中的实战技巧