Learning Spark中文版--第五章--加载保存数据(1)

  开发工程师和数据科学家都会受益于本章的部份内容。工程师可能但愿探索更多的输出格式,看看有没有一些适合他们下游用户的格式。数据科学家可能会更关注他们已经使用的数据格式。<br>数据库

Motivation

  咱们已经介绍了大量分布式程序使用的Spark操做。目前为止,咱们的例子都是从一个本地集合和规整文件中加载数据,可是有可能你的数据不是规整的或者不在一台机器上,那么就跟着我一块儿探索加载和保存数据的操做用法。<br>   Spark支持普遍的输出输入源,部分缘由是由于Spark构建在Haddoop生态环境之上。Spark能够经过Hadoop中MapReduce的InputFormatOutPutForamt接口来访问数据,这些接口可用于不少经常使用的文件格式和存储系统(如,S3,HDFS,Cassadra,HBase等等)。第84页上的“Hadoop输入和输出格式”展现了如何直接使用这些格式。<br>   更常见的状况是,你会想使用对原始接口封装的更高级API。Spark和它的生态系统提供了不少使用方式,这一章,咱们会详细介绍三个经常使用数据源集合:<br>json

  • 文件格式和文件系统<br> 对于数据存储在本地或分布式的文件系统,如NFS,HDFS,或亚马逊的S3,Spark能够访问各类文件格式,包括文本,JSON,SequnceFile和 Protocol buffer。咱们会介绍如何使用这几种经常使用文件格式,以及Spark如何与不一样的文件系统对接和配置压缩。<br>数组

  • 经过Spark SQL构建结构化数据源<br> 第九章介绍的Spark SQL模块,提供了很是高效的结构化数据源API,包括JSON和Apache的Hive。咱们会简要介绍一些Spark SQL,可是大部份内容留在第九章<br>app

  • 数据库和键值对存储<br> 咱们会简要了解如何链接Cassandra,HBase,Elasticsearch和JDBC数据库的内置或第三方库。<br>分布式

  Spark支持的语言可使用咱们选择的大多数方法,但有些库只有Java和Scala可使用。咱们遇到这样的例子会进行提示。<br>函数

文件格式<br>

  对于大多数文件格式,Spark加载和保存的方法都很是简单。从非结构化的格式,如text,到半结构化数据,如JSON,再到结构化数据,如SequenceFile(见表5-1)。The input formats that Spark wraps all transparently handle compressed formats based on the file extension.<br> (我翻不出来:(,对于输入格式,Spark会对其包装,包装了的输入格式会基于文件扩展名透明地处理压缩格式)。<br>oop

格式名 是否结构化 评价
文本文件 普通文本文件。假定每行一条记录
JSON 半结构 常见的基于文本格式,半结构;大多数库须要每行一条记录
CSV 很是经常使用的基于文本的格式,一般用于电子表格应用
SequnceFiles Hadoop中一种经常使用的键值对数据格式
Protocol buffer 一种快速的,空间利用率高的多语言格式
Object file Spark job用来共享代码的保存数据格式,很是有用

  除了Spark直接支持的输出机制以外,咱们能够将Hadoop的新老文件API用于键值对数据。能够在键值对上使用这些API,由于Hadoop的接口要求键值对数据,即便是忽略键的数据。在忽略键的状况下,一般使用虚拟键(如null)。性能

Text Files(文本文件)

  Spark中加载保存文本文件很是简单。当咱们加载一个简单的文本文件做为RDD时,每一行输入变成了RDD的一个基本元素。咱们能够把多个文本文件同时加载到一个键值对RDD中,键做为文件名,值做为文件内容。<br>大数据

loading text files(加载文本文件)<br>

  加载文本文件就和在SparkContext调用textFile(文件路径)同样简单,Example5-1到5-3展现了例子。若是咱们想控制分区的数量咱们也能够明确设定minPartitions。<br>编码

Example 5-1. Loading a text file in Python

input = sc.textFile("file:///home/holden/repos/spark/README.md")

Example 5-2. Loading a text file in Scala

val input = sc.textFile("file:///home/holden/repos/spark/README.md")

Example 5-3. Loading a text file in Java

JavaRDD<String> input = sc.textFile("file:///home/holden/repos/spark/README.md")

  参数是目录形式的多文件输入可使用两种方式处理。咱们能够直接使用textFile方法并把目录地址做为参数传入,这就会把目录中全部文件传进咱们的RDD。有时候须要知道某个文件的某一部分的来源(例如时间和文件的键结合做为区分标志)或者咱们须要一次处理整个文件。若是咱们的文件足够小,咱们可使用SparkContext.wholeTextFiles()方法,这会返回一个以输入文件的名称做为键的键值对RDD。<br>

   当每一个文件能表示一个肯定的时间数据,wholeTextFiles()就会变得颇有用。若是咱们有可以表示不一样时间段销售数据的文件,咱们能够很轻易地计算出每一个时间段的平均值。示例:<br>

Example 5-4. Average value per file in Scala

val input = sc.wholeTextFiles("file://home/holden/salesFiles")
val result = input.mapValues{y =>
val nums = y.split(" ").map(x => x.toDouble)
nums.sum / nums.size.toDouble
}

Spark支持读取指定目录的全部文件,而且也支持通配符输入(如,part-*.txt)。这个用处很是大,由于较大数据集一般分布在多个文件中,特别是若是其余文件(如成功标志)和要处理的文件在同一目录中。

Saving text files(保存文本文件)<br>

   输出文本文件也很简单。saveAsTextFile()方法,如Example5-5所示,以一个输出的路径做为参数,并将RDD内容输入到该路径之中。这个路径会做为一个目录而且Spark会把多个文件输入到该目录之下。Spark能够把输出结果写入多个节点。这个方法不能控制输出数据分段的开始和结束位置。但有其余的方法能够控制。<br>

Example 5-5. Saving as a text file in Python

result.saveAsTextFile(outputFile)

JSON

  JSON是一个很流行的半结构化数据格式。加载JSON数据最简单的方式就是把JSON当作文本文件而后用JSON解析器解析映射值。一样的,咱们可使用偏好的JSON序列化库把值写入字符串,而后再给写出。在Java和Scala中,咱们可使用定制的Hadoop格式来处理JSON。172也也介绍了Spark SQL怎么加载JSON数据。<br>

loading JSON(加载JSON)<br>

  像文本文件同样加载而后转换JSON数据是Spark全部支持的语言均可以使用的一种方法。这是假定你的JSON数据每条记录都在一行之中,若是你的JSON数据是多行的,你可能必须加载整个文件而后转换每一个文件。若是你使用的语言构建JSON解析器的代价比较大,你可使用mapPartitions()来重用解析器;107页"Working on a Per-Partition Basis"有这方面的细节。   咱们经常使用的这三种语言有大量的JSON库可使用,但为了简单起见,每种语言只介绍一种库。在Python中咱们使用内置库(Example5-6),Java和Scala中咱们使用Jackson(Examples5-7和5-8)。选择这几个库是由于他们运行效果很好而且使用相对简单。若是你在解析过程当中花了大量时间,那能够看一下Java和Scala其余的JSON库。<br>

Example 5-6. Loading unstructured JSON in Python

import json
data = input.map(lambda x: json.loads(x))

在Scala和Java中,一般把JSON数据转化为表示JSON格式的类。解析时,咱们可能但愿跳过无效的记录。下面展现了一个把JSON数据转换成Person实例的一个例子。<br>

Example 5-7. Loading JSON in Scala
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
...
case class Person(name: String, lovesPandas: Boolean) // Must be a top-level class
...
// Parse it into a specific case class. We use flatMap to handle errors
// by returning an empty list (None) if we encounter an issue and a
// list with one element if everything is ok (Some(_)).
//把JSON转化为样例类。使用flatMap转化,若是遇到错误就返回list(None),不然就返回一条记录的Some(_)
val result = input.flatMap(record => {
    try {
        Some(mapper.readValue(record, classOf[Person]))
    } catch {
        case e: Exception => None
    }})
    
Example 5-8. Loading JSON in Java

class ParseJson implements FlatMapFunction<Iterator<String>, Person> {
    public Iterable<Person> call(Iterator<String> lines) throws Exception {
        ArrayList<Person> people = new ArrayList<Person>();
        ObjectMapper mapper = new ObjectMapper();
        while (lines.hasNext()) {
            String line = lines.next();
            try {
                people.add(mapper.readValue(line, Person.class));
            } catch (Exception e) {
            // skip records on failure
            }
        }
        return people;
    }
}
JavaRDD<String> input = sc.textFile("file.json");
JavaRDD<Person> result = input.mapPartitions(new ParseJson());

处理格式不正确的记录可能很麻烦,特别是半结构化的数据,如JSON。对于小数据集因格式不正确的输入致使程序崩溃还能够接受,可是,处理大数据集时遇到格式畸形输入也是屡见不鲜。若是选择跳过错误格式的数据,你可能会想使用累加器来记录并追踪错误的数量。<br>

Saving JSON(保存JSON)<br>

  输出JSON比加载JSON简单不少,由于不用担忧错误格式的数据,而且知道咱们输出数据的类型。咱们能够直接使用把字符串RDD转换成JSON的相同的库,把对象转换成JSON后再使用Spark文本文件API将其输出。<br>   假如咱们正在举行一个对喜好熊猫的人的促销。使用第一步中的输入,过滤出喜欢熊猫的人,示例以下:<br>

Example 5-9. Saving JSON in Python

(data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x))
    .saveAsTextFile(outputFile))
Example 5-10. Saving JSON in Scala

result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_))
    .saveAsTextFile(outputFile)
Example 5-11. Saving JSON in Java
class WriteJson implements FlatMapFunction<Iterator<Person>, String> {
    public Iterable<String> call(Iterator<Person> people) throws Exception {
        ArrayList<String> text = new ArrayList<String>();
        ObjectMapper mapper = new ObjectMapper();
        while (people.hasNext()) {
            Person person = people.next();
            text.add(mapper.writeValueAsString(person));
        }
        return text;
    }
}
JavaRDD<Person> result = input.mapPartitions(new ParseJson()).filter(
    new LikesPandas());
JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
formatted.saveAsTextFile(outfile)

  使用已有的文本文件的机制再添加上须要的JSON库,在Spark中加载和保存JSON变得多么简单。<br>

Comma-Separated Values and Tab-Separated Values(逗号分割和Tab分割)

  逗号分割值文件要求每行字段数量固定,而且这些字段被逗号分割(TSV文件是根据tab键进行分割)。一般是每行储存一条记录,可是实际上并不是老是如此,由于记录常常超过一行。CSV和TSV文件有时可能不一致,常常出如今处理换行符,转义,显示非ASCII字符,或非整数数字时。CSV天生不能处理嵌套类型字段,因此咱们必须手动处理。<br>

  不像JSON的字段,每条记录都没有字段名字与其关联,只取行号就好了。在CSV文件中有个不成文的规定,第一行的每一列的值就是每一个字段的名称。<br>

Loading CSV(加载CSV)<br>

  加载CSV/TSV数据和加载JSON很像,按照文本文件那样加载而后再进行处理。若是格式缺乏标准化,会致使相同库的不一样版本之间处理的方式不一样。<br>

  和JSON同样,有不少CSV的库可使用,不过咱们每一个语言只介绍一种。在Python中咱们使用csv库。在Scala和Java中咱们使用opencsv。<br>

固然也有Hadoop的输入格式,CSVInputFormat,能够用来在Scala或Java中加载CSV数据,它也不支持换行符的记录。<br>

  若是你的CSV数据的任何字段都不包含换行符,你能够直接使用textFile()加载你的数据并进行转换。示例以下:<br>

Example 5-12. Loading CSV with textFile() in Python

import csv
import StringIO
...
def loadRecord(line):
    """Parse a CSV line"""
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
    return reader.next()
input = sc.textFile(inputFile).map(loadRecord)

Example 5-13. Loading CSV with textFile() in Scala

import Java.io.StringReader
import au.com.bytecode.opencsv.CSVReader
...
val input = sc.textFile(inputFile)
val result = input.map{ line =>
    val reader = new CSVReader(new StringReader(line));
    reader.readNext();
}


Example 5-14. Loading CSV with textFile() in Java
import au.com.bytecode.opencsv.CSVReader;
import Java.io.StringReader;
...
public static class ParseLine implements Function<String, String[]> {
    public String[] call(String line) throws Exception {
        CSVReader reader = new CSVReader(new StringReader(line));
        return reader.readNext();
    }
}
JavaRDD<String> csvFile1 = sc.textFile(inputFile);
JavaPairRDD<String[]> csvData = csvFile1.map(new ParseLine());

  若是字段中内嵌了换行符,咱们须要加载整个文件而且解析整段,就如Examples5-15到5-17中所展现。很不幸的是若是每一个文件都很是大,那加载解析过程可能出现性能瓶颈。文本文件不一样的加载方法在73页"Loading text files"有描述。<br>

Example 5-15. Loading CSV in full in Python

def loadRecords(fileNameContents):
    """Load all the records in a given file"""
    input = StringIO.StringIO(fileNameContents[1])
    reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"])
    return reader
fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)

Example 5-16. Loading CSV in full in Scala

case class Person(name: String, favoriteAnimal: String)

val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap{ case (_, txt) =>
    val reader = new CSVReader(new StringReader(txt));
    reader.readAll().map(x => Person(x(0), x(1)))
}

Example 5-17. Loading CSV in full in Java

public static class ParseLine
    implements FlatMapFunction<Tuple2<String, String>, String[]> {
    public Iterable<String[]> call(Tuple2<String, String> file) throws Exception {
        CSVReader reader = new CSVReader(new StringReader(file._2()));
        return reader.readAll();
    }
}
JavaPairRDD<String, String> csvData = sc.wholeTextFiles(inputFile);
JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLine());

若是只有少许的输入文件,而且你须要使用wholeFile()方法,则可能须要从新分区输入使得Spark高效并行化执行你的后续操做。<br>

Saving CSV(保存CSV)

  就像JSON同样,输出CSV/TSV数据也很简单而且重用输出的编码对象也有不少优势。由于在CSV中咱们不用输出每一个记录的字段名,咱们须要建立一个映射来确保输出的一致性。一个简单的方法就是写一个函数,把字段转换到数组的指定位置。在Python中,若是咱们想输出字典,CSV writer按照咱们提供的字段名顺序构建writer,而后将字典输出。<br>

  咱们使用的CSV库输出到文件或writer,因此咱们可使用StringWriter/StringIO来把结果输入到RDD之中,示例以下:<br>

Example 5-18. Writing CSV in Python

def writeRecords(records):
    """Write out CSV lines"""
    output = StringIO.StringIO()
    writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]
    
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)

Example 5-19. Writing CSV in Scala

pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray)
.mapPartitions{people =>
    val stringWriter = new StringWriter();
    val csvWriter = new CSVWriter(stringWriter);
    csvWriter.writeAll(people.toList)
    Iterator(stringWriter.toString)
}.saveAsTextFile(outFile)

  你可能注意到了,例子中咱们知道要输出内容的全部的字段。若是有些输入的字段是在运行时肯定的,那咱们只能换个方式了。最简单的办法就是先遍历全部的数据来提取不一样的字段,字段肯定以后就能够输出数据了。<br>

相关文章
相关标签/搜索