flume kafka sparkstreaming整合后集群报错org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti

简介
     整个项目架构是在CDH中,flume采集数据到kafka,而后sparkstreaming消费(flume1.7版本,kafka0.10版本,spark 2.1版本)。原本local本地模式测试已经没有问题,可是部署到集群上就报错以下:

Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
        at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:84)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
        at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
        at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
        at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
        at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
        at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
        at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
        at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
17/08/15 20:09:30 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM


缘由分析
      其实这个在官方文档中有介绍。地址以下:
简单说,就是kafka集成spark2,须要在CDH中进行设置。官网介绍了2中方法。这里我采用了第二种,在CDH中进行修改配置的方法。步骤以下:

进入CDH的spark2配置界面,在搜索框中输入SPARK_KAFKA_VERSION,出现以下图,而后选择对应版本,这里我应该选择的是0.10,而后保存配置,重启生效。从新跑sparkstreaming任务,问题解决。