本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客。版权声明:本套Spark商业应用实战归做者(秦凯新)全部,禁止转载,欢迎学习。java
本文重点介绍最具技术含量的数据倾斜处理算法,以下方法仅供参考。算法
20111230000005 57375476989eea12893c0c3811607bcf 奇艺高清 1 1 http://www.qiyi.com/
20111230000005 66c5bb7774e31d0a22278249b26bc83a 凡人修仙传 3 1 http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1
20111230000007 b97920521c78de70ac38e3713f524b50 本本联盟 1 1 http://www.bblianmeng.com/
20111230000008 6961d0c97fe93701fc9c0d861d096cd9 华南师范大学图书馆 1 1 http://lib.scnu.edu.cn/
复制代码
详细请参考这个博客,很是好:https://blog.csdn.net/zyp13781913772/article/details/81428862
复制代码
val sourceRdd = sc.textFile("hdfs://bd-master:9000/opendir/source.txt")
sourceRdd.zipWithIndex.take(1)
Array(
(20111230000005 57375476989eea12893c0c3811607bcf 奇艺高清 1 1 http://www.qiyi.com/, 0),
(20111230000005 66c5bb7774e31d0a22278249b26bc83a 凡人修仙传 3 1 http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1, 1)
)
++ => ++= 数组追加
+= => +=: 在数组前面追加元素
val sourceWithIndexRdd = sourceRdd.zipWithIndex.map(tuple =>
{val array = scala.collection.mutable.ArrayBuffer[String]();
array++=(tuple._1.split("\t"));
tuple._2.toString +=: array;
array.toArray})
Array(
Array(0, 20111230000005, 57375476989eea12893c0c3811607bcf, 奇艺高清, 1, 1, http://www.qiyi.com/),
Array(1, 20111230000005, 66c5bb7774e31d0a22278249b26bc83a, 凡人修仙传, 3, 1, http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1)
)
sourceWithIndexRdd.map(_.mkString("\t")).saveAsTextFile("hdfs://bd-master:9000/source_index")
复制代码
source_index:sql
Array[String] = Array(
0 20111230000005 57375476989eea12893c0c3811607bcf 奇艺高清 1 1 http://www.qiyi.com/,
1 20111230000005 66c5bb7774e31d0a22278249b26bc83a 凡人修仙传 3 1 http://www.booksky.org/BookDetail.aspx?BookID=1050804&Level=1
)
复制代码
数据模拟:apache
val sourceRdd = sc.textFile("hdfs://bd-master:9000/source_index/p*")
val kvRdd = sourceRdd.map(x =>{ val parm=x.split("\t");(parm(0).trim().toInt,parm(1).trim()) })
(Int, String) = (479936,20111230000005)
//100万条数据集
val kvRdd2 = kvRdd.map(x=>{if(x._1 < 900001) (900001,x._2) else x})
kvRdd2.map(x=>x._1 +","+x._2).saveAsTextFile("hdfs://bd-master:9000/big_data/")
//1万条数据集
val joinRdd2 = kvRdd.filter(_._1 > 900000)
joinRdd2.map(x=>x._1 +","+x._2).saveAsTextFile("hdfs://bd-master:9000/small_data/")
复制代码
map reduce:
val sourceRdd = sc.textFile("hdfs://bd-master:9000/big_data/p*")
val sourceRdd2 = sc.textFile("hdfs://bd-master:9000/small_data/p*")
val joinRdd = sourceRdd.map(x =>{ val parm=x.split(",");(parm(0).trim().toInt, parm(1).trim) })
val joinRdd2 = sourceRdd2.map(x =>{ val parm=x.split(",");(parm(0).trim().toInt, parm(1).trim) })
复制代码
mapSide:
val sourceRdd = sc.textFile("hdfs://bd-master:9000/big_data/p*")
val sourceRdd2 = sc.textFile("hdfs://bd-master:9000/small_data/p*")
//100万条数据集
val joinRdd = sourceRdd.map(x =>{ val parm=x.split(",");(parm(0).trim().toInt, parm(1).trim) })
//1万条数据集
val joinRdd2 = sourceRdd2.map(x =>{ val parm=x.split(",");(parm(0).trim().toInt, parm(1).trim) })
val broadcastVar = sc.broadcast(joinRdd2.collectAsMap)
joinRdd.map(x => (x._1,(x._2,broadcastVar.value.getOrElse(x._1,"")))).count
复制代码
数据模拟--90万如下的id统一改成8的倍数,所以以并行度为12的计算,数据倾斜在taskid=8的任务上:api
val sourceRdd = sc.textFile("hdfs://bd-master:9000/source_index")
case class brower(id:Int, time:Long, uid:String, keyword:String, url_rank:Int, click_num:Int, click_url:String) extends Serializable
val ds = sourceRdd.map(_.split("\t")).map(attr => brower(attr(0).toInt, attr(1).toLong, attr(2), attr(3), attr(4).toInt, attr(5).toInt, attr(6))).toDS
ds.createOrReplaceTempView("sourceTable")
val newSource = spark.sql("SELECT CASE WHEN id < 900000 THEN (8 + (CAST (RAND() * 50000 AS bigint)) * 12 ) ELSE id END, time, uid, keyword, url_rank, click_num, click_url FROM sourceTable")
newSource.rdd.map(_.mkString("\t")).saveAsTextFile("hdfs://bd-master:9000/test_data")
复制代码
val sourceRdd = sc.textFile("hdfs://bd-master:9000/test_data/p*")
val kvRdd = sourceRdd.map(x =>{ val parm=x.split("\t");(parm(0).trim().toInt,parm(1).trim()) })
kvRdd.groupByKey(12).count
复制代码
kvRdd.groupByKey(17).count
复制代码
val sourceRdd = sc.textFile("hdfs://bd-master:9000/source_index/p*",13)
val kvRdd = sourceRdd.map(x =>{ val parm=x.split("\t");(parm(0).trim().toInt,parm(4).trim().toInt) })
数据倾斜的key为20001,总共980000个,所以能够经过随机id达到均匀id:
val kvRdd2 = kvRdd.map(x=>{if(x._1 > 20000) (20001,x._2) else x})
复制代码
kvRdd2.groupByKey().collect
复制代码
val kvRdd3 = kvRdd2.map(x=>{if (x._1 ==20001) (x._1 + scala.util.Random.nextInt(100),x._2) else x})
kvRdd3.sortByKey(false).collect
复制代码
package skewTuring;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Random;
/**
* 方案适用场景:RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案。
* 两阶段聚合(局部聚合+全局聚合)
*/
public class SkewTuring11 {
public static void main(String[] args) throws Exception{
// 构建Spark上下文
SparkConf conf = new SparkConf().setAppName("SkewTuring11");
conf.setMaster("local[8]");
JavaSparkContext sc = new JavaSparkContext(conf);
//0 20111230000005 57375476989eea12893c0c3811607bcf 奇艺高清 1 1 http://www.qiyi.com/
JavaRDD<String> sourceRdd = sc.textFile("hdfs://master:9000/skewtestdata/source_index");
// 第一步,给RDD中的每一个key都打上一个随机前缀。
JavaPairRDD<String, Long> randomPrefixRdd = sourceRdd.mapToPair(new PairFunction<String,String,Long>() {
@Override
public Tuple2<String, Long> call(String s) throws Exception {
String[] splits = s.split("\t");
Random random = new Random();
int prefix = random.nextInt(10);
Long key = Long.valueOf(splits[0]);
if(key > 10000) {
return new Tuple2<String, Long>(prefix + "_" + 10001L, 1L);
} else {
return new Tuple2<String, Long>(prefix + "_" + key,1L);
}
}
});
// 第二步,对打上随机前缀的key进行局部聚合。
JavaPairRDD<String, Long> localAggrRdd = randomPrefixRdd.reduceByKey(new Function2<Long, Long, Long>() {
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
// 第三步,去除RDD中每一个key的随机前缀。
JavaPairRDD<Long, Long> removedRandomPrefixRdd = localAggrRdd.mapToPair(
new PairFunction<Tuple2<String,Long>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
throws Exception {
long originalKey = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Long>(originalKey, tuple._2);
}
});
// 第四步,对去除了随机前缀的RDD进行全局聚合。
JavaPairRDD<Long, Long> globalAggrRdd = removedRandomPrefixRdd.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
System.out.println("*********************************************");
System.out.println(globalAggrRdd.first());
}
复制代码
}数组
package skewTuring;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
/**
* 方案适用场景:若是出现数据倾斜,是由于其中某一个RDD/Hive表中的少数几个key的数据量过大,而另外一个RDD/Hive表中的全部key都分布比较均匀
* 采样倾斜key并分拆join操做
*/
public class SkewTuring22 {
public static void main(String[] args) throws Exception{
// 构建Spark上下文
SparkConf conf = new SparkConf().setAppName("SkewTuring11");
conf.setMaster("local[8]");
JavaSparkContext sc = new JavaSparkContext(conf);
//主源数据--少许倾斜key
//0 20111230000005 57375476989eea12893c0c3811607bcf 奇艺高清 1 1 http://www.qiyi.com/
JavaRDD<String> sourceRdd = sc.textFile("hdfs://master:9000/skewtestdata/source_index");
JavaPairRDD<Long, String> mapdSourceRdd= sourceRdd.mapToPair(new PairFunction<String,Long,String>() {
@Override
public Tuple2<Long,String> call(String s) throws Exception {
String[] splits = s.split("\t");
Long key = Long.valueOf(splits[0]);
String value = splits[6];
if(key > 10000) {
return new Tuple2<Long,String>(10001L, value);
} else {
return new Tuple2<Long,String>(key, value);
}
}
});
//副源数据 -均匀key
JavaPairRDD<Long,String> mapdSourceRdd2 = sourceRdd.mapToPair(new PairFunction<String,Long,String>() {
@Override
public Tuple2<Long,String> call(String s) throws Exception {
String[] splits = s.split("\t");
Long key = Long.valueOf(splits[0]);
String value = splits[6];
return new Tuple2<Long,String>(key, value);
}
});
//首先从包含了少数几个致使数据倾斜key的randomPrefixRdd中,采样10%的样本数据。
JavaPairRDD<Long,String> sampledRDD = mapdSourceRdd.sample(false, 0.1);
System.out.println(" 随机采样:"+sampledRDD.first());
// 对样本数据RDD统计出每一个key的出现次数,并按出现次数降序排序。
// 对降序排序后的数据,取出top 1或者top 100的数据,也就是key最多的前n个数据。
// 具体取出多少个数据量最多的key,由你们本身决定,咱们这里就取1个做为示范。
JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(
new PairFunction<Tuple2<Long,String>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<Long, String> tuple)
throws Exception {
return new Tuple2<Long, Long>(tuple._1, 1L);
}
});
JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(
new Function2<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Long call(Long v1, Long v2) throws Exception {
return v1 + v2;
}
});
JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair(
new PairFunction<Tuple2<Long,Long>, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)
throws Exception {
return new Tuple2<Long, Long>(tuple._2, tuple._1);
}
});
final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2;
System.out.println("数据倾斜id"+skewedUserid);
/**
* 主源数据 过滤倾斜key 造成独立的RDD
*/
JavaPairRDD<Long, String> skewedRDD = mapdSourceRdd.filter(
new Function<Tuple2<Long,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return tuple._1.equals(skewedUserid);
}
});
System.out.println("主源数据 倾斜数据 rdd:"+ skewedRDD.take(100));
// 从mapdSourceRdd中分拆出不致使数据倾斜的普通key,造成独立的RDD。
JavaPairRDD<Long, String> commonRDD = mapdSourceRdd.filter(
new Function<Tuple2<Long,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return !tuple._1.equals(skewedUserid);
}
});
System.out.println("主源数据 常规数据 rdd:"+ commonRDD.take(100));
/**
* sourceRdd2 副源数据 过滤倾斜数据 随机扩容N倍
*/
// rdd2,就是那个全部key的分布相对较为均匀的rdd。
// 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。
// 对扩容的每条数据,都打上0~100的前缀。
JavaPairRDD<String, String> skewedRandomRDD2 = mapdSourceRdd2.filter(
new Function<Tuple2<Long,String>, Boolean>() {
private static final long serialVersionUID = 1L;
@Override
public Boolean call(Tuple2<Long, String> tuple) throws Exception {
return tuple._1.equals(skewedUserid);
}
}).flatMapToPair(new PairFlatMapFunction<Tuple2<Long, String>, String, String>() {
@Override
public Iterator<Tuple2<String, String>> call(Tuple2<Long, String> tuple) throws Exception {
List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();
for (int i = 0; i < 10; i++) {
list.add(new Tuple2<String, String>(i + "_" + tuple._1, tuple._2));
}
return list.iterator();
}
});
System.out.println("副源数据 扩容表处理:" + skewedRandomRDD2.take(100));
/**
* 主源倾斜数据 key+随机数
*/
// 将rdd1中分拆出来的致使倾斜的key的独立rdd,每条数据都打上100之内的随机前缀。
// 而后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。
final JavaPairRDD<String, String> skewedRandomRDD = skewedRDD.mapToPair(new PairFunction<Tuple2<Long, String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Tuple2<Long, String> tuple)
throws Exception {
Random random = new Random();
int prefix = random.nextInt(10);
return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
}
});
System.out.println("主源数据 随机数处理:" + skewedRandomRDD.take(100));
JavaPairRDD<Long, Tuple2<String, String>> joinedRDD1 = skewedRandomRDD
.join(skewedRandomRDD2)
.mapToPair(new PairFunction<Tuple2<String,Tuple2<String,String>>, Long, Tuple2<String, String>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<Long, Tuple2<String, String>> call(Tuple2<String, Tuple2<String, String>> tuple) throws Exception {
long key = Long.valueOf(tuple._1.split("_")[1]);
return new Tuple2<Long, Tuple2<String, String>>(key, tuple._2);
}
});
System.out.println("主 副源数据 倾斜数据 join 处理:" + joinedRDD1.take(100));
// 将rdd1中分拆出来的包含普通key的独立rdd,直接与rdd2进行join。
JavaPairRDD<Long, Tuple2<String, String>> joinedRDD2 = commonRDD.join(mapdSourceRdd2);
System.out.println("主 副源数据 常规数据 join 处理:" + joinedRDD2.take(100));
// 将倾斜key join后的结果与普通key join后的结果,uinon起来。
// 就是最终的join结果。
JavaPairRDD<Long, Tuple2<String, String>> resultRDD = joinedRDD1.union(joinedRDD2);
System.out.println("最终join结果:"+ resultRDD.sample(false, 0.1).take(100));
}
}
复制代码
Join一侧:若是出现数据倾斜的Key比较多,上一种方法将这些大量的倾斜Key分拆出来,意义不大。此时更适合直接对存在数据倾斜的数据集所有加上随机前缀app
Join另一侧:对另一个不存在严重数据倾斜的数据集总体与随机前缀集做笛卡尔乘积(即将数据量扩大N倍)。dom
package skewTuring;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
/**
* 方案适用场景:若是在进行join操做时,RDD中有大量的key致使数据倾斜
* 使用随机前缀和扩容RDD进行join
*/
public class SkewTuring33 {
public static void main(String[] args) throws Exception{
// 构建Spark上下文
SparkConf conf = new SparkConf().setAppName("SkewTuring11");
conf.setMaster("local[8]");
JavaSparkContext sc = new JavaSparkContext(conf);
//主源数据1--存在大量倾斜key
JavaRDD<String> sourceRdd = sc.textFile("hdfs://master:9000/skewtestdata/source_index");
//主源数据--大量倾斜key
//0 20111230000005 57375476989eea12893c0c3811607bcf 奇艺高清 1 1 http://www.qiyi.com/
JavaRDD<String> sourceRdd1 = sc.textFile("hdfs://master:9000/skewtestdata/source_index");
JavaPairRDD<Long, String> mapdSourceRdd= sourceRdd.mapToPair(new PairFunction<String,Long,String>() {
@Override
public Tuple2<Long,String> call(String s) throws Exception {
String[] splits = s.split("\t");
Long key = Long.valueOf(splits[0]);
String value = splits[6];
if(key > 10000) {
return new Tuple2<Long,String>(10001L, value);
} else {
return new Tuple2<Long,String>(key, value);
}
}
});
//副源数据2--均匀key
JavaPairRDD<Long, String> mapdSourceRdd2 = sourceRdd.mapToPair(new PairFunction<String,Long,String>() {
@Override
public Tuple2<Long,String> call(String s) throws Exception {
String[] splits = s.split("\t");
Long key = Long.valueOf(splits[0]);
String value = splits[6];
return new Tuple2<Long,String>(key, value);
}
});
/**
* 主源倾斜数据 key+随机数
*/
// 将rdd1中分拆出来的致使倾斜的key的独立rdd,每条数据都打上100之内的随机前缀。
// 而后将这个rdd1中分拆出来的独立rdd,与上面rdd2中分拆出来的独立rdd,进行join。
final JavaPairRDD<String, String> skewedRandomRDD = mapdSourceRdd.mapToPair(new PairFunction<Tuple2<Long, String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(Tuple2<Long, String> tuple) throws Exception {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);
}
});
System.out.println("主源数据 倾斜数据 rdd:"+ skewedRandomRDD.take(100));
/**
* sourceRdd2 均匀key 扩容N倍
*/
// rdd2,就是那个全部key的分布相对较为均匀的rdd。
// 这里将rdd2中,前面获取到的key对应的数据,过滤出来,分拆成单独的rdd,并对rdd中的数据使用flatMap算子都扩容100倍。
// 对扩容的每条数据,都打上0~100的前缀。
JavaPairRDD<String, String> expandedRDD = mapdSourceRdd2.flatMapToPair(new PairFlatMapFunction<Tuple2<Long, String>, String, String>() {
@Override
public Iterator<Tuple2<String, String>> call(Tuple2<Long, String> tuple) throws Exception {
List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();
for (int i = 0; i < 100; i++) {
list.add(new Tuple2<String, String>(i + "_" + tuple._1, tuple._2));
}
return list.iterator();
}
});
System.out.println("副源数据 扩容表处理 :" + expandedRDD.take(100));
// 将两个处理后的RDD进行join便可。
JavaPairRDD<String, Tuple2<String, String>> joinedRDD = skewedRandomRDD.join(expandedRDD);
System.out.println("最终join结果:"+ joinedRDD.take(100));
}
}
复制代码
本文主要从数据倾斜的角度进行了分析,经过实际的案例测试进行了总结和升华,一片成文的博客实属不易,但愿各自珍惜!!ide
秦凯新 于深圳post