spark本地读取写入s3文件

1.关于S3,S3N和S3A的区别与联系(wiki:https://wiki.apache.org/hadoop/AmazonS3html

S3 Native FileSystem (URI scheme: s3n) A native filesystem for reading and writing regular files on S3. The advantage of this filesystem is that you can access files on S3 that were written with other tools. Conversely, other tools can access files written using Hadoop. The disadvantage is the 5GB limit on file size imposed by S3.


S3A (URI scheme: s3a) A successor to the S3 Native, s3n fs, the S3a: system uses Amazon's libraries to interact with S3. This allows S3a to support larger files (no more 5GB limit), higher performance operations and more. The filesystem is intended to be a replacement for/successor to S3 Native: all objects accessible from s3n:// URLs should also be accessible from s3a simply by replacing the URL schema.


S3 Block FileSystem (URI scheme: s3) A block-based filesystem backed by S3. Files are stored as blocks, just like they are in HDFS. This permits efficient implementation of renames. This filesystem requires you to dedicate a bucket for the filesystem - you should not use an existing bucket containing files, or write other files to the same bucket. The files stored by this filesystem can be larger than 5GB, but they are not interoperable with other S3 tools.
java

2.如何选择S3访问协议算法

由上面介绍可知,首先是三种协议的访问大小有区别;其次S3是block-based,s3n/s3a是object-based,最后S3A是apache推荐的访问方式,且S3访问方式将会慢慢被替代,AWS不同意使用S3访问,且S3A更加稳定安全高效,须要注意的是hadoop2.6版本对于S3A支持有bug,因此推荐使用hadoop2.7.x使用s3a协议访问sql

3.关于jar包的选择apache

在Hadoop当中,访问S3文件,要导入aws-sdk包,这个包里有个s3的子服务供Java语言访问S3,其中会调用hadoop-aws包解析协议,这个包在hadoop2.6.x版本以前是由hadoop-core维护的,所以若是使用hadoop2.4.x,里面会有一个这样的类:org.apache.hadoop.fs.s3native.NativeS3FileSystem,可是在hadoop2.6.x版本就没有了,由于Apache将访问S3的包从hadoop-core包解耦到hadoop-aws包,由AWS维护安全

aws有v2,v3,v4几种签名算法,而jets3t库只有0.9.4版本才支持V4,而hadoop低版本里引入的是老的jar包,所以当你使用s3n方式时最好额外引入jets3t包,而若是使用s3a就不须要了,由于s3a采用的是com.amazonaws.http.AmazonHttpClient协议,而不是jets3tapp

本次测试引入的jar,主要包含如下几个:ide

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.2.0</spark.version>
        <hadoop.version>2.7.3</hadoop.version>
        <spark.pom.scope>compile</spark.pom.scope>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>${spark.pom.scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>${spark.pom.scope}</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
            <scope>${spark.pom.scope}</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.6.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-aws</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>net.java.dev.jets3t</groupId>
            <artifactId>jets3t</artifactId>
            <version>0.9.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.4</version>
        </dependency>
    </dependencies>
4.踩坑

读取代码很简单,以下。须要注意这里面用了s3a协议,也是推荐的访问方式oop

val rdd = spark.sparkContext.textFile("s3a://xxx/part-00000")
 println(rdd.count())

即使我本机已经配置好了aws cli,可是执行的时候依然会遇到以下问题测试

Exception in thread "main" com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
只好在代码里配置下

spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "")
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "")
配置好后从新执行,依然会报错:

com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 403, AWS Service: Amazon S3, AWS Request ID: xx, AWS Error Code: null, AWS Error Message: Forbidden, S3 Extended Request ID: xxx
出现这个问题的缘由找了半天,后来才晓得aws-sdk调用rest服务,而调用rest服务须要指定endpoint的(http://docs.aws.amazon.com/zh_cn/AmazonS3/latest/dev/VirtualHosting.html),可是aws提供的默认的endpoint是在com.amazonaws.services.s3.internal.Constants这个类的HOSTNAME值是s3.amazonaws.com,而中国区的hostname应该是s3.cn-north-1.amazonaws.com.cn。所以有两种方式能够处理,一是新建个一样包名的Constants类替换其值(不建议,代码不友好),另一种是直接在代码的conf里设置

spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-north-1.amazonaws.com.cn")
到这基本能够在本地读取s3了,可是若是放在emr环境仍是有些问题,首先emr环境能够不设置aws的key和secret。其次EMR集群中的Hadoop是经过EMRFS的方式访问S3的,会把s3和s3n都转成s3,这等同于在本地Hadoop中使用s3n,而EMR环境是根本不支持s3a的(报403错误),EMR官方建议在代码中使用s3来访问文件

完整实例代码:

object SparkS3Test {
  def main(args: Array[String]) {
    val spark = SparkSession.builder()
      .master("local[*]")
      .config("spark.eventLog.enabled", "false")
      .config("spark.driver.memory", "2g")
      .config("spark.executor.memory", "2g")
      .appName("SparkDemoFromS3")
      .getOrCreate()
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "")
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "")
    spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-north-1.amazonaws.com.cn")
    val rdd = spark.sparkContext.textFile("s3a://xxx/part-00000")
    println(rdd.count())
  }
}