用过sparkstreaming的人都知道,当使用sparkstreaming on yarn模式的时候,若是咱们想查看系统运行的log,是无法直接看的,就算能看也只是一部分。redis
这里的log分:apache
(1)spark自己运行的log微信
(2)代码里面业务产生的logapp
spark on yarn模式,若是你的hadoop集群有100台,那么意味着你的sparkstreaming的log有可能会随机分布在100台中,你想查看log必须登陆上每台机器上,一个个查看,若是经过Hadoop的8088页面查看,你也得打开可能几十个页面才能看到全部的log,那么问题来了?异步
能不能将这个job运行全部的log统一收集到某一个目录里面呢? 若是收集到一块儿的话排查log就很是方便了。oop
答案是很遗憾,在sparkstreaming里面无法作到,由于sparkstreaming程序永远不停机,就算你开启hadoop的log聚合也没用,只有当sparkstreaming程序停掉,hadoop的log聚合才能把全部的log收集到一个目录里面,因此其余的非sparkstreaming程序,好比MR,Spark 运行完后,若是开启log聚合,hadoop会负责把运行在各个节点上的log给统一收集到HDFS上,这样的话咱们查看log就很是方便了。性能
如今的问题是sparkstreaming不能停机,那么还能集中收集log到指定的地方吗?答案是能够的,咱们使用log4j收集日志而后异步发送至kafka里面,最后再经过logstash收集kafka里面的日志进入es便可,这样一条龙服务打通以后,出现任何异常均可以很是快和方便的在es中排查问题,效率大大提高。至于使用logstash从kafka收集到es里面,不是本文的重点,有兴趣的参考散仙前面的文章:http://qindongliang.iteye.com/blog/2278642。spa
下面会介绍下如何使用:调试
streaming项目中的log4j使用的是apache log4j日志
<dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
sparkstreaming项目能够单独提交某个job的log4j文件,这样就能定制每一个job的log输出格式,若是提交的时候不提交log4j文件,那么默认用的是spark安装目录下面的log4j文件。 看下咱们log4j文件的内容:
log4j.rootLogger=WARN,console,kafka #log4j.logger.com.demo.kafka=DEBUG,kafka # appender kafka log4j.appender.kafka=kafka.producer.KafkaLog4jAppender log4j.appender.kafka.topic=kp_diag_log # multiple brokers are separated by comma ",". log4j.appender.kafka.brokerList=192.168.201.6:9092,192.168.201.7:9092,192.168.201.8:9092 log4j.appender.kafka.compressionType=none log4j.appender.kafka.syncSend=false log4j.appender.kafka.layout=org.apache.log4j.PatternLayout #log4j.appender.kafka.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n log4j.appender.kafka.layout.ConversionPattern=[%d] [%p] [%t] %m%n # appender console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=[%d] [%p] [%t] %m%n #log4j.appender.console.layout.ConversionPattern=%d [%-5p] [%t] - [%l] %m%n
最后看下提交脚本:
jars=`echo /home/spark/x_spark_job/streaming_lib/*jar | sed 's/ /,/g'` echo $jars #nohup /opt/bigdata/spark/bin/spark-submit --class com.bigdata.xuele.streaming.SparkStreamingKmd --master yarn --deploy-mode cluster --executor-cores 3 --driver-memory 4g --executor-memory 4g --num-executors 10 --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=logback.xml" --jars $jars kpdiag-stream-1.0.0-SNAPSHOT.jar &> streaming.log & nohup /opt/bigdata/spark/bin/spark-submit --class com.bigdata.xuele.streaming.SparkStreamingKmd --master yarn --deploy-mode cluster \ --files "/home/spark/x_spark_job/log4j.properties" \ --executor-cores 3 --driver-memory 3g --executor-memory 3g --num-executors 12 --jars $jars \ --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j.properties" \ --driver-class-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ --driver-library-path /opt/bigdata/jars/spark/kafka-log4j-appender-0.9.0.0.jar:/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ --conf spark.executor.extraClassPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ --conf spark.executor.extraLibraryPath=/opt/bigdata/jars/spark/kafka_2.11-0.8.2.1.jar:/opt/bigdata/jars/spark/metrics-core-2.2.0.jar:/opt/bigdata/jars/spark/kafka-clients-0.8.2.1.jar \ kpdiag-stream-1.0.0-SNAPSHOT.jar &> kp.log &
注意上面提交脚本中,/opt/bigdata/jars/spark/这个路径引用的jar包,必须在每台hadoop机器上都要存在,sparkstreaming运行过程当中,会从本地加载jar包,此外log4j.properties文件以及参数里面--jars 后面的依赖jar 能够在提交机器上放一份便可,不须要每台机器上都存放。
提交任务后,在kafka的节点上执行消费者命令就能看到对应的log输出: 执行命令:
kafka-console-consumer --zookeeper 192.168.201.5:2181 --topic kp_diag_log
收集到的log内容以下:
[2017-01-21 16:37:03,154] [WARN] [Driver] Support for Java 7 is deprecated as of Spark 2.0.0 [2017-01-21 16:37:19,714] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,740] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,738] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,739] [WARN] [Executor task launch worker-2] 非客观题跳过:类型:0 [2017-01-21 16:37:19,842] [WARN] [Executor task launch worker-0] 题目id:b07e88feff464659ab5a351bf1e68ee0在redis不存在
至此,咱们的log就统一收集成功了,后续咱们能够把log从kafka导入到es中,就能够任意分析和查询了。
这里须要注意一点,sparkstreaming运行时候,系统自己也有大量的log,若是把这个系统log也收集到kafka里面自己的量是很是大的,并且好多信息不重要,其实 咱们只须要关注业务重点log便可,主要是WARN+ERROR级别的,调试的时候能够把info级别打开,代码里重点关注的log都放在warn级别,异常什么的放在ERROR便可 这样排查问题时候也容易并且了避免了大量log的产生从应用自己性能的影响。
有什么问题能够扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。