Message Queue,消息队列,FIFO 结构。java
例如电商平台,在用户支付订单后执行对应的操做;git
优势:github
缺点spring
Java Message Service,Java消息服务,相似 JDBC 提供了访问数据库的标准,JMS 也制定了一套系统间消息通讯的规范;数据库
区别于 JDBC,JDK 原生包中并未定义 JMS 相关接口。
协做方式图示为;apache
ActiveMQ | RabbitMQ | RocketMQ | kafka | |
---|---|---|---|---|
单机吞吐量 | 万级 | 万级 | 10 万级 | 10 万级 |
可用性 | 高 | 高 | 很是高 | 很是高 |
可靠性 | 较低几率丢失消息 | 基本不丢 | 能够作到 0 丢失 | 能够作到 0 丢失 |
功能支持 | 较为完善 | 基于 erlang,并发强,性能好,延时低 | 分布式,拓展性好,支持分布式事务 | 较为简单,主要应用与大数据实时计算,日志采集等 |
社区活跃度 | 低 | 中 | 高 | 高 |
做为 Apache 下的开源项目,彻底支持 JMS 规范。而且 Spring Boot 内置了 ActiveMQ 的自动化配置,做为入门再适合不过。segmentfault
添加依赖;服务器
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.7.0</version> </dependency>
消息发送;session
// 1. 建立链接工厂 ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 2. 工厂建立链接 Connection connection = factory.createConnection(); // 3. 启动链接 connection.start(); // 4. 建立链接会话session,第一个参数为是否在事务中处理,第二个参数为应答模式 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5. 根据session建立消息队列目的地 Destination queue = session.createQueue("test-queue"); // 6. 根据session和目的地queue建立生产者 MessageProducer producer = session.createProducer(queue); // 7. 根据session建立消息实体 Message message = session.createTextMessage("hello world!"); // 8. 经过生产者producer发送消息实体 producer.send(message); // 9. 关闭链接 connection.close();
自动注入参考:org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.SimpleConnectionFactoryConfiguration
添加依赖;多线程
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
添加 yaml 配置;
spring: activemq: broker-url: tcp://localhost:61616 jms: #消息模式 true:广播(Topic),false:队列(Queue),默认时false pub-sub-domain: true
收发消息;
@Autowired private JmsTemplate jmsTemplate; // 接收消息 @JmsListener(destination = "test") public void receiveMsg(String msg) { System.out.println(msg); } // 发送消息 public void sendMsg(String destination, String msg) { jmsTemplate.convertAndSend(destination, msg); }
基于 zookeeper 实现主从架构,修改 activemq.xml 节点 persistenceAdapter 配置;
<persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/levelDB" replicas="3" bind="tcp://0.0.0.0:0" zkAddress="172.17.0.4:2181,172.17.0.4:2182,172.17.0.4:2183" zkPath="/activemq/leveldb-stores" hostname="localhost" /> </persistenceAdapter>
broker 地址为:failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false
在高可用集群节点 activemq.xml 添加节点 networkConnectors;
<networkConnectors> <networkConnector uri="static:(tcp://192.168.0.103:61616,tcp://192.168.0.103:61617,tcp://192.168.0.103:61618)" duplex="false"/> </networkConnectors>
更多详细信息可参考: https://blog.csdn.net/haoyuya...
因为发布订阅模式,全部订阅者都会接收到消息,在生产环境,消费者集群会产生消息重复消费问题。
ActiveMQ 提供 VirtualTopic 功能,解决多消费端接收同一条消息的问题。于生产者而言,VirtualTopic 就是一个 topic,对消费而言则是 queue。
在 activemq.xml 添加节点 destinationInterceptors;
<destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name="testTopic" prefix="consumer.*." selectorAware="false"/> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors>
生产者正常往 testTopic 中发送消息,订阅者可修改订阅主题为相似 consumer.A.testTopic 这样来消费。
更多详细信息可参考: https://blog.csdn.net/java_co...
是一个队列模型的消息中间件,具备高性能、高可靠、高实时、分布式特色。
Name Server
名称服务器,相似于 Zookeeper 注册中心,提供 Broker 发现;
Broker
RocketMQ 的核心组件,绝大部分工做都在 Broker 中完成,接收请求,处理消费,消息持久化等;
Producer
消息生产方;
Consumer
消息消费方;
安装后,依次启动 nameserver 和 broker,能够用 mqadmin 管理主题、集群和 broker 等信息;
添加依赖;
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency>
消息发送;
DefaultMQProducer producer = new DefaultMQProducer("producer-group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setInstanceName("producer"); producer.start(); Message msg = new Message( "producer-topic", "msg", "hello world".getBytes() ); //msg.setDelayTimeLevel(1); SendResult sendResult = producer.send(msg); System.out.println(sendResult.toString()); producer.shutdown();
delayLevel 从 1 开始默认依次是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。
参考 org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel。
消息接收;
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setInstanceName("consumer"); consumer.subscribe("producer-topic", "msg"); consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> { for (MessageExt msg : list) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start();
.\mqadmin.cmd sendMessage -t producer-topic -c msg -p "hello rocketmq" -n localhost:9876
添加依赖;
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency>
添加 yaml 配置;
rocketmq: name-server: 127.0.0.1:9876 producer: group: producer
发送消息;
@Autowired private RocketMQTemplate mqTemplate; public void sendMessage(String topic, String tag, String message) { SendResult result = mqTemplate.syncSend(topic + ":" + tag, message); System.out.println(JSON.toJSONString(result)); }
接收消息;
@Component @RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test") public class MsgListener implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println(message); } }
RocketMQ 拓展包提供了管理控制台;
https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console
产生缘由:
怎么解决重复消费的问题,换句话怎么保证消息消费的幂等性。
一般基于本地消息表的方案实现,消息处理过便再也不处理。
消息错乱的缘由:
要保证消息的顺序消费,有三个关键点:
参考 RocketMq 中的 MessageQueueSelector 和 MessageListenerOrderly。
在分布式系统中,一个事务由多个本地事务组成。这里介绍一个基于 MQ 的分布式事务解决方案。
经过 broker 的 HA 高可用,和定时回查 prepare 消息的状态,来保证最终一致性。