前面有一篇文章中,博主为你们介绍了**xxchat系统线程监控方案。今天博主为你们分享的是**微xxxx系统三大队列任务执行时所用到的线程池的不停服务器监控、暂停、启用、更改参数等等操做。java
固然,在大多数状况下,小伙伴们可能都遇到的是容许线上停机改配置重启的操做。可是,若是大家的业务有不少人在用,老板要求不宕机的状况下对队列线程池进行动态的修改、重启、监控,那么此时咱们就须要使用一些技术(Jmx、http等)来远程操控。博主今天主要分享的使用jmx来进行动态操做线程池的方案,在分享以前咱们首先得了解jmx这项技术是什么?有什么用途?在何时用?下面博主就开始带你们一块儿去领略jmx高级监控技术的世界。spring
Jmx(Java Managerment Extensions,即"Java管理扩展"),是一个为应用程序、设备、系统等植入管理功能的框架。Jmx能够跨越一系列异构操做系统平台、系统体系结构和网络传输协议,灵活的开发无缝隙集成的系统、网络和服务管理应用。 Jmx在Java编程语言中定义了应用程序以及网络管理和监控的体系结构、设计模式、应用程序接口以及服务。一般使用Jmx来监控系统的运行状态或管理系统的某些方面,好比清空缓存、从新加载配置文件等。apache
此处,**系统主要使用Jmx来注册自定义线程池组件,线程池组件上暴露了一系列的方法给Jmx;它们包括:线程池的中止,修改线程池的核心线程数、最大线程数、超时时间、队列大小等等参数,重启线程池等等方法。而后,客户可使用Jmx管理工具(如:jconsole/ jvisualvm/程序Agent等等)链接上组件暴露的Jmx端口,而后调用其暴露的方法以实现具体的业务功能。编程
如下是**项目的自定义线程池组件代码:windows
1.Jmx组件接口CommonTaskManagerMBean设计模式
package com.xxxxxx.job.queue; /** * * 类CommonTaskManagerMBean.java的实现描述:jmx组件接口 * @author arron 2018年x月2x日 下午x:x2:x9 */ public interface CommonTaskManagerMBean { int getPageSize(); void setPageSize(int pageSize); int getCorePoolSize(); int getQueueSize(); void setQueueSize(int queueSize); void setCorePoolSize(int corePoolSize); int getMaximumPoolSize() ; void setMaximumPoolSize(int maximumPoolSize); boolean isKill() ; public boolean isStop(); /** * 马上停掉 * @return */ String stopTaskNow(); /** * 优雅停掉 * @return */ String stopTaskDely(); /** * 开启任务线程池 * @return */ String startTask(); }
2.Jmx组件接口CommonTaskManagerMBean的具体实现类,也是线程池组件实现类; CommonTaskManager缓存
package com.xxxxxx.job.queue; import java.lang.management.ManagementFactory; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.management.MBeanServer; import javax.management.ObjectName; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; /** * * 类CommonTaskManager.java的实现描述:通用线程池管理组件 * * @author arron 2018年x月2x日 下午x:x2:x9 */ public class CommonTaskManager implements InitializingBean,CommonTaskManagerMBean { private static final Logger logger = LoggerFactory.getLogger(CommonTaskManager.class); private int corePoolSize=10; private int maximumPoolSize=20; private int pageSize=1000; private int queueSize=1000; //queue size 是否生效 private boolean iseffect=true; //第几回设置queuesize private int opcount=0; //以前生效的queuesize大小 private int oldQueueSize=0; //任务管理器名称 private String taskManagerName="未命名"; private ThreadPoolExecutor executor=null; private volatile boolean kill=false; private volatile boolean stop=false; public boolean isKill() { return kill; } public void setKill(boolean kill) { this.kill = kill; } public boolean isStop() { return stop; } public String getTaskManagerName() { return taskManagerName; } public void setTaskManagerName(String taskManagerName) { if(StringUtils.isNotBlank(taskManagerName)){ this.taskManagerName = taskManagerName; }else{ logger.error("任务线程池管理器[{"+taskManagerName+"}]--setTaskManagerName()->设置线程池设置任务管理器名称执行失败,任务管理器名称不能为空"); } } public int getPageSize() { return pageSize; } public void setPageSize(int pageSize) { if(pageSize>0){ this.pageSize = pageSize; }else{ logger.error("任务线程池管理器[{"+taskManagerName+"}]--setPageSize()->设置线程池分页处理size数执行失败,分页处理size数必须大于0"); } } public int getCorePoolSize() { return corePoolSize; } public int getQueueSize() { return queueSize; } public void setQueueSize(int queueSize) { if(queueSize>0){ this.queueSize = queueSize; }else{ logger.error("任务线程池管理器[{"+taskManagerName+"}]--setQueueSize()->设置线程池队列size数执行失败,队列size数必须大于0"); } opcount=opcount+1; if(opcount>1){ iseffect=false; } } public void setCorePoolSize(int corePoolSize) { if(corePoolSize>=0){ this.corePoolSize = corePoolSize; if(executor!=null&&!executor.isShutdown()){ try{ executor.setCorePoolSize(corePoolSize); }catch(Exception e){ logger.error("任务线程池管理器[{"+taskManagerName+"}]--setCorePoolSize()->马上设置线程池核心线程数执行失败",e); } } }else{ logger.error("任务线程池管理器[{"+taskManagerName+"}]--setCorePoolSize()->马上设置线程池核心线程数执行失败,最大线程数必须大于或者等于0"); } } public int getMaximumPoolSize() { return maximumPoolSize; } public void setMaximumPoolSize(int maximumPoolSize) { if(maximumPoolSize>0){ this.maximumPoolSize = maximumPoolSize; if(executor!=null&&!executor.isShutdown()){ if(maximumPoolSize>0){ try{ executor.setMaximumPoolSize(maximumPoolSize); }catch(Exception e){ logger.error("任务线程池管理器[{"+taskManagerName+"}]--setMaximumPoolSize()->马上设置线程池最大线程数执行失败",e); } } } }else{ logger.error("任务线程池管理器[{"+taskManagerName+"}]--setMaximumPoolSize()->马上设置线程池最大线程数执行失败,最大线程数必须大于0"); } } protected ThreadPoolExecutor getExecutor() { return executor; } /** * 当spring xml中定义的属性初始化完成后,执行该方法 */ @Override public void afterPropertiesSet() throws Exception { logger.warn("任务线程池管理器[{"+taskManagerName+"}]--执行初始化消息管理组件CommonTaskManager: init-afterPropertiesSet"); //初始化线程池 executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueSize)); oldQueueSize=queueSize; } private ExecutorService pool; /** * 初始化加载上次未完成发送的消息继续发送; */ public void initMethod() { logger.warn("任务线程池管理器[{"+taskManagerName+"}]--执行初始化消息管理组件CommonTaskManager[{}]: init-method",taskManagerName); //注册到jmx管理 MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer(); try { ObjectName objectName = new ObjectName("CommonTaskManager:type=CommonTaskManager-"+taskManagerName); beanServer.registerMBean(this, objectName); pool = Executors.newSingleThreadExecutor(); //建立实现了Runnable接口对象,Thread对象固然也实现了Runnable接口 pool.submit(new Runnable() { @Override public void run() { while (true) { try { logger.warn("任务线程池管理器[{"+taskManagerName+"}]--任务管理器中参数详情:corePoolSize:{},maximumPoolSize:{},pageSize:{},queueSize:{}",corePoolSize ,maximumPoolSize,pageSize,queueSize); if(executor!=null&&!executor.isShutdown()){ if(!iseffect){ logger.warn("任务线程池管理器[{"+taskManagerName+"}]--任务管理器中线程池的queueSize设置后还未重启,当前处于未生效状态,"); } logger.warn("任务线程池管理器[{"+taskManagerName+"}]--任务管理器中线程池中参数详情:corePoolSize:{},maximumPoolSize:{},pageSize:{},queueSize:{},queue队列中任务数:{}",executor.getCorePoolSize() ,executor.getMaximumPoolSize(),pageSize,oldQueueSize,executor.getQueue().size()); }else{ logger.error("任务线程池管理器[{"+taskManagerName+"}]--任务管理器中线程池处于非工做状态!"); } //休眠5秒钟 Thread.sleep(30000l); } catch (InterruptedException e) { e.printStackTrace(); } } } }); } catch (Exception e) { logger.error("任务线程池管理器[{"+taskManagerName+"}]--initMethod()->马上中止线程池执行失败",e); } } /** * 执行销毁线程池管理组件代码; */ public void destroyMethod() { logger.warn("任务线程池管理器[{"+taskManagerName+"}]--执行销毁消息管理组件destroy-method"); shutdownAndAwaitTermination(executor); pool.shutdownNow(); logger.warn("任务线程池管理器[{"+taskManagerName+"}]--执行销毁消息管理组件destroy-method...任务管理器完成销毁动做..."); } /** * 线程池暴力销毁 * @param pool */ private void shutdownNow() { try{ executor.shutdownNow(); logger.warn("任务线程池管理器[{"+taskManagerName+"}]--shutdownNow()->马上中止线程池执行成功"); }catch(Exception e){ logger.error("任务线程池管理器[{"+taskManagerName+"}]--shutdownNow()->马上中止线程池执行失败",e); } executor=null; } /** * 执行销毁线程池管理组件代码; */ private void shuntdownDely() { logger.warn("任务线程池管理器[{"+taskManagerName+"}]--执行销毁消息管理组件shuntdownDely-method"); shutdownAndAwaitTermination(executor); logger.warn("任务线程池管理器[{"+taskManagerName+"}]--执行销毁消息管理组件shuntdownDely-method...任务管理器完成销毁动做..."); } /** * 线程池优雅销毁 * * @param pool */ private void shutdownAndAwaitTermination(ExecutorService pool) { try { if(pool.isShutdown()){ logger.warn("任务线程池管理器[{"+taskManagerName+"}]--pool has been shutdown"); }else{ //让线程池没法提交新的任务 pool.shutdown(); // Disable new tasks from being submitted // Wait a while for existing tasks to terminate //等待以前的存在的任务执行60s,而后中止 if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { //当即中止正在执行的任务 pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { logger.warn("任务线程池管理器[{"+taskManagerName+"}]--Pool did not terminate"); } } } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted logger.error("任务线程池管理器[{"+taskManagerName+"}]-中止时发生异常",ie); try{ pool.shutdownNow(); // Preserve interrupt status //Thread.currentThread().interrupt(); }catch(Exception e){ logger.error("任务线程池管理器[{"+taskManagerName+"}]-中止时发生异常",e); } } pool=null; } /** * 发起任务请求 submit Callable task request * @param <V> */ public <V> Future<V> submitCallableTaskRequest(Callable<V> callTask) { Future<V> result=null; try{ result = submitTask(callTask); logger.warn("任务线程池管理器[{"+taskManagerName+"}]--submitCallableTaskRequest()->任务提交成功"); }catch(Exception e){ logger.error("任务线程池管理器[{"+taskManagerName+"}]--submitCallableTaskRequest()->任务提交失败",e); } return result; } /** * 发起任务请求 submit Runnable task request * @param <V> */ public void submitRunableTaskRequest(Runnable runTask) { try{ excuteTask(runTask); logger.warn("任务线程池管理器[{"+taskManagerName+"}]--submitRunableTaskRequest()->任务提交成功"); }catch(Exception e){ logger.error("任务线程池管理器[{"+taskManagerName+"}]--submitRunableTaskRequest()->任务提交失败",e); } } protected <V> Future<V> submitTask(Callable<V> callTask) { Future<V> rs= executor.submit(callTask); return rs; } protected void excuteTask(Runnable runTask) { executor.execute(runTask); } @Override public String stopTaskNow() { if(!this.kill){ this.kill=true; } stop=true; shutdownNow(); return "success"; } @Override public String stopTaskDely() { if(!this.kill){ this.kill=true; } stop=true; shuntdownDely(); return "success"; } @Override public String startTask() { String rs="success"; if(executor!=null&&!executor.isShutdown()){ rs ="fail"; logger.warn("任务线程池管理器[{"+taskManagerName+"}]--startTask()->线程池开启失败"); }else{ //初始化线程池 executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(queueSize)); stop=false; iseffect=true; oldQueueSize=queueSize; logger.warn("任务线程池管理器[{"+taskManagerName+"}]--startTask()->线程池开启成功"); } return rs; } }
3.spring Jmx线程池组件配置文件(可定义多个实例,进行不一样的操做)tomcat
<bean id="synMemberTaskManager" class="com.xxxxxx.job.queue.CommonTaskManager" init-method="initMethod" destroy-method="destroyMethod"> <property name="corePoolSize" value="10" /> <property name="maximumPoolSize" value="20" /> <property name="pageSize" value="1000" /> <property name="queueSize" value="1000" /> <property name="taskManagerName" value="member-taskManager" /> </bean> <bean id="qrcodeTaskManager" class="com.xxxxxxx.job.queue.CommonTaskManager" init-method="initMethod" destroy-method="destroyMethod"> <property name="corePoolSize" value="10" /> <property name="maximumPoolSize" value="20" /> <property name="pageSize" value="1000" /> <property name="queueSize" value="1000" /> <property name="taskManagerName" value="qrcode-taskManager" /> </bean> <bean id="templateMsgTaskManager" class="com.xxxxxx.job.queue.CommonTaskManager" init-method="initMethod" destroy-method="destroyMethod"> <property name="corePoolSize" value="10" /> <property name="maximumPoolSize" value="20" /> <property name="pageSize" value="1000" /> <property name="queueSize" value="1000" /> <property name="taskManagerName" value="templateMsg-taskManager" /> </bean>
4.定时任务队列中使用该线程池组件的案例SynMember 服务器
package com.xxxxxx.jobImpl; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.xxxxxx.job.queue.CommonTaskManager; import com.xxxxxx.model.ThreadLastRuntime; import com.xxxxxx.model.WccMemberCommon; import com.xxxxxx.utils.HessianHelper; /** * 定时同步中间表的信息到会员表 * */ @Lazy(false) @Component public class SynMember { private static final Logger logger = LoggerFactory.getLogger(SynMember.class); @Resource private CommonTaskManager synMemberTaskManager; private static ArrayBlockingQueue queue= new ArrayBlockingQueue(1); //初始化线程池 //每1分钟执行一次 @Async @Scheduled(cron = "0 0/1 * * * ? ") public void synMember(){ //获取中间表未处理的数据 if(queue.size()>0){ logger.warn("前一次任务未执行完成跳过本次任务"); return; } synMemberTaskManager.setKill(false); if(synMemberTaskManager.isStop()){ logger.warn("会员同步线程池为启动,跳过本次任务"); return; } try{ queue.add(new Object()); updateLastThreaRunTime(); Integer size=synMemberTaskManager.getPageSize(); List<WccMemberCommon> memberCommons = HessianHelper.getAppService().findMemberCommon(size); boolean rssiz=memberCommons != null && memberCommons.size() > 0; logger.warn("会员同步中间表未处理的大小==="+memberCommons.size()); while(rssiz){ updateLastThreaRunTime(); if(synMemberTaskManager.isKill()||synMemberTaskManager.isStop()){ logger.warn("synMember()-会员同步处理任务退出本次任务,"); queue.clear(); return ; } List<Future<Boolean>> futureList=new ArrayList<Future<Boolean>>(); logger.warn("synMember()-会员同步处理开始提交任务"); for(WccMemberCommon memberCommon:memberCommons){ if(synMemberTaskManager.isKill()||synMemberTaskManager.isStop()){ break ; } Future<Boolean> future=synMemberTaskManager.submitCallableTaskRequest(new SynMemberCallableThread(memberCommon)); if(future!=null){ futureList.add( future); } } updateLastThreaRunTime(); logger.warn("synMember()-会员同步处理提交任务-结束-开始获取执行结果"); if(futureList.size()>0){ for(Future<Boolean> item:futureList){ try { if(synMemberTaskManager.isKill()||synMemberTaskManager.isStop()){ logger.warn("synMember()-会员同步处理任务退出本次任务,"); queue.clear(); return ; } if(item!=null){ if(item.isCancelled()){ queue.clear(); return; } if(item.isDone()){ //若是已经完成,就不作任何事情 }else{ boolean r=item.get(30, TimeUnit.SECONDS); } } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); logger.error("synMember--会员同步出现错误",e); }catch (Exception e) { logger.error("synMember--会员同步任务出现错误",e); } } }else{ logger.warn("synMember()-会员同步处理提交任务成功数为0,线程池拒绝了任务"); break; } updateLastThreaRunTime(); memberCommons = HessianHelper.getAppService().findMemberCommon(size); logger.warn("synMember()-会员同步处理获取执行结果结束"); logger.warn("synMember()-会员同步处理完成一轮,剩余size大小"+memberCommons.size()); rssiz=memberCommons != null && memberCommons.size() > 0; } logger.warn("会员同步中间表未处理的大小==="+memberCommons.size()+",任务结束"); }catch(Exception e){ logger.error("会员同步中间表任务报错",e); } queue.clear(); } public void updateLastThreaRunTime(){ int rscount= HessianHelper.getAppService().updateByThreadType("memberSync"); ThreadLastRuntime threadLastRuntime=new ThreadLastRuntime(); threadLastRuntime.setThreadType("memberSync"); threadLastRuntime.setLastThreadRuntime(new Date()); if(rscount==0){ HessianHelper.getAppService().save(threadLastRuntime); }else if (rscount>1){ HessianHelper.getAppService().deleteByThreadType("memberSync"); HessianHelper.getAppService().save(threadLastRuntime); } } }
5.Jmx配置(此处服务器是windows server 2012,因此配置是在tomcat注册文件中修改)网络
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=333111 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false
经过注册表修改
win+r --> regedit 打开注册表【有道深刻理解jvm虚拟机】
打开 注册表HKEY_LOCAL_MACHINE\SOFTWARE\Wow6432Node\Apache Software Foundation\Procrun 2.0\Tomcat6\Parameters\Java(路径可能有一点点差异)中的Options。
6.项目组件启动效果图
7.Jmx链接效果图(此处我就用本地截图了)
最后,以上就是**项目系统监控操做线程池的全过程了,若是小伙伴们对Jmx这块的知识感兴趣,能够联系博主或者留言。后期博主将推出Jmx的整个技术系列的教程,敬请期待。