yml 配置 html
server: port: 8154 tomcat: uri-encoding: UTF-8 # max-http-post-size: # accept-count: # 线程数达到最大时,接受排队的请求个数,默认值为100 # max-connections: # max-threads: #最大线程数,即同时处理的任务个数,默认值为200 servlet: context-path: /demo # session: # timeout: 800 #(秒) logging: config: classpath:dev-logback.xml spring: profiles: active: dev http: encoding: charset: UTF-8 enabled: true force: true messages: encoding: UTF-8 thymeleaf: cache: true encoding: UTF-8 suffix: .html servlet: content-type: text/html rabbitmq: host: xxxx port: xxxx username: xxxx password: xxxx virtual-host: / publisher-confirms: true # publisher-returns: false # listener: # #type: simple # direct: # prefetch: 5 # 一次请求中预处理的消息数量 # acknowledge-mode: auto # spring 自动应答,若是抛出异常将回滚 # simple: # acknowledge-mode: auto # spring 自动应答,若是抛出异常将回滚 # prefetch: 5 # 一次请求中预处理的消息数量,一次读取的数量 # concurrency: 5 # 消费端最小并发数,线程池初始数量 # max-concurrency: 10 # 消费端最大并发数,线程池最大数量 demo: test-queue-name: canaan-test-queue
java conifg java
@Configuration @EnableRabbit public class RabbitMQConfigurer { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConfigurer.class); @Autowired private ConnectionFactory connectionFactory; /** * 默认的线程池 * * @return */ @Bean @Primary @Qualifier("rabbitExecutor") public Executor rabbitExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(3);/*核心线程数*/ executor.setMaxPoolSize(20);/*最大线程数*/ executor.setQueueCapacity(30000);/*队列大小*/ executor.setKeepAliveSeconds(60);/* 某线程空闲超过1分钟,就回收该线程*/ //executor.setAllowCoreThreadTimeOut(true); // KeepAliveSeconds 设置也做用于【核心线程数】 executor.setThreadNamePrefix("rabbitExecutor-"); //executor.setThreadFactory((r) -> { // LOGGER.info("-------:run"); // return new Thread(r); //}); //executor.setThreadFactory(traceableThreadFactory); executor.initialize(); return executor; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { //必须是prototype类型 RabbitTemplate template = new RabbitTemplate(this.connectionFactory); template.setEncoding("utf-8"); template.setTaskExecutor(this.rabbitExecutor()); //doSendAndReceive 时用到 template.setMessageConverter(new Jackson2JsonMessageConverter()); //template.setReceiveTimeout(); //template.setReplyTimeout(); return template; } /** * 【交易事件】交换机 * * @author Canaan * @date 2019/4/24 21:04 */ @Bean public TopicExchange testExchange() { return new TopicExchange("canaan-test-exchange", false, true); } @Bean public Queue testQueue() { return new Queue("canaan-test-queue", false, false, true); } @Bean public Binding noticeReconciliationBinding() { return BindingBuilder.bind(testQueue()).to(testExchange()).with("test.#"); } @Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } }
消息生产spring
@Component public class RabbitPub implements RabbitTemplate.ConfirmCallback { private final RabbitTemplate rabbitTemplate; @Autowired private TopicExchange testExchange; @Autowired public RabbitPub(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); } public void pub() throws InterruptedException { for (int i = 0; i < 20; i++) { String uuid = UUID.randomUUID().toString().replace("-", ""); MyMsg myMsg = new MyMsg(); myMsg.setCode("currentTime:" + LocalDateTime.now().toString()); myMsg.setText(String.valueOf(i)); final CorrelationData correlationId = new CorrelationData(uuid); this.rabbitTemplate.convertAndSend(this.testExchange.getName(), "test." + uuid.substring(0, 3), myMsg, correlationId); //TimeUnit.SECONDS.sleep(1); } } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息成功消费"); } else { System.out.println("消息消费失败:" + cause); } //System.out.println("RabbitPub== correlationData - " + correlationData + " ack - " + ack + " cause - " + cause); } }
消息消费tomcat
/** * rabbit 订阅交易事件 * <br> * <p> * channel.basicAck(tag, multiple); * <p> * channel.basicNack(tag, multiple, requeue); * <p> * channel.basicReject(envelope.getDeliveryTag(), false); * <p> * channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); * channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); */ //@RabbitListener(queues = "canaan-test-queue") //queue name //@RabbitListener(queues = "#{'canaan-test-queue'}") //SpEL queue name //@RabbitListener(queues = "#{testQueue}") //SpEL queue bean //@RabbitListener(queues = "${demo.test-queue-name}") //property-placeholder keys queue name @Component @RabbitListener(queues = "#{testQueue}") public class RabbitSubI { // String payload , Channel channel ,Message message // import org.springframework.amqp.core.Message; // @Header //@Headers Map<String,Object> headers @RabbitHandler public void process(@Payload MyMsg myMsg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { //System.out.println("RabbitSubI object handle...."); System.out.println(myMsg); TimeUnit.SECONDS.sleep(3); } /** * 根据不一样的内容,来处理不一样的消息 * * @author Canaan * @date 2019/6/25 11:20 */ @RabbitHandler public void process(@Payload String payload, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { System.out.println("RabbitSubI string handle...."); System.out.println(payload); } /** * 根据不一样的内容,来处理不一样的消息 * * @author Canaan * @date 2019/6/25 11:20 */ @RabbitHandler public void processMessage2(@Payload byte[] message) { System.out.println("RabbitSubI byte handle...."); System.out.println(new String(message)); } }