MQ(消息队列)做为现代比较流行的技术,在互联网应用平台中做为中间件,主要解决了应用解耦、异步通讯、流量削锋、服务总线等问题,为实现高并发、高可用、高伸缩的企业应用提供了条件。html
目前市面比较流行的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等,而每种产品又有着独特的着重点,可根据业务须要进行选择。git
这里有对主流MQ的优缺点的一些描述与比较。github
- 安装
一、下载Erlangredis
官网下载,安装完成配置环境变量spring
二、安装RabbitMqmongodb
官网下载,直接安装浏览器
启用管理界面:rabbitmq-plugins.bat enable rabbitmq_management服务器
进入%RM%/sbin双击 rabbitmq-server.bat,启动成功后以下:并发
启动完成浏览器访问http://localhost:15672,用户名/密码:guest/guest,进入界面后以下:app
- 介绍
RabbitMq是遵循了AMQP协议的开源消息队列的代理服务软件。
- 跨平台,支持多种语言
- 实现了AMQP协议;
- 知足高并发需求
- 支持集群部署
- 支持多插件,可视化视图
- 社区活跃
- 等等
核心元件包括:
ConnectionFactory(链接管理器):应用程序与Rabbit之间创建链接的管理器,程序代码中使用;
Channel(信道):消息推送使用的通道;
Exchange(交换器):用于接受、分配消息;
Queue(队列):用于存储生产者的消息;
RoutingKey(路由键):用于把生成者的数据分配到交换器上;
BindingKey(绑定键):用于把交换器的消息绑定到队列上;
- 使用
先看一段简单的代码,来理解下消息队列的工做原理:
Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); final Channel channel = connection.createChannel(); channel.queueDeclare("q.demo",false,false,true,null); channel.basicPublish("","q.demo",null,"消息".getBytes()); channel.basicConsume("q.demo",false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } });
- 首先获取链接,以前下载的rabbitmq server,与之创建链接
- 打开通讯channel,能够理解为打开一个tcp通讯,只不过为节省资源虚拟了一个通道
- 定义一个队列
- 向队列中发送消息
- 从队列中消费消息
其实这里还有个过程被忽略了,实际上是使用了默认处理,在第4步中,实际上是向消息路由发布消息,且该消息路由的routingKey与队列名称相同,
The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.
而实际中的用法可能更为复杂些,可是 原理上其实都同样,只不过在消息的处理过程当中会添加一些策略来应对不一样的应用场景,同时为了保证消息的可靠性,会引入一些确认机制,固然这些都是后话,先在刚的基础上看一下另外一个示例,一样有生产者-消费做为模型:
生成者:
public void produce(String message) throws IOException, TimeoutException, InterruptedException { Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); //基于信道的通讯 Channel channel = connection.createChannel(); /** * 交换机名称、交换机类型、是否持久化、是否自动删除、是否内部使用、参数 */ channel.exchangeDeclare(Common.EXCHANGE_X1,ExType.Direct.value(),false,false,false,null);//申明交换机 /** * 消息发送到指定交换机、routing key、是否重发、是否、基础属性、消息内容 * mandatory:(true)没有队列,消息返回;(false)没有队列,消息丢弃 * immediate:(true)没有消费者,消息返回;(false) */ int count = 0; while (count++ <100){ TimeUnit.SECONDS.sleep(1); channel.basicPublish(Common.EXCHANGE_X1,Common.ROUTING_KEY1,false,false,null,(message+ new Date()).getBytes()); } channel.close(); AmqpConnectionFactory.close(connection); } public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Producer producer = new Producer(); producer.produce("消息 "); }
大体意思就是讲生产的消息绑定到指定的交换机上,而且指定交换机类型以及routing key。
消费者:
public void consume() throws IOException { Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); final Channel channel = connection.createChannel();//创建通讯通道 /** * 队列名称、是否持久化、是否被该链接独占、自动删除、参数 */ channel.queueDeclare(Common.QUEUE_Q1,false,false,false,null);//申明队列 /** * 交换机名称、交换机类型、是否持久化、是否自动删除、是否内部使用、参数 */ channel.exchangeDeclare(Common.EXCHANGE_X1,ExType.Direct.value(),false,false,false,null);//申明交换机 /** * 队列名称、交换机名称、binding Key */ channel.queueBind(Common.QUEUE_Q1,Common.EXCHANGE_X1,Common.BINDING_KEY1);//将消息交换机与队列绑定 /** * 队列名称、自动ACK、消费者标记、非本地、是否被该链接独占、参数 */ channel.basicConsume(Common.QUEUE_Q1,false,"c1",false,false,null,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // super.handleDelivery(consumerTag, envelope, properties, body); System.out.println(new String(body)); /** * 消息索引、批量确认 */ channel.basicAck(envelope.getDeliveryTag(),false); } }); } public static void main(String[] args) throws IOException { Consumer consumer = new Consumer(); consumer.consume(); }
相对要复杂些,主要是从队列中读取消息并显示,那么队列的消息则是经过交换机与设置的binding key来决定。
如今来梳理下整个流程:
- 在生产者中创建与mq服务的链接,建立通道
- 定义消息交换机,注意次数有不少参数,如今咱们仅关注其名称与类型
- 循环100次向指定交换机中发布消息,并设置routing key
- 在消费者中创建链接,建立通道
- 定义消息队列
- 定义消息交换机
- 将消息队列绑定到交换机上,并定义binding key
- 从队列中读取消息
其中有些方面须要注意的:
- 生产者与消费者启动不分前后
- 两个地方都定义了消息交换机,与上一个对应。由于若是有该交换已经存在,则不会从新建立,但若是属性不一样,则会报错
- 其中涉及到了routing key、binding key,主要是完成消息从匹配的交换机拿取消息,固然这个不是必须的,由于受到exchange的type影像
- 生产者的的消息都是发送到交换机,而消费的消息都是从队列中拿
到这里有必要说下Exchange的type,主要有如下类型:
- Fanout:转发消息到全部绑定队列
- Direct:direct 类型的行为是"先匹配, 再投送". 即在绑定时设定一个 routing key, 消息的routing key 匹配时, 才会被交换器投送到绑定的队列中去.
- Topic:按规则转发消息(最灵活)
- Headers:设置 header attribute 参数类型的交换机
经过这几种类型的exchange,能够定制出很是复杂的业务模型。
上面能够说从简单的应用层面了解了Rabbit Mq,由于网上有太多的知识,对一些组件与工做模型等都讲解,而且图文并茂,全部不必作过多重复的工做,下面从可靠性的角度来学习,同时以前咱们在声明队列、交换机等会有一些参数,具体干吗用的,可能有些会说到,毕竟是学习阶段,不可能那么全面,也不会那么准确,权做参考。
下面看一个简单的im,仅仅实现两我的间的固化通讯,jack and rose:
首先说下思路,两个用户间的通讯,会涉及到每一个用户的接收和发送,咱们知道,通常聊天时,接收和发送是互不影响的,也不会有任何依赖关系,也就是发送和接收的消息是两条线,那么咱们就须要为每一个用户分别开通一个发送和接收的线程,这样两个行为就不会有任何影响。而后看下怎么发送信息,就是经过mq开通一个channel,将消息发送到对应的exchange,进而讲消息推送到匹配的消息队列中,而另外一方接收,则从指定的队列中取得消息并展示出来。那么接收发送就造成了完整的过程。固然这个和以前说的接收发送不一样,刚才指的是同一用户,如今指用户到用户,这个发送与接收本就是一个过程的两个阶段。下面看下代码实现,既然两个用户的行为如此相像,咱们就提出一个抽象类来实现共同的部分:
public abstract class AbstractUser { private static String EXCHANGE = "x.user"; private String id = UUID.randomUUID().toString(); private String name; private Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); public AbstractUser(String name){ this.name = name; } public void start() throws IOException { System.out.println(name +" 上线了.."); Channel channel = connection.createChannel(); channel.basicQos(1);//流量控制 String queueName = getResQueue(); channel.exchangeDeclare(EXCHANGE,ExType.Direct.value()); channel.queueDeclare(queueName,false,false,false,null); channel.queueBind(queueName,EXCHANGE,getResBindingKey()); //专门接收的线程 new Thread(()->{ try { channel.basicConsume(queueName,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { if(!id.equals(properties.getCorrelationId())){ System.out.println(properties.getAppId()+" : "+new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); }else{ channel.basicNack(envelope.getDeliveryTag(),false,true); } } }); } catch (IOException e) { e.printStackTrace(); } }).start(); //专门发送的线程 new Thread(()->{ Scanner scanner = new Scanner(System.in); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(id).appId(name).build(); while (true){ try { channel.basicPublish(EXCHANGE,getPusRoutingKey(),properties,scanner.nextLine().getBytes()); } catch (IOException e) { e.printStackTrace(); } } }).start(); } /** * 发送消息时指定routing key,在接收方binding key须要与之对应 * @return */ public abstract String getPusRoutingKey(); /** * 接收消息的队列 * @return */ public abstract String getResQueue(); /** * 接收消息的binding key,与发送方的routing key对应 * @return */ public abstract String getResBindingKey(); }
JACK:
public class Jack extends AbstractUser{ public Jack(String name) { super(name); } @Override public String getPusRoutingKey() { return "toRose"; } @Override public String getResQueue() { return "q.rose"; } @Override public String getResBindingKey() { return "toJack"; } public static void main(String[] args) throws IOException { Jack jack = new Jack("jack"); jack.start(); } }
ROSE:
public class Rose extends AbstractUser{ public Rose(String name) { super(name); } @Override public String getPusRoutingKey() { return "toJack"; } @Override public String getResQueue() { return "q.jack"; } @Override public String getResBindingKey() { return "toRose"; } public static void main(String[] args) throws IOException { Rose rose = new Rose("rose"); rose.start(); } }
其中要注意的是,在接收的时候,开始设计时是共用了一个队列,因此会出现本身给本身发信息,因此在发送消息时,为消息添加了属性,标识该消息的来源,那么在读取消息时,根据该属性判断是否为本身的消息,若是是,则确认并消费该消息,若是不是,须要作一次nack的处理,并将消息从新放回队列中,直到被其余用户消费为止。咱们能够看到,如今是两我的的通讯,有一些固化的元素,好比routing key,两个用户通讯是须要优先肯定的,那么真实的IM系统,会涉及到不少繁琐的内容,好比消息发送失败,消息发送超时、重发、多人聊天等等,会存在不少须要解决的问题。
jack和rose的聊天也结束了,那么咱们在来看看其余的一些知识点,一样以消息的发送与消息接收为一条线来进行下去。
在发送消息前,毫无疑问是先创建链接,打开虚拟通道,以后才是定义交换机,发送消息(不用申明队列)。那么在申明交换机的时候,其实有不少个参数:
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
- exchange:交换机名称
- type:交换机类型,上面说到了, direct, topic, fanout, headers
- durable:是否持久化,也就是断开链接后是否还存在
- autoDelete:自动删除,当与该exchange上的队列所有删除后, 自动删除,和上一个参数比较一下,好比durable=true,那么若是该参数配置true,其实也会删除(没有queue)
- internal:是否内部交换机,不太知道应用场景
- arguments:其余参数,好比DLX
在发送消息时,一样有一些可配置参数:
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
- exchange:消息发送到的交换机
- routingKey:交换机路由key
- mandatory:true,若是交换机没有匹配到对应的队列,会将调用basic.return将该消费返回生成者;false,上述情形直接丢弃消息
- immediate:true,若是交换机关联队列没有消费者,则不会将消息加入队列;false,上述情形将调用basic.return将消息返回生产者。3.0后去掉了
- props:为消息添加一些参数,好比过时时间
- body:消息主体
那么这些参数主要干吗的?当时是保证系统的可靠性了。
那么在消息的发送端,如何保证可靠性:
- 事务
try { channel.txSelect(); channel.basicPublish("exchange", "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, msg); channel.txCommit(); } catch (Exception ex) { channel.txRollback(); }
- 确认机制(推荐)
该机制主要是经过注册一些事件来处理的,好比上面提到过的basic.return
channel.confirmSelect(); channel.basicPublish("yu.exchange", "yu.1", MessageProperties.PERSISTENT_TEXT_PLAIN, msg); boolean success = channel.waitForConfirms(10);
channel.addConfirmListener(new ConfirmListener() { public void handleAck(long l, boolean b) throws IOException { } public void handleNack(long l, boolean b) throws IOException { } }); channel.addReturnListener(new ReturnListener() { public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException{ } }); channel.confirmSelect(); channel.basicPublish("exchange", "routingKey",true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
下面主要对第二种状况验证一下,记得在测试前,讲相关的exchange与queue进行删除,不然会影响测试结果:
public class Producer { public void produce(String message) throws IOException, TimeoutException, InterruptedException { Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); //基于信道的通讯 Channel channel = connection.createChannel(); /** * 交换机名称、交换机类型、是否持久化、是否自动删除、是否内部使用、参数 */ channel.exchangeDeclare(Common.EXCHANGE_X1,ExType.Direct.value(),false,false,false,null);//申明交换机 channel.confirmSelect();//确认机制 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息发送成功!"); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息发送失败!"); } }); /** * mandatory=ture */ channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("从新处理消息!"); } }); /** * 消息发送到指定交换机、routing key、是否重发、是否、基础属性、消息内容 * mandatory:(true)没有队列,消息返回;(false)没有队列,消息丢弃 * immediate:(true)没有消费者,消息返回;(false) */ int count = 0; while (count++ <10){ TimeUnit.SECONDS.sleep(1); channel.basicPublish(Common.EXCHANGE_X1,Common.ROUTING_KEY1,false,false,null,(new Date()+message).getBytes()); } channel.close(); AmqpConnectionFactory.close(connection); } public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Producer producer = new Producer(); producer.produce("消息 "); } }
咱们发送10条消息到交换机,控制台打印以下,若是关闭链接可能最后一条消息打印不出来:
而后启动消费者,恰好也消费了10条消息(须要先声明下队列,否则生产的消息都会被丢弃,mandatory=false)。
如今咱们作一些修改,将上面说到的修改mandatory=true,也就是没有与交换机匹配的队列时,将会重发,也就是调用上面咱们定义的ReturnListener:
与预期的同样,只不过会发现,在调用handleReturn后会再次调用handleAck,也就是发送成功!
上面说的这些也就是消息发布者的ack机制。
接下来看下消费者的ack:
咱们定义消费者时,通常会先定义队列、交换机、将队列与交换机绑定、发送消息。
声明队列:
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
- queue:队列名称
- durable:是否持久化
- exclusive:排他队列,与该链接绑定,决定多个消费者是否能够访问这一队列
- autoDelete:自动删除,没有消费者时自动删除
- arguments:队列参数,好比队列过时时间
消息接收:
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
- queue:绑定的队列名
- autoAck:自动ack
- consumerTag:消费者标记
- noLocal:ture,不将同一链接中生产的消息传递给该消费者
- exclusive:排他
- arguments:扩展参数
- callback:消费回调
首先要知道,消息一段被消费,就会被移除,那么咱们如何肯定消息是否被真实消费?由于从拿到消息到正则消掉该消息,都是有一个过程,可能任何环境都出现问题,可是被认为消费而致使消息被移除,则可靠性就没法获得保证,做为消费者和生产者同样会有事务与ack两种方式保证,只不过须要注意的是:
- autoAck=false手动应对的时候是支持事务的,也就是说即便你已经手动确认了消息已经收到了,但在确认消息会等事务的返回解决以后,在作决定是确认消息仍是从新放回队列,若是你手动确认如今以后,又回滚了事务,那么已事务回滚为主,此条消息会从新放回队列;
- autoAck=true若是自定确认为true的状况是不支持事务的,也就是说你即便在收到消息以后在回滚事务也是于事无补的,队列已经把消息移除了;
那么针对ack机制,主要有如下相关方法:
//消息索引,批量ack,只对小于该DeliveryTag的消息 // 批量确认 channel.basicAck(envelope.getDeliveryTag(),false); //其中 deliveryTag 和 requeue 的含义能够参考 basicReject 方法。 multiple 参数 //设置为 false 则表示拒绝编号为 deliveryT坷的这一条消息,这时候 basicNack 和 basicReject 方法同样; // multiple 参数设置为 true 则表示拒绝 deliveryTag 编号以前所 有未被当前消费者确认的消息。 channel.basicNack(envelope.getDeliveryTag(),false,false); //一次只能拒绝一条 //其中 deliveryTag 能够看做消息的编号 ,它是一个 64 位的长整型值,最大值是 9223372036854775807 // requeue 参数设置为 true,则 RabbitMQ 会从新将这条消息存入队列,以即可以发送给下一个订阅的消费者; // requeue 参数设置为 false,则 RabbitMQ 当即会把消息从队列中移除,而不会把它发送给新的消费者 channel.basicReject(envelope.getDeliveryTag(),false);
在结束ack前,须要说明一点的是,消费者和生产者消息发送的成功与消费是否成功,并非消费者向生产者进行ack,而是针对mq服务器。对于生产者只是确保消息发送到服务器是否成功;对于消费者,只是确保消息是否从服务器被消费掉。
若是咱们对某条消息nack,有没有requeue,那么这条消息是否是真的就丢失了呢?这里不得不引入另一个概念,死信,那么与死信对应的有死信队列XLD,同时死信的条件不仅刚说到的,在如下状况都会触发:
- 消息被拒绝(basic.reject/ basic.nack)而且requeue=false
- 消息TTL过时
- 队列达到最大长度
关于第一点很少说,上面已经提到了,关于第二点TTL(time to live)关系到消息的过时时间,通常会从两个角度分析,咱们知道,消息没有消费前是在队列中,那么队列的过时时间也会影响消息的过时时间,全部这个时间会从队列过时时间()消息过时时间中取小。
队列过时时间设置:
//申明队列时设置 args.put("x-expires", 10000);//ms 队列过时时间
消息过时时间设置:
//申明队列时设置 args.put("x-message-ttl", 6000);//消息过时时间 //发布消息时设置 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("6000").build();
首先如何定义死信队列,这个是在队列申明的时候,以参数的形式加入的(x-dead-letter-exchange):
channel.exchangeDeclare(Common.EXCHANGE_DLX_X1,ExType.Direct.value()); Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange",Common.EXCHANGE_DLX_X1);//死信交换机 channel.queueDeclare(Common.QUEUE_Q1,false,false,false,args);
下面来看下示例:
消息生产者:
public void produce(String message) throws IOException, TimeoutException, InterruptedException { Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); //基于信道的通讯 Channel channel = connection.createChannel(); /** * 交换机名称、交换机类型、是否持久化、是否自动删除、是否内部使用、参数 */ channel.exchangeDeclare(Common.EXCHANGE_X1,ExType.Direct.value(),false,false,false,null);//申明交换机 //为了保证先启动该类,交换机没有绑定队列致使消息丢失,优先处理,在消费者中也会有如下内容 channel.exchangeDeclare(Common.EXCHANGE_DLX_X1,ExType.Direct.value()); channel.queueDeclare(Common.QUEUE_DLX_Q1, false, false, false, null);//申明死信队列 channel.queueBind(Common.QUEUE_DLX_Q1, Common.EXCHANGE_DLX_X1, Common.ROUTING_DLX_KEY1);//将消息交换机与队列绑定 Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange",Common.EXCHANGE_DLX_X1);//死信交换机 args.put("x-dead-letter-routing-key", Common.ROUTING_DLX_KEY1); args.put("x-expires", 30000);//ms 队列过时时间 args.put("x-message-ttl", 12000);//消息过时时间 channel.queueDeclare(Common.QUEUE_Q1,false,false,false ,args); channel.queueBind(Common.QUEUE_Q1,Common.EXCHANGE_X1,Common.BINDING_KEY1);//将消息交换机与队列绑定 channel.confirmSelect();//确认机制 channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息发送成功!"); } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("消息发送失败!"); } }); /** * mandatory=ture */ channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("从新处理消息!"); } }); /** * 消息发送到指定交换机、routing key、是否重发、是否、基础属性、消息内容 * mandatory:(true)没有队列,消息返回;(false)没有队列,消息丢弃 * immediate:(true)没有消费者,消息返回;(false) */ int count = 0; // AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("12000").build(); while (count++ <10){ TimeUnit.MILLISECONDS.sleep(500); channel.basicPublish(Common.EXCHANGE_X1,Common.ROUTING_KEY1,false,false,null,(new Date()+message).getBytes()); } channel.close(); AmqpConnectionFactory.close(connection); } public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Producer producer = new Producer(); producer.produce("消息 "); }
消息消费者:
public void consume() throws IOException { Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); final Channel channel = connection.createChannel();//创建通讯通道 channel.exchangeDeclare(Common.EXCHANGE_DLX_X1,ExType.Direct.value()); Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange",Common.EXCHANGE_DLX_X1);//死信队列 args.put("x-dead-letter-routing-key", Common.ROUTING_DLX_KEY1);//死信routing key 默认取 args.put("x-expires", 30000);//ms 队列过时时间 args.put("x-message-ttl", 12000);//消息过时时间 /** * 队列名称、是否持久化、是否被该链接独占(只对申明链接可见,断开链接删除)、自动删除、参数 */ channel.queueDeclare(Common.QUEUE_Q1,false,false,false,args);//申明队列 /** * 交换机名称、交换机类型、是否持久化、是否自动删除、是否内部使用、参数 */ channel.exchangeDeclare(Common.EXCHANGE_X1,ExType.Direct.value(),false,false,false,null);//申明交换机 /** * 队列名称、交换机名称、binding Key */ channel.queueBind(Common.QUEUE_Q1,Common.EXCHANGE_X1,Common.BINDING_KEY1);//将消息交换机与队列绑定 /** * 队列名称、自动ACK、消费者标记、非本地、是否被该链接独占、参数 * 与basicGet对比,get 只取了队列里面的第一条消息 * 一种是主动去取,一种是监听模式 */ channel.basicConsume(Common.QUEUE_Q1,false,"c1",false,false,null,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); } public static void main(String[] args) throws IOException { Consumer consumer = new Consumer(); consumer.consume(); }
死信消费者:
public void consume() throws IOException { Connection connection = AmqpConnectionFactory.getConnection(new AmqpConfig()); final Channel channel = connection.createChannel();//创建通讯通道 channel.queueDeclare(Common.QUEUE_DLX_Q1, false, false, false, null);//申明队列 channel.exchangeDeclare(Common.EXCHANGE_DLX_X1, ExType.Direct.value(), false, false, false, null);//申明交换机 channel.queueBind(Common.QUEUE_DLX_Q1, Common.EXCHANGE_DLX_X1, Common.ROUTING_DLX_KEY1);//将消息交换机与队列绑定 channel.basicConsume(Common.QUEUE_DLX_Q1, true, "c2", false, false, null, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("死信:"+new String(body)); } }); } public static void main(String[] args) throws IOException { DlxConsumer consumer = new DlxConsumer(); consumer.consume(); }
测试过程当中可观察rabbitmq服务台数据变化,主要步骤大体以下:
- 执行消费者,测试会生成10条信息在正常的队列中
- 在12秒内执行消息消费者,发现会打印出10条信息
- 执行步骤1
- 过12秒后执行消息消费者,发现不会打印任何信息
- 执行死信消费者,发现打印出10条信息
至此,关于rabbitmq的知识也差很少了,可是若是想搭建一个比较稳健的消息系统来处理系统中的各类异步任务,仍是须要将各类知识进行搭配。
扩展:
- 延迟队列:经过死信实现,其实上面的例子中,去掉消费者,将死信消费者看作正常消费者,那么就是延迟队列了
- 重试机制:包含发送失败重试与消费故障重试
- 队列属性:
Message TTL(x-message-ttl):设置队列中的全部消息的生存周期(统一为整个队列的全部消息设置生命周期), 也能够在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒, 相似于redis中的ttl,生存时间到了,消息会被从队里中删除,注意是消息被删除,而不是队列被删除, 特性Features=TTL, 单独为某条消息设置过时时间AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”); Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最先的几条删除掉, 相似于mongodb中的固定集合,例如保存最新的100条消息, Feature=Lim Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 通常受限于内存、磁盘的大小, Features=Lim B Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过时的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值通常不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费 Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中Master locator(x-queue-master-locator)