消息队列 RocketMQ 是阿里巴巴集团自主研发的专业消息中间件。 产品基于高可用分布式集群技术,提供消息订阅和发布、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。 消息队列 RocketMQ 历史超过9年,为分布式应用系统提供异步解耦、削峰填谷的能力,同时具有海量消息堆积、高吞吐、可靠重试等互联网应用所需的特性,是阿里巴巴双11使用的核心产品。java
打开阿里云产品,找到 rocketMQspring
这里须要咱们根据须要开通包年仍是包月服务,开通成功后进入控制台json
根据提示建立实例、建立Topics、建立Groupspringboot
建立好了以后,打开 Topic 管理,手动发送一条消息架构
能够看到发送成功后会返回信息的 messageIDapp
首先引入 pom 异步
<!--消息队列 RocketMQ--> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.7.9.Final</version> </dependency>
定义 rocketMQ 配置分布式
@Configuration public class RocketMQConfig { public Properties getProperties(){ Properties properties=new Properties(); /** * 键的首字母必须大写 */ properties.setProperty("AccessKey","**"); // properties.setProperty("SecretKey","**"); // properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000"); // 顺序消息消费失败进行重试前的等待时间,单位(毫秒) properties.put(PropertyKeyConst.SuspendTimeMillis, "100"); // 消息消费失败时的最大重试次数 properties.put(PropertyKeyConst.MaxReconsumeTimes, "20"); // properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://MQ_INST_1944503281593155_BaOTPbFU.mq-internet-access.mq-internet.aliyuncs.com:80"); return properties; } }
AccessKey、SecretKey 可在阿里云我的信息中找到ide
NAMESRV_ADDR 是实例的接入点测试
定义消息发送者
@Component public class RocketMQProducer { @Autowired private RocketMQConfig rocketMQConfig; /** * 一、发送普通消息 * * @param message * @return */ public boolean sendNormalMessage(Message message,String groupId) { Properties properties=rocketMQConfig.getProperties(); properties.setProperty(PropertyKeyConst.GROUP_ID,groupId); Producer producer = ONSFactory.createProducer(properties); // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次便可 producer.start(); try { SendResult sendResult = producer.send(message); // 同步发送消息,只要不抛异常就是成功 if (sendResult != null) { System.out.println("消息发送成功:messageID:"+sendResult.getMessageId()); return true; } } catch (Exception e) { // 消息发送失败,须要进行重试处理,可从新发送这条消息或持久化这条数据进行补偿处理 e.printStackTrace(); } return false; } }
定义消息消费者
@Component public class RocketMQConsumer { @Autowired private RocketMQConfig rocketMQConfig; /** * 一、普通订阅 * * @param */ public void normalSubscribe( ) { Properties properties = rocketMQConfig.getProperties(); properties.put(PropertyKeyConst.GROUP_ID, "GID-test"); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe("test", "*", new MessageListener() { @Override public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + new String(message.getBody())); //把消息转化为java对象 //JSONObject jsonObject=JSONObject.parseObject(jsonString); //Book book= jsonObject.toJavaObject(Book.class); return Action.CommitMessage; } }); consumer.start(); } }
测试类
@Autowired private RocketMQProducer rocketMQProducer; @Autowired private RocketMQConsumer rocketMQConsumer; //发送信息 @RequestMapping("/send") public String send(String msg){ // test 是建立的topic是名称, tag 是消息的二级分类,能够填空 Message message=new Message("test","tag",msg.getBytes()); // GID-test 是 发送信息组ID rocketMQProducer.sendNormalMessage(message,"GID-test"); return "ok"; } //接收信息 @RequestMapping("/receive") public String receive(){ rocketMQConsumer.normalSubscribe(); return "ok"; }
启动项目,访问 send 和 receive,控制台打印以下
消息发送成功:messageID:C0A83292361818B4AAC23C548787000F Receive: 测试
到这里说明整合成功。最后咱们只须要在启动项目的时候启动消费者。spring 监听器能够实现,或者能够经过实现接口 CommandLineRunner
@Component public class RocketConsumerListener implements CommandLineRunner { @Autowired private RocketMQConsumer rocketMQConsumer; @Override public void run(String... args) throws Exception { System.out.println("========rocketMQ消费者启动=========="); rocketMQConsumer.normalSubscribe(); } }
这样在启动项目的时候消费者也被启动。到此springboot和rocketMQ的整合就完成啦。