建议简单看看上一篇文章再往下阅读java
咱们的项目就基于这个模型:git
接下来就到了咱们的实战时刻~github
项目基于spring cloud编写,没有spring cloud基础看起来可能有一点点费力。web
package com.anur.messageapi.api; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import java.util.Map; /** * Created by Anur IjuoKaruKas on 2018/5/8 */ public interface TransactionMsgApi { /** * 预发送消息,先将消息保存到消息中心 */ @RequestMapping(value = "prepare", method = RequestMethod.GET) int prepareMsg( @RequestParam("id") String id, @RequestParam("msg") String msg, @RequestParam("routingKey") String routingKey, @RequestParam("exchange") String exchange, @RequestParam("paramMap") String paramMap, @RequestParam("artist") String artist); /** * 生产者确认消息可投递 */ @RequestMapping(value = "confirm", method = RequestMethod.GET) int confirmMsgToSend(@RequestParam("id") String id, @RequestParam("caller") String caller); /** * 向队列投递消息 */ @RequestMapping(value = "send", method = RequestMethod.GET) void sendMsg(@RequestParam("id") String id); /** * 消费者确认消费成功 */ @RequestMapping(value = "ack", method = RequestMethod.GET) int acknowledgement(@RequestParam("id") String id, @RequestParam("artist") String artist); }
咱们先忽略后面的两个接口,先看第一个,一共有六个参数spring
咱们首先生成一条消息,咱们往paramMap中指定了,咱们这个订单的订单id是orderId,消息内容我瞎写的,这条消息要保存到数据库(它的做用是保证消息必定被可靠消息接收并持久化)数据库
String routingKey = "test.key.testing"; Map<String, String> map = new HashMap<>(); String orderId = UUID.randomUUID().toString() + System.currentTimeMillis(); map.put("id", orderId); String mapStr = JSON.toJSONString(map); TestMsg testMsg = new TestMsg(); testMsg.setContent("这是一条测试消息"); String testMsgStr = JSON.toJSONString(testMsg); // =============================== // 要保存到数据库(它的做用是保证消息必定被可靠消息接收并持久化) PrepareMsg prepareMsg = prepareMsgService.genMsg(orderId, testMsgStr, routingKey, Constant.TEST_EXCHANGE, mapStr);
异步发送了一条**【预发送】**消息给消息可靠消息服务api
Future<Integer> future = prepareMsgService.prepareMsg(prepareMsg); // 下面是prepareMsg的实现 @Async @Override public Future<Integer> prepareMsg(PrepareMsg prepareMsg) { // 调用咱们刚才在【准备阶段】定义的接口 int result = transactionMsgService.prepareMsg(prepareMsg.getId(), prepareMsg.getMsg(), prepareMsg.getRoutingKey(), prepareMsg.getExchange(), prepareMsg.getParamMap(), artistConfiguration.getArtist()); // 若是调用成功,删除刚才本地保存的数据库 if (result == 1) { prepareMsgMapper.deleteByPrimaryKey(prepareMsg.getId()); } return new AsyncResult<>(result); }
你能够把下面那些想象成处理订单状态,上面的这个步骤是有事务的,也就是说:app
///////////// 事务 ProviderOrder providerOrder = new ProviderOrder(); providerOrder.setId(orderId); providerOrderService.save(providerOrder); ///////////// 事务
// 确认消息能够被发送 if (future.get() == 1) { prepareMsgService.confirmMsgToSend(orderId, this.getClass().getSimpleName()); }
######一、执行成功,但没有及时向可靠消息服务发送通知。dom
这时候咱们的artist和paramMap就发挥做用了,咱们的可靠消息服务,能够拿着这两个东西,定时向生产者查询那些没有被标记为【待发送】的消息。好比说这样:异步
// 这里是可靠消息服务 String url = String.format("http://%s/check?", transactionMsg.getCreater()); Map<String, String> paramMap = JSON.parseObject(transactionMsg.getParamMap(), new TypeReference<HashMap<String, String>>() { }); StringBuilder sb = new StringBuilder(); for (Map.Entry<String, String> stringStringEntry : paramMap.entrySet()) { sb.append(stringStringEntry.getKey()).append("=").append(stringStringEntry.getValue()).append("&"); } sb.deleteCharAt(sb.length() - 1); // 结果为true,表明这条消息的业务执行成功了,可自助将消息状态标记为【待发送】 // 反之执行失败 resultBoolean = restTemplate.getForObject(url + sb, boolean.class);
######二、执行失败,也没有及时向可靠消息服务发送通知。
这个状况并不影响,由于可靠消息服务会回查,发现消息没有执行成功,不会将消息投递出去。
这里要注意,每条消息最好设置一个查询次数的限制
######三、预发送失败,业务执行成功
这时候咱们在第一步事先存储的消息就发挥做用了,这里只要写一个定时任务,向可靠消息服务定时投递便可。这里要注意可靠消息服务的幂等性。
因为消息id是由生产者指定,因此即便可靠消息服务收到了重复的建立【预发送】的消息,插入数据库也是会失败的。
@Scheduled(cron = "*/1 * * * * *") public void checkPrepareMsg() { List<PrepareMsg> prepareMsgList = prepareMsgService.getUnConfirmList(); if (prepareMsgList.size() > 0) { System.out.println("消息重发中"); } for (PrepareMsg prepareMsg : prepareMsgList) { prepareMsgService.prepareMsg(prepareMsg); } }
Github -- > 可靠消息服务 example