#HA方式启动spark,当Leader,挂掉的时候,standy变为alive
./bin/spark-shell --master spark://xupan001:7070,xupan002:7070git
#指定两个分区,会生成两个做业task,hdfs上会有两个文件
val rdd1 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9), 2)
rdd1.partitions.length //2
#saveAsTextFile
rdd1.saveAsTextFile("hdfs://xupan001:8020/user/root/spark/output/partition")github
Permission | Owner | Group | Size | Replication | Block Size | Name |
---|---|---|---|---|---|---|
-rw-r--r-- | root | supergroup | 0 B | 1 | 128 MB | _SUCCESS |
-rw-r--r-- | root | supergroup | 8 B | 1 | 128 MB | part-00000 |
-rw-r--r-- | root | supergroup | 10 B | 1 | 128 MB | part-00001 |
若是没有指定分区数:文件个数和cores有关,也就是可用核数有关(总核数)
val rdd1 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9))
rdd1.partitions.length //6
rdd1.saveAsTextFile("hdfs://xupan001:8020/user/root/spark/output/partition2")shell
Permission | Owner | Group | Size | Replication | Block Size | Name |
---|---|---|---|---|---|---|
-rw-r--r-- | root | supergroup | 0 B | 1 | 128 MB | _SUCCESS |
-rw-r--r-- | root | supergroup | 2 B | 1 | 128 MB | part-00000 |
-rw-r--r-- | root | supergroup | 4 B | 1 | 128 MB | part-00001 |
-rw-r--r-- | root | supergroup | 2 B | 1 | 128 MB | part-00002 |
-rw-r--r-- | root | supergroup | 4 B | 1 | 128 MB | part-00003 |
-rw-r--r-- | root | supergroup | 2 B | 1 | 128 MB | part-00004 |
-rw-r--r-- | root | supergroup | 4 B | 1 | 128 MB | part-00005 |
Workersapache
Worker Id | Address | State | Cores | Memory |
---|---|---|---|---|
worker-20171211031717-192.168.0.118-7071 | 192.168.0.118:7071 | ALIVE | 2 (2 Used) | 2.0 GB (1024.0 MB Used) |
worker-20171211031718-192.168.0.119-7071 | 192.168.0.119:7071 | ALIVE | 2 (2 Used) | 2.0 GB (1024.0 MB Used) |
worker-20171211031718-192.168.0.120-7071 | 192.168.0.120:7071 | ALIVE | 2 (2 Used) | 2.0 GB (1024.0 MB Used) |
======================================================app
从hdfs上读取文件若是没有指定分区,默认为2个分区
scala> val rdd = sc.textFile("hdfs://xupan001:8020/user/root/spark/input/zookeeper.out")
scala> rdd.partitions.length
res3: Int = 2oop
/** * Default min number of partitions for Hadoop RDDs when not given by user * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2. * The reasons for this are discussed in https://github.com/mesos/spark/pull/718 */ def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
若是hdfs文件很大,则会根据 文件Size/128个partition,若是余数不足128则Size/128 + 1个partition测试
总结:以上是我在spark2.2.0上作的测试:
1.若是是Driver端的Scala集合并行化建立RDD,而且没有指定RDD的分区,RDD的分区数就是Application分配的总cores数
2:若是是hdfs文件系统的方式读取数据this
2.1一个文件文件的大小小于128M
scala> val rdd = sc.textFile("hdfs://xupan001:8020/user/root/spark/input/zookeeper.out",1)
scala> rdd.partitions.length
res0: Int = 1spa
2.2多个文件,其中一个文件大大小为:scala
Permission | Owner | Group | Size | Replication | Block Size | Name |
---|---|---|---|---|---|---|
-rw-r--r-- | root | supergroup | 4.9 KB | 1 | 128 MB | userLog.txt |
-rw-r--r-- | root | supergroup | 284.35 MB | 1 | 128 MB | userLogBig.txt |
-rw-r--r-- | root | supergroup | 51.83 KB | 1 | 128 MB | zookeeper.out |
scala> val rdd = sc.textFile("hdfs://xupan001:8020/user/root/spark/input")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://xupan001:8020/user/root/spark/input MapPartitionsRDD[3] at textFile at <console>:24
scala> rdd.partitions.length
res1: Int = 5
userLogBig.txt会有3个block