Spark从外部读取数据之wholeTextFiles

wholeTextFiles函数java

/**
   * Read a directory of text files from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI. Each file is read as a single record and returned in a
   * key-value pair, where the key is the path of each file, the value is the content of each file.
   *
   * <p> For example, if you have the following files:
   * {{{
   *   hdfs://a-hdfs-path/part-00000
   *   hdfs://a-hdfs-path/part-00001
   *   ...
   *   hdfs://a-hdfs-path/part-nnnnn
   * }}}
   *
   * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,
   *
   * <p> then `rdd` contains
   * {{{
   *   (a-hdfs-path/part-00000, its content)
   *   (a-hdfs-path/part-00001, its content)
   *   ...
   *   (a-hdfs-path/part-nnnnn, its content)
   * }}}
   *
   * @note Small files are preferred, large file is also allowable, but may cause bad performance.
   * @note On some filesystems, `.../path/*` can be a more efficient way to read all files
   *       in a directory rather than `.../path/` or `.../path`
   *
   * @param path Directory to the input data files, the path can be comma separated paths as the
   *             list of inputs.
   * @param minPartitions A suggestion value of the minimal splitting number for input data.
   */
  def wholeTextFiles(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
    assertNotStopped()
    val job = NewHadoopJob.getInstance(hadoopConfiguration)
    // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
    // comma separated files as input. (see SPARK-7155)
    NewFileInputFormat.setInputPaths(job, path)
    val updateConf = job.getConfiguration
    new WholeTextFileRDD(
      this,
      classOf[WholeTextFileInputFormat],
      classOf[Text],
      classOf[Text],
      updateConf,
      minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path)
  }

分析参数:node

path: String 是一个URI,這个URI能够是HDFS、本地文件(所有的节点均可以),或者其余Hadoop支持的文件系统URI,每一个文件读取为一个record,返回的是一个key-value的pairRDD,key是這个文件的路径名app

value 是這个文件的具体内容,也就是RDD的内部形式是Iterator[(String,String)])
minPartitions=  math.min(defaultParallelism, 2) 是指定数据的分区,若是不指定分区,当你的核数大于2的时候,不指定分区数那么就是 2
当你的数据大于128M时候,Spark是为每个快(block)建立一个分片(Hadoop-2.X以后为128m一个block)
函数

note:oop

(1)、wholeTextFiles对于大量的小文件效率比较高,大文件效果没有那么高this

(2)、一些文件系统的路径名采用通配符的形式效果比一个一个文件名添加上去更高效spa

sc.wholeTextFiles("/root/application/temp/*",2)
比下面表达更高效。

sc.wholeTextFiles("/root/application/temp/people1.txt,/root/application/temp/people2.txt",2)


一、从HDFS读取一个文件code

val path = "hdfs://master:9000/examples/examples/src/main/resources/people.txt"
 val rdd1 = sc.wholeTextFiles(path,3)
rdd1.foreach(println)
(hdfs://master:9000/examples/examples/src/main/resources/people.txt,Michael, 29
Andy, 30
Justin, 19
)
它的key是hdfs://master:9000/examples/examples/src/main/resources/people.txt

value是:
Michael, 29
Andy, 30
Justin, 19
orm


二、从本地读取多个文件hadoop

val path = "/usr/local/spark/spark-1.6.0-bin-hadoop2.6/data/*/*.txt"  //local file
val rdd1 = sc.wholeTextFiles(path,2)

从本地读取多个文件,可是和textFile有一些分区上的区别

用wholeTextFiles函数读取的這些文件,這个RDD是只有2个partition,它就像把所有文件数据放在一些,以后再按照

后面设置的partition来进行分区。好比此处设置的是两个分区。

虽然這有两个分区,可是内部的RDD是的key-value的个数是和文件数是同样多的。