消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通讯来进行分布式系统的集成。经过提供消息传递和消息排队模型,它能够在分布式环境下扩展进程间的通讯。java
在这里面,关键的部分是“消息传递”和“消息排队”,能够保证事件的顺序性,也能够在高并发下使用。git
执行过程长,且不须要返回结果的功能,能够利用MQ传递(MQ的异步通讯特征)github
JMS(Java Message Service),是一套接口规范,在jdk中已定义好接口(相似于JDBC,只有JDBC没法操做数据库,须要具体的驱动来实现功能)。web
JMS的传递模式很是像观察者模式的思路:spring
定义对象间的一种一对多的依赖关系,让多个观察者同时监听某一个主题现象,当一个对象的状态发生改变时,会通知全部观察者对象,全部依赖于它的对象都获得通知并被自动更新。数据库
观察者模式——https://my.oschina.net/LinkedBear/blog/1791975apache
消息传递的方式有两种:数组
|
|
|
引用文章图片:http://www.javashuo.com/article/p-gqzlvfbx-p.html架构
|
选用阿里巴巴的RocketMQ(现已被Apache接手),搭建Demo工程并发
参考文档:http://rocketmq.apache.org/docs/simple-example/
7.1 安装RocketMQ
参考文章:https://www.jianshu.com/p/4a275e779afa
从Apache的官网上下载运行包 |
配置环境变量 |
依次运行mqnamesrv.cmd脚本和mqbroker.cmd脚本 |
从https://github.com/apache/rocketmq-externals.git下载监控插件,并解压 |
进入“rocketmq-console\src\main\resources”文件夹,打开“application.properties”进行配置 |
进入“rocketmq-console”文件夹,执行“mvn clean package -Dmaven.test.skip=true”,编译生成 |
进入“target”文件夹,执行“java -jar rocketmq-console-ng-1.0.0.jar”,启动“rocketmq-console-ng-1.0.0.jar”(此jar为SpringBoot项目) |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.linkedbear</groupId> <artifactId>MQ-Demo</artifactId> <version>0.0.1-SNAPSHOT</version> <properties> <rocketmq.version>4.3.0</rocketmq.version> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.0.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- RocketMQ --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${rocketmq.version}</version> </dependency> <!-- 热部署 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
/** * 生产者Controller * @Title ProducerController * @author LinkedBear * @Time 2018年8月2日 下午3:22:02 */ @Controller public class ProducerController { //此分组名必须保证全局惟一(考虑到负载均衡等后续问题),故封装为静态常量 public static final String PRODUCE_GROUP_NAME = "TestGroup"; //MQ的运行地址 public static final String MQ_IP = "127.0.0.1:9876"; @RequestMapping("/produceMessage") @ResponseBody public Map<String, Object> produceMessage() throws Exception { //1. 建立生产者链接(相似于JDBC中的Connection),要传入MQ的分组名 DefaultMQProducer producer = new DefaultMQProducer(PRODUCE_GROUP_NAME); //2. 设置MQ的运行地址 producer.setNamesrvAddr(MQ_IP); //3. 开启链接 producer.start(); //4. 构造消息(重载方法较多,此处选择topic, tag, message的三参数方法) Message message = new Message("test_topic", "test_tag", ("test_message。。。" + Math.random()).getBytes()); //5. 发送消息,该方法会返回一个发送结果的对象 SendResult result = producer.send(message); System.out.println(result.getSendStatus()); //6. 关闭链接 producer.shutdown(); //此处将发送结果显示在页面上,方便查看 Map<String, Object> map = new HashMap<>(); map.put("消息", result.getSendStatus()); return map; } }
/** * 消费者Controller * @Title ConsumerController * @author LinkedBear * @Time 2018年8月2日 下午3:22:11 */ @Controller public class ConsumerController { @RequestMapping("/getMessage") @ResponseBody public void getMessage() throws Exception { //1. 建立消费者链接,要传入MQ的分组名,该分组名在ProducerController中 //此处建立的是pushConsumer,它使用监听器,给人的感受是消息被推送的 //pullConsumer,取消息的过程须要本身写 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(ProducerController.PRODUCE_GROUP_NAME); //2. 设置MQ的运行地址 consumer.setNamesrvAddr(ProducerController.MQ_IP); //3. 设置消息的提取顺序 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //4. 设置消费者接收消息的Topic和Tag,此处对Tag不做限制 consumer.subscribe("test_topic", "*"); //5. 使用监听器接收消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt messageExt : msgs) { String message = new String(messageExt.getBody(), "utf-8"); System.out.println("收到消息【主题:" + messageExt.getTopic() + ", 正文:" + message + "】"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { //转换出现问题,稍后从新发送 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); //6. 启动消费者 consumer.start(); } }
执行http://localhost:8080/produceMessage:
|
|
执行http://localhost:8080/getMessage: |