SparkOnYarn 调用System.exit(0)状态异常 与 scala获取当前活跃线程

一.引言:

在yarn-cluster模式下运行spark程序时,出现任务结束可是显示程序没有退出的状况,在本地和yarn上尝试System.exit(0),本地能够正常退出可是在集群模式下没法正常退出并显示Application状态为Failed。redis

 

二.本地运行

=> 不加入System.exit(x)bash

sc.stop()

在之加入 sc.stop() 的状况下,程序未直接退出,只能手动关闭任务。网络

 

=> 加入System.exit(x)app

sc.stop()
sys.exit(0)

加入sc.stop() 与 sys.exit(0),程序能够正常退出,因而决定将sys.exit(0)加入到集群代码中,看集群代码可否正常执行完毕。this

 

三.spark on yarn运行

在集群的主程序加入sys.exit(0)后,程序第一时间退出,可是结束状态显示为FAILED,看log的输出实际上是正常执行完毕的,可是结束状态却不是SUCCESS。spa

查找了yarn相关的介绍找到了缘由:线程

当使用Yarn集群进行集群管理并启动Spark程序并在脚本中选择 --deploy-mode: cluster 模式时,Spark应用程序代码不是在JVM中运行的,而是由 ApplicationMaster 即常说的 AM 在执行。当尝试在应用程序中调用System.exit(x),时,应用程序首先在 startUserApplication中启动,而后在应用程序返回后调用完成方法。当执行System.exit(0)时,执行的是shutdown hook ,他看到代码还没有成功完成,因此标记状态为Failed,并标记 EXIT_EARLY故障。能够看到spark日志中显示 Shutdown hook ... 。ShutdownHook能够理解为一个监听JVM关闭的底层接口,当检查到咱们用System.exit(x)结束JVM程序时,就会调用ShutdownHook,启动钩子线程结束程序。3d

// The default state of ApplicationMaster is failed if it is invoked by shut down hook.
  // This behavior is different compared to 1.x version.
  // If user application is exited ahead of time by calling System.exit(N), here mark
  // this application as failed with EXIT_EARLY. For a good shutdown, user shouldn't call
  // System.exit(0) to terminate the application.

这里说到若是经过ShutdownHoot调用来关闭ApplicationMaster,则Application默认状态为Failed。这里与1.x版本不一样。若是用户应用程序经过调用 System.exit(N) 提早退出用户应用程序,则在这里标记应用程序失败,并显示未EXIT_EARLY。为了更好的结束应用程序,用户不该该经过调用ShutdownHook来终止应用程序。综上分析咱们的程序总体结束时,还有一些活跃的线程没有结束从而致使咱们调用 exit(x) 提早结束而后显示为FAILED,因此接下来须要查一下还有哪些线程在程序结束时处于活跃并无退出。日志

 

四.活跃线程分析

在调用sc.stop()程序下面加入以下代码进行线程分析,经过遍历线程树获取当前全部活跃线程与线程名:
 
netty

......

    Main Function ...   
    
    ......

    sc.stop()

    var group = Thread.currentThread.getThreadGroup
    var topGroup = group
    // 遍历线程组树,获取根线程组
    while ( {
      group != null
    }) {
      topGroup = group
      group = group.getParent
    }
    // 激活的线程数再加一倍,防止枚举时有可能恰好有动态线程生成
    val slackSize = topGroup.activeCount * 2
    val slackThreads = new Array[Thread](slackSize)
    // 获取根线程组下的全部线程,返回的actualSize即是最终的线程数
    val actualSize = topGroup.enumerate(slackThreads)
    val atualThreads = new Array[Thread](actualSize)
    // 复制slackThreads中有效的值到atualThreads
    System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize)
    System.out.println("Threads size is " + atualThreads.length)
    for (thread <- atualThreads) {
      System.out.println("Thread name : " + thread.getName)
    }

sc.stop()以后活跃线程共计:

Threads size is 364

经过检查发现了大量redission-netty的线程从而定位到程序没有正常结束的缘由是redission client启动后未调用close方法,从而一直在线程中活跃致使程序没法退出,调用client的close方法后,程序正常退出,问题解决:

Thread name : redisson-netty-2-1
Thread name : redisson-netty-2-2
Thread name : redisson-netty-2-3
Thread name : redisson-netty-2-4

 

五.总结

1.spark程序在local和yarn-cluster下运行状态控制模式,一个基于JVM,一个基于AppicationMaster,因此调用 System.exit(x) 结果不一样

2.启动一些基于网络的客户端在不使用的状况下及时关闭客户端防止有线程一直活跃,也能够用相似try-with-resources使用完毕后关闭客户端服务

3.尽可能不使用System.exit(x)来关闭程序

相关文章
相关标签/搜索