本系列主要讲解RabbitMQ,讲解其特性,例如消息持久化、消息TTL、消息的优先、延迟消息、消息可靠性、消费模式以及在Spring Boot中使用RabbitMQ,代码在个人Github上java
RabbitMQ使用Erlang语言开发基于AQMP协议的开源消息队列,RabbitMQ主要有如下特色:git
RabbitMQ基于AQMP协议开发的消息队列,AQMP协议在以前消息队列(一)中已经简单的介绍了,这里就简单的介绍一下:github
须要注意的地方:编程
RabbitMQ-Java官方提供了简单的使用教程,这里就简单的提一下,具体可见其网友翻译版本:RabbitMQ入门教程服务器
这里展现的是RabbitMQ发送消息网络
public class Sender { private static final String EXCHANGE_NAME = "log"; private static final String ROUTING_KEY = "info"; private static final String MESSAGE = "hello world!"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("47.100.20.186"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); // 经过链接工厂建立链接 Connection connection = factory.newConnection(); // 经过Connection建立Channel Channel channel = connection.createChannel(); // 声明Exchange:名称及其类型,该操做一样是幂等的,如何声明对队列同样 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 经过Channel向Exchange发送消息和Routing Key,而且配置了BasicProperties(消息属性) channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_BASIC, MESSAGE.getBytes(StandardCharsets.UTF_8)); // 关闭Channel和Connection channel.close(); connection.close(); } }
这里展现使用RabbitMQ接收消息app
public class Receiver { private static final String QUEUE_NAME = "log_info_queue"; private static final String EXCHANGE_NAME = "log"; private static final String ROUTING_KEY = "info"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("47.100.20.186"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); // 经过链接工厂建立链接 Connection connection = factory.newConnection(); // 经过Connection建立Channel Channel channel = connection.createChannel(); // 声明一个队列 -- 在RabbitMQ中,队列声明是幂等性的 // 一个幂等操做的特色是其任意屡次执行所产生的影响均与一次执行的影响相同 // 也就是说,若是不存在,就建立,若是存在,不会对已经存在的队列产生任何影响 // 可是若是声明时修改已存在队列的属性,则会抛出异常 channel.queueDeclare(QUEUE_NAME, false, false , false, null); // 把Queue和Exchange经过Routing Key绑定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 设置该消费者预读取消息数量:这里主要是考虑到慢消费的问题,这里使用PUSH模型,服务器推消息给客户端, // 可能会致使消息堆积,设置预读取数量后,服务器会发送指定数量消息后等待前面消息ACK后才会继续发送消息 channel.basicQos(1); // 接收消息:这里使用自动ACK,固然也能够获取消息后手动ACK String s = channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, StandardCharsets.UTF_8); } }); } }
RabbitMQ主要经过ExchangeType来设置消息传递模型,主要有下面4种模型,其中Header模型用的少:异步
Direct模型顾名思义指的是直接链接,只有当消息中的Routing Key与Queue绑定到Exchange的Routing Key一致,才会转发消息给该Queue
分布式
Fanout模型相似于订阅/发布模型,Exchange会把消息转发给全部绑定到该Exchange上的Queue
ide
Topic模型类型与Servlet的URL匹配模型,其会匹配消息的Routing Key和Queue绑定到Exchange的Routing Key,使用通配符匹配。有#和两种通配符,#表明0个或多个字符,表明1个字符
首先RabbitMQ的持久化是异步持久化模型,也就是说在特定状况下,可能形成消息丢失。好比在RabbitMQ Server回调RabbitMQ Producer Client的接口代表已经接收到该消息,可是因为是异步持久化可能尚未把消息持久化到磁盘中,这时候MQ-Server断电就会致使消息的丢失
RabbitMQ中消息的持久化须要保证Exchange、Queue、Message都进行持久化操做。须要注意的是:Exchange、Queue的声明时幂等的。幂等指说屡次声明产生的结果都是同样,也就是说若是其不存在则建立,存在则返回且不会对其产生任何影响,可是若是声明已存在的队列,且其属性不一样则会抛出异常。
RabbitMQ声明Exchange有几种方法,但主要使用下面方法,其中第三个参数表示是否将该Exchange持久化
/** * Actively declare a non-autodelete exchange with no extra arguments * @param exchange the name of the exchange * @param type the exchange type * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) */ Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
RabbitMQ声明Queue与Exchange的方法类型,一样使用durable参数表示是否将该Queue进行持久化操做,下面是其中一个方法
/** * Declare a queue * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
部分参数说明:
消息的持久化须要在生产者发送消息时设置消息属性,以代表该消息时持久化消息。下面是消息发送的一个API
/** * Publish a message. * * Publishing to a non-existent exchange will result in a channel-level * protocol exception, which closes the channel. * * @param exchange the exchange to publish the message to * @param routingKey the routing key * @param props other properties for the message - routing headers etc * @param body the message body */ void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
部分参数说明:
BasicProperties定义以下:
public BasicProperties( String contentType,//消息类型如:text/plain String contentEncoding,//编码 Map<String,Object> headers, Integer deliveryMode,//1:nonpersistent 2:persistent Integer priority,//优先级 String correlationId, String replyTo,//反馈队列 String expiration,//expiration到期时间 String messageId, Date timestamp, String type, String userId, String appId, String clusterId)
其中RabbitMQ提供了属性实现已经更简单的配置消息属性:
/** Empty basic properties, with no fields set */ BasicProperties.MINIMAL_BASIC /** Empty basic properties, with only deliveryMode set to 2 (persistent) */ BasicProperties.MINIMAL_PERSISTENT_BASIC /** Content-type "application/octet-stream", deliveryMode 1 (nonpersistent), priority zero */ BasicProperties.BASIC /** Content-type "application/octet-stream", deliveryMode 2 (persistent), priority zero */ BasicProperties.PERSISTENT_BASIC /** Content-type "text/plain", deliveryMode 1 (nonpersistent), priority zero */ BasicProperties.TEXT_PLAIN /** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */ BasicProperties.PERSISTENT_TEXT_PLAIN
固然可使用时编程自定义设置消息属性:
AMQP.BasicProperties.Builder builder = new Builder(); BasicProperties properties = builder .deliveryMode(2) .build(); channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, MESSAGE.getBytes(StandardCharsets.UTF_8));
写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,若是Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。
有个固定的刷盘时间:25ms,也就是无论Buffer满不满,每一个25ms,Buffer里的数据及未刷新到磁盘的文件内容一定会刷到磁盘。
每次消息写入后,若是没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操做。
TTL(Time To Live)表示存活时间。RabbitMQ中能够对Queue和Message设置TTL,以控制Queue和Message的存活时间。
队列的存活时间指的是Queue在自动删除前能够处于未使用状态的时间。未使用状态指的是Queue上没有Consumer、Queue没有被从新声明。队列的存活时间在队列第一次声明时经过指定队列的属性"x-expires"指定,单位是毫秒,代码以下:
Map<String, Object> queueArgs = new HashMap<>(); // 设置1分钟过时 queueArgs.put("x-expires", 60000); channel.queueDeclare("queue", false, false, false, queueArgs);
消息的存活时间指的是消息在队列中的存活时间,超过该时间消息将被删除或者不能传递给消费者。消息的存活时间能够经过设置每条消息的存活时间或者设置某条队列中的因此存活时间,当二者都有时,时间小的有效。
设置消息属性
针对每条消息能够在发送消息时设置消息属性
// 设置消息属性-TTL为30s BasicProperties properties = new BasicProperties.Builder() .expiration("30000").build(); channel.basicPublish("exchange", "kanyuxia", properties, "hello".getBytes(StandardCharsets.UTF_8));
设置队列属性
经过设置队列中消息的TTL属性,而后传入该队列的全部消息都有该TTL属性
Map<String, Object> queueArgs = new HashMap<>(); queueArgs.put("x-message-ttl", 30000); channel.queueDeclare("queue", false, false, false, queueArgs);
https://www.jianshu.com/p/64357bf35808?utm_campaign=maleskine&utm_content=note&utm_medium=seo_notes&utm_source=recommendation http://blog.csdn.net/wanbf123/article/details/78052419 http://blog.csdn.net/u013256816/article/details/60875666 http://blog.csdn.net/u013256816/article/details/54916011