Spark笔记:RDD基本操做(上)

  本文主要是讲解spark里RDD的基础操做。RDD是spark特有的数据模型,谈到RDD就会提到什么弹性分布式数据集,什么有向无环图,本文暂时不去展开这些高深概念,在阅读本文时候,你们能够就把RDD看成一个数组,这样的理解对咱们学习RDD的API是很是有帮助的。本文全部示例代码都是使用scala语言编写的。java

  Spark里的计算都是操做RDD进行,那么学习RDD的第一个问题就是如何构建RDD,构建RDD从数据来源角度分为两类:第一类是从内存里直接读取数据,第二类就是从文件系统里读取,固然这里的文件系统种类不少常见的就是HDFS以及本地文件系统了。shell

  第一类方式从内存里构造RDD,使用的方法:makeRDD和parallelize方法,以下代码所示:apache

 

    /* 使用makeRDD建立RDD */
    /* List */
    val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
    val r01 = rdd01.map { x => x * x }
    println(r01.collect().mkString(","))
    /* Array */
    val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
    val r02 = rdd02.filter { x => x < 5}
    println(r02.collect().mkString(","))

    val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r03 = rdd03.map { x => x + 1 }
    println(r03.collect().mkString(","))
    /* Array */
    val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r04 = rdd04.filter { x => x > 3 }
    println(r04.collect().mkString(","))

 

  你们看到了RDD本质就是一个数组,所以构造数据时候使用的是List(链表)和Array(数组)类型。windows

  第二类方式是经过文件系统构造RDD,代码以下所示:数组

    val rdd:RDD[String] = sc.textFile("file:///D:/sparkdata.txt", 1)
    val r:RDD[String] = rdd.flatMap { x => x.split(",") }
    println(r.collect().mkString(","))

  这里例子使用的是本地文件系统,因此文件路径协议前缀是file://。服务器

  构造了RDD对象了,接下来就是如何操做RDD对象了,RDD的操做分为转化操做(transformation)和行动操做(action),RDD之因此将操做分红这两类这是和RDD惰性运算有关,当RDD执行转化操做时候,实际计算并无被执行,只有当RDD执行行动操做时候才会促发计算任务提交,执行相应的计算操做。区别转化操做和行动操做也很是简单,转化操做就是从一个RDD产生一个新的RDD操做,而行动操做就是进行实际的计算。app

  下面是RDD的基础操做API介绍:框架

操做类型eclipse

函数名分布式

做用

转化操做

map()

参数是函数,函数应用于RDD每个元素,返回值是新的RDD

flatMap()

参数是函数,函数应用于RDD每个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD

filter()

参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD

distinct()

没有参数,将RDD里的元素进行去重操做

union()

参数是RDD,生成包含两个RDD全部元素的新RDD

intersection()

参数是RDD,求出两个RDD的共同元素

subtract()

参数是RDD,将原RDD里和参数RDD里相同的元素去掉

cartesian()

参数是RDD,求两个RDD的笛卡儿积

行动操做

collect()

返回RDD全部元素

count()

RDD里元素个数

countByValue()

各元素在RDD中出现次数

reduce()

并行整合全部RDD数据,例如求和操做

fold(0)(func)

和reduce功能同样,不过fold带有初始值

aggregate(0)(seqOp,combop)

和reduce功能同样,可是返回的RDD数据类型和原RDD不同

foreach(func)

对RDD每一个元素都是使用特定函数

  下面是以上API操做的示例代码,以下:

  转化操做:

    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    val rddFile:RDD[String] = sc.textFile(path, 1)
    
    val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
    val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))

    /* map操做 */
    println("======map操做======")
    println(rddInt.map(x => x + 1).collect().mkString(","))
    println("======map操做======")
    /* filter操做 */
    println("======filter操做======")
    println(rddInt.filter(x => x > 4).collect().mkString(","))
    println("======filter操做======")
    /* flatMap操做 */
    println("======flatMap操做======")
    println(rddFile.flatMap { x => x.split(",") }.first())
    println("======flatMap操做======")
    /* distinct去重操做 */
    println("======distinct去重======")
    println(rddInt.distinct().collect().mkString(","))
    println(rddStr.distinct().collect().mkString(","))
    println("======distinct去重======")
    /* union操做 */
    println("======union操做======")
    println(rdd01.union(rdd02).collect().mkString(","))
    println("======union操做======")
    /* intersection操做 */
    println("======intersection操做======")
    println(rdd01.intersection(rdd02).collect().mkString(","))
    println("======intersection操做======")
    /* subtract操做 */
    println("======subtract操做======")
    println(rdd01.subtract(rdd02).collect().mkString(","))
    println("======subtract操做======")
    /* cartesian操做 */
    println("======cartesian操做======")
    println(rdd01.cartesian(rdd02).collect().mkString(","))
    println("======cartesian操做======")

  行动操做代码以下:

    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    
    /* count操做 */
    println("======count操做======")
    println(rddInt.count())
    println("======count操做======")   
    /* countByValue操做 */
    println("======countByValue操做======")
    println(rddInt.countByValue())
    println("======countByValue操做======")
    /* reduce操做 */
    println("======countByValue操做======")
    println(rddInt.reduce((x ,y) => x + y))
    println("======countByValue操做======")
    /* fold操做 */
    println("======fold操做======")
    println(rddInt.fold(0)((x ,y) => x + y))
    println("======fold操做======")
    /* aggregate操做 */
    println("======aggregate操做======")
    val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
    println(res._1 + "," + res._2)
    println("======aggregate操做======")
    /* foeach操做 */
    println("======foeach操做======")
    println(rddStr.foreach { x => println(x) })
    println("======foeach操做======") 

  RDD操做暂时先学习到这里,剩下的内容在下一篇里再谈了,下面我要说说如何开发spark,安装spark的内容我后面会使用专门的文章进行讲解,这里咱们假设已经安装好了spark,那么咱们就能够在已经装好的spark服务器上使用spark-shell进行与spark交互的shell,这里咱们直接能够敲打代码编写spark程序。可是spark-shell毕竟使用太麻烦,并且spark-shell一次只能使用一个用户,当另一个用户要使用spark-shell就会把前一个用户踢掉,并且shell也没有IDE那种代码补全,代码校验的功能,使用起来非常痛苦。

  不过spark的确是一个神奇的框架,这里的神奇就是指spark本地开发调试很是简单,本地开发调试不须要任何已经装好的spark系统,咱们只须要创建一个项目,这个项目能够是java的也能够是scala,而后咱们将spark-assembly-1.6.1-hadoop2.6.0.jar这样的jar放入项目的环境里,这个时候咱们就能够在本地开发调试spark程序了。

  你们请看咱们装有scala插件的eclipse里的完整代码:

package cn.com.sparktest

import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

object SparkTest {
  val conf:SparkConf = new SparkConf().setAppName("xtq").setMaster("local[2]")
  val sc:SparkContext = new SparkContext(conf)
  
  /**
   * 建立数据的方式--从内存里构造数据(基础)
   */
  def createDataMethod():Unit = {
    /* 使用makeRDD建立RDD */
    /* List */
    val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
    val r01 = rdd01.map { x => x * x }
    println("===================createDataMethod:makeRDD:List=====================")
    println(r01.collect().mkString(","))
    println("===================createDataMethod:makeRDD:List=====================")
    /* Array */
    val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
    val r02 = rdd02.filter { x => x < 5}
    println("===================createDataMethod:makeRDD:Array=====================")
    println(r02.collect().mkString(","))
    println("===================createDataMethod:makeRDD:Array=====================")
    
    /* 使用parallelize建立RDD */
    /* List */
    val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r03 = rdd03.map { x => x + 1 }
    println("===================createDataMethod:parallelize:List=====================")
    println(r03.collect().mkString(","))
    println("===================createDataMethod:parallelize:List=====================")
    /* Array */
    val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
    val r04 = rdd04.filter { x => x > 3 }
    println("===================createDataMethod:parallelize:Array=====================")
    println(r04.collect().mkString(","))
    println("===================createDataMethod:parallelize:Array=====================")
  }
  
  /**
   * 建立Pair Map
   */
  def createPairRDD():Unit = {
    val rdd:RDD[(String,Int)] = sc.makeRDD(List(("key01",1),("key02",2),("key03",3)))
    val r:RDD[String] = rdd.keys
    println("===========================createPairRDD=================================")
    println(r.collect().mkString(","))
    println("===========================createPairRDD=================================")
  }
  
  /**
   * 经过文件建立RDD
   * 文件数据:
   * 	key01,1,2.3
		  key02,5,3.7
      key03,23,4.8
      key04,12,3.9
      key05,7,1.3
   */
  def createDataFromFile(path:String):Unit = {
    val rdd:RDD[String] = sc.textFile(path, 1)
    val r:RDD[String] = rdd.flatMap { x => x.split(",") }
    println("=========================createDataFromFile==================================")
    println(r.collect().mkString(","))
    println("=========================createDataFromFile==================================")
  }
  
  /**
   * 基本的RDD操做
   */
  def basicTransformRDD(path:String):Unit = {
    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    val rddFile:RDD[String] = sc.textFile(path, 1)
    
    val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
    val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))

    /* map操做 */
    println("======map操做======")
    println(rddInt.map(x => x + 1).collect().mkString(","))
    println("======map操做======")
    /* filter操做 */
    println("======filter操做======")
    println(rddInt.filter(x => x > 4).collect().mkString(","))
    println("======filter操做======")
    /* flatMap操做 */
    println("======flatMap操做======")
    println(rddFile.flatMap { x => x.split(",") }.first())
    println("======flatMap操做======")
    /* distinct去重操做 */
    println("======distinct去重======")
    println(rddInt.distinct().collect().mkString(","))
    println(rddStr.distinct().collect().mkString(","))
    println("======distinct去重======")
    /* union操做 */
    println("======union操做======")
    println(rdd01.union(rdd02).collect().mkString(","))
    println("======union操做======")
    /* intersection操做 */
    println("======intersection操做======")
    println(rdd01.intersection(rdd02).collect().mkString(","))
    println("======intersection操做======")
    /* subtract操做 */
    println("======subtract操做======")
    println(rdd01.subtract(rdd02).collect().mkString(","))
    println("======subtract操做======")
    /* cartesian操做 */
    println("======cartesian操做======")
    println(rdd01.cartesian(rdd02).collect().mkString(","))
    println("======cartesian操做======")    
  }
  
  /**
   * 基本的RDD行动操做
   */
  def basicActionRDD():Unit = {
    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    
    /* count操做 */
    println("======count操做======")
    println(rddInt.count())
    println("======count操做======")   
    /* countByValue操做 */
    println("======countByValue操做======")
    println(rddInt.countByValue())
    println("======countByValue操做======")
    /* reduce操做 */
    println("======countByValue操做======")
    println(rddInt.reduce((x ,y) => x + y))
    println("======countByValue操做======")
    /* fold操做 */
    println("======fold操做======")
    println(rddInt.fold(0)((x ,y) => x + y))
    println("======fold操做======")
    /* aggregate操做 */
    println("======aggregate操做======")
    val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
    println(res._1 + "," + res._2)
    println("======aggregate操做======")
    /* foeach操做 */
    println("======foeach操做======")
    println(rddStr.foreach { x => println(x) })
    println("======foeach操做======")    
  }
  
  def main(args: Array[String]): Unit = {
    println(System.getenv("HADOOP_HOME"))
    createDataMethod()
    createPairRDD()
    createDataFromFile("file:///D:/sparkdata.txt")
    basicTransformRDD("file:///D:/sparkdata.txt")
    basicActionRDD()
    /*打印结果*/
    /*D://hadoop
===================createDataMethod:makeRDD:List=====================
1,4,9,16,25,36
===================createDataMethod:makeRDD:List=====================
===================createDataMethod:makeRDD:Array=====================
1,2,3,4
===================createDataMethod:makeRDD:Array=====================
===================createDataMethod:parallelize:List=====================
2,3,4,5,6,7
===================createDataMethod:parallelize:List=====================
===================createDataMethod:parallelize:Array=====================
4,5,6
===================createDataMethod:parallelize:Array=====================
===========================createPairRDD=================================
key01,key02,key03
===========================createPairRDD=================================
key01,1,2.3,key02,5,3.7,key03,23,4.8,key04,12,3.9,key05,7,1.3
=========================createDataFromFile==================================
2,3,4,5,6,7,3,6,2
======map操做======
======filter操做======
5,6,5
======filter操做======
======flatMap操做======
key01
======flatMap操做======
======distinct去重======
4,6,2,1,3,5
======distinct去重======
======union操做======
1,3,5,3,2,4,5,1
======union操做======
======intersection操做======
1,5
======intersection操做======
======subtract操做======
3,3
======subtract操做======
======cartesian操做======
(1,2),(1,4),(3,2),(3,4),(1,5),(1,1),(3,5),(3,1),(5,2),(5,4),(3,2),(3,4),(5,5),(5,1),(3,5),(3,1)
======cartesian操做======
======count操做======
9
======count操做======
======countByValue操做======
Map(5 -> 2, 1 -> 2, 6 -> 1, 2 -> 2, 3 -> 1, 4 -> 1)
======countByValue操做======
======countByValue操做======
29
======countByValue操做======
======fold操做======
29
======fold操做======
======aggregate操做======
19,10
======aggregate操做======
======foeach操做======
a
b
c
d
b
a
======foeach操做======*/
  }
}

  Spark执行时候咱们须要构造一个SparkContenxt的环境变量,构造环境变量时候须要构造一个SparkConf对象,例如代码:setAppName("xtq").setMaster("local[2]")

  appName就是spark任务名称,master为local[2]是指使用本地模式,启动2个线程完成spark任务。

  在eclipse里运行spark程序时候,会报出以下错误:

java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
	at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355)
	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370)
	at org.apache.hadoop.util.Shell.<clinit>(Shell.java:363)
	at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:79)
	at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104)
	at org.apache.hadoop.security.Groups.<init>(Groups.java:86)
	at org.apache.hadoop.security.Groups.<init>(Groups.java:66)
	at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280)
	at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:271)
	at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:248)
	at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:763)
	at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748)
	at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621)
	at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160)
	at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2160)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:322)
	at cn.com.sparktest.SparkTest$.<init>(SparkTest.scala:10)
	at cn.com.sparktest.SparkTest$.<clinit>(SparkTest.scala)
	at cn.com.sparktest.SparkTest.main(SparkTest.scala)

  该错误不会影响程序的运算,但老是让人以为不舒服,这个问题是由于spark运行依赖于hadoop,但是在window下实际上是没法安装hadoop,只能使用cygwin模拟安装,而新版本的hadoop在windows下使用须要使用winutils.exe,解决这个问题很简单,就是下载一个winutils.exe,注意下本身操做系统是32位仍是64位,找到对应版本,而后放置在这样的目录下:

  D:\hadoop\bin\winutils.exe

  而后再环境变量里定义HADOOP_HOME= D:\hadoop

  环境变量的改变要重启eclipse,这样环境变量才会生效,这个时候程序运行就不会报出错误了。

相关文章
相关标签/搜索