Spark Streaming是一个新的实时计算的利器,并且还在快速的发展。它将输入流切分红一个个的DStream转换为RDD,从而能够使用Spark来处理。它直接支持多种数据源:Kafka, Flume, Twitter, ZeroMQ , TCP sockets等,有一些能够操做的函数:map
, reduce
, join
, window等。
html
本文将Spark Streaming和Flume-NG进行对接,而后以官方内置的JavaFlumeEventCount做参考,稍做修改而后放到集群上去运行。 java
1、下载spark streaming的flume插件包,咱们这里的spark版本是1.0.0(standlone),这个插件包的版本选择spark-streaming-flume_2.10-1.0.1.jar,这个版本修复了一个重要的bug,参考下面参考中的7。git
2、把spark的编译后的jar包以及上面flume的插件,放入工程,编写以下类(参考8中的例子修改而来),代码以下:github
1 package com.spark_streaming; 2 3 import org.apache.spark.SparkConf; 4 import org.apache.spark.api.java.function.Function; 5 import org.apache.spark.streaming.*; 6 import org.apache.spark.streaming.api.java.*; 7 import org.apache.spark.streaming.flume.FlumeUtils; 8 import org.apache.spark.streaming.flume.SparkFlumeEvent; 9 10 public final class JavaFlumeEventCount { 11 private JavaFlumeEventCount() { 12 } 13 14 public static void main(String[] args) { 15 16 String host = args[0]; 17 int port = Integer.parseInt(args[1]); 18 19 Duration batchInterval = new Duration(Integer.parseInt(args[2])); 20 SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount"); 21 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval); 22 JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port); 23 24 flumeStream.count(); 25 26 flumeStream.count().map(new Function<Long, String>() { 27 @Override 28 public String call(Long in) { 29 return "Received " + in + " flume events."; 30 } 31 }).print(); 32 33 ssc.start(); 34 ssc.awaitTermination(); 35 } 36 }
这个和官方的区别是删除了参数个数检查和增长了自定义时间间隔(分割流),也就是第三个参数。这个类并无作太多处理,入门为主。apache
3、打包这个类到ifeng_spark.jar,连同spark-streaming-flume_2.10-1.0.1.jar一块儿上传到spark集群中的节点上。api
4、启动flume,这个flume的sink要用avro,指定要发送到的spark集群中的一个节点,咱们这里是10.32.21.165:11000。socket
5、在spark安装根目录下执行以下命令:maven
./bin/spark-submit --master spark://10.32.21.165:8070 --driver-memory 4G --executor-memory 4G --jars /usr/lib/spark-1.0.0-cdh4/lib/spark-streaming-flume_2.10-1.0.1.jar,/usr/lib/flume-ng-1.4-cdh4.6.0/lib/flume-ng-sdk-1.4.0-cdh6.0.jar /usr/lib/spark-1.0.0-cdh4/ifeng_spark.jar --class com.spark_streaming.JavaFlumeEventCount 10.32.21.165 11000 2000ide
这个命令中的参数解释请参考下面参考3中的解释,也能够本身增长一些参数,须要注意的是配置内存,本身根据须要自行增长内存(driver、executor)防止OOM。另外jars能够同时加载多个jar包,逗号分隔。记得指定类后须要指定3个参数。函数
若是没有指定Flume的sdk包,会爆以下错误:
java.lang.NoClassDefFoundError: Lorg/apache/flume/source/avro/AvroFlumeEvent;没有找到类。这个类在flume的sdk包内,在jars参数中指定jar包位置就能够。
还有就是要将本身定义的业务类的jar单独列出,不要放在jars参数指定,不然也会有错误抛出。
运行后能够看到大量的输出信息,而后能够看到有数据的RDD会统计出这个RDD有多少行,截图以下,最后的部分就是这2秒(上面命令最后的参数设定的)统计结果:
至此,flume-ng与spark的对接成功,这只是一个入门实验。可根据须要灵活编写相关的业务类来实现实时处理Flume传输的数据。
spark streaming和一些数据传输工具对接能够达到实时处理的目的。
参考:
一、https://spark.apache.org/docs/0.9.0/streaming-programming-guide.html
二、http://www.cnblogs.com/cenyuhai/p/3577204.html
三、http://blog.csdn.net/book_mmicky/article/details/25714545 , 重要的参数解释
四、http://blog.csdn.net/lskyne/article/details/37561235 , 这是一个例子
五、http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20 , spark-flume插件下载
六、http://outofmemory.cn/spark/configuration , spark一些可配置参数说明
七、https://issues.apache.org/jira/browse/SPARK-1916 ,这是1.0.1以前版本中spark streaming与flume对接的一个bug信息
八、https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming , 这是java版本的spark streaming的一些例子,里面有flume的一个