关于Java:选用1.7或者1.8.为了通用性,本章内容使用1.7进行编写。
html
关于Scala:工程不须要增长scala nature,即不需Add Scala Nature。若增长在java代码中调用scala library会有异常。java
关于Spark版本:使用1.6.3进行编写。python
maven 依赖shell
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.3</version> </dependency> </dependencies> |
Spark 编程的第一步是须要建立一个JavaSparkContext对象,用来告诉 Spark 如何访问集群。在建立 JavaSparkContext以前,你须要构建一个 SparkConf对象, SparkConf 对象包含了一些你应用程序的信息。apache
SparkConf conf = new SparkConf().setAppName("JavaApiLearn").setMaster("local"); @SuppressWarnings("resource") JavaSparkContext jsc = new JavaSparkContext(conf) |
其中编程
setAppName方法指定了spark程序的名称。api
setMaster("local")指定了spark程序运行的方式,分为以下几种。
local |
Run Spark locally with one worker thread (i.e. no parallelism at all). |
local[K] |
Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine). |
local[*] |
Run Spark locally with as many worker threads as logical cores on your machine. |
spark://HOST:PORT |
Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default. |
mesos://HOST:PORT |
Connect to the given Mesos cluster. The port must be whichever one your is configured to use, which is 5050 by default. Or, for a Mesos cluster using ZooKeeper, use mesos://zk://... . To submit with --deploy-mode cluster , the HOST:PORT should be configured to connect to the MesosClusterDispatcher. |
yarn |
Connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode . The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable. |
yarn-client |
Equivalent to yarn with --deploy-mode client , which is preferred to `yarn-client` |
yarn-cluster |
Equivalent to yarn with --deploy-mode cluster , which is preferred to `yarn-cluster` |
除了在eclipse、Intellij中运行local模式的任务,也能够打成jar包,使用spark-submit来进行任务提交。数组
经过List来进行转化RDDbash
// List to RDD List<String> list = new ArrayList<String>(); list.add("11,22,33,44,55"); list.add("aa,bb,cc,dd,ee"); JavaRDD<String> jrl = jsc.parallelize(list); |
JavaRDD<String> jrf = jsc.textFile("data/README.md"); //JavaRDD<String> jrfFromHDFS = jsc.textFile("hdfs:///data/README.md"); //from hdfs to rdd //JavaRDD<String> jrfFromLocal = jsc.textFile("file:///data/README.md"); //from localfile to rdd |
测试代码:网络
List<String> list = new ArrayList<String>(); list.add("11 22,33,44,55"); list.add("aa bb,cc,dd,ee"); list.add("aa bb,cc,dd,ee"); JavaRDD<String> jRDD = jsc.parallelize(list,1); JavaPairRDD<String, String> jPRDD = jRDD.mapToPair(new PairFunction<String, String, String>() { public Tuple2<String, String> call(String s) throws Exception { return new Tuple2<String, String>(s.split("\\s+")[0], s.substring(s.indexOf(" ")+1)); } }); PrintUtilPro.printList(jPRDD.collect()); |
输出:
11,22,33,44,55 aa,bb,cc,dd,ee aa,bb,cc,dd,ee |
备注:输出中第一个逗号是tuple中key和value分隔符。
关于Transformation和Actions的操做解释能够参照培训手册。
The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R) and pair RDD functions doc (Scala, Java) for details.
Transformation | Meaning |
---|---|
map(func) | Return a new distributed dataset formed by passing each element of the source through a function func. 将原来RDD的每一个数据项,使用map中用户自定义的函数func进行映射,转变为一个新的元素,并返回一个新的RDD。 |
filter(func) | Return a new dataset formed by selecting those elements of the source on which funcreturns true. 使用函数func对原RDD中数据项进行过滤,将符合func中条件的数据项组成新的RDD返回。 |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so funcshould return a Seq rather than a single item). 相似于map,可是输入数据项能够被映射到0个或多个输出数据集合中,因此函数func的返回值是一个数据项集合而不是一个单一的数据项。 |
mapPartitions(func) | Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T. 相似于map,可是该操做是在每一个分区上分别执行,因此当操做一个类型为T的RDD时func的格式必须是 |
mapPartitionsWithIndex(func) | Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T. 相似于mapPartitions,可是须要提供给func一个整型值,这个整型值是分区的索引,因此当处理T类型的RDD时,func的格式必须为 |
sample(withReplacement, fraction, seed) | Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed. 对数据采样。用户能够设定是否有放回(withReplacement)、采样的百分比(fraction)、随机种子(seed)。 |
union(otherDataset) | Return a new dataset that contains the union of the elements in the source dataset and the argument. 返回原数据集和参数指定的数据集合并后的数据集。使用union函数时须要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同。该操做不进行去重操做,返回的结果会保存全部元素。若是想去重,可使用distinct()。 |
intersection(otherDataset) | Return a new RDD that contains the intersection of elements in the source dataset and the argument. 返回两个数据集的交集。 |
subtract(otherDataset) | Return an RDD with the elements from `this` that are not in `other`. 返回this RDD中但不在other RDD中的元素 |
distinct([numTasks])) | Return a new dataset that contains the distinct elements of the source dataset. 将RDD中的元素进行去重操做。 |
groupByKey([numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 操做(K,V)格式的数据集,返回 (K, Iterable)格式的数据集。 |
reduceByKey(func, [numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in 使用给定的func,将(K,V)对格式的数据集中key相同的值进行汇集,其中func的格式必须为(V,V) => V。可选参数numTasks能够指定reduce任务的数目。 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in 对(K,V)格式的数据按key进行聚合操做,聚合时使用给定的合并函数和一个初试值,返回一个(K,U)对格式数据。须要指定的三个参数:zeroValue为在每一个分区中,对key值第一次读取V类型的值时,使用的U类型的初始变量;seqOp用于在每一个分区中,相同的key中V类型的值合并到zeroValue建立的U类型的变量中。combOp是对从新分区后两个分区中传入的U类型数据的合并函数。 |
sortByKey([ascending], [numTasks]) | When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean (K,V)格式的数据集,其中K已实现了Ordered,通过sortByKey操做返回排序后的数据集。指定布尔值参数ascending来指定升序或降序排列。 |
join(otherDataset, [numTasks]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through 用于操做两个键值对格式的数据集,操做两个数据集(K,V)和(K,W)返回(K, (V, W))格式的数据集。经过 |
cogroup(otherDataset, [numTasks]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called 用于操做两个键值对格式数据集(K,V)和(K,W),返回数据集格式为 (K,(Iterable, Iterable)) 。这个操做也称为 |
cartesian(otherDataset) | When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). 对类型为T和U的两个数据集进行操做,返回包含两个数据集全部元素对的(T,U)格式的数据集。即对两个RDD内的全部元素进行笛卡尔积操做。 |
pipe(command, [envVars]) | Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings. 以管道(pipe)方式将 RDD的各个分区(partition)使用 shell命令处理(好比一个 Perl或 bash脚本)。 RDD的元素会被写入进程的标准输入(stdin),将进程返回的一个字符串型 RDD(RDD of strings),以一行文本的形式写入进程的标准输出(stdout)中。 |
coalesce(numPartitions) | Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset. 把RDD的分区数下降到经过参数numPartitions指定的值。在获得的更大一些数据集上执行操做,会更加高效。 |
repartition(numPartitions) | Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network. 随机地对RDD的数据从新洗牌(Reshuffle),从而建立更多或更少的分区,以平衡数据。老是对网络上的全部数据进行洗牌(shuffles)。
|
repartitionAndSortWithinPartitions(partitioner) | Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling 根据给定的分区器对RDD进行从新分区,在每一个结果分区中,按照key值对记录排序。这在每一个分区中比先调用repartition再排序效率更高,由于它能够将排序过程在shuffle操做的机器上进行。 |
The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (Scala, Java, Python, R)
and pair RDD functions doc (Scala, Java) for details.
Action | Meaning |
---|---|
reduce(func) | Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. 使用函数func汇集数据集中的元素,这个函数func输入为两个元素,返回为一个元素。这个函数应该符合结合律和交换了,这样才能保证数据集中各个元素计算的正确性。 |
collect() | Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. 在驱动程序中,以数组的形式返回数据集的全部元素。一般用于filter或其它产生了大量小数据集的状况。 |
count() | Return the number of elements in the dataset. 返回数据集中元素的个数。 |
first() | Return the first element of the dataset (similar to take(1)). 返回数据集中的第一个元素(相似于 |
take(n) | Return an array with the first n elements of the dataset. 返回数据集中的前n个元素。 |
takeSample(withReplacement,num, [seed]) | Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed. 对一个数据集随机抽样,返回一个包含num个随机抽样元素的数组,参数 |
takeOrdered(n, [ordering]) | Return the first n elements of the RDD using either their natural order or a custom comparator. 返回RDD按天然顺序或自定义顺序排序后的前n个元素。 |
saveAsTextFile(path) | Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. 将数据集中的元素以文本文件(或文本文件集合)的形式保存到指定的本地文件系统、HDFS或其它Hadoop支持的文件系统中。Spark将在每一个元素上调用 |
saveAsSequenceFile(path) (Java and Scala) |
Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). 将数据集中的元素以Hadoop Sequence文件的形式保存到指定的本地文件系统、HDFS或其它Hadoop支持的文件系统中。该操做只支持对实现了Hadoop的 |
saveAsObjectFile(path) (Java and Scala) |
Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using 将数据集中的元素以简单的Java序列化的格式写入指定的路径。这些保存该数据的文件,可使用SparkContext.objectFile()进行加载。 |
countByKey() | Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. 仅支持对(K,V)格式的键值对类型的RDD进行操做。返回(K,Int)格式的Hashmap,(K,Int)为每一个key值对应的记录数目。 |
foreach(func) | Run a function func on each element of the dataset. This is usually done for side effects such as updating anAccumulator or interacting with external storage systems. 对数据集中每一个元素使用函数func进行处理。该操做一般用于更新一个累加器(Accumulator)或与外部数据源进行交互。注意:在foreach()以外修改累加器变量可能引发不肯定的后果。详细介绍请阅读Understanding closures部分。 |
类型 | 算子 |
---|---|
输入分区与输出分区一对一型 | map、flatMap、mapPartitions |
输入分区与输出分区多对一型 | union、cartesian、intersection |
输入分区与输出分区多对多型 | groupBy、groupByKey |
输出分区为输入分区子集型 | filter、distinct、subtract、sample、takeSample |
Cache型 | cache、persist |
类型 | 算子 |
---|---|
输入分区与输出分区一对一 | mapValues |
对单个RDD | combineByKey、reduceByKey、partitionBy、aggregateByKey、SortByKey |
两个RDD汇集 | Cogroup |
链接 | join、leftOutJoin、rightOutJoin |
类型 | 算子 |
---|---|
无输出 | foreach |
HDFS | saveAsTextFile、saveAsObjectFile |
Scala集合和数据类型 | collect、collectAsMap、reduceByKeyLocally、lookup、count、top、reduce、fold、aggregate |
/* * Function<T,R> * 接收一个输入值并返回一个输出值,用于相似map()和filter()的操做中 * R call(T) */ //过滤RDD数据集中包含result的表项,新建RDD数据集resultLines JavaRDD<String> resultLines=lines.filter( new Function<String, Boolean>() { public Boolean call(String v1)throws Exception { return v1.contains("result"); } } ); |
/* * Function<T1,T2,R> * 接收两个输入值并返回一个输出值,用于相似aggregate()和fold()等操做中 * R call(T1,T2) */ List<String> strLine=new ArrayList<String>(); strLine.add("hello world"); strLine.add("This is Spark"); strLine.add("This is JavaRDD") JavaRDD<String> input=sc.parallelize(strLine); //如下代码的功能是wordcount,其中的reduceByKey操做的Function2函数定义了遇到相同的key时,value是如何reduce的————直接将二者的value相加。 //将文本行的单词过滤出来,并将全部的单词保存在RDD数据集words中。切分为单词,扁平化处理。见FlatMapFunction< T,R> JavaRDD<String> words=input.flatMap( new FlatMapFunction<String, String>() { public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } } ); //转化为键值对 JavaPairRDD<String,Integer> counts=words.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2(s, 1); } } ); //对每一个词语进行计数 JavaPairRDD <String,Integer> results=counts.reduceByKey( new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } } ) ; |
/* * FlatMapFunction<T,R> * 接收一个输入值并返回任意个输出,用于相似flatMap()这样的操做中 * Iterable <R> call(T) */ List<String> strLine=new ArrayList<String>(); strLine.add("hello world"); strLine.add("This is Spark"); strLine.add("This is JavaRDD") JavaRDD<String> input=sc.parallelize(strLine); //将文本行的单词过滤出来,并将全部的单词保存在RDD数据集words中。 JavaRDD<String> words=input.flatMap( new FlatMapFunction<String, String>() { public Iterable<String> call(String s) throws Exception { return Arrays.asList(s.split(" ")); } } ); |
/* * PairFunction<T,K,R> * 接收一个输入值并返回一个Tuple,用于相似mapToPair()这样的操做中,将一个元素变为一个键值对(PairRDD) * Tuple2<K, V> call(T) */ //转化为键值对 JavaPairRDD<String,Integer> counts=words.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2(s, 1); } } ); |