MQ 使用状况梳理

我的梳理有限:欢迎你们 丰富此文档

目前规划原则:css

         topic 建立基于业务  消费者基于模块 多对多关系 且消费本身的topic 不会影响别人  topic n↔n  CIDhtml

基于业务topic 的分布表格: (后续有模块更新请自行更新文档或者联系我补上)

topic tag 使用模块/消费者CID 说明 消费方式 负责人
主题:行程
topic: topic_dev_xxx_trip

trip.*

orderdisp/ CID_DEV_xxx_TRIP

行程变动使用

集群消费 (无序)

蒋飞

主题:坐标
topic: topic_dev_xxx_coord

coord_notice

coord/CID_DEV_xxx_COORD

结算信息和语音播报

广播消费 (无序)

付稳稳

主题: 首次登录
topic: topic_dev_xxx_broadcast

uc_passenger_first_login

coupon/CID_DEV_xxx_BROADCAST(废弃)

已废弃 首登送券

广播消费(无序)

孙金新

uc_passenger_first_login

coupon/CID_DEV_xxx_COUPON (已建)

代替上面 首登送券

集群消费 (无序)

孙金新


主题: 支付完成
topic: topic_dev_xxx_pay_broadcast

pay_notice

coupon / CID_DEV_xxx_COUPON (已建)

支付完成 消费优惠券

集群消费 (无序)

孙金新

pay_notice

orderplace /CID_DEV_xxx_PLACE (已建)

支付完成 更新订单

集群消费 (无序)

张开文

pay_notice

ordertaking /CID_DEV_xxx_TAKING(已建)

支付完成 更新行程/订单状态→ 坐标

集群消费 (无序)

栗东星

pay_notice

bill /CID_DEV_xxx_BILL

支付完成更新帐单状态

集群消费 (无序)

刘明涛

pay_notice

datawarehouse/CID_DEV_xxx_DATAWARE

支付完成 数据统计

集群消费 (无序)

王德成

refund_notice

datawarehouse/CID_DEV_xxx_DATAWARE

退款完成 数据统计

集群消费 (无序)

王德成

主题: 顺序更新行程状态
topic: topic_dev_xxx_order

update_trip

orderplace/CID_DEV_xxx_ORDER

更新行程信息

集群消费 (有序)

张开文


MQ: 使用原则和规范:java

      正确的顺序: 是先启动Consumer 后再启动producer。git

  1.   全部业务目前使用同一个生产者 PID
  2.   全部topic 由主帐号建立 并受权给子帐号(dev/prod)
  3.   topic 的建立基于业务(首次登录,支付成功,行程结束等等)  
  4.   CID(消费者 ID)的建立基于应用 (每一个应用若是须要建立一个CID 若须要(广播和集群)两种消费模式则建立两个CID  广播方式后缀加_BROADCAST区分)
  5.   Consumer ID 和 Topic 的关系是 N:N。 同一个 Consumer ID 能够订阅多个 Topic,同一个 Topic 也能够对应多个 Consumer ID。
  6.   消息订阅一致性( 同一 CustomerID 的全部使用的模块 订阅的 topic tag 数量须要彻底一致 )
  7.   CID只消费本身受权订阅的 topic. 

MQ 使用状况总结: github

  1. 主帐号建立的CustomerID  以主帐号(或有最高权限的受权用户)的Access/Secret 的身份的登陆  启动实例   消费者在线 且能够接收消息 而且能够突破订阅限制 订阅谁能够消费谁  (前提是订阅关系一致性 同一 CustomerID 的全部使用的模块 订阅的 topic tag 数量须要彻底一致 )

  2. 主帐号建立的CustomerID 以子帐号dev(普通权限用户)Access/Secret 的身份的登陆  启动实例  会出现 topic 消费者 不在线状态 (同当日线上状态)  6月以前创建的topic 和 CID 因为阿里云有补偿机制 仍旧能够运行. (这也是致使上线失败的缘由: 当时线下用的 dev 具备最高权限 ,线上 prod 是普通用户权限)

  3. 子帐号dev 登陆阿里云,建立不一样CustomerID 后 ,以子帐号(普通权限用户) devAccess/Secret 的身份的登陆  启动实例    消费者在线  且能够接收消息 而且能够突破订阅限制 订阅谁能够消费谁 但仅限于消费(子帐号)  被受权的 topic.  
    未受权CID 为topic的消费者时 由于子帐号有订阅消费权限 因此 子帐号建立的 CID 能够订阅和消费 topic 可是不影响其余模块(其余 CID)消费



可行方案一web

    每一个模块实例都使用同一个子帐号(Access/Secret相同)  不一样模块使用使用同一个 CID时, 须要作到 消息订阅一致性( 同一 CustomerID 的全部使用的模块 订阅的 topic tag 数量须要彻底一致 )spring


可行方案二: (我的推荐方案)json

    每一个模块实例都使用同一个子帐号(Access/Secret相同)  每一个模块单独分配本身的CID(同一子帐号dev 统一建立), 模块之间数据隔离, 要求各个模块自能用本身的 CID 且不要订阅本身模块不应订阅的 topic 和 tag    (缺点:同一个子帐号dev [Access/Secret相同] 订阅谁能够消费谁(可是不响应其余模块) 只要子帐号被受权的 topic  每一个CID 均可以订阅该topic)api

     1.主帐号登陆并建立topic服务器

     2.受权订阅权限给子帐号(帐号不能访问未受权的 topic)

     3.子帐号登陆 topic管理中建立本身帐号下的CID

     4.程序中使用 同一子帐号(Access/Secret) 可是本身模块的 CID 消费消息 相互不影响


最强隔离方案:

  每一个模块实例都使用不一样子帐号(Access/Secret不一样)  每一个模块单独分配本身子帐号建立的CID.这样模块之间能够保障不能相互订阅和消费.

     1.主帐号登陆并建立topic

     2.受权订阅权限给子帐号(帐号不能访问未受权的 topic)

     3.子帐号登陆 topic管理中建立本身帐号下的CID

     4.程序中使用 不一样子帐号(Access/Secret不一样)下本身模块的 CID 消费消息 相互不影响



topic n↔n  CID 图解:


https://pic2.zhimg.com/80/v2-b6ed65f370a766620718ad4227d5d4e5_hd.jpg

 奉上  官方文档:  https://help.aliyun.com/document_detail/34411.html?spm=a2c4g.11186623.4.5.565f7b25vcsskW

          官方DEMO:  https://github.com/AliwareMQ/mq-demo?spm=a2c4g.11186623.2.14.578018aaaNZL17

          RocketMQ源码分析辅助: https://www.processon.com/view/5a6eb653e4b05680c3e94fec


我的梳理有限:欢迎你们 丰富此文档




测试用例:

import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.*;
import com.xxx.engine.ui.controller.pay.base.BaseTest;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.web.WebAppConfiguration;

import java.util.Properties;


/**
*
* topic:
*
* topic_dev_xxx_trip_tomas
* topic_dev_xxx_pay_tomas
*
* CID:
*
* CID_DEV_xxx_TRIP_TOMAS 做为 topic: trip_tomas 和pay_tomas 消费者
* CID_DEV_xxx_COORD_TOMAS 做为 topic: trip_tomas 和pay_tomas 消费者
* CID_DEV_xxx_TAKING_TOMAS 做为 topic: trip_tomas 消费者
* CID_DEV_xxx_ORDER_TOMAS 做为 topic: pay_tomas 消费者
* CID_DEV_xxx_COUPON_TOMAS 只创建不须要受权 topic
*/

@Slf4j
@SpringBootTest(classes={PayMQTest.class})
@ComponentScan(basePackages = {"com.xxx.engine"})
@TestPropertySource("classpath:engine_common.properties")
@WebAppConfiguration
public class PayMQTest extends BaseTest {


@Value("${xxx.mq.tag.pay.notice:pay_notice}")
private String MQ_TAG_PAY;

private void sendTestMQ(String topic, String tag) {
Properties properties = new Properties();
// 您在MQ控制台建立的Producer ID
properties.put(PropertyKeyConst.ProducerId, "PID_xxx_DEV");
// 鉴权用AccessKey,在阿里云服务器管理控制台建立
properties.put(PropertyKeyConst.AccessKey,"LTAIBvzNg84oa7bM");
// 鉴权用SecretKey,在阿里云服务器管理控制台建立
properties.put(PropertyKeyConst.SecretKey, "5VynbpH5JmRXTshkSbTlLdsbMErHF7");
// 设置 TCP 接入域名(此处以公共云的公网接入为例)
properties.put(PropertyKeyConst.ONSAddr,"http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用start方法来启动Producer,只需调用一次便可
producer.start();
//循环发送消息
for (int i=10000;i<10005;i++){
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = topic.concat(tag).concat("消息内容:").concat(String.valueOf(i));
Message msg = new Message( //
// 在控制台建立的Topic,即该消息所属的Topic名称
topic,
// Message Tag,
// 可理解为Gmail中的标签,对消息进行再归类,方便Consumer指定过滤条件在MQ服务器过滤
tag,
// Message Body
// 任何二进制形式的数据, MQ不作任何干预,
// 须要Producer与Consumer协商好一致的序列化和反序列化方式
message.getBytes());
// 设置表明消息的业务关键属性,请尽量全局惟一,以方便您在没法正常收到消息状况下,可经过MQ控制台查询消息并补发
// 发送消息,只要不抛异常就是成功
// 打印Message ID,以便用于消息发送状态查询
SendResult sendResult = producer.send(msg);
System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId());
}
// 在应用退出前,能够销毁Producer对象
producer.shutdown();
}

/**
* 持续发送1w 条 MQ 消息
* @throws Exception
*/
@Test
public void sendMQ() throws Exception {
sendTestMQ("topic_dev_xxx_trip_tomas", MQ_TAG_PAY);
sendTestMQ("topic_dev_xxx_pay_tomas", MQ_TAG_PAY);
}
/**
* 子帐号dev 子帐号(普通权限用户) dev Access/Secret 的身份的登陆
* CID 由子帐号建立 并订阅topic (本例:topic_dev_xxx_trip_tomas和topic_dev_xxx_pay_tomas)
* 启动实例 消费者在线 且能够接收消息
* @throws Exception
*/
@Test
public void WithAuthCID() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TRIP_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.subscribe("topic_dev_xxx_pay_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithAuthCID Started.");
Thread.sleep(1000000000000l);
}
/**
* 子帐号dev 子帐号(普通权限用户) dev Access/Secret 的身份的登陆
* CID 由子帐号建立 并订阅topic (本例:topic_dev_xxx_trip_tomas和topic_dev_xxx_pay_tomas)
* 启动实例 消费者在线 且能够接收消息
* PS. CID_DEV_xxx_TRIP_TOMAS 同一个 CID 能够启动多个实例 可是必须保证 每一个实例订阅的 topic 和 tag 一致 否则会违反消息一致性原则 致使消息消费混乱
* @throws Exception
*/
@Test
public void WithAuthCIDSecond() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TRIP_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
//正确方式(和上个实例同样) 消费成功
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.subscribe("topic_dev_xxx_pay_tomas", MQ_TAG_PAY, getMessageListener());

//错误方式(和上个实例不同) 消费失败 违反订阅关系一致性
consumer.subscribe("topic_dev_xxx_pay_tomas", MQ_TAG_PAY, getMessageListener());

consumer.start();
System.out.println("WithAuthCIDSecond Started.");
Thread.sleep(1000000000000l);
}

/**
* 子帐号dev 子帐号(普通权限用户) dev Access/Secret 的身份的登陆
* CID 由子帐号建立 并订阅topic (本例:topic_dev_xxx_trip_tomas)
* 启动实例 消费者在线 且能够接收消息
* PS. CID_DEV_xxx_TAKING_TOMAS 做为topic_dev_xxx_trip_tomas的消费者 不影响其余模块CID 订阅和消费任何topic
* @throws Exception
*/
@Test
public void WithAuthCID3() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TAKING_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithOutAuthCID3 Started.");
Thread.sleep(1000000000000l);
}

/**
* 子帐号dev 以子帐号(普通权限用户) devAccess/Secret 的身份的登陆
* 启动实例 消费者在线 且能够接收消息 而且能够突破订阅限制 订阅谁能够消费谁 但仅限于消费(子帐号) 被受权的 topic
*
* 子帐号CID不是 topic指定的消费者 强制做为为topic的消费者时 由于子帐号有订阅消费权限 因此 子帐号建立的 CID 能够订阅和消费 topic 可是不影响其余模块(其余 CID)
* @throws Exception
*/
@Test
public void WithOutAuthCID() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_COUPON_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithOutAuthCID Started.");
Thread.sleep(1000000000000l);
}

/**
* 子帐号CID不是 topic指定的消费者 强制做为为topic的消费者时 由于子帐号有订阅消费权限 因此 子帐号建立的 CID 能够订阅和消费 topic 可是不影响其余模块(其余 CID)
* @throws Exception
*/
@Test
public void WithOutAuthCID2() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_ORDER_TOMAS");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("reciveMQByDevWithOutAuthCID2 Started.");
Thread.sleep(1000000000000l);
}


/**
* 强隔离方案:
* 每一个模块实例都使用不一样子帐号(Access/Secret不一样) 每一个模块单独分配本身子帐号建立的CID.这样模块之间能够保障不能相互订阅和消费.
* 1.主帐号登陆并建立topic
* 2.受权订阅权限给子帐号(帐号不能访问未受权的 topic)
* 3.子帐号登陆 topic管理中建立本身帐号下的CID
* 4.程序中使用 不一样子帐号(Access/Secret不一样)下本身模块的 CID 消费消息 相互不影响
*
* @throws Exception
*/
@Test
public void recivePayMQByCoodAccountWithAuthCID() throws Exception {
Properties consumerProperties = new Properties();
consumerProperties.setProperty(PropertyKeyConst.ConsumerId, "CID_DEV_xxx_TEST");
consumerProperties.setProperty(PropertyKeyConst.AccessKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.SecretKey, "xxx");
consumerProperties.setProperty(PropertyKeyConst.ONSAddr, "http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet");
//consumerProperties.setProperty(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(consumerProperties);
consumer.subscribe("topic_dev_xxx_trip_tomas", MQ_TAG_PAY, getMessageListener());
consumer.start();
System.out.println("recivePayMQByCoodAccountWithAuthCID Started.");
//Thread.sleep(1000000000000l);
}

private MessageListener getMessageListener(){
return new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext consumeContext) {
try {
log.info("receive message ={} ConsumeContext={}:", JSON.toJSONString(message), JSON.toJSONString(consumeContext));
} catch (Exception e) {
log.error("{}", e);
return Action.ReconsumeLater;
}
return Action.CommitMessage;
}
}; }
相关文章
相关标签/搜索