RabbitMQ--高级特性

生产端可靠性投递

  • 保证消息成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Broker)确认应答
  • 完善消息进行补偿机制

大厂处理方案java

  • 消息落库,对消息状态打标
  • 消息延迟投递,作二次确认,回调检查

 冥等性概念

就是相似原子性,高并发下,不出现重复消费。redis

方案:乐观锁、惟一id+指纹锁机制,利用数据库去重、redis原子性数据库

通常在消费端处理并发

Confirm 确认消息

  • 消息投递后,若是Broker收到消息,则会给咱们一个生产者应答。
  • channel.confirmSelect(); 开启确认消息
  • channel.addConfirmListener(new ConfirmListener() {})  添加消息接收对象

(producer) ide

public static void main(String[] args) throws Exception {
		//1 建立ConnectionFactory
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.11.76");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		//2 获取C	onnection
		Connection connection = connectionFactory.newConnection();
		//3 经过Connection建立一个新的Channel
		Channel channel = connection.createChannel();
		
		//4 指定咱们的消息投递模式: 消息的确认模式 
		channel.confirmSelect();
		String exchangeName = "test_confirm_exchange";
		String routingKey = "confirm.save";
		
		//5 发送一条消息
		String msg = "Hello RabbitMQ Send confirm message!";
		channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
		
		//6 添加一个确认监听
		channel.addConfirmListener(new ConfirmListener() {
			@Override
			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
				System.err.println("-------no ack!--返回失败---------");
			}
			@Override
			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
				System.err.println("-------ack!---成功返回--------");
			}
		});
	}

Return 消息机制

  • Return Listener 用于处理一下不可路由的消息
  • 若是发送的消息指定的 Exchange或者路由Key找不到,这种不可达的消息,须要Return Listener
  • Mandatory :若是设置成True才会接收不可达消息,false的话,那么broker会自动删除该消息!

(producer)  高并发

public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.11.76");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		String exchange = "test_return_exchange";
		String routingKey = "return.save";
		String msg = "Hello RabbitMQ Return Message";

		channel.addReturnListener(new ReturnListener() {
			@Override
			public void handleReturn(int replyCode, String replyText, String exchange,
					String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
				System.err.println("---------handle  return----------");
				System.err.println("replyCode: " + replyCode);
				System.err.println("replyText: " + replyText);
				System.err.println("exchange: " + exchange);
				System.err.println("routingKey: " + routingKey);
				System.err.println("properties: " + properties);
				System.err.println("body: " + new String(body));
			}
		});
		//这里的 true 就是 Mandatory
		channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
	}

自定义消费监听

消息接收类 fetch

public class MyConsumer extends DefaultConsumer {
	
	public MyConsumer(Channel channel) {
		super(channel);
	}

	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.err.println("-----------consume message----------");
		System.err.println("consumerTag: " + consumerTag);
		System.err.println("envelope: " + envelope);
		System.err.println("properties: " + properties);
		System.err.println("body: " + new String(body));
	}
}

添加绑定(consumer)ui

public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.11.76");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		String exchangeName = "test_consumer_exchange";
		String routingKey = "consumer.#";
		String queueName = "test_consumer_queue";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		channel.basicConsume(queueName, true, new MyConsumer(channel));
	}

消费限流

  • prefetchSize:0
  • prefetchCount : 消费者接收消息上限,一旦有N个消息还没ack(处理完成),该consumer将block(阻塞)掉,直到有ack
  • global : 是否将上面设置应用于channel。就是上面限制是channel级别仍是consumer级别
  • prefetchSize 和 global这两个,rabbitmq没实现。prefetchCount  在 no_ask=false状况下生效,自动应答的状况下,这两个值不生效的。
public class MyConsumer extends DefaultConsumer {
	private Channel channel ;
	
	public MyConsumer(Channel channel) {
		super(channel);
		this.channel = channel;
	}

	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.err.println("-----------consume message----------");
		System.err.println("consumerTag: " + consumerTag);
		System.err.println("envelope: " + envelope);
		System.err.println("properties: " + properties);
		System.err.println("body: " + new String(body));
		//手动ack ,false不支持多条签收
		channel.basicAck(envelope.getDeliveryTag(), false);
	}
}

添加绑定(consumer)this

public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.11.76");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		
		String exchangeName = "test_qos_exchange";
		String queueName = "test_qos_queue";
		String routingKey = "qos.#";
		
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		channel.queueDeclare(queueName, true, false, false, null);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//1 限流方式  第一件事就是 autoAck设置为 false
		//一次只接受1条消息
		channel.basicQos(0, 1, false);
        //关闭自动签收
		channel.basicConsume(queueName, false, new MyConsumer(channel));
	}

消费端ACK与重回队列

  • 消费端重回队列是为了对没有处理成功的消息,把消息传递给Broker!
  • 实际应用中,通常都会关闭重回队列。设置成false

 添加绑定(consumer)spa

// 手工签收 必需要关闭 autoAck = false
		channel.basicConsume(queueName, false, new MyConsumer(channel));
public class MyConsumer extends DefaultConsumer {
	private Channel channel ;
	
	public MyConsumer(Channel channel) {
		super(channel);
		this.channel = channel;
	}

	@Override
	public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
		System.err.println("-----------consume message----------");
		System.err.println("body: " + new String(body));
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		if((Integer)properties.getHeaders().get("num") == 0) {
            //第一个false 是否支持多条签收,第二个true是否重回队列
			channel.basicNack(envelope.getDeliveryTag(), false, true);
		} else {
			channel.basicAck(envelope.getDeliveryTag(), false);
		}
	}
}

TTL队列/消息

  • 就是队列/消息 的生命周期
  • 设置最大数量和过时时间

Arguments 设置queue 

  • x-max-length   3000 (最大长度3000)
  • x-message-ttl   1000   (过时时间10秒)

(producer)   

public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.11.76");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		String exchange = "test_dlx_exchange";
		String routingKey = "dlx.save";
		String msg = "Hello RabbitMQ DLX Message";
		
		for(int i =0; i<1; i ++){
			AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
					.deliveryMode(2)
					.contentEncoding("UTF-8")
					.expiration("10000")//过时时间
					.build();
			channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
		}
	}

死信队列

  • DLX,Dead-Letter-Exchange
  • 当消息在一个队列中变成死信以后,他会被从新publish到另外一个Exchange,这个Exchange就是DLX
  • 变成死信队列的几种状况
    • 消息被拒绝(basic.reject/basic.nack) 而且requeue=false (手动签收)
    • TTL过时
    • 队列达到最大长度
  • 设置死信队列
    • 就是绑定一个死信队列
    • Exchange: dlx.exchange
    • Queue: dlx.queue
    • RoutingKey: #
    • arguments.put("x-dead-letter-exchange","dlx.exchange")  其实上面名称均可以随便取,只要这里绑定对应交换机就行。

(consumer) 

public static void main(String[] args) throws Exception {
		ConnectionFactory connectionFactory = new ConnectionFactory();
		connectionFactory.setHost("192.168.11.76");
		connectionFactory.setPort(5672);
		connectionFactory.setVirtualHost("/");
		
		Connection connection = connectionFactory.newConnection();
		Channel channel = connection.createChannel();
		
		// 这就是一个普通的交换机 和 队列 以及路由
		String exchangeName = "test_dlx_exchange";
		String routingKey = "dlx.#";
		String queueName = "test_dlx_queue";
		channel.exchangeDeclare(exchangeName, "topic", true, false, null);
		
		Map<String, Object> agruments = new HashMap<String, Object>();
		agruments.put("x-dead-letter-exchange", "dlx.exchange");
		//这个agruments属性,要设置到声明队列上
		channel.queueDeclare(queueName, true, false, false, agruments);
		channel.queueBind(queueName, exchangeName, routingKey);
		
		//要进行死信队列的声明:
		channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
		channel.queueDeclare("dlx.queue", true, false, false, null);
		channel.queueBind("dlx.queue", "dlx.exchange", "#");
		channel.basicConsume(queueName, true, new MyConsumer(channel));
	}