环境:Spark-1.5.0 HBase-1.0.0。java
场景:HBase中按天分表存数据,要求将任意时间段的数据合并成一个RDD以作后续计算。apache
尝试1: 寻找一次读取多个表的API,找到最接近的是一个叫MultiTableInputFormat的东西,它在MapReduce中使用良好,oop
但没有找到用于RDD读HBase的方法。spa
尝试2: 每一个表生成一个RDD,再用union合并,代码逻辑以下:命令行
var totalRDD = xxx // 读取第一张表 for { // 循环读表并合并到totalRDD val sRDD = xxx
totalRDD.union(sRDD) }
代码放到集群上执行,totalRDD并非正确的union结果,用var还真是不行。scala
尝试3: 思路相似2,但使用SparkContext.union来一次合并多个RDD,代码逻辑以下:日志
var rddSet: xxx = Set() // 建立RDD列表 dateSet.foreach(date => { // 将全部表的RDD放入列表中 val sRDD = xxx rddSet += sRDD } val totalRDD = sc.union(rddSet.toSeq) // 合并列表中的全部RDD
完整代码以下:code
import java.text.SimpleDateFormat import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark.rdd.RDD import org.apache.spark.{SparkContext, SparkConf} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapreduce.TableInputFormat import scala.collection.mutable.Set /** * 时间处理类 */ object Htime { /** * 根据起止日期获取日期列表 * 例如起止时间为20160118,20160120,那么日期列表为(20160118,20160119,20160120) * * @param sDate 开始日期 * @param eDate 结束日期 * @return 日期列表 */ def getDateSet(sDate:String, eDate:String): Set[String] = { // 定义要生成的日期列表 var dateSet: Set[String] = Set() // 定义日期格式 val sdf = new SimpleDateFormat("yyyyMMdd") // 按照上边定义的日期格式将起止时间转化成毫秒数 val sDate_ms = sdf.parse(sDate).getTime val eDate_ms = sdf.parse(eDate).getTime // 计算一天的毫秒数用于后续迭代 val day_ms = 24*60*60*1000 // 循环生成日期列表 var tm = sDate_ms while (tm <= eDate_ms) { val dateStr = sdf.format(tm) dateSet += dateStr tm = tm + day_ms } // 日期列表做为返回 dateSet } } /** * 从HBase中读取行为数据计算人群分类 */ object Classify { /** * @param args 命令行参数,第一个参数为行为数据开始日期,第二个为结束日期,例如20160118 */ def main(args: Array[String]) { // 命令行参数个数必须为2 if (args.length != 2) { System.err.println("参数个数错误") System.err.println("Usage: Classify <开始日期> <结束日期>") System.exit(1) } // 获取命令行参数中的行为数据起止日期 val startDate = args(0) val endDate = args(1) // 根据起止日志获取日期列表 // 例如起止时间为20160118,20160120,那么日期列表为(20160118,20160119,20160120) val dateSet = Htime.getDateSet(startDate, endDate) // Spark上下文 val sparkConf = new SparkConf().setAppName("Classify") val sc = new SparkContext(sparkConf) // 初始化HBase配置 val conf = HBaseConfiguration.create() // 按照日期列表读出多个RDD存在一个Set中,再用SparkContext.union()合并成一个RDD var rddSet: Set[RDD[(ImmutableBytesWritable, Result)] ] = Set() dateSet.foreach(date => { conf.set(TableInputFormat.INPUT_TABLE, "behaviour_test_" + date) // 设置表名 val bRdd: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) rddSet += bRdd }) val behavRdd = sc.union(rddSet.toSeq) behavRdd.collect().foreach(println) } }