公司运维同事针对ActiveMQ提出了两个问题,其中一个是“队列长时间无人监听时,自动删除该队列”。
调研提出了三种方案。这里是相关记录和说明。java
运维同事对生产环境使用的ActiveMQ作了相关监控。这个监控在某个队列出现消息积压时(实际规则更复杂一些,而且正在调整)发送短信报警。运维接到短信后会通知开发负责人。开发负责人再检查系统是否在正常监听相关队列。
可是,从过往经验来看,只有一次消息积压是业务系统故障致使的;其它状况(没有统计到具体数据,大约五六次)都是业务系统已经再也不监听该队列致使的。这使得咱们的运维、开发同事半夜三更火急火燎检查问题,结果发现只须要删除那个队列就能够了。
尤为惹发起床气的是,因为线上ActiveMQ配置了消息持久化,这种消息积压其实并不会对ActiveMQ产生多大的影响,彻底能够在次日上班后再处理。
考虑到你们的睡眠质量和夫妻感情,在JIRA中,咱们调研、讨论了三个方案。 spring
在ActiveMQ官方提供的功能列表中,有这样一项功能:Delete Inactive Destination。它能够删除“没有未处理消息、而且没有消费者的Destination”。 apache
这个配置比较简单,在ActiveMQ的配置文件activemq.xml中,作以下改动便可。这里示例的是对queue的配置;topic配置是相似的。 运维
<!-- 在这里加上schedulePeriodForDestinationPurge属性。 --> <broker xmlns="http://activemq.apache.org/schema/core" schedulePeriodForDestinationPurge="10000" <destinationPolicy> <policyMap> <policyEntries> <!-- 在这里加上gcInactiveDestinations和inactiveTimoutBeforeGC两个属性 --> <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="30000"/> </policyEntries> </policyMap> </destinationPolicy> </broker>
上述示例配置的含义是:这个Broker会每隔10000ms(由schedulePeriodForDestinationPurge配置指定)扫描一次标记有“gcInactiveDestinations="true"”的Queue(因为这里配置的是queue=">",于是实际是扫描全部Queue),将其中“没有未处理消息、而且没有消费者、而且此状态已超过30000ms(由inactiveTimoutBeforeGC配置指定)”的队列删除掉。有点晕。各配置项的具体说明以下。ide
如下三个配置项中,schedulePeriodForDestinationPurge和gcInactiveDestinations是必填配置;inactiveTimoutBeforeGC是选填配置。spring-boot
这是针对Broker的配置,用于声明“扫描闲置队列的周期”,单位为毫秒。默认值为0,意为“不扫描”。
须要说明的是,这里只能配置扫描任务的启动周期、不能配置启动延迟。也就是说,配置好了以后,ActiveMQ服务启动时会当即扫描一次;而后再按照指定时间周期性扫描。this
这是针对Destination的配置,用于声明当Broker扫描闲置队列时,是否扫描这个Destination(由queue="xxxx"来指定)。默认值是false。 spa
这也是针对Destination的配置,用于声明这个Destination闲置多长时间后能够被删除。单位毫秒,默认时间60s。
这个配置必须在gcInactiveDestinations被设置为true的状况下才会生效。插件
虽然上面介绍了这么多,但实际上,从第一句话中就能够看出这个方案没法解决咱们的问题。由于咱们的问题是要处理“有消息积压、但没有消费者的Destination”,而这个方案只能删除“没有未处理消息、而且没有消费者的Destination”。
除此以外,这应该算是最简单可靠的一种方案了。实际上,对大多数原生Queue来讲,业务系统会同时下线其生产者与消费者。这个方案能够很好的应对这种状况。线程
ActiveMQ插件(plugin),也有文档中称为拦截器(Interceptor)。两者实际上是相辅相成的:配置时,咱们须要一个插件;执行时,咱们须要一个拦截器。
ActiveMQ官方提供了几个插件(日志、统计、时间戳等),能够参见官方说明和开发文档。咱们能够参考官方示例来自定义一个插件。
ActiveMQ经过解析activemq.xml中的配置,来加载一个插件的。所以咱们从配置入手,逐步搞清楚插件和拦截器是如何工做的。
activemq.xml中的配置其实很简单,以下所示:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" advisorySupport="false"> <!-- 用plugins标签声明这是一个插件 --> <plugins> <!-- bea的语法来自spring,xml name space的声明已经说明了这一点 --> <bean xmlns="http://www.springframework.org/schema/beans" id="linjunPlugin" class="net.loyintean.blog.jms.manage.PlugIn"/> </plugins> <!-- 其它配置 --> </broker>
上述配置声明了一个插件,插件类名是net.loyintean.blog.jms.manage.PlugIn,id是linjunPlugin。
这个类必须包含在ActiveMQ的classpath路径下。咱们能够本身打一个jar包,并把jar包放到ActiveMQ的lib路径下;也能够修改相关类路径。总之要保证ActiveMQ可以加载到这个类(及其依赖类)。
其实按照上面的配置,并不须要为插件配置一个id。不过,插件声明还有其它方式,有些是须要使用id的。这里很少说,能够参考开发文档。
如配置中的注释所说,声明插件所使用的<bean />标签及语法来自spring。也就是说,spring中的<property />等其它标签,这里也是支持的。不过目前尚未找到对@Autowired等注解的支持方式。
因为我使用的是spring boot,只须要加上一个spring-boot-starter-activemq就能够引入所需依赖jar包了。不使用spring boot的话,须要引入activemq-broker-x.y.z.jar。
根据ActiveMQ规范,插件必须实现BrokerPlugin接口。这个接口只有一个方法:Broker installPlugin(Broker broker) throws Exception,用于在服务启动、加载插件时,获取当前启动的borker实例,并返回一个Broker实例。
例如,上文中声明的linjunPlugin代码以下:
public class PlugIn implements BrokerPlugin { /** * @author linjun * @since 2017年10月30日 * @param b * @return * @see org.apache.activemq.broker.BrokerPlugin#installPlugin(org.apache.activemq.broker.Broker) */ @Override public Broker installPlugin(Broker b) { return new RemoveDestination(b); } }
彷佛有些莫名其妙,但从“装饰者”的角度来理解就轻松愉快了:入参broker是原生实例(固然也多是其它插件“装饰”过的);出参则是被咱们本身的插件“装饰”过的、加强版的实例。
通常来讲,启动过程不会作太多处理;处理逻辑在咱们的“装饰者”中——如上面代码里的RemoveDestination。
如上文所说,咱们须要提供的是一个“装饰”过的Broker。可是Broker是一个接口,其中有超过50个方法,用于处理Broker在服务期间的各类事件(如服务启动、建立连接、消息收发、事务提交与回滚等等)。直接实现接口未免太丑陋了。ActiveMQ也考虑到了这一点,所以给咱们提供了一个适配器(其实同时也是一个装饰者):BrokerFilter。它的代码以下:
public class BrokerFilter implements Broker { // 被“装饰”的原生实例 protected final Broker next; public BrokerFilter(Broker next) { this.next = next; } // 省略其它接口方法,所有都直接委托给next处理。 }
借助这个适配器,咱们能够专一的处理咱们关注的事件。如咱们的RemoveDestination,它只须要在服务启动时注册一个定时器,按需删除无人监听的队列便可。代码以下:
public class RemoveDestination extends BrokerFilter { private Timer timer; /** * @param next */ public RemoveDestination(Broker next) { super(next); // 声明为守护线程,避免它阻塞关闭activeMQ的进程 this.timer = new Timer(true); } @Override public void start() throws Exception { super.start(); // DONE linjun 2017-11-01 改成定时调度 this.timer.schedule(new TimerTask() { @Override public void run() { RemoveDestination.this.remove(); } }, 3000, 3000); } private void remove() { Map<ActiveMQDestination, Destination> destinationMap = this .getDestinationMap(); ConnectionContext context = BrokerSupport.getConnectionContext(this); destinationMap.entrySet().forEach(entry -> { Destination destination = entry.getValue(); // 无人监听了 // DONE linjun 2017-11-01 只处理queue,不处理topic if (destination.getDestinationStatistics().getConsumers() .getCount() == 0) { ActiveMQDestination activeMQDestination = entry.getKey(); if (activeMQDestination.isQueue()) { try { this.removeDestination(context, activeMQDestination, 1); } catch (Exception e) { // 示例代码,不要喷我直接打印堆栈 e.printStackTrace(); } } } }); } }
除了BrokerFilter这个针对Broker事件作拦截、装饰的类以外,也有针对Destination的DestinationFilter,不赘述。
不管是BrokerFilter仍是DestinationFilter,在重写父类的某个方法时,要注意调用super中的对应方法。如RemoveDestination类在覆盖start()方法时,调用了super.start()方法。
这两个类中的每个方法,都对应Broker或Destination的一个事件的“处理栈”。若是不调用父类方法,极可能会致使一些基础的、或关键的代码没有执行到,进而出现异常。所以,若是不是很是肯定“执行到这里时必须中断当前事件”,不然必定要调用super相应方法。
上面的代码是示例用,还能够进一步完善。可是这个方案是能够知足需求的。
不过,这个方案存在一项风险:当咱们删除一个Destination时,其中全部未消费的消息也会随之被删除,即便这些消息已经作了持久化。若是有某个业务系统长时间出现故障、没法连上ActiveMQ,而ActiveMQ在此期间删除了它监听的Destination及其中消息……这个风险几率虽然小,可是影响太大。慎重起见,放弃方案二。
方案三属于运维的范畴。如JIRA中所讨论的,这个问题真正的“痛点”,并非废弃队列,而是非紧急状况却在半夜报警。所以,由运维同事修改一下脚本,调整“没有消费者”这种问题的监控报警时间就能够了。
最后选定的是方案三。方案一不能知足需求;方案二的风险较大。方案三直击痛点,干脆利落。这件事也启示咱们:作事情以前先想清楚目标,谋定然后动。