源码层面整理下咱们经常使用的操做RDD数据处理与分析的函数,从而能更好的应用于工做中。apache
链接Hbase,读取hbase的过程,首先代码以下:函数
def tableInitByTime(sc : SparkContext,tableName : String,columns : String,fromdate: Date,todate : Date) : RDD[(ImmutableBytesWritable,Result)] = { val configuration = HBaseConfiguration.create() configuration.addResource("hbase-site.xml ") configuration.set(TableInputFormat.INPUT_TABLE,tableName ) val scan = new Scan //scan.setTimeRange(fromdate.getTime,todate.getTime) val column = columns.split(",") for(columnName <- column){ scan.addColumn("f1".getBytes(),columnName.getBytes()) } val hbaseRDD = sc.newAPIHadoopRDD(configuration,classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result]) System.out.println(hbaseRDD.count()) hbaseRDD }
咱们来一点一点解析整个过程。oop
一、val configuration = HBaseConfiguration.create()ui
这个用过hbase的伙伴们都知道,加载配置文件,其实调用的是HBase的API,返回的RDD是个Configuration。加载的配置文件信息包含core-default.xml,core-site.xml,mapred-default.xml等。加载源码以下:spa
二、随之设置表名信息,并声明scan对象,而且set读取的列有哪些,随后调用newAPIHadoopRDD,加载指定hbase的数据,固然你能够加上各类filter。那么下来 咱们看看newAPIHadoopRDD是干了什么呢?咱们来阅读下里面的实现。code
能够看到咱们调用API,其实就是一个input过程,建立了一个newHadoopRDD对象,那么后台是一个input数据随后转化为RDD的过程。节点之间的数据传输是经过序列化数据,经过broadCast传输的conf信息。orm
三、随之进行count验证操做,查找数据的partition个数,hbase的数据固然是以block块的形式存储于HDFS。xml
四、下来开始map遍历,取出以前咱们设置的字段,存入新的transRDD中,那么这个map函数干了什么呢?它实际上是将原RDD所作的操做组织成一个function,建立一个MapPartitionsRDD。对象
五、下来咱们看下filter函数干了什么呢?blog
val calculateRDD = transRDD.filter(_._1 != null).filter(_._2 != null).filter(_._3 != null).filter(_._4 !=null) //map转换为字段((身份证号,经度(保留两位小数),纬度(保留两位小数),电话号码,时间段标志),1),最后的1表明出现一次,用于后边作累加 .map(data => { val locsp = data._2.split(",").take(2) val df = new DecimalFormat("######0.000") val hour = data._4.split(":")(0).toInt val datarange = if(hour >= 9 && hour <= 18) 1 else 0 ((data._1,df.format(locsp(0).toDouble),df.format(locsp(1).toDouble),data._3,datarange),1) })
这里的filter是进行为空判断,咱们从源码中能够看到传入的是一个布尔类型的变量,与map相同经过MapPartitionsRDD进行function的条件过滤,那么也就是说,其实咱们能够在map中直接提取咱们须要的数据,或者用filter进行为空过滤,条件过滤。
六、随后咱们要进行相同key值的合并,那么,咱们开始使用reduceByKey:
//按key作reduce,value作累加 .reduceByKey(_ + _)
底层调用了combineByKeyWithClassTag,这里的Partitioner参数咱们之因此没有传入,是由于在map的RDD中已包含该RDD的partitioner的信息。它内部的实现将map的结果调用了require先进行merge,随后建立shuffleRDD.shuffleRDD就是最终reduce后的RDD。而后看不懂了。。。由于须要与整个流程相结合。因此后续继续深刻~