相似于spark statCounter类的东西,处理缺失值java
import org.apache.spark.util.StatCounter class NAStatCounter extends Serializable { val stats:StatCounter =new StatCounter() var missing:Long=0 def add(x:Double):NAStatCounter={ if(java.lang.Double.isNaN(x)){ missing += 1 }else{ stats.merge(x) } this } def merge(other:NAStatCounter):NAStatCounter={ stats.merge(other.stats) missing+=other.missing this } override def toString: String = { "stats:"+stats.toString()+" NaN "+missing } } object NAStatCounter extends Serializable{ def apply(x:Double) = new NAStatCounter().add(x) }
编译:数据库
调用以前的方法parse:apache
val nas1=Array(1.0,Double.NaN).map(d=>NAStatCounter(d)) val nas2=Array(Double.NaN,2.0).map(d=>NAStatCounter(d)) //经过拉链进行聚合 val mergerd = nas1.zip(nas2).map(p=>p._1.merge(p._2))
集合之间作reduce数组
val nas=List(nas1,nas2) val mergerd=mas.reduce((n1,n2)=>{ n1.zip(n2).map{ case (a,b)=>a.merge(b) } })
/* 与map方法相似,map是对rdd中的每个元素进行操做,而mapPartitions(foreachPartition)则是对rdd中的每一个分区的迭代器进行操做。若是在map过程当中须要频繁建立额外的对象(例如将rdd中的数据经过jdbc写入数据库,map须要为每一个元素建立一个连接而mapPartition为每一个partition建立一个连接),则mapPartitions效率比map高的多。 */ def statswith(rdd:RDD[Array[Double]]):Array[NAStatCounter]={ val nastats = rdd.mapPartitions((iter:Iterator[Array[Double]])=>{ val nas=Array[NAStatCounter]=iter.next().map(d=>NAStatCounter(d)) iter.foreach(arr=>{ nas.zip(arr).foreach{case (n,d) => n.add(d)} }) Iterator(nas) }) }
变量的选择和评分简介app
val statsm = statswith(parsed.filter(_.matched).map(_.scores)) val statsn = statswith(parsed.filter(!_.matched).map(_.scores)) /* statsm和statsn这两个数组结构相同,但对应不一样的数据子集:statsm包含匹配记录匹配分值数组的概要统计信息,而statsn对应不匹配记录分值数组的概要统计信息。对匹配和不匹配记录列的值作简单差别分析 */ statsm.zip(statsn).map{ case(m,n)=>{ (m.missing+n.missing,m.stats.mean - n.stats.mean) }.foreach(println) }
定义一个简单的评分模型:ide
def naz(d:Double) = if (Double.NaN.equals(d)) 0.0 else d case class Scored(md:MatchData,score:Double) val ct = parsed.map(md=>{ val score=Array(2,5,6,7,8).map(i=>naz(md.scores(i))).sum Scored(md,score) })
过滤一个阈值 4.0this
ct.filter(s=>s.score>=4.0).map(s=>s.md.matched).countByValue()