Spark输出自定义文件目录踩坑(Java)

最近项目中,使用Spark作离线计算,结果须要输出一份结果到文件中保存,而且须要按Key来放置不一样的目录。由于spark经过saveAsTextFile()方法默认输出是以part-0000的形式。java

解决方法

经过搜索,很轻易的就能搜索到使用saveAsHadoopFile()方法能够将文件输出到自定义文件目录。网上大部分都是scala的写法,java的具体操做以下:json

//首先,构造出一个PariRDD形式的RDD
JavaPairRDD<String, JSONObject> javaPairRDD =xxx
//使用saveAsHadoopFile方法输出到目标目录下,方法参数分别为(目标目录,key的class类型,value的class类型,输出format类)
javaPairRDD.saveAsHadoopFile("D:\\Test",String.class,JSONObject.class,RDDMultipleTextOutputFormat2.class);

//自定义一个RDDMultipleTextOutputFormat2继承MultipleTextOutputFormat
public static class RDDMultipleTextOutputFormat2 extends MultipleTextOutputFormat<String, JSONObject> {

    @Override
    public String generateFileNameForKeyValue(String key, JSONObject value,
                                              String name) {
        String object_type = value.getString("object_type");
        String object_id = value.getString("object_id");
        return object_type + "/" + object_id+".json";
    }

}
复制代码

最后输出的结果就是按"D:\Test\object_type\object_id.json来区分保存。bash

新的问题

美滋滋的打开按咱们要求输出目录的输出文件。结果却发现,输出文件中,并非仅仅将value写入了文件中,同时把key也写了进去。可是咱们的文件格式是不须要key写入文件的。ide

//输出文件内容形式以下
key  value
复制代码

解决办法一

遇事不决百度Google,经过搜索引擎,能够找到咱们经过设置rdd的key为NullWritable,使得输出文件中不包含key,网上一样大多数是scala的,下面是java的具体操做:oop

/首先,构造出一个PariRDD形式的RDD
JavaPairRDD<String, JSONObject> javaPairRDD =xxx

//将PariRDD转为<NullWritable,T>的形式
JavaPairRDD<NullWritable, JSONObject> nullKeyJavaPairRDD = javaPairRDD.mapToPair(tuple2 -> {
    return new Tuple2(NullWritable.get(),tuple2._2);
});

//接下来的操做和上面同样
nullKeyJavaPairRDD.saveAsHadoopFile("D:\\Test",NullWritable.class,JSONObject.class,RDDMultipleTextOutputFormat.class);

public static class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat<NullWritable, JSONObject> {

    @Override
    public String generateFileNameForKeyValue(NullWritable key, JSONObject value,
                                              String name) {
        String object_type = value.getString("object_type");
        String object_id = value.getString("object_id");
        return object_type + "/" + object_id+".json";
    }
}
复制代码

如上方法确实是能够解决问题,可是若是咱们须要的输出目录与key有关系,想将key使用在自定义目录中,这就办不到了。因此这个解决方法仍是有缺陷的,仅仅能知足输出目录只与value有关系的状况。ui

解决办法二

这里想到另外一个解决思路,往文件写内容老是须要一个类的,咱们找到这个类,重写它,把key的输出去掉,不就能够了。this

因而咱们跟进咱们继承的MultipleTextOutputFormat类中搜索引擎

public class MultipleTextOutputFormat<K, V>
    extends MultipleOutputFormat<K, V> {

  private TextOutputFormat<K, V> theTextOutputFormat = null;

  @Override
  protected RecordWriter<K, V> getBaseRecordWriter(FileSystem fs, JobConf job,
      String name, Progressable arg3) throws IOException {
    if (theTextOutputFormat == null) {
      theTextOutputFormat = new TextOutputFormat<K, V>();
    }
    return theTextOutputFormat.getRecordWriter(fs, job, name, arg3);
  }
}
复制代码

并无发现有相关的方法,咱们继续跟进父类MultipleOutputFormat,在这个类中,咱们发现了一个write方法:spa

public void write(K key, V value) throws IOException {

        // get the file name based on the key
        String keyBasedPath = generateFileNameForKeyValue(key, value, myName);

        // get the file name based on the input file name
        String finalPath = getInputFileBasedOutputFileName(myJob, keyBasedPath);

        // get the actual key
        K actualKey = generateActualKey(key, value);
        V actualValue = generateActualValue(key, value);

        RecordWriter<K, V> rw = this.recordWriters.get(finalPath);
        if (rw == null) {
          // if we don't have the record writer yet for the final path, create // one // and add it to the cache rw = getBaseRecordWriter(myFS, myJob, finalPath, myProgressable); this.recordWriters.put(finalPath, rw); } rw.write(actualKey, actualValue); }; 复制代码

感受真相就在眼前了,咱们继续跟进rw.write(actualKey, actualValue);方法,经过断点咱们能够知道他进入的是TextOutPutFormat #write()方法:scala

public synchronized void write(K key, V value)
      throws IOException {

      boolean nullKey = key == null || key instanceof NullWritable;
      boolean nullValue = value == null || value instanceof NullWritable;
      if (nullKey && nullValue) {
        return;
      }
      if (!nullKey) {
        writeObject(key);
      }
      if (!(nullKey || nullValue)) {
        out.write(keyValueSeparator);
      }
      if (!nullValue) {
        writeObject(value);
      }
      out.write(newline);
    }
复制代码

这段代码就很简单了,只要key不是null或者NullWritable类,他就会往文件里输出。这也解释了为何上面方法一中将key转换为NullWritable类就不会输出到文件中了。

了解到这里,咱们就很容易得出解决方案,咱们只要将传入write()方法中的key传入null就能够了。回到MultipleOutputFormat类中,咱们看到传入的key是由这个方法获取的K actualKey = generateActualKey(key, value);,继续跟进:

protected K generateActualKey(K key, V value) {
    return key;
  }
复制代码

接下很简单了,咱们在自定义的format类中重写这个方法,改成返回null便可。

public static class RDDMultipleTextOutputFormat3 extends MultipleTextOutputFormat<String, JSONObject> {

        @Override
        public String generateFileNameForKeyValue(String key, JSONObject value,
                                                  String name) {
            String object_id = value.getString("object_id");
            return key + "/" + object_id+".json";
        }
        
        @Override
        public String generateActualKey(String key, JSONObject value) {
            return null;
        }
    }
复制代码

总结

其实此次踩坑仍是挺简单的,循序渐进的一路跟进就找到了,其中还有很多点是能够延伸的,好比saveAsHadoopFile方法的的底层实现,和传统的saveAsTextFile方法的异同,我在查资料的过程当中也发现了不少延伸的知识点,不过这些应该就是另外一篇博客了。

相关文章
相关标签/搜索