版权声明:本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,若有任何技术交流,可随时联系。数据库
预发送消息失败,消息未进存储,业务操做未执行(可能的缘由:主动方应用、网络、消息中间件、消息存储),数据此时保持一致。网络
预发送消息后,主动方业务没有收到返回消息存储结果,分为两种可能session
(1)消息未进存储,业务操做未执行 数据此时保持一致。
(2)消息已进存储(待确认),业务操做未执行 数据此时保持不一致。
复制代码
收到消息存储成功的返回结果,但未执行业务操做就失败并发
消息已进存储(待确认),业务操做未执行 数据此时保持不一致。
复制代码
消息中间件没有收到主动方应用的业务操做处理结果异步
(1)消息已进存储(待确认),业务操做未执行(或 业务操做出错回滚了) 数据此时保持不一致
(2)消息已进存储(待确认),业务操做成功 数据此时保持不一致
复制代码
消息中间件收到业务操做结果(成功/失败),但处理消息存储中的消息状态失败高并发
(1)消息已进存储(待确认),业务操做未执行(或业务操做出错回滚了) 数据此时保持不一致
(2)消息已进存储(待确认),业务操做成功 数据此时保持不一致
复制代码
消息中间件返回消息持久化结果(成功/失败),主动方应用根据返 回结果进行判断如何进行业务操做处理:大数据
a) 失败:放弃业务操做处理,结束(必要时向上层返回失败结果);
b) 成功:执行业务操做处理;
复制代码
消息中间件收到业务操做结果后,根据业务结果进行处理;优化
a) 失败:删除消息存储中的消息,结束;
b) 成功:更新消息存储中的消息状态为“待发送(可发送)”;
复制代码
消息服务子系统接口规范spa
public interface RpTransactionMessageService {
/**
* 预存储消息.
*/
public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 确认并发送消息.
*/
public void confirmAndSendMessage(String messageId) throws MessageBizException;
/**
* 存储并发送消息.
*/
public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 直接发送消息.
*/
public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 重发消息.
*/
public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
/**
* 根据messageId重发某条消息.
*/
public void reSendMessageByMessageId(String messageId) throws MessageBizException;
/**
* 将消息标记为死亡消息.
*/
public void setMessageToAreadlyDead(String messageId) throws MessageBizException;
/**
* 根据消息ID获取消息
*/
public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;
/**
* 根据消息ID删除消息
*/
public void deleteMessageByMessageId(String messageId) throws MessageBizException;
/**
* 重发某个消息队列中的所有已死亡的消息.
*/
public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;
复制代码
}code
消息类型及核心字段
public class RpTransactionMessage extends BaseEntity {
private static final long serialVersionUID = 1757377457814546156L;
private String messageId;
private String messageBody;
private String messageDataType;
private String consumerQueue;
private Integer messageSendTimes;
private String areadlyDead;
private String field1;
private String field2;
private String field3;
}
复制代码
消息服务子系统核心实现
预发送消息
public int saveMessageWaitingConfirm(RpTransactionMessage message) {
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
}
if (StringUtil.isEmpty(message.getConsumerQueue())) {
throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
}
message.setEditTime(new Date());
message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name());
message.setAreadlyDead(PublicEnum.NO.name());
message.setMessageSendTimes(0);
return rpTransactionMessageDao.insert(message);
}
消息确认并发送
public void confirmAndSendMessage(String messageId) {
final RpTransactionMessage message = getMessageByMessageId(messageId);
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空");
}
message.setStatus(MessageStatusEnum.SENDING.name());
message.setEditTime(new Date());
rpTransactionMessageDao.update(message);
notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.getMessageBody());
}
});
}
保存并发送不进行预发送
public int saveAndSendMessage(final RpTransactionMessage message) {
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
}
if (StringUtil.isEmpty(message.getConsumerQueue())) {
throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
}
message.setStatus(MessageStatusEnum.SENDING.name());
message.setAreadlyDead(PublicEnum.NO.name());
message.setMessageSendTimes(0);
message.setEditTime(new Date());
int result = rpTransactionMessageDao.insert(message);
notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.getMessageBody());
}
});
return result;
}
直接发送
public void directSendMessage(final RpTransactionMessage message) {
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
}
if (StringUtil.isEmpty(message.getConsumerQueue())) {
throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
}
notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.getMessageBody());
}
});
}
重复发送
public void reSendMessage(final RpTransactionMessage message) {
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
}
if (StringUtil.isEmpty(message.getConsumerQueue())) {
throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
}
message.addSendTimes();
message.setEditTime(new Date());
rpTransactionMessageDao.update(message);
notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.getMessageBody());
}
});
}
public void reSendMessageByMessageId(String messageId) {
final RpTransactionMessage message = getMessageByMessageId(messageId);
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空");
}
int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
if (message.getMessageSendTimes() >= maxTimes) {
message.setAreadlyDead(PublicEnum.YES.name());
}
message.setEditTime(new Date());
message.setMessageSendTimes(message.getMessageSendTimes() + 1);
rpTransactionMessageDao.update(message);
notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
notifyJmsTemplate.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message.getMessageBody());
}
});
}
标记死亡
public void setMessageToAreadlyDead(String messageId) {
RpTransactionMessage message = getMessageByMessageId(messageId);
if (message == null) {
throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空");
}
message.setAreadlyDead(PublicEnum.YES.name());
message.setEditTime(new Date());
rpTransactionMessageDao.update(message);
}
复制代码
业务活动的主动方,在完成业务处理以后,向业务活动的被动方发送消息,容许消息丢失。
业务活动的被动方根据定时策略,向业务活动主动方查询,恢复丢失的业务消息。
适用范围
• 对业务最终一致性的时间敏感度低
• 跨企业的业务活动
复制代码
本文结合工业大数据高并发数据接入场景,经过弱化MQ的消息一致性,增强业务系统的一致性保证,实现了工业级大数据的数据仓库建设。
秦凯新 于深圳