参考文章:在idea里面怎么远程提交spark任务到yarn集群html
spark任务运行的几种模式:java
1,本地模式,在idea里面写完代码直接运行.node
2,standalone模式,须要把程序打jar包,上传到集群,spark-submit提交到集群运行web
3,yarn模式(local,client,cluster)跟上面的同样,也须要打jar包,提交到集群运行apache
若是是本身测试的话,用上面几种方法都比较麻烦,每次改完代码都须要打包上传到集群,而后spark-submit提交到集群运行,也很是浪费时间,下面就介绍怎么在本地idea远程提交到yarn集群bootstrap
直接看下面的demo(代码写的比较简单)app
package spark import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf} import spark.wordcount.kafkaStreams object RemoteSubmitApp { def main(args: Array[String]) { // 设置提交任务的用户 System.setProperty("HADOOP_USER_NAME", "root") val conf = new SparkConf() .setAppName("WordCount") // 设置yarn-client模式提交 .setMaster("yarn") // 设置resourcemanager的ip .set("yarn.resourcemanager.hostname","master") // 设置executor的个数 .set("spark.executor.instance","2") // 设置executor的内存大小 .set("spark.executor.memory", "1024M") // 设置提交任务的yarn队列 .set("spark.yarn.queue","spark") // 设置driver的ip地址 .set("spark.driver.host","192.168.17.1") // 设置jar包的路径,若是有其余的依赖包,能够在这里添加,逗号隔开 .setJars(List("D:\\develop_soft\\idea_workspace_2018\\sparkdemo\\target\\sparkdemo-1.0-SNAPSHOT.jar" )) conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val scc = new StreamingContext(conf, Seconds(1)) scc.sparkContext.setLogLevel("WARN") //scc.checkpoint("/spark/checkpoint") val topic = "jason_flink" val topicSet = Set(topic) val kafkaParams = Map[String, Object]( "auto.offset.reset" -> "latest", "value.deserializer" -> classOf[StringDeserializer] , "key.deserializer" -> classOf[StringDeserializer] , "bootstrap.servers" -> "master:9092,storm1:9092,storm2:9092" , "group.id" -> "jason_" , "enable.auto.commit" -> (true: java.lang.Boolean) ) kafkaStreams = KafkaUtils.createDirectStream[String, String]( scc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)) kafkaStreams.foreachRDD(rdd=> { if (!rdd.isEmpty()) { rdd.foreachPartition(fp=> { fp.foreach(f=> { println(f.value().toString) }) }) } }) scc.start() scc.awaitTermination() } }
而后咱们直接右键运行,看下打印的日志ide
... 19/08/16 23:17:35 INFO Client:client token: N/Adiagnostics: AM container is launched, waiting for AM container to Register with RMApplicationMaster host: N/AApplicationMaster RPC port: -1queue: sparkstart time: 1565997454105final status: UNDEFINEDtracking URL: http://master:8088/proxy/application_1565990507758_0020/user: root19/08/16 23:17:36 INFO Client: Application report for application_1565990507758_0020 (state: ACCEPTED)19/08/16 23:17:37 INFO Client: Application report for application_1565990507758_0020 (state: ACCEPTED)19/08/16 23:17:38 INFO Client: Application report for application_1565990507758_0020 (state: ACCEPTED)19/08/16 23:17:39 INFO Client: Application report for application_1565990507758_0020 (state: ACCEPTED)19/08/16 23:17:40 INFO YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(spark-client://YarnAM)19/08/16 23:17:40 INFO YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> master, PROXY_URI_BASES -> http://master:8088/proxy/application_1565990507758_0020), /proxy/application_1565990507758_002019/08/16 23:17:40 INFO JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter19/08/16 23:17:40 INFO Client: Application report for application_1565990507758_0020 (state: ACCEPTED)19/08/16 23:17:41 INFO Client: Application report for application_1565990507758_0020 (state: RUNNING)19/08/16 23:17:41 INFO Client:client token: N/Adiagnostics: N/AApplicationMaster host: 192.168.17.145ApplicationMaster RPC port: 0queue: sparkstart time: 1565997454105final status: UNDEFINED tracking URL: http://master:8088/proxy/application_1565990507758_0020/ user: root ...
能够看到提交成功了,而后咱们打开yarn的监控页面看下有没有joboop
能够看到有一个spark程序在运行,而后咱们点进去,看下具体的运行状况测试
能够看到运行的正常,选择一下job,看下executor打印的日志
这个就是咱们写到kafka的数据,没什么问题,中止的时候,只须要在idea里面点击中止程序就能够了,这样测试起来就会方便不少.
运行过程当中可能会遇到的问题:
1,首先须要把yarn-site.xml,core-site.xml,hdfs-site.xml放到resource下面,由于程序运行的时候须要这些环境.
2,权限问题
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): Permission denied: user=JasonLee, access=WRITE, inode="/user":root:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:342) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:251)at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:189)at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1744)at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1728)at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkAncestorAccess(FSDirectory.java:1687)at org.apache.hadoop.hdfs.server.namenode.FSDirMkdirOp.mkdirs(FSDirMkdirOp.java:60)at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:2980)at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:1096)at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB. mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:652)at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2. callBlockingMethod(ClientNamenodeProtocolProtos.java)at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:503)at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:989)at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:868)at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:814)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1886)at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2603)
这个是由于在本地提交的因此用户名是JasonLee,它没有访问hdfs的权限,最简单的解决方法就是在代码里面设置用户是root
System.setProperty("HADOOP_USER_NAME", "root")
3,缺失环境变量
Exception in thread "main" java.lang.IllegalStateException: Library directory 'D:\develop_soft\idea_workspace_2018\sparkdemo\assembly\target\scala-2.11\jars' does not exist; make sure Spark is built.at org.apache.spark.launcher.CommandBuilderUtils.checkState(CommandBuilderUtils.java:248)at org.apache.spark.launcher.CommandBuilderUtils.findJarsDir(CommandBuilderUtils.java:347)at org.apache.spark.launcher.YarnCommandBuilderUtils$.findJarsDir(YarnCommandBuilderUtils.scala:38)at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:526)at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:814)at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:169)at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:56)at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:173)at org.apache.spark.SparkContext.<init>(SparkContext.scala:509)at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:839)at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:85)at spark.RemoteSubmitApp$.main(RemoteSubmitApp.scala:31)at spark.RemoteSubmitApp.main(RemoteSubmitApp.scala)
这个报错是由于咱们没有配置SPARK_HOME的环境变量,直接在idea里面的configurations里面的environment variables里面设置SPARK_HOME就能够了,以下图所示:
4,没有设置driver的ip
19/08/17 07:52:45 ERROR ApplicationMaster: Failed to connect to driver at 169.254.42.204:64010, retrying ...19/08/17 07:52:48 ERROR ApplicationMaster: Failed to connect to driver at 169.254.42.204:64010, retrying ...19/08/17 07:52:48 ERROR ApplicationMaster: Uncaught exception:org.apache.spark.SparkException: Failed to connect to driver!at org.apache.spark.deploy.yarn.ApplicationMaster.waitForSparkDriver(ApplicationMaster.scala:577)at org.apache.spark.deploy.yarn.ApplicationMaster.runExecutorLauncher(ApplicationMaster.scala:433)at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:256)at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:764)at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:67)at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:66)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:422)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:762)at org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:785)at org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
这个报错是由于没有设置driver host,由于咱们运行的是yarn-client模式,driver就是咱们的本机,因此要设置本地的ip,否则找不到driver.
.set("spark.driver.host","192.168.17.1")
5,还有一个就是须要保证本身的电脑和虚拟机在同一个网段内,并且要关闭本身电脑的防火墙,否则可能会出现链接不上的状况.
我是以yarn-client模式提交的,yarn分了两个队列,提交的时候须要设置下队列的名称,
还有不少参数均可以在代码里面设置,好比executor的内存,个数,
driver的内存等,你们能够根据本身的状况去设置,固然了这个也能够提交到standalone集群