病人记录spark建立完整代码

相似于spark statCounter类的东西,处理缺失值java

import org.apache.spark.util.StatCounter

class NAStatCounter extends Serializable {
  val stats:StatCounter =new StatCounter()
  var missing:Long=0
  def add(x:Double):NAStatCounter={
    if(java.lang.Double.isNaN(x)){
      missing += 1
    }else{
      stats.merge(x)
    }
    this
  }
  def merge(other:NAStatCounter):NAStatCounter={
    stats.merge(other.stats)
    missing+=other.missing
    this
  }

  override def toString: String = {
    "stats:"+stats.toString()+" NaN "+missing
  }
}
object NAStatCounter extends Serializable{
  def apply(x:Double) = new NAStatCounter().add(x)
}

编译:数据库

调用以前的方法parse:apache

val nas1=Array(1.0,Double.NaN).map(d=>NAStatCounter(d))
val nas2=Array(Double.NaN,2.0).map(d=>NAStatCounter(d))
//经过拉链进行聚合
val mergerd = nas1.zip(nas2).map(p=>p._1.merge(p._2))

集合之间作reduce数组

val nas=List(nas1,nas2)
val mergerd=mas.reduce((n1,n2)=>{
 n1.zip(n2).map{
  case (a,b)=>a.merge(b)
 }
})
/*
与map方法相似,map是对rdd中的每个元素进行操做,而mapPartitions(foreachPartition)则是对rdd中的每一个分区的迭代器进行操做。若是在map过程当中须要频繁建立额外的对象(例如将rdd中的数据经过jdbc写入数据库,map须要为每一个元素建立一个连接而mapPartition为每一个partition建立一个连接),则mapPartitions效率比map高的多。
*/
def statswith(rdd:RDD[Array[Double]]):Array[NAStatCounter]={
    val nastats = rdd.mapPartitions((iter:Iterator[Array[Double]])=>{
      val nas=Array[NAStatCounter]=iter.next().map(d=>NAStatCounter(d))
      iter.foreach(arr=>{
        nas.zip(arr).foreach{case (n,d) => n.add(d)}
      })
      Iterator(nas)
    })
}

变量的选择和评分简介app

val statsm = statswith(parsed.filter(_.matched).map(_.scores))
val statsn = statswith(parsed.filter(!_.matched).map(_.scores))
/*
statsm和statsn这两个数组结构相同,但对应不一样的数据子集:statsm包含匹配记录匹配分值数组的概要统计信息,而statsn对应不匹配记录分值数组的概要统计信息。对匹配和不匹配记录列的值作简单差别分析
*/
statsm.zip(statsn).map{
 case(m,n)=>{
  (m.missing+n.missing,m.stats.mean - n.stats.mean)
 }.foreach(println)
}

定义一个简单的评分模型:ide

def naz(d:Double) = if (Double.NaN.equals(d)) 0.0 else d
  case class Scored(md:MatchData,score:Double)
  val ct = parsed.map(md=>{
    val score=Array(2,5,6,7,8).map(i=>naz(md.scores(i))).sum
    Scored(md,score)
  })

过滤一个阈值 4.0this

ct.filter(s=>s.score>=4.0).map(s=>s.md.matched).countByValue()
相关文章
相关标签/搜索