阿里云 消息队列mq

使用阿里云消息队列服务器

控制台地址:http://ons.console.aliyun.com/#/home/topicapp

Demo:ide

支付消息mq工厂类:ui

public class DfacePayConsumerFactory {

public static String CID = "CID-";
  
  //监听执行实例
@Autowired
private DfacePayConsumerListener dfacePayConsumerListener;

private String topic;
private String pTag;
private String accessKey;
private String secretKey;
private String tag;

private Consumer consumer;

/**
* @return the topic
*/
public String getTopic() {
return topic;
}

/**
* @param topic the topic to set
*/
public void setTopic(String topic) {
this.topic = topic;
}

/**
* @return the tag
*/
public String getTag() {
return tag;
}

/**
* @param tag the tag to set
*/
public void setTag(String tag) {
this.tag = tag;
}

/**
* @return the pTag
*/
public String getpTag() {
return pTag;
}

/**
* @param pTag the pTag to set
*/
public void setpTag(String pTag) {
this.pTag = pTag;
}

/**
* @return the accessKey
*/
public String getAccessKey() {
return accessKey;
}

/**
* @param accessKey the accessKey to set
*/
public void setAccessKey(String accessKey) {
this.accessKey = accessKey;
}

/**
* @return the secretKey
*/
public String getSecretKey() {
return secretKey;
}

/**
* @param secretKey the secretKey to set
*/
public void setSecretKey(String secretKey) {
this.secretKey = secretKey;
}

public void initConsumer() {

Properties properties = new Properties();
String consumerId = CID + this.topic
+ (StringUtils.hasText(this.pTag) ? "-" + this.pTag : "");
properties.put(PropertyKeyConst.ConsumerId, consumerId);
properties.put(PropertyKeyConst.AccessKey, this.accessKey);
properties.put(PropertyKeyConst.SecretKey, this.secretKey);

     /**
     ( 
     //相关属性介绍:
     //Properties properties = new Properties();
     //properties.put(PropertyKeyConst.ConsumerId, consumerLocal.getConsumerId());
     // AccessKey 阿里云身份验证,在阿里云服务器管理控制台建立
     //properties.put(PropertyKeyConst.AccessKey, consumerLocal.getAccessKey());
     // SecretKey 阿里云身份验证,在阿里云服务器管理控制台建立
     //properties.put(PropertyKeyConst.SecretKey, consumerLocal.getSecreKey());
     //消息处理失败后多久从新发送消息
     properties.put(PropertyKeyConst.SuspendTimeMillis, consumerLocal.getSuspendTimeMillis());
     //重发的次数
     //properties.put(PropertyKeyConst.MaxReconsumeTimes, consumerLocal.getMaxReconsumeTimes());
     //消费者的线程数
     //properties.put(PropertyKeyConst.ConsumeThreadNums,"1");
     //消费者的介入地址
     //properties.put(PropertyKeyConst.ONSAddr, consumerLocal.getOnsAddress());
     )

     **/
 
consumer = ONSFactory.createConsumer(properties);
consumer.subscribe(topic, tag, this.dfacePayConsumerListener);
new Thread(new Runnable() {

@Override
public void run() {
try {
Thread.sleep(90000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.start();
}
}).start();

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
shutDown();
}
});
}

/**
* 中止监听
*
* @return
*/
public boolean shutDown() {
if (null != this.consumer) {
this.consumer.shutdown();
return true;
}
return false;
}
}

//dPay支付监听  执行mq consume消息接收(经过topic订阅)@Component("dfacePayConsumerListener")public class DfacePayConsumerListener implements MessageListener {    private static Logger logger = LoggerFactory.getLogger(DfacePayConsumerListener.class);    @Autowired    private ...;    @Override    public Action consume(Message message, ConsumeContext context) {        String msg = new String(message.getBody());        String tag = message.getTag();        logger.info(LogUtils.builder().append("mq", "接受mq").append("mqTag", tag)                .append("mqMsg", msg).toString());        return tagHandle(tag, msg, message);    }    public Action tagHandle(String tag, String msg, Message message) {        if (MqTagEnum.PAY.name().equals(tag)) {            try {                PayBackBo payBackBo = JSON.parse(msg, PayBackBo.class);                //检查订单号                if (!payBackBo.getOrderNo().startsWith(ApplicationConstant.APP_NO)) {                    logger.info(LogUtils.format("订单支付失败: orderNo 前缀 ", payBackBo.getOrderNo()));                    return Action.CommitMessage;                }                return giftOrderHandle(payBackBo);            } catch (Exception e) {                e.printStackTrace();                logger.info(LogUtils.format("paid_error", e.getMessage()));                return Action.ReconsumeLater;            }        } else {            logger.info(LogUtils.builder().append("tag error", tag).append("msg", msg)                    .append("message", message).toString());        }        return Action.CommitMessage;    }    /**     * 支付订单处理     *     * @param payBackBo     * @return     */    private Action giftOrderHandle(PayBackBo payBackBo) {        //处理支付业务逻辑    }}
相关文章
相关标签/搜索