开发工程师和数据科学家都会受益于本章的部份内容。工程师可能但愿探索更多的输出格式,看看有没有一些适合他们下游用户的格式。数据科学家可能会更关注他们已经使用的数据格式。<br>数据库
Motivation
咱们已经介绍了大量分布式程序使用的Spark操做。目前为止,咱们的例子都是从一个本地集合和规整文件中加载数据,可是有可能你的数据不是规整的或者不在一台机器上,那么就跟着我一块儿探索加载和保存数据的操做用法。<br> Spark支持普遍的输出输入源,部分缘由是由于Spark构建在Haddoop生态环境之上。Spark能够经过Hadoop中MapReduce的InputFormat
和OutPutForamt
接口来访问数据,这些接口可用于不少经常使用的文件格式和存储系统(如,S3,HDFS,Cassadra,HBase等等)。第84页上的“Hadoop输入和输出格式”展现了如何直接使用这些格式。<br> 更常见的状况是,你会想使用对原始接口封装的更高级API。Spark和它的生态系统提供了不少使用方式,这一章,咱们会详细介绍三个经常使用数据源集合:<br>json
-
文件格式和文件系统<br> 对于数据存储在本地或分布式的文件系统,如NFS,HDFS,或亚马逊的S3,Spark能够访问各类文件格式,包括文本,JSON,SequnceFile和 Protocol buffer。咱们会介绍如何使用这几种经常使用文件格式,以及Spark如何与不一样的文件系统对接和配置压缩。<br>数组
-
经过Spark SQL构建结构化数据源<br> 第九章介绍的Spark SQL模块,提供了很是高效的结构化数据源API,包括JSON和Apache的Hive。咱们会简要介绍一些Spark SQL,可是大部份内容留在第九章<br>app
-
数据库和键值对存储<br> 咱们会简要了解如何链接Cassandra,HBase,Elasticsearch和JDBC数据库的内置或第三方库。<br>分布式
Spark支持的语言可使用咱们选择的大多数方法,但有些库只有Java和Scala可使用。咱们遇到这样的例子会进行提示。<br>函数
文件格式<br>
对于大多数文件格式,Spark加载和保存的方法都很是简单。从非结构化的格式,如text,到半结构化数据,如JSON,再到结构化数据,如SequenceFile(见表5-1)。The input formats that Spark wraps all transparently handle compressed formats based on the file extension.<br> (我翻不出来:(,对于输入格式,Spark会对其包装,包装了的输入格式会基于文件扩展名透明地处理压缩格式)。<br>oop
格式名 | 是否结构化 | 评价 |
---|---|---|
文本文件 | 非 | 普通文本文件。假定每行一条记录 |
JSON | 半结构 | 常见的基于文本格式,半结构;大多数库须要每行一条记录 |
CSV | 是 | 很是经常使用的基于文本的格式,一般用于电子表格应用 |
SequnceFiles | 是 | Hadoop中一种经常使用的键值对数据格式 |
Protocol buffer | 是 | 一种快速的,空间利用率高的多语言格式 |
Object file | 是 | Spark job用来共享代码的保存数据格式,很是有用 |
除了Spark直接支持的输出机制以外,咱们能够将Hadoop的新老文件API用于键值对数据。能够在键值对上使用这些API,由于Hadoop的接口要求键值对数据,即便是忽略键的数据。在忽略键的状况下,一般使用虚拟键(如null)。性能
Text Files(文本文件)
Spark中加载保存文本文件很是简单。当咱们加载一个简单的文本文件做为RDD时,每一行输入变成了RDD的一个基本元素。咱们能够把多个文本文件同时加载到一个键值对RDD中,键做为文件名,值做为文件内容。<br>大数据
loading text files(加载文本文件)<br>
加载文本文件就和在SparkContext
调用textFile(文件路径)
同样简单,Example5-1到5-3展现了例子。若是咱们想控制分区的数量咱们也能够明确设定minPartitions
。<br>编码
Example 5-1. Loading a text file in Python input = sc.textFile("file:///home/holden/repos/spark/README.md") Example 5-2. Loading a text file in Scala val input = sc.textFile("file:///home/holden/repos/spark/README.md") Example 5-3. Loading a text file in Java JavaRDD<String> input = sc.textFile("file:///home/holden/repos/spark/README.md")
参数是目录形式的多文件输入可使用两种方式处理。咱们能够直接使用textFile
方法并把目录地址做为参数传入,这就会把目录中全部文件传进咱们的RDD。有时候须要知道某个文件的某一部分的来源(例如时间和文件的键结合做为区分标志)或者咱们须要一次处理整个文件。若是咱们的文件足够小,咱们可使用SparkContext.wholeTextFiles()
方法,这会返回一个以输入文件的名称做为键的键值对RDD。<br>
当每一个文件能表示一个肯定的时间数据,wholeTextFiles()
就会变得颇有用。若是咱们有可以表示不一样时间段销售数据的文件,咱们能够很轻易地计算出每一个时间段的平均值。示例:<br>
Example 5-4. Average value per file in Scala val input = sc.wholeTextFiles("file://home/holden/salesFiles") val result = input.mapValues{y => val nums = y.split(" ").map(x => x.toDouble) nums.sum / nums.size.toDouble }
Spark支持读取指定目录的全部文件,而且也支持通配符输入(如,part-*.txt)。这个用处很是大,由于较大数据集一般分布在多个文件中,特别是若是其余文件(如成功标志)和要处理的文件在同一目录中。
Saving text files(保存文本文件)<br>
输出文本文件也很简单。saveAsTextFile()
方法,如Example5-5所示,以一个输出的路径做为参数,并将RDD内容输入到该路径之中。这个路径会做为一个目录而且Spark会把多个文件输入到该目录之下。Spark能够把输出结果写入多个节点。这个方法不能控制输出数据分段的开始和结束位置。但有其余的方法能够控制。<br>
Example 5-5. Saving as a text file in Python result.saveAsTextFile(outputFile)
JSON
JSON是一个很流行的半结构化数据格式。加载JSON数据最简单的方式就是把JSON当作文本文件而后用JSON解析器解析映射值。一样的,咱们可使用偏好的JSON序列化库把值写入字符串,而后再给写出。在Java和Scala中,咱们可使用定制的Hadoop格式来处理JSON。172也也介绍了Spark SQL怎么加载JSON数据。<br>
loading JSON(加载JSON)<br>
像文本文件同样加载而后转换JSON数据是Spark全部支持的语言均可以使用的一种方法。这是假定你的JSON数据每条记录都在一行之中,若是你的JSON数据是多行的,你可能必须加载整个文件而后转换每一个文件。若是你使用的语言构建JSON解析器的代价比较大,你可使用mapPartitions()
来重用解析器;107页"Working on a Per-Partition Basis"有这方面的细节。 咱们经常使用的这三种语言有大量的JSON库可使用,但为了简单起见,每种语言只介绍一种库。在Python中咱们使用内置库(Example5-6),Java和Scala中咱们使用Jackson(Examples5-7和5-8)。选择这几个库是由于他们运行效果很好而且使用相对简单。若是你在解析过程当中花了大量时间,那能够看一下Java和Scala其余的JSON库。<br>
Example 5-6. Loading unstructured JSON in Python import json data = input.map(lambda x: json.loads(x))
在Scala和Java中,一般把JSON数据转化为表示JSON格式的类。解析时,咱们可能但愿跳过无效的记录。下面展现了一个把JSON数据转换成Person实例的一个例子。<br>
Example 5-7. Loading JSON in Scala import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.DeserializationFeature ... case class Person(name: String, lovesPandas: Boolean) // Must be a top-level class ... // Parse it into a specific case class. We use flatMap to handle errors // by returning an empty list (None) if we encounter an issue and a // list with one element if everything is ok (Some(_)). //把JSON转化为样例类。使用flatMap转化,若是遇到错误就返回list(None),不然就返回一条记录的Some(_) val result = input.flatMap(record => { try { Some(mapper.readValue(record, classOf[Person])) } catch { case e: Exception => None }}) Example 5-8. Loading JSON in Java class ParseJson implements FlatMapFunction<Iterator<String>, Person> { public Iterable<Person> call(Iterator<String> lines) throws Exception { ArrayList<Person> people = new ArrayList<Person>(); ObjectMapper mapper = new ObjectMapper(); while (lines.hasNext()) { String line = lines.next(); try { people.add(mapper.readValue(line, Person.class)); } catch (Exception e) { // skip records on failure } } return people; } } JavaRDD<String> input = sc.textFile("file.json"); JavaRDD<Person> result = input.mapPartitions(new ParseJson());
处理格式不正确的记录可能很麻烦,特别是半结构化的数据,如JSON。对于小数据集因格式不正确的输入致使程序崩溃还能够接受,可是,处理大数据集时遇到格式畸形输入也是屡见不鲜。若是选择跳过错误格式的数据,你可能会想使用累加器来记录并追踪错误的数量。<br>
Saving JSON(保存JSON)<br>
输出JSON比加载JSON简单不少,由于不用担忧错误格式的数据,而且知道咱们输出数据的类型。咱们能够直接使用把字符串RDD转换成JSON的相同的库,把对象转换成JSON后再使用Spark文本文件API将其输出。<br> 假如咱们正在举行一个对喜好熊猫的人的促销。使用第一步中的输入,过滤出喜欢熊猫的人,示例以下:<br>
Example 5-9. Saving JSON in Python (data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x)) .saveAsTextFile(outputFile)) Example 5-10. Saving JSON in Scala result.filter(p => P.lovesPandas).map(mapper.writeValueAsString(_)) .saveAsTextFile(outputFile) Example 5-11. Saving JSON in Java class WriteJson implements FlatMapFunction<Iterator<Person>, String> { public Iterable<String> call(Iterator<Person> people) throws Exception { ArrayList<String> text = new ArrayList<String>(); ObjectMapper mapper = new ObjectMapper(); while (people.hasNext()) { Person person = people.next(); text.add(mapper.writeValueAsString(person)); } return text; } } JavaRDD<Person> result = input.mapPartitions(new ParseJson()).filter( new LikesPandas()); JavaRDD<String> formatted = result.mapPartitions(new WriteJson()); formatted.saveAsTextFile(outfile)
使用已有的文本文件的机制再添加上须要的JSON库,在Spark中加载和保存JSON变得多么简单。<br>
Comma-Separated Values and Tab-Separated Values(逗号分割和Tab分割)
逗号分割值文件要求每行字段数量固定,而且这些字段被逗号分割(TSV文件是根据tab键进行分割)。一般是每行储存一条记录,可是实际上并不是老是如此,由于记录常常超过一行。CSV和TSV文件有时可能不一致,常常出如今处理换行符,转义,显示非ASCII字符,或非整数数字时。CSV天生不能处理嵌套类型字段,因此咱们必须手动处理。<br>
不像JSON的字段,每条记录都没有字段名字与其关联,只取行号就好了。在CSV文件中有个不成文的规定,第一行的每一列的值就是每一个字段的名称。<br>
Loading CSV(加载CSV)<br>
加载CSV/TSV数据和加载JSON很像,按照文本文件那样加载而后再进行处理。若是格式缺乏标准化,会致使相同库的不一样版本之间处理的方式不一样。<br>
和JSON同样,有不少CSV的库可使用,不过咱们每一个语言只介绍一种。在Python中咱们使用csv库。在Scala和Java中咱们使用opencsv。<br>
固然也有Hadoop的输入格式,CSVInputFormat,能够用来在Scala或Java中加载CSV数据,它也不支持换行符的记录。<br>
若是你的CSV数据的任何字段都不包含换行符,你能够直接使用textFile()
加载你的数据并进行转换。示例以下:<br>
Example 5-12. Loading CSV with textFile() in Python import csv import StringIO ... def loadRecord(line): """Parse a CSV line""" input = StringIO.StringIO(line) reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"]) return reader.next() input = sc.textFile(inputFile).map(loadRecord) Example 5-13. Loading CSV with textFile() in Scala import Java.io.StringReader import au.com.bytecode.opencsv.CSVReader ... val input = sc.textFile(inputFile) val result = input.map{ line => val reader = new CSVReader(new StringReader(line)); reader.readNext(); } Example 5-14. Loading CSV with textFile() in Java import au.com.bytecode.opencsv.CSVReader; import Java.io.StringReader; ... public static class ParseLine implements Function<String, String[]> { public String[] call(String line) throws Exception { CSVReader reader = new CSVReader(new StringReader(line)); return reader.readNext(); } } JavaRDD<String> csvFile1 = sc.textFile(inputFile); JavaPairRDD<String[]> csvData = csvFile1.map(new ParseLine());
若是字段中内嵌了换行符,咱们须要加载整个文件而且解析整段,就如Examples5-15到5-17中所展现。很不幸的是若是每一个文件都很是大,那加载解析过程可能出现性能瓶颈。文本文件不一样的加载方法在73页"Loading text files"有描述。<br>
Example 5-15. Loading CSV in full in Python def loadRecords(fileNameContents): """Load all the records in a given file""" input = StringIO.StringIO(fileNameContents[1]) reader = csv.DictReader(input, fieldnames=["name", "favoriteAnimal"]) return reader fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords) Example 5-16. Loading CSV in full in Scala case class Person(name: String, favoriteAnimal: String) val input = sc.wholeTextFiles(inputFile) val result = input.flatMap{ case (_, txt) => val reader = new CSVReader(new StringReader(txt)); reader.readAll().map(x => Person(x(0), x(1))) } Example 5-17. Loading CSV in full in Java public static class ParseLine implements FlatMapFunction<Tuple2<String, String>, String[]> { public Iterable<String[]> call(Tuple2<String, String> file) throws Exception { CSVReader reader = new CSVReader(new StringReader(file._2())); return reader.readAll(); } } JavaPairRDD<String, String> csvData = sc.wholeTextFiles(inputFile); JavaRDD<String[]> keyedRDD = csvData.flatMap(new ParseLine());
若是只有少许的输入文件,而且你须要使用
wholeFile()
方法,则可能须要从新分区输入使得Spark高效并行化执行你的后续操做。<br>
Saving CSV(保存CSV)
就像JSON同样,输出CSV/TSV数据也很简单而且重用输出的编码对象也有不少优势。由于在CSV中咱们不用输出每一个记录的字段名,咱们须要建立一个映射来确保输出的一致性。一个简单的方法就是写一个函数,把字段转换到数组的指定位置。在Python中,若是咱们想输出字典,CSV writer
按照咱们提供的字段名顺序构建writer
,而后将字典输出。<br>
咱们使用的CSV库输出到文件或writer
,因此咱们可使用StringWriter/StringIO
来把结果输入到RDD之中,示例以下:<br>
Example 5-18. Writing CSV in Python def writeRecords(records): """Write out CSV lines""" output = StringIO.StringIO() writer = csv.DictWriter(output, fieldnames=["name", "favoriteAnimal"]) for record in records: writer.writerow(record) return [output.getvalue()] pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile) Example 5-19. Writing CSV in Scala pandaLovers.map(person => List(person.name, person.favoriteAnimal).toArray) .mapPartitions{people => val stringWriter = new StringWriter(); val csvWriter = new CSVWriter(stringWriter); csvWriter.writeAll(people.toList) Iterator(stringWriter.toString) }.saveAsTextFile(outFile)
你可能注意到了,例子中咱们知道要输出内容的全部的字段。若是有些输入的字段是在运行时肯定的,那咱们只能换个方式了。最简单的办法就是先遍历全部的数据来提取不一样的字段,字段肯定以后就能够输出数据了。<br>