嫌Hive导数据到Hbase太慢?试试buckloader吧

1

前言javascript


   若是咱们离线计算好的hive数据须要同步到hbase中,你们会用什么方法呢?java

     若是是明细数据,上千万乃至上亿行的数据,导入到hbase中确定是须要考虑效率问题的mysql


     若是是直接使用hbase客户端的API进行数据插入,效率是很是低的
web


     因此咱们选择了bulkloader工具进行操做(原理:利用hbase以外的计算引擎将源数据加工成hbase的底层文件格式:Hfile,而后通知hbase导入便可)面试


测试数据redis

CREATE TABLE wedw_dw.t_user_order_info(                                   user_id string                                     ,user_name string  ,order_id  string  ,order_amt decimal(16,2))ROW FORMAT SERDE                                        'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'WITH SERDEPROPERTIES (                                  'field.delim'=',',                                    'serialization.format'=',')         ;+----------+------------+-----------+------------+--+| user_id  | user_name  | order_id  | order_amt  |+----------+------------+-----------+------------+--+| 1        | 小红         | 001       | 100.32     || 2        | 小明         | 002       | 34.76      || 3        | 小花         | 003       | 39.88      || 4        | 小牛         | 004       | 22.22      || 5        | 小刘         | 005       | 98765.34   |+----------+------------+-----------+------------+--+# /data/hive/warehouse/wedw/dw/t_user_order_info/



2

 利用hbasse自带程序导入sql

# hbase建表typescript

hbase(main):009:0* create 'user_order_info','user_info','order_info'


# 执行hbase自带的importtsv程序(mapreduce程序),将原始文件转成hfileapache

/usr/local/hadoop-current/bin/yarn jar \/usr/local/hbase-current/lib/hbase-server-1.2.0-cdh5.8.2.jar  \importtsv -Dimporttsv.columns=HBASE_ROW_KEY,user_info:user_name,order_info:order_id,order_info:order_amt \'-Dimporttsv.separator=,' \-Dmapreduce.job.queuename='root.test' \-Dimporttsv.bulk.output=hdfs://cluster/data/hive/output1 user_order_info \hdfs://cluster/data/hive/warehouse/wedw/dw/t_user_order_info


完整参数:ruby

  •   -Dimporttsv.bulk.output=/path/for/output   输出目录

  •   -Dimporttsv.skip.bad.lines=false   是否跳过脏数据行

  •   -Dimporttsv.separator=|'   指定分隔符

  •   -Dimporttsv.timestamp=currentTimeAsLong 是否指定时间戳

  •   -Dimporttsv.mapper.class=my.Mapper  替换默认的Mapper类


# 移动数据到hbase表中

hadoop jar hbase-server-1.2.0-cdh5.8.2.jar completebulkload  hdfs://cluster/data/hive/output1 user_order_info



3


编写代码导入

hbase建表

hbase(main):018:0> create 'user_info','info'
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>com.wedoctor.spark</groupId>    <artifactId>spark-0708</artifactId>    <version>1.0-SNAPSHOT</version>    <properties>        <maven.compiler.source>1.8</maven.compiler.source>        <maven.compiler.target>1.8</maven.compiler.target>        <scala.version>2.11.8</scala.version>        <spark.version>2.2.0</spark.version>        <hadoop.version>2.8.1</hadoop.version>        <encoding>UTF-8</encoding>    </properties>    <dependencies>                <dependency>            <groupId>org.scala-lang</groupId>            <artifactId>scala-library</artifactId>            <version>${scala.version}</version>        </dependency>                <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-core_2.11</artifactId>            <version>${spark.version}</version>        </dependency>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-hive_2.11</artifactId>            <version>${spark.version}</version>        </dependency>        <dependency>            <groupId>log4j</groupId>            <artifactId>log4j</artifactId>            <version>1.2.17</version>        </dependency>                <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-sql_2.11</artifactId>            <version>${spark.version}</version>        </dependency>        <dependency>            <groupId>mysql</groupId>            <artifactId>mysql-connector-java</artifactId>            <version>5.1.41</version>        </dependency>                <dependency>            <groupId>com.typesafe</groupId>            <artifactId>config</artifactId>            <version>1.3.0</version>        </dependency>                <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-streaming_2.11</artifactId>            <version>${spark.version}</version>        </dependency>                <dependency>            <groupId>redis.clients</groupId>            <artifactId>jedis</artifactId>            <version>2.8.1</version>        </dependency>        <dependency>            <groupId>org.scalikejdbc</groupId>            <artifactId>scalikejdbc_2.11</artifactId>            <version>2.5.0</version>        </dependency>                <dependency>            <groupId>org.scalikejdbc</groupId>            <artifactId>scalikejdbc-config_2.11</artifactId>            <version>2.5.0</version>        </dependency>                <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>            <version>${spark.version}</version>        </dependency>                <dependency>            <groupId>org.apache.kafka</groupId>            <artifactId>kafka-clients</artifactId>            <version>0.10.2.1</version>        </dependency>        <dependency>            <groupId>org.apache.hbase</groupId>            <artifactId>hbase-server</artifactId>            <version>1.2.0-cdh5.8.2</version>        </dependency>        <dependency>            <groupId>org.apache.hbase</groupId>            <artifactId>hbase-client</artifactId>            <version>1.2.0-cdh5.8.2</version>            <exclusions>                <exclusion>                    <groupId>org.apache.httpcomponents</groupId>                    <artifactId>httpclient</artifactId>                </exclusion>                <exclusion>                    <groupId>org.apache.httpcomponents</groupId>                    <artifactId>httpcore</artifactId>                </exclusion>            </exclusions>        </dependency>    </dependencies>    <repositories>        <repository>            <id>cloudera</id>            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>        </repository>    </repositories>    <build>        <plugins>                      <plugin>                <groupId>org.apache.maven.plugins</groupId>                <artifactId>maven-compiler-plugin</artifactId>                <version>3.5.1</version>            </plugin>                        <plugin>                <groupId>net.alchim31.maven</groupId>                <artifactId>scala-maven-plugin</artifactId>                <version>3.2.2</version>                <executions>                    <execution>                        <goals>                            <goal>compile</goal>                            <goal>testCompile</goal>                        </goals>                        <configuration>                            <args>                                <arg>-dependencyfile</arg>                                <arg>${project.build.directory}/.scala_dependencies</arg>                            </args>                        </configuration>                    </execution>                </executions>            </plugin>        </plugins>    </build></project>


spark程序编写

package com.hbase.bulkloaderimport org.apache.hadoop.fs.{Path}import org.apache.hadoop.hbase.client.ConnectionFactoryimport org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}import org.apache.hadoop.hbase.util.Bytesimport org.apache.hadoop.mapreduce.Jobimport org.apache.spark.rdd.RDDimport org.apache.spark.sql.{DataFrame, SparkSession}object BulkLoader {  //Logger.getLogger("org").setLevel(Level.ERROR)  def main(args: Array[String]): Unit = {    System.setProperty("HADOOP_USER_NAME", "pgxl")    val spark: SparkSession = SparkSession.builder()      .master("local[*]")      .config("hive.metastore.uris""thrift://10.11.3.44:9999")      .appName("bulkloaderTest")      .enableHiveSupport()      .getOrCreate()    val re: DataFrame = spark.sql("select * from wedw_dw.t_user_order_info")    val dataRdd: RDD[(String, (String, String, String))] = re.rdd.flatMap(row => {      val rowkey: String = row.getAs[String]("user_id").toString      Array(        (rowkey, ("info", "user_id", row.getAs[String]("user_id"))),        (rowkey, ("info", "user_name", row.getAs[String]("user_name"))),        (rowkey, ("info", "order_id", row.getAs[String]("order_id"))),        (rowkey, ("info", "order_amt", row.get(3).toString))      )    })    val output = dataRdd.filter(x=>x._1 != null).sortBy(x=>(x._1,x._2._1,x._2._2)).map {      x => {        val rowKey = Bytes.toBytes(x._1)        val immutableRowKey = new ImmutableBytesWritable(rowKey)        val colFam = x._2._1        val colName = x._2._2        val colValue = x._2._3        val kv = new KeyValue(          rowKey,          Bytes.toBytes(colFam),          Bytes.toBytes(colName),          Bytes.toBytes(colValue.toString)        )        (immutableRowKey, kv)      }    }    val conf = HBaseConfiguration.create()    conf.set("fs.defaultFS", "hdfs://cluster")    conf.set("hbase.zookeeper.quorum""10.11.3.43")    val job = Job.getInstance(conf)    val conn = ConnectionFactory.createConnection(conf)    val table = conn.getTable(TableName.valueOf("user_info"))    val locator = conn.getRegionLocator(TableName.valueOf("user_info"))    // 将咱们本身的数据保存为HFile    HFileOutputFormat2.configureIncrementalLoad(job, table, locator)    output.saveAsNewAPIHadoopFile("/data/hive/test/", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)    // 构造一个导入hfile的工具类    new LoadIncrementalHFiles(job.getConfiguration).doBulkLoad(new Path("/data/hive/test/"),conn.getAdmin,table,locator)    conn.close()    spark.close()  }}


hbase表结果:


2020大数据面试题真题总结(附答案)

微信交流群

多值维度及交叉维度最佳解决方案

深刻探究order by,sort by,distribute by,cluster by

Hive调优,数据工程师成神之路

数据质量那点事

简述元数据管理

你真的了解全量表,增量表及拉链表吗?

缓慢变化维(SCD)常看法决方案

全方位解读星型模型,雪花模型及星座模型

Sqoop or Datax

left join(on&where)

ID-Mapping

大家公司还在用SparkOnYan吗?

大厂高频面试题-连续登陆问题

朋友面试数据研发岗遇到的面试题

数据仓库分层架构

clickhouse实践篇-SQL语法

clickhouse实践篇-表引擎

简单聊一聊大数据学习之路

朋友面试数据专家岗遇到的面试题

HADOOP快速入门

数仓工程师的利器-HIVE详解

OLAP引擎—Kylin介绍

Hbase从入门到入坑

Kafka

Datax-数据抽取同步利器

Spark数据倾斜解决方案

Spark统一内存管理机制



本文分享自微信公众号 - 大数据私房菜(datagogogo)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索