metaq生产者发送消息找不到指定partition调查

前记java

metaq,(metamorphosis),一款设计起源于kafka的,高可用、高性能、可扩展、分布式的消息中间件,MetaQ具备消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适合大规模分布式系统应用的特色深受一些互联网企业的喜好。spring

      而最近,项目中用到metaq做为消息队列框架来处理消息,可是,最近出现消息没有被消费的问题,查了消费端服务器日志,没有查到异常、又查了服务端发送消息的日志,也没有发现异常,还觉得是消息消费延迟,由于最近活动比较多,消息也多。后来等了大半天,仍是么有结果。因而,我又从新检查了一遍日志,最终仍是发现了问题的蛛丝马迹:
 - 2017-01-10 12:59:40,204 [// -  - ] INFO  com.api.pub.mq.MetaQUtils - topic info :api-quancart-log has send {"xxx":"x","xxx":xx,"xx":xx,"xx":xx} to metaq null

怀疑是否是异常被吞没了,查找源码,发现此方法的catch里有可能被吃掉异常,因而调试,重写MetaqTemplate类,加日志以下:api

    /**
     * Send message built by message builder.Returns the sent result.
     * 
     * @param builder
     * @return
     * @throws InterruptedException
     * @since 1.4.5
     */
    private static final Logger logger = LoggerFactory.getLogger(MetaqTemplate.class);

    public SendResult send(MessageBuilder builder) throws InterruptedException {
        Message msg = builder.build(this.messageBodyConverter);
        final String topic = msg.getTopic();
        MessageProducer producer = this.getOrCreateProducer(topic);
        try {
            return producer.sendMessage(msg);
        } catch (MetaClientException e) {
            logger.error(e.getMessage(), e); return new SendResult(false, null, -1, ExceptionUtils.getFullStackTrace(e));
        }
    }

因而,就有了一下的异常栈:服务器

 - ................. 
[// - - ] ERROR com.taobao.metamorphosis.client.extension.spring.MetaqTemplate - Send message timeout in 3000 mills com.taobao.metamorphosis.exception.MetaOpeartionTimeoutException: Send message timeout in 3000 mills at com.taobao.metamorphosis.client.producer.SimpleMessageProducer.send0(SimpleMessageProducer.java:456) at com.taobao.metamorphosis.client.producer.SimpleMessageProducer.sendMessageToServer(SimpleMessageProducer.java:194) at com.taobao.metamorphosis.client.producer.SimpleMessageProducer.sendMessage(SimpleMessageProducer.java:171) at com.taobao.metamorphosis.client.producer.SimpleMessageProducer.sendMessage(SimpleMessageProducer.java:585) at com.taobao.metamorphosis.client.extension.spring.MetaqTemplate.send(MetaqTemplate.java:220) ..........

看来,果真是异常被吃掉了,SimpleMessageProducer 的456行以下图:网络

缘由算是找到了,怎么解决呢?session

初步分析结果:网络和超时时间这是超时时间致使,由于这段时间活动多网站流量高,进而影响了mq消息服务器的网络,解决办法,改善网络情况,这一点有点可望而不可即,虽然说这是最根本的方法,可是鉴于公司的情况如今这上边改善可能会花点钱,或者至少折腾下运维的兄弟们;再者,就是调大一点超时时间。框架

带着这个疑问,看了下配置文件:运维

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
     xsi:schemaLocation="
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

     <bean id="sessionFactory"
          class="com.taobao.metamorphosis.client.extension.spring.MetaqMessageSessionFactoryBean">
          <property name="zkConnect" value="${pom.metaq.zkConnect}" />
          <property name="zkSessionTimeoutMs" value="15000" />
          <property name="zkConnectionTimeoutMs" value="15000" />
          <property name="zkSyncTimeMs" value="15000" />
     </bean>

     <bean id="messageBodyConverter"
          class="com.taobao.metamorphosis.client.extension.spring.JavaSerializationMessageBodyConverter" />

     <bean id="metaqTemplate"
          class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate">
          <property name="messageSessionFactory" ref="sessionFactory" />
          <property name="messageBodyConverter" ref="messageBodyConverter" />
          <!--以共享一个MessageProducer来发送多个topic的消息 -->
          <property name="shareProducer" value="true" />
     </bean>

     <bean class="com.api.pub.mq.MetaQInitQBean" init-method="init">
          <property name="metaqTemplate" ref="metaqTemplate" />
     </bean>

</beans>

自定义的工具类没有定义超时时间,用的默认的超时时间:分布式

最后,解决办法,在工具类com.api.pub.mq.MetaQInitQBean中增长变量,配置文件注入自定义超时时间,工具

private int customTimeout = 30;

    public int getCustomTimeout() {
        return customTimeout;
    }

    public void setCustomTimeout(int customTimeout) {
        this.customTimeout= customTimeout;
    }

改用MetaqTemplate中带超时时间的方法

    /**
     * Send message built by message builder.Returns the sent result.
     * 
     * @param builder
     * @return
     * @throws InterruptedException
     * @since 1.4.5
     */
    public SendResult send(MessageBuilder builder, long timeout, TimeUnit unit) throws InterruptedException {
        Message msg = builder.build(this.messageBodyConverter);
        final String topic = msg.getTopic();
        MessageProducer producer = this.getOrCreateProducer(topic);
        try {
            return producer.sendMessage(msg, timeout, unit);
        } catch (MetaClientException e) {
            return new SendResult(false, null, -1, ExceptionUtils.getFullStackTrace(e));
        }
    }

问题解决!

相关文章
相关标签/搜索