目前规划原则:css
topic 建立基于业务 消费者基于模块 多对多关系 且消费本身的topic 不会影响别人 topic n↔n CIDhtml
topic | tag | 使用模块/消费者CID | 说明 | 消费方式 | 负责人 |
---|---|---|---|---|---|
主题:行程 topic: topic_dev_xxx_trip |
trip.* |
orderdisp/ CID_DEV_xxx_TRIP | 行程变动使用 |
集群消费 (无序) |
蒋飞 |
主题:坐标 topic: topic_dev_xxx_coord |
coord_notice |
coord/CID_DEV_xxx_COORD |
结算信息和语音播报 |
广播消费 (无序) |
付稳稳 |
主题: 首次登录 topic: topic_dev_xxx_broadcast |
uc_passenger_first_login |
|
已废弃 首登送券 |
广播消费(无序) |
孙金新 |
uc_passenger_first_login |
coupon/CID_DEV_xxx_COUPON (已建) |
代替上面 首登送券 |
集群消费 (无序) |
孙金新 |
|
主题: 支付完成 topic: topic_dev_xxx_pay_broadcast
|
pay_notice |
coupon / CID_DEV_xxx_COUPON (已建) |
支付完成 消费优惠券 |
集群消费 (无序) |
孙金新 |
pay_notice |
orderplace /CID_DEV_xxx_PLACE (已建) |
支付完成 更新订单 |
集群消费 (无序) |
张开文 |
|
pay_notice |
ordertaking /CID_DEV_xxx_TAKING(已建) |
支付完成 更新行程/订单状态→ 坐标 |
集群消费 (无序) |
栗东星 |
|
pay_notice |
bill /CID_DEV_xxx_BILL |
支付完成更新帐单状态 |
集群消费 (无序) |
刘明涛 |
|
pay_notice |
datawarehouse/CID_DEV_xxx_DATAWARE |
支付完成 数据统计 |
集群消费 (无序) |
王德成 |
|
refund_notice |
datawarehouse/CID_DEV_xxx_DATAWARE |
退款完成 数据统计 |
集群消费 (无序) |
王德成 |
|
主题: 顺序更新行程状态 topic: topic_dev_xxx_order |
update_trip |
orderplace/CID_DEV_xxx_ORDER |
更新行程信息 |
集群消费 (有序) |
张开文 |
MQ: 使用原则和规范:java
正确的顺序: 是先启动Consumer 后再启动producer。git
MQ 使用状况总结: github
未受权CID 为topic的消费者时 由于子帐号有订阅消费权限 因此 子帐号建立的 CID 能够订阅和消费 topic 可是不影响其余模块(其余 CID)消费
可行方案一: web
每一个模块实例都使用同一个子帐号(Access/Secret相同) 不一样模块使用使用同一个 CID时, 须要作到 消息订阅一致性( 同一 CustomerID 的全部使用的模块 订阅的 topic和 tag 数量须要彻底一致 )spring
可行方案二: (我的推荐方案)json
每一个模块实例都使用同一个子帐号(Access/Secret相同) 每一个模块单独分配本身的CID(同一子帐号dev 统一建立), 模块之间数据隔离, 要求各个模块自能用本身的 CID 且不要订阅本身模块不应订阅的 topic 和 tag (缺点:同一个子帐号dev [Access/Secret相同] 订阅谁能够消费谁(可是不响应其余模块) 只要子帐号被受权的 topic 每一个CID 均可以订阅该topic)api
1.主帐号登陆并建立topic服务器
2.受权订阅权限给子帐号(帐号不能访问未受权的 topic)
3.子帐号登陆 topic管理中建立本身帐号下的CID
4.程序中使用 同一子帐号(Access/Secret) 可是本身模块的 CID 消费消息 相互不影响
最强隔离方案:
每一个模块实例都使用不一样子帐号(Access/Secret不一样) 每一个模块单独分配本身子帐号建立的CID.这样模块之间能够保障不能相互订阅和消费.
1.主帐号登陆并建立topic
2.受权订阅权限给子帐号(帐号不能访问未受权的 topic)
3.子帐号登陆 topic管理中建立本身帐号下的CID
4.程序中使用 不一样子帐号(Access/Secret不一样)下本身模块的 CID 消费消息 相互不影响
topic n↔n CID 图解:
https://pic2.zhimg.com/80/v2-b6ed65f370a766620718ad4227d5d4e5_hd.jpg
下
奉上 官方文档: https://help.aliyun.com/document_detail/34411.html?spm=a2c4g.11186623.4.5.565f7b25vcsskW
官方DEMO: https://github.com/AliwareMQ/mq-demo?spm=a2c4g.11186623.2.14.578018aaaNZL17
RocketMQ源码分析辅助: https://www.processon.com/view/5a6eb653e4b05680c3e94fec
我的梳理有限:欢迎你们 丰富此文档
测试用例:
import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.*;
import com.xxx.engine.ui.controller.pay.base.BaseTest;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.web.WebAppConfiguration;
import java.util.Properties;
/**
*
* topic:
*
* topic_dev_xxx_trip_tomas
* topic_dev_xxx_pay_tomas
*
* CID:
*
* CID_DEV_xxx_TRIP_TOMAS 做为 topic: trip_tomas 和pay_tomas 消费者
* CID_DEV_xxx_COORD_TOMAS 做为 topic: trip_tomas 和pay_tomas 消费者
* CID_DEV_xxx_TAKING_TOMAS 做为 topic: trip_tomas 消费者
* CID_DEV_xxx_ORDER_TOMAS 做为 topic: pay_tomas 消费者
* CID_DEV_xxx_COUPON_TOMAS 只创建不须要受权 topic
*/
@Slf4j
@SpringBootTest(classes={PayMQTest.class})
@ComponentScan(basePackages = {"com.xxx.engine"})
@TestPropertySource("classpath:engine_common.properties")
@WebAppConfiguration
public class PayMQTest extends BaseTest {
@Value("${xxx.mq.tag.pay.notice:pay_notice}")
private String MQ_TAG_PAY;
private void sendTestMQ(String topic, String tag) {
Properties properties = new Properties();
// 您在MQ控制台建立的Producer ID
properties.put(PropertyKeyConst.ProducerId, "PID_xxx_DEV");
// 鉴权用AccessKey,在阿里云服务器管理控制台建立
properties.put(PropertyKeyConst.AccessKey,"LTAIBvzNg84oa7bM");
// 鉴权用SecretKey,在阿里云服务器管理控制台建立
properties.put(PropertyKeyConst.SecretKey, "5VynbpH5JmRXTshkSbTlLdsbMErHF7");
// 设置 TCP 接入域名(此处以公共云的公网接入为例)
properties.put(PropertyKeyConst.ONSAddr,"http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次便可
producer.start();
//循环发送消息
for (int i=10000;i<10005;i++){
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = topic.concat(tag).concat("消息内容:").concat(String.valueOf(i));
Message msg = new Message( //
// 在控制台建立的Topic,即该消息所属的Topic名称
topic,
// Message Tag,
// 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
tag,
// Message Body
// 任何二进制形式的数据, MQ不作任何干预,
// 须要Producer与Consumer协商好一致的序列化和反序列化方式
message.getBytes());
// 设置表明消息的业务关键属性,请尽量全局惟一,以方便您在没法正常收到消息状况下,可经过MQ控制台查询消息并补发
// 发送消息,只要不抛异常就是成功
// 打印Message ID,以便用于消息发送状态查询
SendResult sendResult = producer.send(msg);
System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId());
}
// 在应用退出前,能够销毁Producer对象
producer.shutdown();
}
/**
* 持续发送1w 条 MQ 消息
* @throws Exception
*/
@Test
public void sendMQ() throws Exception {
sendTestMQ("topic_dev_xxx_trip_tomas", MQ_TAG_PAY);
sendTestMQ("topic_dev_xxx_pay_tomas", MQ_TAG_PAY);
}
/**
* 子帐号dev 子帐号(普通权限用户) dev Access/Secret 的身份的登陆
* CID 由子帐号建立 并订阅topic (本例:topic_dev_xxx_trip_tomas和topic_dev_xxx_pay_tomas)
* 启动实例 消费者在线 且能够接收消息
* @throws Exception
*/
@Test
public void WithAuthCID() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TRIP_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.subscribe("topic_dev_xxx_pay_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithAuthCID Started.");
Thread.sleep(1000000000000l);
}
/**
* 子帐号dev 子帐号(普通权限用户) dev Access/Secret 的身份的登陆
* CID 由子帐号建立 并订阅topic (本例:topic_dev_xxx_trip_tomas和topic_dev_xxx_pay_tomas)
* 启动实例 消费者在线 且能够接收消息
* PS. CID_DEV_xxx_TRIP_TOMAS 同一个 CID 能够启动多个实例 可是必须保证 每一个实例订阅的 topic 和 tag 一致 否则会违反消息一致性原则 致使消息消费混乱
* @throws Exception
*/
@Test
public void WithAuthCIDSecond() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TRIP_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
//正确方式(和上个实例同样) 消费成功
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.subscribe("topic_dev_xxx_pay_tomas", MQ_TAG_PAY, getMessageListener());
//错误方式(和上个实例不同) 消费失败 违反订阅关系一致性
consumer.subscribe("topic_dev_xxx_pay_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("WithAuthCIDSecond Started.");
Thread.sleep(1000000000000l);
}
/**
* 子帐号dev 子帐号(普通权限用户) dev Access/Secret 的身份的登陆
* CID 由子帐号建立 并订阅topic (本例:topic_dev_xxx_trip_tomas)
* 启动实例 消费者在线 且能够接收消息
* PS. CID_DEV_xxx_TAKING_TOMAS 做为topic_dev_xxx_trip_tomas的消费者 不影响其余模块CID 订阅和消费任何topic
* @throws Exception
*/
@Test
public void WithAuthCID3() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TAKING_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithOutAuthCID3 Started.");
Thread.sleep(1000000000000l);
}
/**
* 子帐号dev 以子帐号(普通权限用户) devAccess/Secret 的身份的登陆
* 启动实例 消费者在线 且能够接收消息 而且能够突破订阅限制 订阅谁能够消费谁 但仅限于消费(子帐号) 被受权的 topic
*
* 子帐号CID不是 topic指定的消费者 强制做为为topic的消费者时 由于子帐号有订阅消费权限 因此 子帐号建立的 CID 能够订阅和消费 topic 可是不影响其余模块(其余 CID)
* @throws Exception
*/
@Test
public void WithOutAuthCID() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_COUPON_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithOutAuthCID Started.");
Thread.sleep(1000000000000l);
}
/**
* 子帐号CID不是 topic指定的消费者 强制做为为topic的消费者时 由于子帐号有订阅消费权限 因此 子帐号建立的 CID 能够订阅和消费 topic 可是不影响其余模块(其余 CID)
* @throws Exception
*/
@Test
public void WithOutAuthCID2() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_ORDER_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithOutAuthCID2 Started.");
Thread.sleep(1000000000000l);
}
/**
* 强隔离方案:
* 每一个模块实例都使用不一样子帐号(Access/Secret不一样) 每一个模块单独分配本身子帐号建立的CID.这样模块之间能够保障不能相互订阅和消费.
* 1.主帐号登陆并建立topic
* 2.受权订阅权限给子帐号(帐号不能访问未受权的 topic)
* 3.子帐号登陆 topic管理中建立本身帐号下的CID
* 4.程序中使用 不一样子帐号(Access/Secret不一样)下本身模块的 CID 消费消息 相互不影响
*
* @throws Exception
*/
@Test
public void recivePayMQByCoodAccountWithAuthCID() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TEST");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("recivePayMQByCoodAccountWithAuthCID Started.");
//Thread.sleep(1000000000000l);
}
private MessageListener getMessageListener(){
return new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
try {
log.info("receive message ={} ConsumeContext={}:", JSON.toJSONString(message), JSON.toJSONString(consumeContext));
} catch (Exception e) {
log.error("{}", e);
return Action.ReconsumeLater;
}
return Action.CommitMessage;
}
}; }