一、业务需求
在拥有手机号在每一个基站处停留时间日志 和 基站信息的 算出某个手机号的(所在基站,停留时间),(当前所在经度,当前所在纬度)apache
其中手机链接基站产生的日志信息相似以下:网络
18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1 18611132889,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1 18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0 18611132889,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0
上面的含义表示的是:手机号,时间,基站ID,接入网络的类型(0:unknow,1:3G,2:2G,6:4G)spa
基站信息:日志
9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6 CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6 16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6
上面的含义表示的是:基站ID,经度,纬度,接入网络的类型(0:unknow,1:3G,2:2G,6:4G)code
编写Scale代码:it
package com.Hive import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object FD { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("FD").setMaster("local[2]") val sc = new SparkContext(conf) //1.读取数据文件 val user =sc.textFile("src/main/data/log/")//用户数据 val base = sc.textFile("src/main/data/base_info.txt")//基站数据 //2.数据清洗工做,数据维度提取 // 用户数据清洗 val splited = user.map(line =>{ val fields = line.split(",") val phone = fields(0) val base = fields(2) val envet = fields(3).toInt val time = { if (envet == 1){ -fields(1).toLong//赋值- }else{ fields(1).toLong//正值+ } } ((phone,base),time) }) // splited.collect().foreach(println(_)) // 基站数据清洗 val alcsplited = base.map(line =>{ val fields = line.split(",") val id = fields(0) val x = fields(1) val y = fields(2) (id,(x,y)) }) // splited.collect().foreach(println(_)) //3.统计每一个用户在每一个基站中停留的时间 val reducted = splited.reduceByKey(_+_) // reducted.collect().foreach(println(_)) //((phone,base),time) val pmt = reducted.map(x=>{ //(基站ID,(手机号,时间)) //x._1对应的是元组((mobile,lac),time)中的(mobile,lac) //x._2对应的是元组((mobile,lac),time)中的time ((x._1._2),(x._1._1,x._2)) }) //链接join 以后的结果[(基站ID,((手机号,时间),(经度,纬度)))] val joined:RDD[(String, ((String, Long), (String, String)))] = pmt.join(alcsplited) //按照手机号进行分组 //_. :表明的是基站 手机号,时间,经度,纬度 //_._2 :表明的是 手机号,时间 经度,纬度 //_._2_1 :表明的是 手机号,时间 //_._2._1._ :表明的是 手机号 val MobileGroupBykey = joined.groupBy(_._2._1._1) val result = MobileGroupBykey.mapValues(_.toList.sortBy(_._2._1._2).reverse.take(2)) println(result.collect().toBuffer) sc.stop() } }