前两天阅读公司代码看到了用JMX监控定时任务信息和状态,JMX这个单词感受很熟因而便去查阅了一下,并写了监控线程池的Demojava
JMX(Java Management Extensions)
,监控管理框架,经过使用JMX
能够监控和管理应用程序。JMX
最多见的场景是监控Java
程序的基本信息和运行状况,任何Java
程序均可以开启JMX
,而后使用JConsole
或Visual VM
进行预览git
HTTP
链接、
RMI
链接、
SNMP
链接
MBean
,经过将
MBean
注册到代理层实现
MBean
的管理,除了注册
MBean
,还能够注册
Adapter
,代理层在应用中通常都是
MBeanService
应用中通常使用Standard MBean
比较多,因此这里只介绍Standard MBean
,使用Standard MBean
须要知足必定的规则,规则以下:github
XXXXMBean
的格式,必须以MBean
结尾XXXXMBean
,则接口实现类必须为MBean
,不然程序将报错get
和set
方法表示监控指标是否可读、可写。好比getXXX()
抽象方法,则XXX
就是监控的指标,getXXX()
表示XXX
性能指标可读,setXXX()
方法表示该监控指标可写String
)和基本数据类型,不能够是自定义类型,若是返回值为自定义类型能够选择MXBean
线程池是线程的管理工具,经过使用线程池能够复用线程下降资源消耗、提升响应速度、提升线程的可管理性。若是在系统中大量使用线程池,就必须对线程池进行监控方便出错时定位问题。能够经过线程池提供的参数进行监控,线程池提供的参数以下:bash
方法 | 含义 |
---|---|
getActiveCount | 线程池中正在执行任务的线程数量 |
getCompletedTaskCount | 线程池已完成的任务数量 |
getCorePoolSize | 线程池的核心线程数量 |
getLargestPoolSize | 线程池曾经建立过的最大线程数量 |
getMaximumPoolSize | 线程池的最大线程数量 |
getPoolSize | 线程池当前的线程数量 |
getTaskCount | 线程池须要执行的任务数量 |
介绍完JMX
及线程池之后,写一个JMX
监控线程池的Demo
,总不能纸上谈兵吧架构
定义线程池监控类:ThreadPoolMonitor.java
框架
public class ThreadPoolMonitor extends ThreadPoolExecutor {
private final Logger logger = LoggerFactory.getLogger(getClass());
/**
* ActiveCount
* */
int ac = 0;
/**
* 当前全部线程消耗的时间
* */
private AtomicLong totalCostTime = new AtomicLong();
/**
* 当前执行的线程总数
* */
private AtomicLong totalTasks = new AtomicLong();
/**
* 线程池名称
*/
private String poolName;
/**
* 最短 执行时间
* */
private long minCostTime;
/**
* 最长执行时间
* */
private long maxCostTime;
/**
* 保存任务开始执行的时间
*/
private ThreadLocal<Long> startTime = new ThreadLocal<>();
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), poolName);
}
public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, String poolName) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.poolName = poolName;
}
public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
}
public static ExecutorService newCachedThreadPool(String poolName) {
return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName);
}
public static ExecutorService newSingleThreadExecutor(String poolName) {
return new ThreadPoolMonitor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
}
/**
* 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池状况
*/
@Override
public void shutdown() {
// 统计已执行任务、正在执行任务、未执行任务数量
logger.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
// 统计已执行任务、正在执行任务、未执行任务数量
logger.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
return super.shutdownNow();
}
/**
* 任务执行以前,记录任务开始时间
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
startTime.set(System.currentTimeMillis());
}
/**
* 任务执行以后,计算任务结束时间
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
long costTime = System.currentTimeMillis() - startTime.get();
startTime.remove(); //删除,避免占用太多内存
//设置最大最小执行时间
maxCostTime = maxCostTime > costTime ? maxCostTime : costTime;
if (totalTasks.get() == 0) {
minCostTime = costTime;
}
minCostTime = minCostTime < costTime ? minCostTime : costTime;
totalCostTime.addAndGet(costTime);
totalTasks.incrementAndGet();
logger.info("{}-pool-monitor: " +
"Duration: {} ms, PoolSize: {}, CorePoolSize: {}, ActiveCount: {}, " +
"Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
"MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
this.poolName,
costTime, this.getPoolSize(), this.getCorePoolSize(), super.getActiveCount(),
this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
}
public int getAc() {
return ac;
}
/**
* 线程平均耗时
*
* @return
* */
public float getAverageCostTime() {
return totalCostTime.get() / totalTasks.get();
}
/**
* 线程最大耗时
* */
public long getMaxCostTime() {
return maxCostTime;
}
/**
* 线程最小耗时
* */
public long getMinCostTime() {
return minCostTime;
}
/**
* 生成线程池所用的线程,改写了线程池默认的线程工厂
*/
static class EventThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
/**
* 初始化线程工厂
*
* @param poolName 线程池名称
*/
EventThreadFactory(String poolName) {
SecurityManager s = System.getSecurityManager();
group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}
复制代码
经过继承线程池来自定义线程池,并在构造函数中加入了poolName
标明是哪个线程池,同时重写了beforeExecute
、afterExecute
、terminated
等方法,在beforeExecute
方法中记录线程池执行的时间,在afterExecute
方法中计算线程执行的耗时、最大耗时、最小耗时、平均耗时。重写线程池生成线程的方法,指定了生成的线程名dom
定义一个MBean
:ThreadPoolParamMBean.java
MBean
其实并非一个实体类而是一个接口,里面定义了监控的指标ide
public interface ThreadPoolParamMBean {
/**
* 线程池中正在执行任务的线程数量
*
* @return
*/
int getActiveCount();
/**
* 线程池已完成的任务数量
*
* @return
*/
long getCompletedTaskCount();
/**
* 线程池的核心线程数量
*
* @return
*/
int getCorePoolSize();
/**
* 线程池曾经建立过的最大线程数量
*
* @return
*/
int getLargestPoolSize();
/**
* 线程池的最大线程数量
*
* @return
*/
int getMaximumPoolSize();
/**
* 线程池当前的线程数量
*
* @return
*/
int getPoolSize();
/**
* 线程池须要执行的任务数量
*
* @return
*/
long getTaskCount();
/**
* 线程最大耗时
*
* @return
* */
long getMaxCostTime();
/**
* 线程最小耗时
*
* @return
* */
long getMinCostTime();
/**
* 线程平均耗时
*
* @return
* */
float getAverageCostTime();
}
复制代码
定义一个MBean
实现类:ThreadPoolParam.java
定义的是静态MBean,因此接口实现类必须知足规定,即xxxMBean
,实现类为xxx
函数
public class ThreadPoolParam implements ThreadPoolParamMBean {
private ThreadPoolMonitor threadPoolMonitor;
public ThreadPoolParam(ExecutorService es) {
this.threadPoolMonitor = (ThreadPoolMonitor) es;
}
/**
* 线程池中正在执行任务的线程数量
*
* @return
*/
@Override
public int getActiveCount() {
return threadPoolMonitor.getAc();
}
/**
* 线程池已完成的任务数量
*
* @return
*/
@Override
public long getCompletedTaskCount() {
return threadPoolMonitor.getCompletedTaskCount();
}
/**
* 线程池的核心线程数量
*
* @return
*/
@Override
public int getCorePoolSize() {
return threadPoolMonitor.getCorePoolSize();
}
/**
* 线程池曾经建立过的最大线程数量
*
* @return
*/
@Override
public int getLargestPoolSize() {
return threadPoolMonitor.getLargestPoolSize();
}
/**
* 线程池的最大线程数量
*
* @return
*/
@Override
public int getMaximumPoolSize() {
return threadPoolMonitor.getMaximumPoolSize();
}
/**
* 线程池当前的线程数量
*
* @return
*/
@Override
public int getPoolSize() {
return threadPoolMonitor.getPoolSize();
}
/**
* 线程池须要执行的任务数量
*
* @return
*/
@Override
public long getTaskCount() {
return threadPoolMonitor.getTaskCount();
}
/**
* 线程最大耗时
*
* @return
* */
@Override
public long getMaxCostTime() {
return threadPoolMonitor.getMaxCostTime();
}
/**
* 线程最小耗时
*
* @return
* */
@Override
public long getMinCostTime() {
return threadPoolMonitor.getMinCostTime();
}
/**
* 线程平均耗时
*
* @return
* */
@Override
public float getAverageCostTime() {
return threadPoolMonitor.getAverageCostTime();
}
}
复制代码
监控的参数指标经过线程池获得工具
测试类:Test.java
public class Test {
private static Random random = new Random();
public static void main(String[] args) throws MalformedObjectNameException, InterruptedException {
ExecutorService es1 = ThreadPoolMonitor.newCachedThreadPool("test-pool-1");
ThreadPoolParam threadPoolParam1 = new ThreadPoolParam(es1);
ExecutorService es2 = ThreadPoolMonitor.newCachedThreadPool("test-pool-2");
ThreadPoolParam threadPoolParam2 = new ThreadPoolParam(es2);
MBeanServerUtil.registerMBean(threadPoolParam1, new ObjectName("test-pool-1:type=threadPoolParam"));
MBeanServerUtil.registerMBean(threadPoolParam2, new ObjectName("test-pool-2:type=threadPoolParam"));
//http链接的方式查看监控任务
HtmlAdaptor.start();
executeTask(es1);
executeTask(es2);
Thread.sleep(1000 * 60 * 60);
}
private static void executeTask(ExecutorService es) {
new Thread(() -> {
for (int i = 0; i < 10; i++) {
int temp = i;
es.submit(() -> {
//随机睡眠时间
try {
Thread.sleep(random.nextInt(60) * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(temp);
});
}
}).start();
}
}
复制代码
说明:
MBeanServerUtil.registerMBean()
注册监控的类HtmlAdaptor.start()
开启HTTP
链接的方式查看监控任务启动程序后打开http://localhost:8082/
以下图:
点击test-pool-1
下的type=threadPoolParam
经过刷新获取线程池最新的监控指标 test-pool-1
和type=threadPoolParam
这些属性是在ObjectName
中定义的属性值
使用JMX
监控线程池只是JMX
一个功能,本篇文章只是学以至用,更多有关JMX以及线程池的内容能够查阅其余资料。文章如有错误欢迎指正
最后附:项目代码,欢迎fork与star,【我都划重点了就star一下】