大厂处理方案java
就是相似原子性,高并发下,不出现重复消费。redis
方案:乐观锁、惟一id+指纹锁机制,利用数据库去重、redis原子性数据库
通常在消费端处理并发
(producer) ide
public static void main(String[] args) throws Exception { //1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); //2 获取C onnection Connection connection = connectionFactory.newConnection(); //3 经过Connection建立一个新的Channel Channel channel = connection.createChannel(); //4 指定咱们的消息投递模式: 消息的确认模式 channel.confirmSelect(); String exchangeName = "test_confirm_exchange"; String routingKey = "confirm.save"; //5 发送一条消息 String msg = "Hello RabbitMQ Send confirm message!"; channel.basicPublish(exchangeName, routingKey, null, msg.getBytes()); //6 添加一个确认监听 channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------no ack!--返回失败---------"); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.err.println("-------ack!---成功返回--------"); } }); }
(producer) 高并发
public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_return_exchange"; String routingKey = "return.save"; String msg = "Hello RabbitMQ Return Message"; channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("---------handle return----------"); System.err.println("replyCode: " + replyCode); System.err.println("replyText: " + replyText); System.err.println("exchange: " + exchange); System.err.println("routingKey: " + routingKey); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }); //这里的 true 就是 Mandatory channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes()); }
消息接收类 fetch
public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }
添加绑定(consumer)ui
public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_consumer_exchange"; String routingKey = "consumer.#"; String queueName = "test_consumer_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); channel.basicConsume(queueName, true, new MyConsumer(channel)); }
public class MyConsumer extends DefaultConsumer { private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); //手动ack ,false不支持多条签收 channel.basicAck(envelope.getDeliveryTag(), false); } }
添加绑定(consumer)this
public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String routingKey = "qos.#"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //1 限流方式 第一件事就是 autoAck设置为 false //一次只接受1条消息 channel.basicQos(0, 1, false); //关闭自动签收 channel.basicConsume(queueName, false, new MyConsumer(channel)); }
添加绑定(consumer)spa
// 手工签收 必需要关闭 autoAck = false channel.basicConsume(queueName, false, new MyConsumer(channel));
public class MyConsumer extends DefaultConsumer { private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("body: " + new String(body)); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } if((Integer)properties.getHeaders().get("num") == 0) { //第一个false 是否支持多条签收,第二个true是否重回队列 channel.basicNack(envelope.getDeliveryTag(), false, true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }
Arguments 设置queue
(producer)
public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_dlx_exchange"; String routingKey = "dlx.save"; String msg = "Hello RabbitMQ DLX Message"; for(int i =0; i<1; i ++){ AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000")//过时时间 .build(); channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes()); } }
(consumer)
public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.11.76"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 这就是一个普通的交换机 和 队列 以及路由 String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.#"; String queueName = "test_dlx_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", "dlx.exchange"); //这个agruments属性,要设置到声明队列上 channel.queueDeclare(queueName, true, false, false, agruments); channel.queueBind(queueName, exchangeName, routingKey); //要进行死信队列的声明: channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); channel.queueDeclare("dlx.queue", true, false, false, null); channel.queueBind("dlx.queue", "dlx.exchange", "#"); channel.basicConsume(queueName, true, new MyConsumer(channel)); }