Spark Core提供了三种建立RDD的方式,包括:使用程序中的集合建立RDD;使用本地文件建立RDD;使用HDFS文件建立RDD。
一、并行化集合
若是要经过并行化集合来建立RDD,须要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上去,造成一个分布式的数据集合,也就是一个RDD。至关因而,集合中的部分数据会到一个节点上,而另外一部分数据会到其余节点上。而后就能够用并行的方式来操做这个分布式数据集合,即RDD。
// 案例:1到10累加求和
val arr = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_ + _)
调用parallelize()时,有一个重要的参数能够指定,就是要将集合切分红多少个partition。Spark会为每个partition运行一个task来进行处理。Spark官方的建议是,为集群中的每一个CPU建立2~4个partition。Spark默认会根据集群的状况来设置partition的数量。可是也能够在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量。好比parallelize(arr, 10)
1.一、Java
package sparkcore;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
/**
* 并行化集合建立RDD 案例:累加1到10
*/
public class ParallelizeCollection {
public static void main(String[] args) {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("ParallelizeCollection").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 要经过并行化集合的方式建立RDD,那么就调用SparkContext以及其子类,的parallelize()方法
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 执行reduce算子操做
// 至关于,先进行1 + 2 = 3;而后再用3 + 3 = 6;而后再用6 + 4 = 10。。。以此类推
int sum = numberRDD.reduce(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(Integer num1, Integer num2) throws Exception {
return num1 + num2;
}
});
// 输出累加的和
System.out.println("1到10的累加和:" + sum);
// 关闭JavaSparkContext
sc.close();
}
}
1.二、Scala
package sparkcore.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object ParallelizeCollection {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("ParallelizeCollection")
.setMaster("local")
val sc = new SparkContext(conf)
val numbers = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val numberRDD = sc.parallelize(numbers, 5)
val sum = numberRDD.reduce(_ + _)
println("1到10的累加和:" + sum)
}
}
二、使用本地文件和HDFS建立RDD
Spark是支持使用任何Hadoop支持的存储系统上的文件建立RDD的,好比说HDFS、Cassandra、HBase以及本地文件。经过调用SparkContext的textFile()方法,能够针对本地文件或HDFS文件建立RDD。
有几个事项是须要注意的:
一、若是是针对本地文件的话,若是是在windows上本地测试,windows上有一份文件便可;若是是在spark集群上针对linux本地文件,那么须要将文件拷贝到全部worker节点上。
二、Spark的textFile()方法支持针对目录、压缩文件以及通配符进行RDD建立。
三、Spark默认会为hdfs文件的每个block建立一个partition,可是也能够经过textFile()的第二个参数手动设置分区数量,只能比block数量多,不能比block数量少。
// 案例:文件字数统计
val rdd = sc.textFile("data.txt")
val wordCount = rdd.map(line => line.length).reduce(_ + _)
Spark的textFile()除了能够针对上述几种普通的文件建立RDD以外,还有一些特列的方法来建立RDD:
一、SparkContext.
wholeTextFiles()方法,能够针对一个目录中的大量小文件,返回<filename, fileContent>组成的
pair,做为一个PairRDD,而不是普通的RDD。普通的textFile()返回的RDD中,每一个元素就是文件中的一行文本。
二、SparkContext.
sequenceFile[K, V]()方法,能够针对SequenceFile建立RDD,K和V泛型类型就是SequenceFile的key和value的类型。K和V要求必须是Hadoop的序列化类型,好比IntWritable、Text等。
三、SparkContext.
hadoopRDD()方法,对于Hadoop的自定义输入类型,能够建立RDD。该方法接收JobConf、InputFormatClass、Key和Value的Class。
四、SparkContext.objectFile()方法,能够针对以前调用RDD.saveAsObjectFile()建立的对象序列化的文件,反序列化文件中的数据,并建立一个RDD。
2.一、Java
package sparkcore.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
/**
* 使用本地文件建立RDD 案例:统计文本文件字数
*/
public class LocalFile {
public static void main(String[] args) {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("LocalFile").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 使用SparkContext以及其子类的textFile()方法,针对本地文件建立RDD
JavaRDD<String> lines = sc.textFile("D:/eclipse-jee-neon-3/workspace/sparkcore-java/test.txt");
// 统计文本文件内的字数
JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(String v1) throws Exception {
return v1.length();
}
});
int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println("文件总字数是:" + count);
// 关闭JavaSparkContext
sc.close();
}
}
package sparkcore.java;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
/**
* 使用HDFS文件建立RDD
* 案例:统计文本文件字数
*/
public class HDFSFile {
public static void main(String[] args) {
// 建立SparkConf
// 修改:去除setMaster()设置,修改setAppName()
SparkConf conf = new SparkConf()
.setAppName("HDFSFile");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 使用SparkContext以及其子类的textFile()方法,针对HDFS文件建立RDD
// 只要把textFile()内的路径修改成hdfs文件路径便可
JavaRDD<String> lines = sc.textFile("hdfs://node1:8020/test.txt");
// 统计文本文件内的字数
JavaRDD<Integer> lineLength = lines.map(new Function<String, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(String v1) throws Exception {
return v1.length();
}
});
int count = lineLength.reduce(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println("文件总字数是:" + count);
// 关闭JavaSparkContext
sc.close();
}
}
2.二、Scala
package sparkcore.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object LocalFile {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("LocalFile")
.setMaster(
"local"
);
val sc = new SparkContext(conf)
val lines = sc.textFile("D:/eclipse-jee-neon-3/workspace/sparkcore-scala/test.txt", 1);
val count = lines.map { line => line.length() }.reduce(_ + _)
println("file's count is " + count)
}
}
package sparkcore.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object HDFSFile {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("HDFSFile").setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("hdfs://node1:8020/test.txt", 1);
val count = lines.map { _.length() }.reduce(_ + _)
println("file's count is " + count)
}
}