上一篇介绍了rabbitmq的安装以及监控,基础是资料网上有不少。这里仍是补充一点概念,我也没看官方文档因此说的不对还请留言指正。java
消息中间件最大的做用就是解耦,消峰,这个你们应该都知道。可是这个解耦就意味着解除依赖关系,因此有时候这种解耦会给业务带来必定的缺陷,好比:你但愿消费者消费完消息之后及时通知生成者,这时候是作不到的,消费状况和推送状况只能通知到队列,消费者和生产者是不会直接通讯的,若是你想消费完后可以通知到生产者这样就违背了消息中间件设计的原理,也就说明你不适合选择消息中间件来解决这个业务问题。spring
具体操做:apache
一、引入rabbitmq依赖,具体坐标以下:json
<!-- 配置 rabbitMq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
二、application配置缓存
#配置rabbitmq spring.application.name=springboot-rabbitmq spring.rabbitmq.host=192.168.19.8 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.rabbitmq.publisher-confirms=true#手动确认 spring.rabbitmq.virtual-host=/ spring.rabbitmq.listener.prefetch=5#每次处理5条消息 spring.rabbitmq.listener.acknowledge-mode=MANUAL #手动确认
三、生产者springboot
package com.mymall.crawlerTaskCenter.mq; import java.io.IOException; import java.util.UUID; import java.util.concurrent.TimeoutException; import javax.annotation.PostConstruct; import org.springframework.amqp.rabbit.connection.Connection; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSONObject; import com.mymall.crawlerTaskCenter.domain.CrawlerJob; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import org.springframework.amqp.rabbit.support.CorrelationData; @Component public class RabbitMqService implements RabbitTemplate.ConfirmCallback{ @Autowired RabbitTemplate rabbitTemplate; public void sendCrawlerJob(CrawlerJob crawlerJob) { String sendMsg = JSONObject.toJSONString(crawlerJob); System.out.println("Sender1 : " + sendMsg); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); this.rabbitTemplate.convertAndSend(this.rabbitTemplate.getExchange(),"crawlerJobs", sendMsg, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println(" 回调id:" + correlationData); if (ack) { System.out.println("消息成功消费"); } else { System.out.println("消息消费失败:" + cause); } } }
四、消费者app
package com.crawlerTaskCenter.rabbitMq; import java.io.IOException; import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; import org.apache.log4j.Logger; import org.reflections.Reflections; import org.reflections.util.ConfigurationBuilder; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.crawlerTaskCenter.AppInit; import com.crawlerTaskCenter.Application; import com.crawlerTaskCenter.annotation.CrawlerAnnotation; import com.crawlerTaskCenter.crawler.CrawlerBase; import com.crawlerTaskCenter.model.CrawlerJob; import com.crawlerTaskCenter.util.InetAddressUtil; import com.crawlerTaskCenter.util.ThreadPoolUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; @Configuration public class Receiver { private static Logger log = Logger.getLogger(Application.class); private static Map<String, Class<?>> crawlerClassMap=new HashMap<String, Class<?>>(); private static String returnPath; private static ThreadPoolUtil threadPool ; @Value("${server.port}") private String serverport; public Receiver(){ System.out.println("init Receiver"); } @PostConstruct public void init(){ //初始化returnPath returnPath="http://" + InetAddressUtil.getHostIP() + ":"+serverport+"/task/getMessage"; log.info("returnPath:"+returnPath); initCrawlerClassMap(); threadPool = new ThreadPoolUtil("crawlerTaskPool"); threadPool.setPoolsize(30); threadPool.setMaximumPoolSize(50); threadPool.initPool(); } @Autowired CachingConnectionFactory connectionFactory; private void initCrawlerClassMap(){ Reflections reflections = new Reflections( ConfigurationBuilder.build("com.crawlerTaskCenter.crawler")); Set<Class<?>> set = reflections.getTypesAnnotatedWith(CrawlerAnnotation.class); for (Class<?> object : set) { try { CrawlerAnnotation geccoClass = (CrawlerAnnotation) object .getAnnotation(CrawlerAnnotation.class); String key = geccoClass.key(); String type=geccoClass.type(); //缓存全部爬虫class 方便在启动线程时直接实例化 crawlerClassMap.put(key+type, object); } catch (Exception e) { e.printStackTrace(); } } } @Bean public SimpleMessageListenerContainer messageContainer() { connectionFactory.setPublisherConfirms(true); SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames("crawlerJobs");; container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setPrefetchCount(5); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认 container.setMessageListener(new ChannelAwareMessageListener() { @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); try{ JSONObject json=JSON.parseObject(new String(body)); System.out.println("receive msg : " + new String(body)); // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费 new tt(channel, message.getMessageProperties().getDeliveryTag(),json).start(); }catch(Exception ex){ ex.printStackTrace(); channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); //确认消息成功消费 } } }); return container; } class tt extends Thread{ long tag=0l; Channel channel=null; JSONObject jsonObject=null; tt(Channel channel,long tag,JSONObject jsonObject){ this.channel=channel; this.tag=tag; this.jsonObject=jsonObject; } @Override public void run() { try { Class[] paraTypes = { CrawlerJob.class }; // 从缓存中获取爬虫class Class<?> object = crawlerClassMap.get(jsonObject .getString("key") + jsonObject.getString("type")); if (object != null) { try { // 实例化 CrawlerJob CrawlerJob crawler = JSON.parseObject( jsonObject.toJSONString(), CrawlerJob.class); Object[] paras = { crawler }; Constructor<?> cons = object.getConstructor(paraTypes); CrawlerBase crawlerInstace = (CrawlerBase) cons .newInstance(paras); crawlerInstace.returnPath = returnPath; threadPool.executeCrawlerTask(crawlerInstace); } catch (Exception e) { e.printStackTrace(); } } else { log.error(jsonObject.getString("jobName") + "爬虫不存在"); } } catch (Exception e) { e.printStackTrace(); try { channel.basicNack(tag,false, false); } catch (IOException e1) { e1.printStackTrace(); } } try { channel.basicAck(tag, false); } catch (IOException e) { e.printStackTrace(); } } } }
核心代码是messageContainer方法,其余的是我真实项目中的一些逻辑(消息队列中存的是爬虫任务,根据任务启动爬虫线程),你们能够忽略。dom
总结:ide
我只用了最简单消息发送、接收、手动确认,其实rabbitmq最比较强大的消息分发策略、响应策略、事务、重试机制等。spring-boot
目前生产环境尚未上,由于生产环境中间件必须是高可用,还要进行集群部署才能发布到生产环境。
下一篇:rabbitmq集群搭建