package dayo1 import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable.ArrayBuffer object MapAndPartitions { def main(args: Array[String]): Unit = { val cof = new SparkConf ().setAppName ( this.getClass.getSimpleName ).setMaster ( "local[*]" ) val sc = new SparkContext ( cof ) //建立RDD(并列化方法) val arrayRDD = sc.parallelize ( Array ( 1, 2, 3, 4, 5, 6, 7, 8, 9 ) ) //map数据每次处理一行数据 arrayRDD.map ( elements => elements ).foreach ( println ) arrayRDD.mapPartitions(tp=>{ val result=new ArrayBuffer[Int]() tp.foreach(tp=>{ result+=tp }) result.iterator } ).foreach(println) sc.stop () } /** * 两个函数最终处理获得的结果是同样的 * * mapPartitions比较适合须要分批处理数据的状况,好比将数据插入某个表,每批数据只须要开启一次数据库链接,大大减小了链接开支,伪代码以下: * * 复制代码 * arrayRDD.mapPartitions(datas=>{ * dbConnect = getDbConnect() //获取数据库链接 * datas.foreach(data=>{ * dbConnect.insert(data) //循环插入数据 * }) * dbConnect.commit() //提交数据库事务 * dbConnect.close() //关闭数据库链接 * }) * 复制代码 */ }