MQ在全部项目里面都很常见,java
一、减小非紧急性任务对整个业务流程形成的延时;spring
二、减小高并发对系统所形成的性能上的影响;docker
举例几个场景:springboot
一、给注册完成的用户派发优惠券、加积分、发消息等(派发优惠券、加积分、发消息这些属于非紧急性任务,可交由MQ进行处理,先让用户完成注册)并发
二、实时收集用户运动数据,而且收集数据后还须要比较复杂和耗时的操做才能完成业务处理(实时的数据采集任务通常并发量都是很高的,咱们就应该先发送到MQ,再进行有序的处理)app
另外说明一下,高并发的问题有不少种处理手段,而MQ是我认为的最稳健、简单的手段之一,因此我会优先使用分布式
首先用docker安装RabbitMQ,快捷ide
进入控制台,下图简单介绍下各个功能模块高并发
大体流程性能
一、发送消息到MQ
二、MQ接收并保存消息等待消费
三、消费者有序地进行消息处理
springboot中使用MQ超级简单
一、配置
#RabbitMq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=test
spring.rabbitmq.listener.simple.retry.max-attempts=5
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.retry.initial-interval=5000
spring.rabbitmq.listener.simple.default-requeue-rejected=false
spring.rabbitmq.listener.simple.prefetch=1 #这个根据本身业务状况来定
二、发送
@Autowired private AmqpTemplate amqpTemplate; @Test public void testMq(){ User user = new User(); user.setName("cjh陈"); amqpTemplate.convertAndSend("exchange_01","que_01", JSON.toJSONString(JSONResult.SUCC("",user))); }
三、消费
@RabbitListener(containerFactory = "containerFactory",concurrency = "10",bindings = {@QueueBinding(declare = "false",value = @Queue(value = "que_01"), exchange = @Exchange(declare = "false",value = "exchange_01",type = "fanout"))}) public void process(Message message, com.rabbitmq.client.Channel channel) { Long deliveryTag = null; String data = null; try { deliveryTag = message.getMessageProperties().getDeliveryTag(); data = new String(message.getBody(), "UTF-8"); //你的业务处理 test01(data); } catch (Exception e) { logger.warn("MQ异常 {} , {} , {}" , e.getMessage(),e.getCause(),data); }finally { logger.warn("======消息处理结束"); } try { channel.basicAck(deliveryTag, false); // 确认消息成功消费(配置须要开启消费确认模式) } catch (IOException e) { logger.warn("======MQ应答出错,请检查"); } }
3步完成使用
特别说明几点
一、AmqpTemplate 好像作不到发布确认,要用RabbitTemplate,发布确认我主要用在分布式事务的场景
二、containerFactory能够不配置,根据实际状况来,下面再说明
三、concurrency指并发量,实时性要求很高的话,prefetch应该设低点或者设置成1,concurrency的值调高点
简单画了一下,,,留着本身看,,
四、declare = "false",很经常使用,意思就是说加入你MQ控制台里面已经新建好队列或者交换机了,这里就应该配false表示程序再也不进行从新定义,否则容易发生报错(当从新定义的参数与已定义的参数不一致时就会报错)
五、权限方面的处理之后再记录,,
回到上面第2点,containerFactory何时会用到?
某些配置须要自定义,好比线程池的大小,
当concurrency数值放大的时候,好比100,我发现大部分的消费者并无工做,这是由于被线程池的大小所限制,网上的人说线程池大小默认是50,我也没去查估计差很少也就这个数,那么这个时候咱们就须要自定义的containerFactory:
package xxx; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author cjh * @Package xxx * @Description: * @date: 2019/6/30 20:19 */ @Configuration @EnableRabbit public class MqContainerFactory implements RabbitListenerConfigurer { /** * containerFactory * @Description: 自定义配置 * @param * @return */ @Bean public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory) throws Exception{ SimpleRabbitListenerContainerFactory factory=new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); //必须是concurrency的两倍以上 ExecutorService service=Executors.newFixedThreadPool(200); factory.setTaskExecutor(service); factory.setPrefetchCount(1); return factory; } @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setMessageConverter(new MappingJackson2MessageConverter()); return factory; } @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); } }
有错欢迎指正,转载请注明博客出处:http://www.cnblogs.com/cjh-notes/