Spark资源申请肯定内存和CPU数量案例实战-Spark商业应用实战

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客。版权声明:本套Spark商业应用实战归做者(秦凯新)全部,禁止转载,欢迎学习。java

1 Spark基于Yarn模式进行资源申请

1.1 executor个数肯定

以yarn模式启动(必须拷贝spark jar包)在yarn模式下,如何肯定executor个数,直接指定 –num-executors 这个参数便可。ios

咱们知道,使用yarn做为cluster manager时,spark(以client模式为例)用spark-submit提交应用程序(或者是spark-shell交互操做)不加任何资源参数时,会使用以下几个默认配置来向yarn的resourcemanager申请container资源:web

  • spark.executor.memory     1g
  • spark.executor.cores      1
  • spark.executor.instances  2
  • spark.yarn.am.memory      512m
  • spark.yarn.am.cores       1

按照参数的默认值,yarn将会生成3个containers,第一个是container0,用来执行applicationmaster功能,另外两个container,就是分配给spark程序的CoarseGrainedExecutorBackend. 结合上面这些默认值,咱们认为将会占用集群的3个vcores,3.5G memory。 第一个问题来了,为何memory使用的数量是5个, 为何memory使用的数量不是想象中的3.5g呢?sql

原来,yarn对于应用程序所要申请的内存资源,有两个参数来影响实际申请到内存容量: 第一个是yarn.scheduler.minimum-allocation-mb:最小可申请内存量,默认是1024。 第二个是规整化因子(FIFO和Capacity Scheduler时,规整化因子等于最小可申请资源量,不可单独配置;Fair Scheduler时,规整化因子经过参数yarn.scheduler.increment-allocation-mb设置,默认是1024),其做用是应用程序申请的资源若是不是该因子的整数倍,则将被修改成最小的整数倍对应的值。 因为每一个容器都会占用一些额外的内存,因此致使CoarseGrainedExecutorBackend的每一个容器实际使用的内存数 > 申请的1G,在规整化因子的做用下,这些容器实际申请的内存,就会是2G;而applicationmaster所在的容器,由于申请内存不到1G,则在最小可申请内存的做用下,实际申请的内存就会是1G。shell

$SPARK_HOME/bin/spark-submit --master yarn  --deploy-mode cluster --class you.class.Name --executor-memory 1g --executor-cores 1 --num-executors 8 --driver-memory 2g  /you/jar.jar
复制代码

1.2 同时调度cpu core和memory

yarn-site.xml
<property>
    <name>yarn.resourcemanager.scheduler.class</name>   
    <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>

capacity-scheduler.xml
<property>
    <name>yarn.scheduler.capacity.resource-calculator</name>
    <value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
</property>
复制代码

1.3 Yarn模式启动提交脚本

spark-shell --master yarn --executor-memory 512m --num-executors 4 --executor-cores 2

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --executor-memory 512m --num-executors 3 ./examples/jars/spark-examples_2.11-2.3.0.jar 1000
复制代码

2 Spark以standalone模式进行资源申请

2.1 executor个数肯定

standlone模式下,如何肯定内存和cpu数量:公式:execuoterNum = spark.cores.max/spark.executor.cores spark.cores.max:表示整个集群所具备的cpu内核数量,相关参数在启动具体应用时指定apache

  • --total-executor-cores
  • --executor-cores

它们共同决定了当前应用启动executor的个数,因此经过设置total-executor-cores,能够决定executor的个数。app

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://bd-master:7077 --executor-memory 512m --num-executors 3 ./examples/jars/spark-examples_2.11-2.3.0.jar 1000

spark-shell --master spark://bd-master:7077 --total-executor-cores 40 --executor-memory 4096m --executor-cores 4
复制代码

2.1 Spark Shell测试

sc.textFile("hdfs://bd-master:9000/user/root/input").flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).collect.foreach(println)

sc.textFile("hdfs://bd-master:9000/waflog").flatMap(_.split("|")).collect.take(10).foreach(println)
复制代码

2.2 日志模型案例测试

$remote_addr | $time_local | $request | $status | $body_bytes_sent | $bytes_sent | $gzip_ratio 
| $http_referer | $http_user_agent | $http_x_forwarded_for | $upstream_addr 
| $upstream_response_time | $upstream_status | $request_time | $host;
复制代码

2.3:离线ETL

import java.text.SimpleDateFormat
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.{SparkConf, SparkContext}
    import java.util.{Calendar, Date}
    
    val DATE_FORMAT = new SimpleDateFormat("yyyyMMdd")
    val DATE_FORMAT_ = new SimpleDateFormat("yyyy-MM-dd")
    
    val lines = sc.textFile("/opendir/opendir/access.log-20180620")
    
    val formatedLog = lines.map(log =>{
      val logSplited = log .split("\\|")
      val eventTime = logSplited(1)
      val todayDate = DATE_FORMAT_.format(new Date().getTime)
      val cutTime = eventTime.substring(13, eventTime.length - 7)
      val dataTime = todayDate + " " + cutTime
      logSplited(1)=dataTime
    
      for(i <- 0 to (logSplited.length-1)){
        logSplited(i)=logSplited(i).trim
      }
      logSplited.mkString("@@")
    })
    
    val outpath = "hdfs://bd-master:9000/waflog/access.log-20180620"
    formatedLog.saveAsTextFile(outpath) 
复制代码

2.4:数据修正--细化粒度

import java.sql.Timestamp
    import java.text.SimpleDateFormat
    import java.util.{Calendar, Date}
    
    val DATE_FORMAT =  new SimpleDateFormat("yyyyMMdd")
    val DATE_FORMAT_ = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    private val cal: Calendar = Calendar.getInstance
    
    //日志模型--加入小时分钟标志
    case class wafLogModel(remote_addr:String, time_local:Timestamp, request:String,
                             status:String, body_bytes_sent:String, bytes_sent:Long,
                             gzip_ratio:String, http_referer:String, http_user_agent:String,
                             http_x_forwarded_for:String, upstream_addr:String, upstream_response_time:String,
                             upstream_status:String, request_time:String, host:String, hour_flag:String, minute_flag:String) 
    val fileRDD = sc.textFile("hdfs://bd-master:9000/waflog/access.log-20180620")
    import spark.implicits._
    val wafLogRDD = fileRDD.filter(x=>{
            if(x.contains("\\xFAW\\xF7")) {
              false
            }else{
              true
            }
         }).map(line => line.split("@@")).map(x => {
          val ip = x(0).trim
          val urlDetails = x(2).split("/")
          var url = ""
          if (urlDetails.length == 1)  url = urlDetails(0).trim
          else if (urlDetails.length == 2) url = urlDetails(0)+" "+urlDetails(1).trim
          else if (urlDetails.length == 3) url = urlDetails(0)+" "+urlDetails(1) + "/" + urlDetails(2).trim
          else if (urlDetails.length >= 4) url = urlDetails(0)+" "+urlDetails(1) + "/" + urlDetails(2) + "/" + urlDetails(3).trim
    
          val eventTime = Timestamp.valueOf(x(1))
          val format_date = DATE_FORMAT_.format(eventTime)
          val hourflag = format_date.substring(11,13)
          val minuteflag = hourflag+":"+format_date.substring(14,16)
    
          var bytesSent = ""
          var host=""
          if(x(5).trim.equals("/error= HTTP/1.1")){
            bytesSent=x(8).trim
            host = x(17).trim
            url ="GET = ReportProductSn&LoginCode=LoginID&ProductSn=ZJSN/error= HTTP/1.1 (Exception Log)"
          }else{
            bytesSent = x(5).trim
            host = x(14).trim
          }
          val bytesSentMb:Long = bytesSent.toLong/1024/1024L;
    
          wafLogModel(x(0),eventTime, url, x(3), x(4),bytesSentMb, x(6), x(7), x(8), x(9), x(10), x(11), x(12), x(13),host,hourflag, minuteflag)
        })
复制代码

2.5: 数据分析--域名访问统计

val wafLogDs = wafLogRDD.toDS()
wafLogDs.createOrReplaceTempView("wafLog")
val urlStat = spark.sql("SELECT host, remote_addr, count(*) as total_count FROM wafLog group by host, remote_addr order by total_count desc limit 10")
urlStat.show

+--------------------+--------------+-----------+                               
|                host|   remote_addr|total_count|
+--------------------+--------------+-----------+
|  hcdt.dataserver.cn|192.168.100.61|      73642|
|resource.dataserv...| 58.60.228.148|      61113|
|  hcdt.dataserver.cn| 58.60.228.148|      45858|
|testuweb.dataserv...| 58.60.228.148|      44042|
|hcautotestkyj.dat...| 58.60.228.148|      42827|
|gdlmdt.dataserver.cn| 14.157.120.63|      36587|
|resource.dataserv...| 14.157.120.63|      26947|
|   cbs.dataserver.cn|192.168.100.62|      26726|
|   cbs.dataserver.cn|192.168.100.61|      26503|
|message.dataserve...| 58.60.228.148|      25739|
+--------------------+--------------+-----------+

val urlStatStore =  urlStat.map(row => row(0)+"|"+row(1)+"|"+row(2)).rdd
urlStatStore.saveAsTextFile("/wafResult/20180620");

hcdt.dataserver.cn|192.168.100.61|73642
resource.dataserver.cn|58.60.228.148|61113
hcdt.dataserver.cn|58.60.228.148|45858
testuweb.dataserver.cn|58.60.228.148|44042
hcautotestkyj.dataserver.cn|58.60.228.148|42827
gdlmdt.dataserver.cn|14.157.120.63|36587
resource.dataserver.cn|14.157.120.63|26947
cbs.dataserver.cn|192.168.100.62|26726
cbs.dataserver.cn|192.168.100.61|26503
message.dataserver.cn|58.60.228.148|25739

case class urlStatModel(host:String,remote_addr:String,total_count:String)
urlStat.as[urlStatModel].map(urlStat => urlStat.host+"|"+urlStat.remote_addr+"|"+urlStat.total_count).rdd.saveAsTextFile("/wafResult2/20180620");

hcdt.dataserver.cn|192.168.100.61|73642
resource.dataserver.cn|58.60.228.148|61113
hcdt.dataserver.cn|58.60.228.148|45858
testuweb.dataserver.cn|58.60.228.148|44042
hcautotestkyj.dataserver.cn|58.60.228.148|42827
gdlmdt.dataserver.cn|14.157.120.63|36587
resource.dataserver.cn|14.157.120.63|26947
cbs.dataserver.cn|192.168.100.62|26726
cbs.dataserver.cn|192.168.100.61|26503
message.dataserver.cn|58.60.228.148|25739
复制代码

2.6: Hive数据建模:

create table accesslog(
host string, 
remote_addr string,
total_count bigint
)row format delimited fields terminated by '|';

从hdfs导入到hive
load data inpath '/wafResult/20180620' overwrite into table accesslog;
复制代码

2.7: 数据分析--流量统计

val bytesStat = spark.sql("SELECT host, remote_addr, request, max(bytes_sent) as max_byte FROM wafLog group by host,remote_addr,request order by max_byte desc limit 10")
bytesStat.show
+--------------------+--------------+--------------------+--------+             
|                host|   remote_addr|             request|max_byte|
+--------------------+--------------+--------------------+--------+
|resource.dataserv...|27.123.214.103|GET  download/bro...|      42|
|  hcdt.dataserver.cn|61.178.233.112|GET  1.1/componen...|      40|
|qdakfhdt.dataserv...| 58.56.156.190|GET  1.1/componen...|      40|
|westdt.dataserver.cn|222.179.116.10|GET  1.1/componen...|      40|
|security.dataserv...| 119.23.123.17|GET  iosDeploy/el...|      28|
|bestlink.dataserv...|180.97.106.135|GET  /uploadfile/APP|      22|
|security.dataserv...| 112.17.244.69|GET  iosDeploy/uw...|      17|
|greatdt.dataserve...| 58.210.39.230|GET  monitor/webs...|      16|
|  rdts.dataserver.cn| 61.130.49.162|GET  rdts?ip=192....|      15|
|security.dataserv...| 119.23.123.25|GET  iosDeploy/ca...|      13|
+--------------------+--------------+--------------------+--------+
复制代码

2.8: 数据分析--按小时进行访问次数统计

val urlStat = spark.sql("SELECT hour_flag, host, remote_addr, count(*) as total_count FROM wafLog group by hour_flag,host,remote_addr order by total_count desc limit 50")
urlStat.show
+---------+--------------------+--------------+-----------+                     
|hour_flag|                host|   remote_addr|total_count|
+---------+--------------------+--------------+-----------+
|       13|  hcdt.dataserver.cn| 58.60.228.148|       8650|
|       08|  hcdt.dataserver.cn| 58.60.228.148|       8606|
|       21|sdryer2.dataserve...|171.213.124.37|       8324|
|       04|  hcdt.dataserver.cn|192.168.100.61|       7162|
|       05|  hcdt.dataserver.cn|192.168.100.61|       7144|
|       12|  hcdt.dataserver.cn|192.168.100.61|       7131|
|       13|  hcdt.dataserver.cn|192.168.100.61|       7108|
|       20|  hcdt.dataserver.cn|192.168.100.61|       7106|
|       21|  hcdt.dataserver.cn|192.168.100.61|       7083|
|       11|  hcdt.dataserver.cn|192.168.100.61|       6068|
|       03|  hcdt.dataserver.cn|192.168.100.61|       6064|
|       19|  hcdt.dataserver.cn|192.168.100.61|       6029|
|       09|gdlmdt.dataserver.cn| 14.157.120.63|       5557|
|       10|gdlmdt.dataserver.cn| 14.157.120.63|       5297|
|       14|gdlmdt.dataserver.cn| 14.157.120.63|       4148|
|       13|gdlmdt.dataserver.cn| 14.157.120.63|       4140|
|       14|  hcdt.dataserver.cn|192.168.100.61|       3867|
|       12|gdlmdt.dataserver.cn| 14.157.120.63|       3789|
|       11|gdlmdt.dataserver.cn| 14.157.120.63|       3771|
|       15|gdlmdt.dataserver.cn| 14.157.120.63|       3756|
+---------+--------------------+--------------+-----------+
复制代码

2.9: 数据分析--按小时进行接口流量统计

val bytesStat = spark.sql("SELECT hour_flag,host,remote_addr,request,max(bytes_sent) as max_byte FROM wafLog group by hour_flag, host, remote_addr, request order by max_byte desc limit 50")
 
 bytesStat.show
+---------+--------------------+---------------+--------------------+--------+  
|hour_flag|                host|    remote_addr|             request|max_byte|
+---------+--------------------+---------------+--------------------+--------+
|       15|resource.dataserv...| 27.123.214.103|GET  download/bro...|      42|
|       09|westdt.dataserver.cn| 222.179.116.10|GET  1.1/componen...|      40|
|       15|qdakfhdt.dataserv...|  58.56.156.190|GET  1.1/componen...|      40|
|       09|  hcdt.dataserver.cn| 61.178.233.112|GET  1.1/componen...|      40|
|       11|security.dataserv...|  119.23.123.17|GET  iosDeploy/el...|      28|
|       09|security.dataserv...|  119.23.123.17|GET  iosDeploy/el...|      28|
|       11|westdt.dataserver.cn| 222.179.116.10|GET  1.1/componen...|      27|
|       23|bestlink.dataserv...| 180.97.106.135|GET  /uploadfile/APP|      22|
|       11|security.dataserv...|  112.17.244.69|GET  iosDeploy/uw...|      17|
|       07|greatdt.dataserve...|  58.210.39.230|GET  monitor/webs...|      16|
|       16|  rdts.dataserver.cn|  61.130.49.162|GET  rdts?ip=192....|      15|
|       16|security.dataserv...|  119.23.123.25|GET  iosDeploy/ca...|      13|
|       16|  rdts.dataserver.cn|  61.130.49.162|GET  rdts?ip=192....|      11|
|       23|  rdts.dataserver.cn|  183.33.59.157|GET  rdts?ip=192....|      11|
|       14|  rdts.dataserver.cn|  61.130.49.162|GET  rdts?ip=192....|      11|
|       21|bestlink.dataserv...|  123.125.71.74|GET  uploadfile/A...|       9|
|       13|hcuweb.dataserver.cn| 27.123.214.107|GET  uploadfile/A...|       9|
|       18|  hnks.dataserver.cn| 122.192.15.137|GET  uploadfile/s...|       9|
|       16|hcuweb.dataserver.cn|   122.192.13.2|GET  /uploadfile/...|       9|
|       07|bestlink.dataserv...|211.138.116.246|GET  /uploadfile/...|       8|
+---------+--------------------+---------------+--------------------+--------+
复制代码

2.10: 数据分析--按分钟进行访问次数统计

val urlStat = spark.sql("SELECT minute_flag,host,remote_addr,request, count(*) as total_count FROM wafLog group by minute_flag, host, remote_addr, request order by total_count desc limit 50")

urlStat.show
+-----------+--------------------+--------------+--------------------+-----------+
|minute_flag|                host|   remote_addr|             request|total_count|
+-----------+--------------------+--------------+--------------------+-----------+
|      21:33|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        304|
|      21:37|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        302|
|      21:35|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        302|
|      21:34|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        302|
|      22:00|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        299|
|      22:01|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        298|
|      21:36|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        296|
|      22:02|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        293|
|      21:39|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        293|
|      21:40|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        292|
|      21:55|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        292|
|      21:53|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        292|
|      21:52|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        289|
|      21:31|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        288|
|      21:58|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        288|
|      21:38|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        286|
|      21:42|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        286|
|      21:48|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        284|
|      21:59|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        282|
|      21:54|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        280|
+-----------+--------------------+--------------+--------------------+-----------+
复制代码

2.20: 数据分析--按分钟进行接口流量统计

val bytesStat = spark.sql("SELECT minute_flag,host,remote_addr,request,max(bytes_sent) as max_byte FROM wafLog group by minute_flag, host, remote_addr, request order by max_byte desc limit 50")

bytesStat.show
+-----------+--------------------+---------------+--------------------+--------+
|minute_flag|                host|    remote_addr|             request|max_byte|
+-----------+--------------------+---------------+--------------------+--------+
|      15:21|resource.dataserv...| 27.123.214.103|GET  download/bro...|      42|
|      09:29|  hcdt.dataserver.cn| 61.178.233.112|GET  1.1/componen...|      40|
|      09:42|westdt.dataserver.cn| 222.179.116.10|GET  1.1/componen...|      40|
|      15:58|qdakfhdt.dataserv...|  58.56.156.190|GET  1.1/componen...|      40|
|      11:49|security.dataserv...|  119.23.123.17|GET  iosDeploy/el...|      28|
|      09:21|security.dataserv...|  119.23.123.17|GET  iosDeploy/el...|      28|
|      11:03|westdt.dataserver.cn| 222.179.116.10|GET  1.1/componen...|      27|
|      23:31|bestlink.dataserv...| 180.97.106.135|GET  /uploadfile/APP|      22|
|      11:06|security.dataserv...|  112.17.244.69|GET  iosDeploy/uw...|      17|
|      07:51|greatdt.dataserve...|  58.210.39.230|GET  monitor/webs...|      16|
|      16:35|  rdts.dataserver.cn|  61.130.49.162|GET  rdts?ip=192....|      15|
|      16:41|security.dataserv...|  119.23.123.25|GET  iosDeploy/ca...|      13|
|      14:01|  rdts.dataserver.cn|  61.130.49.162|GET  rdts?ip=192....|      11|
|      23:00|  rdts.dataserver.cn|  183.33.59.157|GET  rdts?ip=192....|      11|
|      16:35|  rdts.dataserver.cn|  61.130.49.162|GET  rdts?ip=192....|      11|
|      18:37|  hnks.dataserver.cn| 122.192.15.137|GET  uploadfile/s...|       9|
|      21:40|bestlink.dataserv...|  123.125.71.74|GET  uploadfile/A...|       9|
|      16:12|hcuweb.dataserver.cn|   122.192.13.2|GET  /uploadfile/...|       9|
|      13:02|hcuweb.dataserver.cn| 27.123.214.107|GET  uploadfile/A...|       9|
|      07:56|bestlink.dataserv...|211.138.116.246|GET  /uploadfile/...|       8|
+-----------+--------------------+---------------+--------------------+--------+
复制代码

3: 总结

秦凯新 于深圳oop

相关文章
相关标签/搜索