//39932,40902,1,?,1,?,1,1,1,1,1,TRUE /* 前两个字段是整数型ID,表明记录中匹配的两个病人; 后面9个值,表明病人记录中不一样字段(姓名,生日,地址)的匹配值 最后一个字段:布尔。表明该行病人记录是否匹配。 咱们用‘,’切割一下 */ val p = head(5).split(',') //p: Array[String] = Array(36950, 42116, 1, ?, 1, 1, 1, 1, 1, 1, 1, TRUE) /*隐士类型转换:当调用scala对象方法时,若是定义该对象的类型中找不到方法定义,Scala编译器就将该对象转换成响应的方法定义的类的实例*/ val id1 = p(0).toInt
须要对9个字段值进行转换,能够先用Scala Array 类的slice方法提取一部分元素,而后调用map函数,将slice中每一个元素的类型从String转成Doublejava
val raws=p.slice(2,11) //raws: Array[String] = Array(1, ?, 1, 1, 1, 1, 1, 1, 1) raws.map(s=>s.toDouble) /* 出错,主要是由于遇到了?,因此咱们写一个函数来对它进行处理 java.lang.NumberFormatException: For input string: "?" */ def toDouble(s:String)={ if("?".equals(s)) Double.NaN else s.toDouble } //toDouble: (s: String)Double val sorce = raws.map(toDouble) //sorce: Array[Double] = Array(1.0, NaN, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0)
结合全部,咱们写一个方法,将以前的总结起来缓存
def parse(line:String)={ val pieces = line.split(',') val id1 = pieces(0).toInt val id2 = pieces(1).toInt val scores = pieces.slice(2,11).map(toDouble) val matched = pieces(11).toBoolean (id1,id2,scores,matched) } val tup = parse(line)
咱们建立一个case class,方便取值:函数
case class MatchData(id1:Int,id2:Int,scores:Array[Double],matched:Boolean)
而后以后返回的时候,就不是元祖类型,而是MatchData类型;spa
def parse(line:String)={ val pieces = line.split(',') val id1 = pieces(0).toInt val id2 = pieces(1).toInt val scores = pieces.slice(2,11).map(toDouble) val matched = pieces(11).toBoolean MatchData(id1,id2,scores,matched) } val tup = parse(line) //这块的tup就已是MatchData类型 //能够直接用过tup.id1 拿值了
接下来咱们就能够调用函数。scala
val mds=head.filter(x=>!isHeader(x)).map(x=>parse(x)) //解析集群数据,再noheader上调用map函数: val parsed = noheader.map(line=>parse(line)) //若是须要缓存,能够直接使用缓存,spark有本身的缓存机制 parsed.cache()
可是数据不必定在一台机器上,因此咱们须要聚合,对其聚合时,数据传输的效率确定是担忧的一个问题。code
val group = mds.groupBy(md=>md.matched) /* 获得grouped变量中的值之后,就能够经过在grouped上调用mapValues方法获得计数。 */ group.mapValues(x=>x.size).foreach(println)
建立直方图orm
连续变量的概要统计,如如下代码:对象
val stats =(0 until 9).map(i=>{ parsed.map(md=>md.scores(i)).filter(!isNaN(_)).stats() })