**xxchat系统之线程池实时监控方案(Jmx远程后台监控、暂停、启用)

    前面有一篇文章中,博主为你们介绍了**xxchat系统线程监控方案。今天博主为你们分享的是**微xxxx系统三大队列任务执行时所用到的线程池的不停服务器监控、暂停、启用、更改参数等等操做。java

    固然,在大多数状况下,小伙伴们可能都遇到的是容许线上停机改配置重启的操做。可是,若是大家的业务有不少人在用,老板要求不宕机的状况下对队列线程池进行动态的修改、重启、监控,那么此时咱们就须要使用一些技术(Jmx、http等)来远程操控。博主今天主要分享的使用jmx来进行动态操做线程池的方案,在分享以前咱们首先得了解jmx这项技术是什么?有什么用途?在何时用?下面博主就开始带你们一块儿去领略jmx高级监控技术的世界。spring

    Jmx(Java Managerment Extensions,即"Java管理扩展"),是一个为应用程序、设备、系统等植入管理功能的框架。Jmx能够跨越一系列异构操做系统平台、系统体系结构和网络传输协议,灵活的开发无缝隙集成的系统、网络和服务管理应用。 Jmx在Java编程语言中定义了应用程序以及网络管理和监控的体系结构、设计模式、应用程序接口以及服务。一般使用Jmx来监控系统的运行状态或管理系统的某些方面,好比清空缓存、从新加载配置文件等。apache

    此处,**系统主要使用Jmx来注册自定义线程池组件,线程池组件上暴露了一系列的方法给Jmx;它们包括:线程池的中止,修改线程池的核心线程数、最大线程数、超时时间、队列大小等等参数,重启线程池等等方法。而后,客户可使用Jmx管理工具(如:jconsole/ jvisualvm/程序Agent等等)链接上组件暴露的Jmx端口,而后调用其暴露的方法以实现具体的业务功能。编程

    如下是**项目的自定义线程池组件代码:windows

1.Jmx组件接口CommonTaskManagerMBean设计模式

package com.xxxxxx.job.queue;
/**
 * 
 * 类CommonTaskManagerMBean.java的实现描述:jmx组件接口
 * @author arron 2018年x月2x日 下午x:x2:x9
 */
public interface CommonTaskManagerMBean {
    int getPageSize();

    void setPageSize(int pageSize);

    int getCorePoolSize();
    
    int getQueueSize();

    void setQueueSize(int queueSize);

    void setCorePoolSize(int corePoolSize);
    
    int getMaximumPoolSize() ;

    void setMaximumPoolSize(int maximumPoolSize);
    
    boolean isKill() ;
    
    public boolean isStop();
    /**
     * 马上停掉
     * @return
     */
    String stopTaskNow();
    /**
     * 优雅停掉
     * @return
     */
    String stopTaskDely();
    /**
     * 开启任务线程池
     * @return
     */
    String startTask();
}

2.Jmx组件接口CommonTaskManagerMBean的具体实现类,也是线程池组件实现类;       CommonTaskManager缓存

package com.xxxxxx.job.queue;

import java.lang.management.ManagementFactory;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.management.MBeanServer;
import javax.management.ObjectName;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;


/**
 * 
 * 类CommonTaskManager.java的实现描述:通用线程池管理组件
 * 
 * @author arron 2018年x月2x日 下午x:x2:x9
 */
public class CommonTaskManager implements InitializingBean,CommonTaskManagerMBean {
    private static final Logger      logger        = LoggerFactory.getLogger(CommonTaskManager.class);
    private int                      corePoolSize=10;

    private int                      maximumPoolSize=20;
    
    private int                      pageSize=1000;
    
    private int                      queueSize=1000;
    //queue size 是否生效
    private boolean              iseffect=true;
    //第几回设置queuesize
    private int                      opcount=0;
    //以前生效的queuesize大小
    private int                      oldQueueSize=0;
    //任务管理器名称
    private String                 taskManagerName="未命名"; 
    private ThreadPoolExecutor executor=null;
    private volatile boolean kill=false;
   
    private volatile boolean stop=false;
    public boolean isKill() {
        return kill;
    }
    public void setKill(boolean kill) {
        this.kill = kill;
    }
    
    public boolean isStop() {
        return stop;
    }
    
    public String getTaskManagerName() {
        return taskManagerName;
    }

    public void setTaskManagerName(String taskManagerName) {
        if(StringUtils.isNotBlank(taskManagerName)){
            this.taskManagerName = taskManagerName;
        }else{
            logger.error("任务线程池管理器[{"+taskManagerName+"}]--setTaskManagerName()->设置线程池设置任务管理器名称执行失败,任务管理器名称不能为空");
        }
    }

    public int getPageSize() {
        return pageSize;
    }

    public void setPageSize(int pageSize) {
        if(pageSize>0){
            this.pageSize = pageSize;
        }else{
            logger.error("任务线程池管理器[{"+taskManagerName+"}]--setPageSize()->设置线程池分页处理size数执行失败,分页处理size数必须大于0");
        }
    }
    
    public int getCorePoolSize() {
        return corePoolSize;
    }
    
    public int getQueueSize() {
        return queueSize;
    }

    public void setQueueSize(int queueSize) {
        if(queueSize>0){
            this.queueSize = queueSize;
        }else{
            logger.error("任务线程池管理器[{"+taskManagerName+"}]--setQueueSize()->设置线程池队列size数执行失败,队列size数必须大于0");
        }
        opcount=opcount+1;
        if(opcount>1){
            iseffect=false;
        }
    }

    public void setCorePoolSize(int corePoolSize) {
        if(corePoolSize>=0){
            this.corePoolSize = corePoolSize;
            if(executor!=null&&!executor.isShutdown()){
                    try{
                        executor.setCorePoolSize(corePoolSize);
                    }catch(Exception e){
                        logger.error("任务线程池管理器[{"+taskManagerName+"}]--setCorePoolSize()->马上设置线程池核心线程数执行失败",e);
                    }               
            }
        }else{
            logger.error("任务线程池管理器[{"+taskManagerName+"}]--setCorePoolSize()->马上设置线程池核心线程数执行失败,最大线程数必须大于或者等于0");
        }
    }

    public int getMaximumPoolSize() {
        return maximumPoolSize;
    }

    public void setMaximumPoolSize(int maximumPoolSize) {
        if(maximumPoolSize>0){
            this.maximumPoolSize = maximumPoolSize;
            if(executor!=null&&!executor.isShutdown()){
                if(maximumPoolSize>0){
                    try{
                        executor.setMaximumPoolSize(maximumPoolSize);
                    }catch(Exception e){
                        logger.error("任务线程池管理器[{"+taskManagerName+"}]--setMaximumPoolSize()->马上设置线程池最大线程数执行失败",e);
                    }     
                }
            }
        }else{
            logger.error("任务线程池管理器[{"+taskManagerName+"}]--setMaximumPoolSize()->马上设置线程池最大线程数执行失败,最大线程数必须大于0");
        }
    }

   

    protected ThreadPoolExecutor getExecutor() {
        return executor;
    }


    /**
     * 当spring xml中定义的属性初始化完成后,执行该方法
     */
    @Override
    public void afterPropertiesSet() throws Exception {
       logger.warn("任务线程池管理器[{"+taskManagerName+"}]--执行初始化消息管理组件CommonTaskManager: init-afterPropertiesSet");
        //初始化线程池
       executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(queueSize));
       oldQueueSize=queueSize;
    }
    private ExecutorService pool;
    /**
     * 初始化加载上次未完成发送的消息继续发送;
     */
    public void initMethod() {
        logger.warn("任务线程池管理器[{"+taskManagerName+"}]--执行初始化消息管理组件CommonTaskManager[{}]: init-method",taskManagerName);
        //注册到jmx管理
        MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();  
        try {
            ObjectName objectName = new ObjectName("CommonTaskManager:type=CommonTaskManager-"+taskManagerName);  
            beanServer.registerMBean(this, objectName);  
            pool = Executors.newSingleThreadExecutor();
            //建立实现了Runnable接口对象,Thread对象固然也实现了Runnable接口
            pool.submit(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            logger.warn("任务线程池管理器[{"+taskManagerName+"}]--任务管理器中参数详情:corePoolSize:{},maximumPoolSize:{},pageSize:{},queueSize:{}",corePoolSize ,maximumPoolSize,pageSize,queueSize);
                            if(executor!=null&&!executor.isShutdown()){
                                if(!iseffect){
                                    logger.warn("任务线程池管理器[{"+taskManagerName+"}]--任务管理器中线程池的queueSize设置后还未重启,当前处于未生效状态,");
                                }
                                logger.warn("任务线程池管理器[{"+taskManagerName+"}]--任务管理器中线程池中参数详情:corePoolSize:{},maximumPoolSize:{},pageSize:{},queueSize:{},queue队列中任务数:{}",executor.getCorePoolSize() ,executor.getMaximumPoolSize(),pageSize,oldQueueSize,executor.getQueue().size());
                            }else{
                                logger.error("任务线程池管理器[{"+taskManagerName+"}]--任务管理器中线程池处于非工做状态!");
                            }
                            //休眠5秒钟
                            Thread.sleep(30000l);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                       
                    }
                }
            });
        } catch (Exception e) {
            logger.error("任务线程池管理器[{"+taskManagerName+"}]--initMethod()->马上中止线程池执行失败",e);
        }
    }

    /**
     * 执行销毁线程池管理组件代码;
     */
    public void destroyMethod() {
        logger.warn("任务线程池管理器[{"+taskManagerName+"}]--执行销毁消息管理组件destroy-method");
        shutdownAndAwaitTermination(executor);
        pool.shutdownNow();
        logger.warn("任务线程池管理器[{"+taskManagerName+"}]--执行销毁消息管理组件destroy-method...任务管理器完成销毁动做...");
    }
    /**
     * 线程池暴力销毁
     * @param pool
     */
    private void shutdownNow() {
        try{
            executor.shutdownNow();
            logger.warn("任务线程池管理器[{"+taskManagerName+"}]--shutdownNow()->马上中止线程池执行成功");
        }catch(Exception e){
            logger.error("任务线程池管理器[{"+taskManagerName+"}]--shutdownNow()->马上中止线程池执行失败",e);
        }
        executor=null;
    }
    /**
     * 执行销毁线程池管理组件代码;
     */
    private void shuntdownDely() {
        logger.warn("任务线程池管理器[{"+taskManagerName+"}]--执行销毁消息管理组件shuntdownDely-method");
        shutdownAndAwaitTermination(executor);
        logger.warn("任务线程池管理器[{"+taskManagerName+"}]--执行销毁消息管理组件shuntdownDely-method...任务管理器完成销毁动做...");
    }
    /**
     * 线程池优雅销毁
     * 
     * @param pool
     */
    private void shutdownAndAwaitTermination(ExecutorService pool) {
        try {
            if(pool.isShutdown()){
                logger.warn("任务线程池管理器[{"+taskManagerName+"}]--pool has been shutdown");
            }else{
                //让线程池没法提交新的任务
                pool.shutdown(); // Disable new tasks from being submitted
                // Wait a while for existing tasks to terminate
                //等待以前的存在的任务执行60s,而后中止
                if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                    //当即中止正在执行的任务
                    pool.shutdownNow(); // Cancel currently executing tasks
                    // Wait a while for tasks to respond to being cancelled
                    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
                        logger.warn("任务线程池管理器[{"+taskManagerName+"}]--Pool did not terminate");
                    }
                }
            }
        } catch (InterruptedException ie) {
            // (Re-)Cancel if current thread also interrupted
            logger.error("任务线程池管理器[{"+taskManagerName+"}]-中止时发生异常",ie);
            try{
                pool.shutdownNow();
                // Preserve interrupt status
                //Thread.currentThread().interrupt();
            }catch(Exception e){
                logger.error("任务线程池管理器[{"+taskManagerName+"}]-中止时发生异常",e);
            }
           
        }
        pool=null;
    }
    /**
     * 发起任务请求 submit Callable task request
     * @param <V>
     */
    public <V> Future<V> submitCallableTaskRequest(Callable<V> callTask) {
        Future<V> result=null;
        try{
            result = submitTask(callTask);
            logger.warn("任务线程池管理器[{"+taskManagerName+"}]--submitCallableTaskRequest()->任务提交成功");
        }catch(Exception e){
            logger.error("任务线程池管理器[{"+taskManagerName+"}]--submitCallableTaskRequest()->任务提交失败",e);
        }
        return result;
    }
    /**
     * 发起任务请求 submit Runnable task request
     * @param <V>
     */
    public void submitRunableTaskRequest(Runnable runTask) {
        try{
            excuteTask(runTask);
            logger.warn("任务线程池管理器[{"+taskManagerName+"}]--submitRunableTaskRequest()->任务提交成功");
        }catch(Exception e){
            logger.error("任务线程池管理器[{"+taskManagerName+"}]--submitRunableTaskRequest()->任务提交失败",e);
        }
         
    }
    protected <V> Future<V> submitTask(Callable<V> callTask) {
         Future<V> rs= executor.submit(callTask);
         return rs;
    }
    protected void excuteTask(Runnable runTask) {
         executor.execute(runTask);
    }

    @Override
    public String stopTaskNow() {
        if(!this.kill){
            this.kill=true;
        }
        stop=true;
        shutdownNow();
        return  "success";
    }

    @Override
    public String stopTaskDely() {
        if(!this.kill){
            this.kill=true;
        }
        stop=true;
        shuntdownDely();
        return "success";
    }

    @Override
    public String startTask() {
        String rs="success";
        if(executor!=null&&!executor.isShutdown()){
            rs ="fail";
            logger.warn("任务线程池管理器[{"+taskManagerName+"}]--startTask()->线程池开启失败");
        }else{
            //初始化线程池
            executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<Runnable>(queueSize));
            stop=false;
            iseffect=true;
            oldQueueSize=queueSize;
            logger.warn("任务线程池管理器[{"+taskManagerName+"}]--startTask()->线程池开启成功");
        }
        return  rs;
    }
    
}

3.spring Jmx线程池组件配置文件(可定义多个实例,进行不一样的操做)tomcat

<bean id="synMemberTaskManager" class="com.xxxxxx.job.queue.CommonTaskManager" init-method="initMethod" destroy-method="destroyMethod">
    	<property name="corePoolSize"  value="10" />
		<property name="maximumPoolSize" value="20" />
		<property name="pageSize" value="1000" />
		<property name="queueSize" value="1000"  />
		<property name="taskManagerName" value="member-taskManager"  />
</bean> 
<bean id="qrcodeTaskManager" class="com.xxxxxxx.job.queue.CommonTaskManager" init-method="initMethod" destroy-method="destroyMethod">
    	<property name="corePoolSize"  value="10" />
		<property name="maximumPoolSize" value="20" />
		<property name="pageSize" value="1000" />
		<property name="queueSize" value="1000"  />
		<property name="taskManagerName" value="qrcode-taskManager"  />
</bean> 
<bean id="templateMsgTaskManager" class="com.xxxxxx.job.queue.CommonTaskManager" init-method="initMethod" destroy-method="destroyMethod">
    	<property name="corePoolSize"  value="10" />
		<property name="maximumPoolSize" value="20" />
		<property name="pageSize" value="1000" />
		<property name="queueSize" value="1000"  />
		<property name="taskManagerName" value="templateMsg-taskManager"  />
</bean>

4.定时任务队列中使用该线程池组件的案例SynMember 服务器

package com.xxxxxx.jobImpl;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.annotation.Resource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import com.xxxxxx.job.queue.CommonTaskManager;
import com.xxxxxx.model.ThreadLastRuntime;
import com.xxxxxx.model.WccMemberCommon;
import com.xxxxxx.utils.HessianHelper;

/**
 * 定时同步中间表的信息到会员表
 * 
*/
@Lazy(false)
@Component
public class SynMember {
    private static final Logger logger = LoggerFactory.getLogger(SynMember.class);
    @Resource
    private CommonTaskManager synMemberTaskManager;
    private  static ArrayBlockingQueue queue=  new ArrayBlockingQueue(1);
    //初始化线程池
    //每1分钟执行一次
    @Async
    @Scheduled(cron = "0 0/1 * * * ? ")
    public  void synMember(){
        //获取中间表未处理的数据
        if(queue.size()>0){
            logger.warn("前一次任务未执行完成跳过本次任务");
            return;
        }
        synMemberTaskManager.setKill(false);
        if(synMemberTaskManager.isStop()){
            logger.warn("会员同步线程池为启动,跳过本次任务");
            return;
        }
        try{
            queue.add(new Object());
            updateLastThreaRunTime();
            Integer size=synMemberTaskManager.getPageSize();
            List<WccMemberCommon> memberCommons = HessianHelper.getAppService().findMemberCommon(size);
            boolean rssiz=memberCommons != null && memberCommons.size() > 0;
            logger.warn("会员同步中间表未处理的大小==="+memberCommons.size());
            while(rssiz){
                updateLastThreaRunTime();
                if(synMemberTaskManager.isKill()||synMemberTaskManager.isStop()){
                    logger.warn("synMember()-会员同步处理任务退出本次任务,");
                    queue.clear();
                    return ;
                }
                 List<Future<Boolean>>  futureList=new ArrayList<Future<Boolean>>();
                 logger.warn("synMember()-会员同步处理开始提交任务");
                 for(WccMemberCommon memberCommon:memberCommons){
                     if(synMemberTaskManager.isKill()||synMemberTaskManager.isStop()){
                         break ;
                     }
                     Future<Boolean>  future=synMemberTaskManager.submitCallableTaskRequest(new SynMemberCallableThread(memberCommon));
                     if(future!=null){
                         futureList.add( future);
                     }
                 }
                 updateLastThreaRunTime();
                 logger.warn("synMember()-会员同步处理提交任务-结束-开始获取执行结果");
                 if(futureList.size()>0){
                     for(Future<Boolean>  item:futureList){
                         try {
                             if(synMemberTaskManager.isKill()||synMemberTaskManager.isStop()){
                                 logger.warn("synMember()-会员同步处理任务退出本次任务,");
                                 queue.clear();
                                 return ;
                             }
                             if(item!=null){
                                 if(item.isCancelled()){
                                     queue.clear();
                                     return;
                                 }
                                 if(item.isDone()){
                                     //若是已经完成,就不作任何事情
                                 }else{
                                     boolean r=item.get(30, TimeUnit.SECONDS);
                                 }
                             }
                          } catch (InterruptedException | ExecutionException e) {
                              e.printStackTrace();
                              logger.error("synMember--会员同步出现错误",e);
                          }catch (Exception e) {
                              logger.error("synMember--会员同步任务出现错误",e);
                          }
                     }
                 }else{
                     logger.warn("synMember()-会员同步处理提交任务成功数为0,线程池拒绝了任务");
                     break;
                 }
                 updateLastThreaRunTime();
                 memberCommons = HessianHelper.getAppService().findMemberCommon(size);
                 logger.warn("synMember()-会员同步处理获取执行结果结束");
                 logger.warn("synMember()-会员同步处理完成一轮,剩余size大小"+memberCommons.size());
                 rssiz=memberCommons != null && memberCommons.size() > 0;
            }
            logger.warn("会员同步中间表未处理的大小==="+memberCommons.size()+",任务结束");
        }catch(Exception e){
            logger.error("会员同步中间表任务报错",e);
        }
        queue.clear();
    }
    public  void updateLastThreaRunTime(){
        int rscount= HessianHelper.getAppService().updateByThreadType("memberSync");
        ThreadLastRuntime threadLastRuntime=new ThreadLastRuntime();
        threadLastRuntime.setThreadType("memberSync");
        threadLastRuntime.setLastThreadRuntime(new Date());
        if(rscount==0){
            HessianHelper.getAppService().save(threadLastRuntime);
        }else if (rscount>1){
            HessianHelper.getAppService().deleteByThreadType("memberSync");
            HessianHelper.getAppService().save(threadLastRuntime);
        }
    }
}

5.Jmx配置(此处服务器是windows server 2012,因此配置是在tomcat注册文件中修改)网络

-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=333111
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=false

经过注册表修改
win+r   -->  regedit 打开注册表【有道深刻理解jvm虚拟机】
打开 注册表HKEY_LOCAL_MACHINE\SOFTWARE\Wow6432Node\Apache Software Foundation\Procrun 2.0\Tomcat6\Parameters\Java(路径可能有一点点差异)中的Options。

6.项目组件启动效果图

 

7.Jmx链接效果图(此处我就用本地截图了)

 

最后,以上就是**项目系统监控操做线程池的全过程了,若是小伙伴们对Jmx这块的知识感兴趣,能够联系博主或者留言。后期博主将推出Jmx的整个技术系列的教程,敬请期待。

相关文章
相关标签/搜索