笔记摘抄自 [美] Holden Karau 等著的《Spark快速大数据分析》python
弹性分布式数据集(Resilient Distributed Dataset,简称 RDD)shell
在 Spark 中,对数据的全部操做不外乎 建立 RDD、转化已有 RDD 以及 调用 RDD 操做 进行求值。而在这一切背后,Spark 会自动将 RDD 中的数据分发到集群上,并将操做并行化执行。apache
建立 RDD 的两种方式:api
- 读取一个外部数据集
- 驱动器程序里分发驱动器程序中的对象集合(好比 list 和 set)
这里经过读取文本文件做为一个字符串 RDD:分布式
>>> lines = sc.textFile("README.md")
RDD 的两种操做:oop
- 转化操做(transformation):由一个RDD 生成一个新的RDD,例如筛选数据
- 行动操做(action):对RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如HDFS)中
调用转化操做 filter() :学习
>>> pythonLines = lines.filter(lambda line: "Python" in line)
调用 first() 行动操做 :大数据
>>> pythonLines.first() u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'
@Noticespa
把RDD 持久化3到内存中
>>> pythonLines.persist <bound method PipelinedRDD.persist of PythonRDD[3] at RDD at PythonRDD.scala:53> >>> pythonLines.count() 3 >>> pythonLines.first() u'high-level APIs in Scala, Java, Python, and R, and an optimized engine that'
把程序中一个已有的集合传给 SparkContext 的 parallelize() 方法,这种方式须要把整个数据集先放到一台机器的内存中,故不经常使用
JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));
JavaRDD<String> lines = sc.textFile("/path/to/README.md");
RDD 的转化操做是返回一个新的RDD 的操做,好比 map() 和 filter()
展现日志文件中全部错误记录
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import java.util.List; public class CountError { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("CountError"); JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); JavaRDD<String> log = javaSparkContext.textFile(args[0]); JavaRDD<String> errorsRDD = log.filter( new Function<String, Boolean>() { public Boolean call(String x) { return x.contains("ERROR"); } }); List<String> errors = errorsRDD.collect(); for (String output : errors) { System.out.println(output); } javaSparkContext.stop(); } }
日志文件内容
INFO:everything gonna be ok... ERROR:something is wrong! INFO:everything gonna be ok... ERROR:something is wrong! INFO:everything gonna be ok... INFO:everything gonna be ok... INFO:everything gonna be ok... INFO:everything gonna be ok... ERROR:something is wrong! INFO:everything gonna be ok... INFO:everything gonna be ok... INFO:everything gonna be ok... ERROR:something is wrong! INFO:everything gonna be ok... INFO:everything gonna be ok...
运行效果
[root@server1 spark-2.4.4-bin-hadoop2.7]# bin/spark-submit --class CountError ~/SparkTest2.jar ~/SparkTest2.log ... 19/09/10 16:33:10 INFO DAGScheduler: Job 0 finished: collect at CountError.java:20, took 0.423698 s ERROR:something is wrong! ERROR:something is wrong! ERROR:something is wrong! ERROR:something is wrong! ...
>>> lines = sc.textFile("/root/SparkTest2.log") >>> errorsRDD = lines.filter(lambda lines: "ERROR" in lines) >>> infoRDD = lines.filter(lambda lines: "INFO" in lines) >>> totalRDD = errorsRDD.union(infoRDD) >>> lines.count() 21 >>> errorsRDD.count() 4 >>> infoRDD.count() 17 >>> totalRDD.count() 21
@Notice
转化操做能够操做任意数量的输入 RDD
Spark 会使用谱系图(lineage graph)来记录这些不一样 RDD 之间的依赖关系,以此按需计算每一个 RDD
@P.s.
也能够依靠谱系图在持久化的RDD 丢失部分数据时恢复所丢失的数据
把最终求得的结果返回到驱动器程序,或者写入外部存储系统中的 RDD 操做
上文例程中的 count() 即是一个行动操做,另外还有 take() 、collect() 等操做
下面以 take() 为例,获取 union 后的 totalRDD 的前 10 条
>>> for line in totalRDD.take(10):print line ... ERROR:something is wrong! ERROR:something is wrong! ERROR:something is wrong! ERROR:something is wrong! INFO:everything gonna be ok... INFO:everything gonna be ok... INFO:everything gonna be ok... INFO:everything gonna be ok... INFO:everything gonna be ok... INFO:everything gonna be ok... >>>
@P.s.
程序把RDD 筛选到一个很小的规模单台机器内存足以放下时才可使用 collect()
RDD 的转化操做都是惰性求值的,在被调用行动操做以前 Spark 不会开始计算