kudu 学习知识点总结(二)

1、KUDU分区数必须预先预约 
    2、在内存中对每一个Tablet分区维护一个MemRowSet来管理最新更新的数据,默认是1G刷新一次或者是2分钟。后Flush到磁盘上造成DiskRowSet,
       多个DiskRowSet在适当的时候进行归并处理 
    3、和HBase采用的LSM(LogStructured Merge,很难对数据进行特殊编码,因此处理效率不高)方案不一样的是,Kudu对同一行的数据更新记录的合并工做,
       不是在查询的时候发生的(HBase会将多条更新记录前后Flush到不一样的Storefile中,因此读取时须要扫描多个文件,比较rowkey,比较版本等,而后进行更新操做),
       而是在更新的时候进行,在Kudu中一行数据只会存在于一个DiskRowSet中,避免读操做时的比较合并工做。
       那Kudu是怎么作到的呢? 对于列式存储的数据文件,要原地变动一行数据是很困难的,因此在Kudu中,对于Flush到磁盘上的DiskRowSet(DRS)数据,
       其实是分两种形式存在的,一种是Base的数据,按列式存储格式存在,一旦生成,就再也不修改,另外一种是Delta文件,存储Base数据中有变动的数据,
       一个Base文件能够对应多个Delta文件,这种方式意味着,插入数据时相比HBase,须要额外走一次检索流程来断定对应主键的数据是否已经存在。
       所以,Kudu是牺牲了写性能来换取读取性能的提高。 
       更新、删除操做须要记录到特殊的数据结构里,保存在内存中的DeltaMemStore或磁盘上的DeltaFIle里面。
       DeltaMemStore是B-Tree实现的,所以速度快,并且可修改。磁盘上的DeltaFIle是二进制的列式的块,和base数据同样都是不可修改的。
       所以当数据频繁删改的时候,磁盘上会有大量的DeltaFiles文件,Kudu借鉴了Hbase的方式,会按期对这些文件进行合并。 
impala操做界面里 能够执行 invalidate metadata; 命令刷新元数据。
 
==================== impala-shell ============================================
impala-shell
 
从Impala建立一个新的Kudu表
Kudu 1.9.0文档:https://kudu.apache.org/releases/1.9.0/docs/
Kudu 1.9.0 Java API文档:https://kudu.apache.org/releases/1.9.0/apidocs/
 
1.Kudu介绍:Kudu集HDFS的顺序读和HBASE的随机读于一身,同时具有高性能的随机写,以及很强大的可用性(单行事务,一致性协议),支持Impala spark计算引擎。
2.何时使用kudu
    大规模数据复杂的实时分析,例如大数据量的join。
    数据有更新
    查询准实时
3.存储
    Kudu的存储是不基于HDFS的,构建集群时,kudu颇有可能和HDFS共同占用物理磁盘或者云磁盘,理想状况是独立空间。
4.表设计
    10+G a tablet 
    10-100 tablets individual node
    在配置32C,64G机器上,跑过1000个tablet状况,能正常写入,但在大量查询状况下,出现tablet time out,rpc满了的状况,
 
5.分区设计
    hash 
    range
 
6.参数
    1、Kudu Tablet Server Maintenance Threads
       参数:maintenance_manager_num_threads
       解释:Kudu后台对数据进行维护操做,如写入数据时的并发线程数,通常设置为4,官网建议的是数据目录的3倍
        Kudu Tablet Server Maintenance Threads 这个参数决定了Kudu后台对数据进行维护操做,如写入数据时的并发线程数。并发数越大,吞吐量越高,
        但对集群计算能力的要求也越高。默认值为1,表示Kudu会采用单线程操做;对于须要大量数据进行快速写入/删除的集群,能够设置更大的值。
        该值能够设置跟计算节点的数据磁盘数量和CPU核数有关,通常来讲,建议设置为4以获取比较均衡的性能,最大不超过8。
 
    2、Kudu Tablet Server Block Cache Capacity Tablet
       参数:block_cache_capacity_mb
        解释:分配给Kudu Tablet Server块缓存的最大内存量,建议是2-4G
        Kudu Tablet Server Block Cache Capacity Tablet的Block buffer cache,根据集群内存配置和数据量规模设置。通常建议至少2GB~4GB。
 
 
    3、Kudu Tablet Server Hard Memory Limit Kudu
       参数:memory_limit_hard_bytes
       解释:Tablet Server能使用的最大内存量,有多大,设置多大,tablet Server在批量写入数据时并不是实时写入磁盘,而是先Cache在内存中,
        在flush到磁盘。这个值设置太小时,会形成Kudu数据写入性能显著降低。对于写入性能要求比较高的集群,建议设置更大的值(通常是机器内存的百分之80)
        Kudu Tablet Server Hard Memory Limit Kudu的Tablet Server能使用的最大内存。Tablet Server在批量写入数据时并不是实时写入磁盘,
        而是先Cache在内存中,在flush到磁盘。这个值设置太小时,会形成Kudu数据写入性能显著降低。对于写入性能要求比较高的集群,建议设置更大的值,好比32GB。
 
 
    4.Maximum Process File Descriptors 
        这个参数决定了Kudu可以同时打开的操做系统文件数。不设置则使用系统的ulimits值,设置后会覆盖系统的设置。
        须要根据集群的规模及并发处理能力,很是谨慎的设置这个值。
 
    5.Default Number of Replicas 
        这个参数设置了每一个Tablet的默认复制因子,默认值为3,表示每一个表的数据会在Kudu中存储3份副本。
        咱们能够根据须要修改这个全局默认值,也能够在建表语句中经过’kudu.num_tablet_replicas’属性来设置每一个表的副本数,
        好比:’kudu.num_tablet_replicas’ = ‘1’。
 
7.建议每一个表50columns左右,不能超过300个
8.hash分区数量 * range分区数量不能超过60个(1.7.0版本以后没限制了)
9.设置block的管理器为文件管理器(默认是日志服务器)
    解释:并不是全部文件系统格式都须要设置该选项。ext四、xfs格式支持hole punching(打孔),因此不须要设置block_manager=file,可是ext3 格式须要。
          能够经过df -Th命令来查看文件系统的格式。
    参数:--block_manager=file
 
10.设置ntp服务器的时间偏差不超过20s(默认是10s)
    参数:max_clock_sync_error_usec=20000000
 
11.设置rpc的链接时长(默认是3s,建议不要设置)
    参数:--rpc_negotiation_timeout_ms=300000
 
12.设置rpc一致性选择的链接时长(默认为1s,建议不要设置)
    参数:--consensus_rpc_timeout_ms=1000
 
13.记录kudu的crash的信息
    解释:
        Kudu在遇到崩溃时,使用Google Breakpad库来生成minidump。这些minidumps的大小一般只有几MB,即便禁用了核心转储生成,也会生成,
        生成minidumps只能在Linux上创建。
        minidump文件包含有关崩溃的进程的重要调试信息,包括加载的共享库及其版本,崩溃时运行的线程列表,处理器寄存器的状态和每一个线程的堆栈内存副本,
        以及CPU和操做系统版本信息。
        Minitump能够经过电子邮件发送给Kudu开发人员或附加到JIRA,以帮助Kudu开发人员调试崩溃。为了使其有用,
        开发人员将须要知道Kudu的确切版本和发生崩溃的操做系统。请注意,虽然minidump不包含堆内存转储,但它确实包含堆栈内存,
        所以能够将应用程序数据显示在minidump中。若是机密或我的信息存储在群集上,请不要共享minidump文件。
    参数:
        --minidump_path=minidumps              
        --max_minidumps=9
        (默认是在设置的log目录下生成minidumps目录,里边包含最多9个以dmp结尾的文件,没法设置为空值,须要注意的是若是自定义minidump文件,
        在master不能启动的状况下,须要将该目录中的文件删除)
 
14.Stack WatchLog
    解释:每一个Kudu服务器进程都有一个称为Stack Watchdog的后台线程,它监视服务器中的其余线程,以防它们被阻塞超过预期的时间段。
          这些跟踪能够指示操做系统问题或瓶颈存储。经过WARN日志信息的跟踪(Trace)能够用于诊断因为Kudu如下的系统(如磁盘控制器或文件系统)引发的根本缘由延迟问题。
 
15.cdh设置多master
    参数:--master_addresses=cdh01:7051,cdh02:7051cdh03:7051
 
 
 
16.kudu出现启动速度特别慢
    解决办法:
        1、取消全部配置参数(除了资源、时间同步)
        2、升级版本到kudu1.6.0
        3、client必须中止(client不占用io的状况,3台机器,每台机器60G,127分区数量,启动速度3分钟)
        4、查看io使用状况 iostat -d -x -k 1 200
 
17.单hash分区最大是60
 
18.安装kudu过程当中,会要求CPU支持ssc4.2指令集,可是咱们的虚拟机cpu没有这个执行集,因此没法安装
 
19.设置client长链接过时时间
    参数:--authn_token_validity_seconds=12960000(150天)
    注意:设置到tserver的配置文件中
 
20.tserver和master的wal和data目录要分隔(或者是目录设置为lvm卷轴)
    缘由:wal目录只能设置为1个
    参数:--fs_wal_dir_reserved_bytes
    解释:
        Number of bytes to reserve on the log directory filesystem for non-Kudu usage. The default,
        which is represented by -1, is that 1% of the disk space on each disk will be reserved.     
        Any other value specified represents the number of bytes reserved and must be greater than or equal to 0. 
        Explicit percentages to reserve are not currently supported
        用于非kudu都使用的日志目录文件系统的字节数,默认状况下是-1,每一个磁盘上的磁盘空间的1%将被保留,指定的任何其余值表示保留的字节数,必须大于或等于0。
 
21.设置用户权限,能移动tablet
    参数:--superuser_acl=*
 
22.tserver宕掉后,5分钟后没有恢复的状况下,该机器上的tablet会移动到其余机器
    参数:--follower_unavailable_considered_failed_sec=300
 
23.超过参数时间的历史数据会被清理,若是是base数据不会被清理。而真实运行时数据大小持续累加,没有被清理。
    参数:--tablet_history_max_age_sec=900
 
 

 


    从Impala在Kudu中建立新表相似于将现有Kudu表映射到Impala表,除了您须要本身指定模式和分区信息。
    使用如下示例做为指导。Impala首先建立表,而后建立映射。
 
        CREATE TABLE my_first_table
        (
          id BIGINT,
          name STRING,
          PRIMARY KEY(id)
        )
        PARTITION BY HASH PARTITIONS 16
        STORED AS KUDU;
 
    在CREATE TABLE语句中,必须首先列出组成主键的列。此外,隐式标记主键列NOT NULL。
    建立新的Kudu表时,您须要指定分发方案。请参阅分区表:https://kudu.apache.org/docs/kudu_impala_integration.html#partitioning_tables
    id为简单起见,上面的表建立示例经过散列列分布到16个分区中。有关分区的指导,请参阅 分区规则:https://kudu.apache.org/docs/kudu_impala_integration.html#partitioning_rules_of_thumb
 
    
在Impala中查询现有的Kudu表:Impala中建立映射Kudu表的外部映射表
    经过Kudu API或其余集成(如Apache Spark)建立的表在Impala中不会自动显示。要查询它们,必须首先在Impala中建立外部表,以将Kudu表映射到Impala数据库:
        CREATE EXTERNAL TABLE `bigData` STORED AS KUDU
        TBLPROPERTIES(
            'kudu.table_name' = 'bigData',
            'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051')
        
 
Kudu中的分区方法主要有两种:partition by hash 和 partition by range 
    kudu表基于其partition方法被拆分红多个分区,每一个分区就是一个tablet,一张kudu表所属的全部tablets均匀分布并存储在tablet servers的磁盘上。
    所以在建立kudu表的时候须要声明该表的partition方法,同时要指定primary key做为partition的依据。
    
基于hash的分区方法的基本原理是:
    基于primary key的hash值将每一个row(行)划分到相应的tablet当中,分区的个数即tablet的个数必须在建立表语句中指定,建表语句示例以下:
    注:若是未指定基于某个字段的hash值进行分区,默认以主键的hash值进行分区。
        create table test
        (
            name string,
            age int,
            primary key (name)
        )
        partition by hash (name) partitions 8
        stored as kudu;
 
基于range的分区方法的基本原理是:
    基于指定主键的取值范围将每一个row(行)划分到相应的tablet当中,用于range分区的主键以及各个取值范围都必须在建表语句中声明,建表语句示例以下:
    例子:有班级、姓名、年龄三个字段,表中的每一个row将会根据其所在的班级划分红四个分区,每一个分区就表明一个班级。
        create table test
        (
            classes int,
            name string,
            age int,
            primary key (classes,name)
        )
        partition by range (classes)
        (
            partition value = 1,
            partition value = 2,
            partition value = 3,
            partition value = 4
        )
        stored as kudu;
 
kudu表还能够采用基于hash和基于range相结合的分区方式,使用方法与上述相似    
        
kudu表支持3种insert语句:
    1.insert into test values(‘a’,12);
    2.insert into test values(‘a’,12),(‘b’,13),(‘c’,14);
    3.insert into test select * from other_table;
 
update语句
    kudu表的update操做不能更改主键的值,其余与标准sql语法相同。
 
upsert 语句
    对于 upsert into test values (‘a’,12) 
    若是指定的values中的主键值 在表中已经存在,则执行update语义,反之,执行insert语义。
 
delete语句
    与标准sql语法相同。
impala + kudu一些优化心得
    1.一开始须要全量导入kudu,这时候咱们先用sqoop把关系数据库数据导入临时表,再用impala从临时表导入kudu目标表
        因为sqoop从关系型数据直接以parquet格式导入hive会有问题,这里默认hive的表都是text格式;每次导完到临时表,
          须要作invalidate metadata 表操做,否则后面直接导入kudu的时候会查不到数据
    2.除了查询,建议全部impala操做都在impala-shell而不在hue上面执行
    3.impala并发写入kudu的时候,数据量比较大的时候
        这时候kudu配置参数 --memory_limit_hard_bytes能大点就大点,由于kudu写入首先保存再内存里面,到必定阀值才溢写到磁盘,这个是直接最能提升写的方法;
        固然不是全部机器都有那么多资源,能够把--maintenance_manager_num_threads 这个参数稍微调大,须要调试,提升数据从内存写入磁盘的效率
    4.impala查询kudu
        首先全部表作彻底量的etl操做,必须得执行compute stats 表名,否则impala执行sql生成的计划执行数评估的内存不许确,容易评估错误致使实际执行不了
        kudu表最好不要作任何压缩,保证原始扫描性能发挥最好;假如对查询性能要求比存储要求高的话;大部分企业对实时查询效率要求高,并且存储成本毕竟低;
        kudu针对大表要作好分区,最好range和hash一块儿使用,前提是主键列包含能hash的id,但range分区必定要作好,经验告诉我通常是基于时间;
        查询慢的sql,通常要拿出来;方便的话作下explain,看下kudu有没有过滤部分数据关键字kudu predicates;假如sql没问题,那在impala-shell执行这个sql,
        最后执行summray命令,重点查看单点峰值内存和时间比较大的点,对相关的表作优化,解决数据倾斜问题
    5.kudu数据删除
        大表不要delete,不要犹豫直接drop,在create吧;磁盘空间会释放的
    6.关于impala + kudu 和 impala + parquet
        网上不少分析impala + kudu 要比 impala + parquet 优越不少;谁信谁XB;
        首先两个解决的场景不同,kudu通常解决实时,hive解决的是离线(一般是T + 1或者 T -1)
        hive基于hdfs,hdfs已经提供一套较为完善的存储机制,底层数据和文件操做便利;安全性,可扩展性都比kudu强不少,
        最重要parquet + impala效率要比kudu高,数仓首选是它
        kudu最大优点是能作相似关系型数据库同样的操做,insert, update, delete,这样热点的数据能够存储在kudu里面并随时作更新
    7.最后谈到的实时同步工具
        同步工具咱们这里使用streamsets,一个拖拉拽的工具,很是好用;但内存使用率高,经过jconsole咱们发现,全部任务同时启动;
        JVM新生代的内容几乎都跑到老年代了,GC没来的及,就内存溢出了;后面单独拿几台服务器出来作这个ETL工具,jvm配置G1垃圾回收器
 

1.kudu的产生背景和应用场景

 

1.在 kudu 以前,大数据主要以两种方式存储:
    第一种是静态数据:以 HDFS 引擎做为存储引擎,适用于高吞吐量的离线大数据分析场景。
            这类存储的局限性是数据没法进行随机的读写和批量的更新操做。
    第二种是动态数据:以 HBase做为存储引擎,适用于大数据随机读写场景。这类存储的局限性是批量读取吞吐量远不如 HDFS、不适用于批量数据分析的场景。
 
2.从上面分析可知,这两种数据在存储方式上彻底不一样,进而致使使用场景彻底不一样,但在真实的场景中,边界可能没有那么清晰,面对既须要随机读写,
  又须要批量分析的大数据场景,该如何选择呢?
 
3.这个场景中,单种存储引擎没法知足业务需求,咱们须要经过多种大数据组件组合来知足这一需求,一个常见的方案是:
    数据实时写入 HBase,实时的数据更新也在 HBase 完成,为了应对 OLAP 需求,咱们定时(一般是 T+1 或者 T+H)将 HBase的 数据写成静态的文件(Parquet)
    导入到 OLAP 引擎(HDFS)。这一架构能知足既须要随机读写,又能够支持 OLAP 分析的场景。
  但他有以下缺点:
    第一:架构复杂。从架构上看,数据在 HBase、消息队列Kafka、HDFS 间流转,涉及环节太多,运维成本很高。
          而且每一个环节须要保证高可用、维护多个副本、存储空间浪费。最后数据在多个系统上,对数据安全策略、监控等都提出了挑战。
    第二:时效性低。数据从 HBase 导出成静态文件是周期性的,通常这个周期是一天(或一小时),在时效性上不是很高。
    第三:难以应对后续的更新。真实场景中,总会有数据是延迟到达的。若是这些数据以前已经从 HBase 导出到 HDFS,
          新到的变动数据就难以处理了,一个方案是把新变动的数据和原有数据进行对比,把不一样的数据从新进行更新操做,这时候代价就很大了。
          假如说,咱们想要sql实时对大量数据进行分析该怎么办?或者是我想让数据存储可以支持Upsert(更新插入操做),又该怎么办?因此这就是kudu的优点。
          kudu 的定位是 Fast Analytics on Fast Data,是一个既支持随机读写、又支持 OLAP 分析的大数据存储引擎。
 
4.KUDU在 HDFS 和 HBase 这两个中平衡了随机读写和批量分析的性能,既支持了SQL实时查询,也支持了数据更新插入操做。
  完美的和impala集成,统一了hdfs数据源和kudu数据源,从而使得开发人员可以高效的进行数据分析。

 

5.hdfs不支持批量更新操做,kudu支持
    hdfs适用于离线sql分析,kudu适用于实时sql分析    hbase不支持sql操做,kudu支持(hbase-hive表可支持sql操做,可是效率极低)
    hbase不支持结构化数据存储,kudu支持
    hbase开发语言使用的java,内存的释放经过gc来完成,在内存比较紧张时可能引起full gc进而致使服务不稳定;kudu核心模块用的c++来实现,没有full gc的风险
    hbase的timestamp是暴露的,kudu没有暴露
    hbase的插入和更新操做都是看成一条数据进行处理的,而kudu是分隔开的
 
6.适合于在线实时分析的应用
    适合大数据量更新操做的应用
    适合将mysql的数据同步到kudu,减轻备库mysql查询的压力
    适合存储ADS数据,包含用户标签、各种指标数据等
    适合于存储结构化数据
    适合于和Impala继承,SQL分析数据
    适合于和HDFS一块儿使用,聚合数据源
    实时预测模型的应用,支持根据全部历史数据周期地更新模型
 
7.kudu完美的和impala集成,统一了hdfs数据源和kudu数据源,从而使得开发人员可以高效的进行数据分析。
  impala-kudu 主要用于实时的分析海量数据,即海量的结构化数据不断更新到kudu中,底层是以列式结构分布式存储,查询是获取结构化数据,
  而后进行 OLAP 分析、数据挖掘、机器学习等分析型操做,这些分析型操做所涉及的数据延迟性很小。
  可是kudu对硬件资源要求很高,特别是IO这块,以前公司遇到的集群瓶颈是多台机器(写30m/s)IO使用率达到100%,从而使用rpc链接超时,致使数据丢失。                             
  impala-kudu 的应用适用于多个行业,凡是结构化数据分析的情景均可使用,从实时性方面来说,使用sql实时的查询结构化数据,使得分析操做快速和高效。
  从离线方面来说,能够查询hdfs的数据,从而保证了数据的统一化和多元化,而且有利于构建数据仓库模型。
 

2.kudu的基础架构

 

1.Kudu特色
    特色一:主从架构 主为master,从为tserver,一般为三主多从
    特色二:高可用性(High availability)
        Tablet server 和 Master 使用 Raft Consensus Algorithm 来保证节点的高可用,确保只要有一半以上的副本可用,
        该 tablet 即可用于读写。例如,若是3个副本中有2个可用 或 5个副本中的有3个可用,则该tablet可用。即便在 leader tablet 出现故障的状况下,
        读取功能也能够经过 read-only(只读的)follower tablets来进行服务,或者是leader宕掉的状况下,会根据raft机制从新选举leader
    特色三:水平可扩展
    特色四:OLAP 工做的快速处理。
    特色五:与 MapReduce,Spark ,Impala和其余 Hadoop 生态系统组件集成
    特色六:使用 Cloudera Manager 轻松维护和管理
    特色七:底层存储彻底是列式结构,每一列均可以自定义压缩
    特色八:查询出来的数据是结构化模型,支持sql操做
 
2.Kudu概念和术语
    1.开发语言 C++
    2.Columnar Data Store(列式数据存储)
    3.Read Efficiency(高效读取) 
        对于分析查询,容许读取单个列或该列的一部分同时忽略其余列
    4.Data Compression(数据压缩)
        因为给定的列只包含一种类型的数据,因此基于此模式的压缩会比压缩混合数据类型(在基于行的解决案中使用)时更有效几个数量级。
        结合从列读取数据的效率,压缩容许从磁盘读取更少的块时完成查询

 

5.Table(表)
        一张table是数据存储在 Kudu 的位置。表具备schema(结构)和全局有序的primary key(主键)。table被分红不少段,也就是称为tablets(Tablet的复数)
    6.Tablet(段)
         一个tablet 是 一张表table 连续的segment(段),与其它数据存储引擎或关系型数据库partition(分区)类似。
        在必定的时间范围内,tablet的副本冗余到多个tserver服务器上,其中一个副本被认为是leader tablet。
        任何副本均可以对读取进行服务,而且写入时须要为tablet服务的一组tablet server之间达成一致性。
        一张表分红多个tablet,分布在不一样的tablet server中,最大并行化操做,Tablet在Kudu中被切分为更小的单元,叫作RowSets,
        RowSets分为两种MemRowSets和DiskRowSet,MemRowSets每生成32M,就溢写到磁盘中,也就是DiskRowSet
    7.Tablet Server
        tablet server是 存储tablet 和 为tablet向client提供服务。对于给定的tablet,一个tablet server充当 leader,
        其余tablet server充当该tablet的follower副本。只有leader为每个服务提供写请求,leader和followers为每一个服务提供读请求。
        leader使用Raft协议来进行选举 。一个tablet server能够服务多个tablets,而且一个 tablet 能够被多个tablet servers服务着。
    8.Master
        保持跟踪全部的tablets、tablet servers、catalog tables(目录表)和其它与集群相关的metadata。在给定的时间点,只能有一个起做用的master(也就是 leader)。
        若是当前的leader消失,则选举出一个新的master,使用Raft协议来进行选举。master还协调客户端的metadata operations(元数据操做)。
        例如,当建立新表时,客户端内部将请求发送给master。 master将新表的元数据写入catalog table(目录表),并协调在tablet server上建立tablet的过程。
        全部master的元数据都存储在一个tablet中,能够复制到全部其余候选的master。tablet server以设定的间隔向master发出心跳(默认值为每秒一次)。
        master是以文件的形式存储在磁盘中。
    9.Raft Consensus Algorithm
        Kudu 使用 Raft consensus algorithm 做为确保常规 tablet 和 master 数据的容错性和一致性的手段。
        经过 Raft协议,tablet 的多个副本选举出 leader,它负责接受请求和复制数据写入到其余follower副本。
        一旦写入的数据在大多数副本中持久化后,就会向客户确认。给定的一组N副本(一般为 35 个)可以接受最多(N - 1)/2 错误的副本的写入。
    10.Catalog Table(目录表)
        catalog table是Kudu 的 metadata(元数据中)的中心位置。它存储有关tables和tablets的信息。
        该catalog table(目录表)可能不会被直接读取或写入。相反,它只能经过客户端 API中公开的元数据操做访问。
        catalog tables存储如下两类元数据。
            Tables:table schemas 表结构,locations 位置,states 状态
            Tablets:现有tablet 的列表,每一个 tablet 的副本所在哪些tablet server,tablet的当前状态以及开始和结束的keys(键)。
 
3.Kudu-Impala 集成特性
    CREATE/ALTER/DROP TABLE
        Impala 支持使用 Kudu 做为持久层来 creating(建立),altering(修改)和 dropping(删除)表。
        这些表遵循与 Impala 中其余表格相同的  Internal / external(内部 / 外部)方法,容许灵活的数据采集和查询。
    INSERT
        数据可使用“与那些使用 HDFS 或 HBase 持久性的任何其余 Impala 表相同的”语法插入 Impala 中的 Kudu 表。
    UPDATE / DELETE
        Impala 支持 UPDATE 和 DELETE SQL 命令逐行或批处理修改 Kudu 表中的已有的数据。
        选择 SQL 命令的语法与现有标准尽量兼容。除了简单 DELETE 或 UPDATE 命令以外,还能够 FROM 在子查询中指定带有子句的复杂链接。
    Flexible Partitioning(灵活分区)
        与 Hive 中的表分区相似,Kudu 容许您经过 hash 或 range 动态预分割成预约义数量的 tablets,以便在集群中均匀分布写入和查询。
        您能够经过任意数量的 primary key(主键)列,任意数量的 hashes 和可选的 list of split rows 来进行分区。
    Parallel Scan(并行扫描)
        为了在现代硬件上实现最高的性能,Impala 使用的 Kudu 客户端能够跨多个 tablets扫描。
    High-efficiency queries(高效查询)
        在可能的状况下,Impala 将谓词评估下推到 Kudu,以便使谓词(in,between and,>,<)评估为尽量接近数据。在许多任务中,查询性能与 Parquet 至关。
--------------------- 

3.kudu的底层存储原理

 

1.底层存储原理
    1.1个Table(表)包含多个Tablet,其中Tablet的数量是根据hash或者是range进行设置的。
    2.1个Tablet中包含MetaData信息和多个RowSet信息,其中MetaData信息是block和block在data中的位置。
    3.1个RowSet包含一个MemRowSet和多个DiskRowSet,其中MemRowSet用于存储insert数据和update后的数据,写满后会刷新到磁盘中也就是多个DiskRowSet中,
      默认是1G刷新一次或者是2分钟。
    4.1个DiskRowSet用于老数据的mutation(更新),好比说数据的更新操做,后台按期对DiskRowSet进行合并操做,删除历史数据和没有的数据,减小查询过程当中的IO开销。
    5.1个DiskRowSet包含BloomFilter、Ad_hoc Index、UndoFile、RedoFile、BaseData、DeltaMem。
        BloomFile:根据DiskRowSet中key生成一个bloom filter,用于快速模糊的定位某一个key是否在DiskRowSet中。
        Ad_hoc Index:是主键的索引,用于定位key在DiskRowSet中具体哪一个偏移位置。
        BaseData:是MemRowSet flush下来的数据,按照列存储,按照主键有序。
        UndoFile:是BaseData以前的数据历史数据。
        RedoFile:是BaseData以后的mutation(更新)记录,能够得到较新的数据。
        DeltaMem:用于在内存中存储mutation(更新)记录,先写到内存中,而后写满后flush到磁盘,造成deltafile。
--------------------- 

 

2.MemRowSet
    实现方式:B+Tree
 
3.DiskRowSet
    1.实现方式:二叉平衡树
    2.内部数据组织:DeltaMem 和 MemRowSet在存在中的组织方式是一致的,都是B+Tree,在磁盘上的存储都是放在CFile文件中的,右图为CFile的文件格式
    3.Cfile:包含Header,Data,Index,Footer四块,Index有两种,posidx index是根据rowId找到Data中的偏移,validx index是根据key的值找到data中的偏移,
        validx只针对只有一个column为key的状况下,这个时候DiskRowSet是没有Ad_hoc索引的,使用validx来代替。
        这两个index内部实现了B-Tree,index不必定是联系的,在达到必定长度后就会刷盘,Footer是记录CFile的元数据,
        包括posidx_index,validx_index两棵树根节点所在位置,数据条目、编码、压缩方式等
    4.压缩:对于ad_hoc文件使用的prefix,delta fle使用的是plain,bloomfile使用的是plain
    5.磁盘上每个DiskRowSet有若*.metadata和*.data文件,metadata文件记录的是DiskRowSet的元信息,主要包括哪些block和block在data中的位置,
      左图为block和DiskRowSet中各部分的映射关系,在写磁盘时是经过container来写入,每一个container能够写很大的一块连续的磁盘空间,
      用于给某一个CFile写数据,当一个CFile写完后会将container归还给BlockManager,这时container就能够用于下一个CFile写数据了,
      当BlockManager中没有container可用是会建立一个新的container给新的CFile使用。
    6.对应新建block先看看是否有container可用,若没有,目前默认的是在所在的配置中的data_dir中随机选取一个dir建一个新的metadata和data文件。
      先写data,block落盘后再写metadata
 

4.MVCC
    表的主键排序,受益于MVCC(Multi-Version Concurrency Control 多版本并发控制,一旦数据写入到MemRowSet,后续的reader能立马查询到
 
5.Compaction    minor compaction:多个deltafile进行合并。默认是1000个deltafile进行合并一次
    major compaction:deltafile文件的大小和basedata的文件的比例为0.1的时候,会进行合并操做

1.kudu中的Tablet是负责Table表的一部分的读写工做,Tablet是有多个或一个Rowset组成的,其中一个Rowset处于内存中,叫作MemRowSet,MemRowSet主要是负责处理新的数据写入请求。
 
2.DiskRowSet是MemRowSet达到1G刷新一次或者是时间超过2分钟后刷新到磁盘后生成的,实际底层存储是是有Base Data(一个CFile文件)、
  多个Delta file(Undo data、Redo data组成)的和Delta MemStore,其中位于磁盘中的Base data、Undo data、Redo data是不可修改的,
  Delta Memstore达到必定程度后会刷新到磁盘中的生成Redo data,其中kudu后台有一个相似HBase的compaction线程策略进行合并处理:
    1、Minor Compaction:多个DeltaFile进行合并生成一个大的DeltaFile。默认是1000个DeltaFile进行合并一次
    2、Major Compaction:DeltaFile文件的大小和Base data的文件的比例为0.1的时候,会进行合并操做,生成Undo data
    3、将多个DiskRowSet进行合并,减小DiskRowSet的数量
  Base Data:是MemRowSet flush下来的数据,按照列存储,按照主键有序
  Undo Data:是BaseData以前的数据历史数据
  Redo Data:是BaseData以后的mutation记录,能够得到较新的数据
  Delta Memstore:用于在内存中存储更新为磁盘中数据的mutation记录,先写到内存中,而后写满后flush到磁盘,造成DeltaFile
 
3.当建立Kudu客户端时,其会从主master上获取tablet位置信息,而后直接与服务于该tablet的服务器进行交谈。
  为了优化读取和写入路径,客户端将保留该信息的本地缓存,以防止他们在每一个请求时须要查询主机的tablet位置信息。
  随着时间的推移,客户端的缓存可能会变得过期,而且当写入被发送到不是领导者的tablet服务器时,则将被拒绝。
  而后,客户端将经过查询主服务器发现新领导者的位置来更新其缓存。
 
4.读流程
    1.客户端链接TMaster获取表的相关信息,包括分区信息,表中全部tablet的信息
    2.客户端找到须要读取的数据的tablet所在的TServer,Kudu接受读请求,并记录timestamp信息,若是没有显式指定,那么表示使用当前时间
    3.从内存中读取数据,也就是MemRowSet和DeltaRowSet中读取数据,根据timestamp来找到对应的mutation链表
    4.从磁盘中读取数据,从metadata文件中使用boom filter快速模糊的判断全部候选RowSet是否含有此key。
      而后从DiskRowSet中读取数据,实际是根据B+树,判断key在那些DiskRowSet的range范围内,而后从metadata文件中,获取index来判断rowId在Data中的偏移,
      或者是利用validex来判断数据的偏移(只有一个key),根据读操做中包含的timestamp信息判断是否须要将base data进行回滚操做从而获取数据
 
5.写流程
    1.Kudu插入一条新数据
        1.客户端链接TMaster获取表的相关信息,包括分区信息,表中全部tablet的信息
        2.客户端找到负责处理读写请求的tablet所负责维护的TServer。Kudu接受客户端的请求,检查请求是否符合要求(表结构)
        3.Kudu在Tablet中的全部rowset(memrowset,diskrowset)中进行查找,看是否存在与待插入数据相同主键的数据,若是存在就返回错误,不然继续
        4.写入操做先被提交到tablet的预写日志(WAL),并根据Raft一致性算法取得追随节点的赞成,而后才会被添加到其中一个tablet的内存中,
          插入会被添加到tablet的MemRowSet中。为了在MemRowSet中支持多版本并发控制(MVCC),对最近插入的行(即还没有刷新到磁盘的新的行)的更新和删除操做
          将被追加到MemRowSet中的原始行以后以生成REDO记录的列表
        5.Kudu在MemRowset中写入一行新数据,在MemRowset(1G或者是120s)数据达到必定大小时,MemRowset将数据落盘,并生成一个diskrowset用于持久化数据,
          还生成一个memrowset继续接收新数据的请求
 
    2.Kudu对原有数据的更新
        1.客户端链接TMaster获取表的相关信息,包括分区信息,表中全部tablet的信息
        2.Kudu接受请求,检查请求是否符合要求
        3.由于待更新数数据可能位于memrowset中,也可能已经flush到磁盘上,造成diskrowset。因 此根据待更新数据所处位置不一样,kudu有不一样的作法
        4.当待更新数据位于memrowset时,找到待更新数据所在行,而后将更新操做记录在所在行中一个mutation链表中;
          在memrowset将数据落盘时,Kudu会将更新合并到base data,并生成UNDO records用于查看历史版本的数据,REDO records实际上也是以DeltaFile的形式存放
 
6.应用场景
    1.当待更新数据位于DiskRowset时,找到待更新数据所在的DiskRowset,每一个DiskRowset都会在内存中设置一个DeltaMemStore,将更新操做记录在DeltaMemStore中,
      在DeltaMemStore达到必定大小时,flush在磁盘,造成DeltaFile中。
    2.实际上Kudu提交更新时会使用Raft协议将更新同步到其余replica(复制品)上去,固然若是在memrowset和DiskRowset中都没有找到这条数据,那么返回错误给客户端;
      另外当DiskRowset中的deltafile太多时,Kudu会采用必定的策略对一组deltafile进行合并。
    3.wal日志的做用是若是咱们在作真正的操做以前,先将这件事记录下来,持久化到可靠存储中(由于日志通常很小,而且是顺序写,效率很高),
      而后再去执行真正的操做。这样执行真正操做的时候也就不须要等待执行结果flush到磁盘再执行下一步,由于不管在哪一步出错,咱们都可以根据备忘录重作一遍,
      获得正确的结果。

5.kudu和hbase的对比

1.架构
    1.hbase的物理模型是master和regionserver,regionserver存储的是region,region里边颇有不少store,一个store对应一个列簇,
      一个store中有一个memstore和多个storefile,store的底层是hfile,hfile是hadoop的二进制文件,其中HFile和HLog是hbase两大文件存储格式,
      HFile用于存储数据,HLog保证能够数据写入到HFile中。
    2.kudu的物理模型是master和tserver,其中table根据hash和range分区,分为多个tablet存储到tserver中,tablet分为leader和follower,
      leader负责写请求,follower负责读请求,总结来讲,一个ts能够服务多个tablet,一个tablet能够被多个ts服务。
2.联系
    设计理念和想法是一致的。
    kudu的思想是基于hbase的,以前cloudera公司向对hbase改造,支持大数据量更新,但是因为改动源码太大,因此todd直接开发了kudu。
    hbase基于rowkey查询和kudu基于主键查询是很快的。
    底层存储架构都是以列式存储的。
--------------------- 
1.hbase的物理模型是master和regionserver,regionserver存储的是region,region里边颇有不少store,一个store对应一个列簇,
  一个store中有一个memstore和多个storefile,store的底层是hfile,hfile是hadoop的二进制文件,其中HFile和HLog是hbase两大文件存储格式,
  HFile用于存储数据,HLog保证能够写入到HFile中;
2.kudu的物理模型是master和tserver,其中table根据hash和range分区,分为多个tablet存储到tserver中,tablet分为leader和follower,
  leader负责写请求,follower负责读请求,总结来讲,一个ts能够服务多个tablet,一个tablet能够被多个ts服务(基于tablet的分区,最低为2个分区);
3.hbase基于rowkey查询和kudu基于主键查询是很快的;
4.Kudu结构看上去跟HBase差异并不大,主要的区别包括:
    1.Kudu将HBase中zookeeper的功能放进了TMaster内,Kudu中TMaster的功能比HBase中的Master任务要多一些,kudu全部集群的配置信息均存储在本地磁盘中,
        hbase的集群配置信息是存储在zookeeper中;
    2.Hbase将数据持久化这部分的功能交给了Hadoop中的HDFS,最终组织的数据存储在HDFS上。Kudu本身将存储模块集成在本身的结构中,
      内部的数据存储模块经过Raft协议来保证leader Tablet和replica Tablet内数据的强一致性,和数据的高可靠性。
      为何不像HBase同样利用HDFS来实现数据存储,因此Kudu本身从新完成了底层的数据存储模块,并将其集成在TServer中,
      可是kudu对磁盘的IO要求很高,它是以写的性能换取读的性能;
--------------------- 

5.数据存储方式
    1.HBase是面向列族式的存储,每一个列族都是分别存放的,HBase表设计时,不多使用设计多个列族,大多状况下是一个列族。
      这个时候的HBase的存储结构已经与行式存储无太大差异了。而Kudu,实现的是一个真正的面向列的存储方式,表中的每一列都是单独存放的;
      因此HBase与Kudu的差别主要在于相似于行式存储的列族式存储方式与典型的面向列式的存储方式的差别;
    2.HBase是一款NoSQL类型的数据库,对表的设计主要在于rowkey与列族的设计,列的类型能够不指定,由于HBase在实际存储中都会将全部的value字段转换成二进制的字节流。
      由于不须要指定类型,因此在插入数据的时候能够任意指定列名(列限定名),这样至关于能够在建表以后动态改变表的结构。
      Kudu由于选择了列式存储,为了更好的提升列式存储的效果,Kudu要求在建表时指定每一列的类型,这样的作法是为了根据每一列的类型设置合适的编码方式,
      实现更高的数据压缩比,进而下降数据读入时的IO压力;
    3.HBase对每个cell数据中加入了timestamp字段,这样可以实现记录同一rowkey和列名的多版本数据,另外HBase将数据更新操做、删除操做也是做为一条数据写入,
      经过timestamp来标记更新时间,type来区分数据是插入、更新仍是删除。HBase写入或者更新数据时能够指定timestamp,这样的设置能够完成某些特定的操做; 
      Kudu也在数据存储中加入了timestamp这个字段,不像HBase能够直接在插入或者更新数据时设置特殊的timestamp值,Kudu的作法是由Kudu内部来控制timestamp的写入。
      不过Kudu容许在scan的时候设置timestamp参数,使得客户端能够scan到历史数据;
    4.相对于HBase容许多版本的数据存在,Kudu为了提升批量读取数据时的效率,要求设计表时提供一列或者多列组成一个主键,主键惟一,
      不容许多个相同主键的数据存在。这样的设置下,Kudu不能像HBase同样将更新操做直接转换成插入一条新版本的数据,Kudu的选择是将写入的数据,更新操做分开存储;
      固然还有一些其余的行式存储与列式存储之间在不一样应用场景下的性能差别。
    5.hbase中,同一个主键数据是能够存在多个storefile里的,为了让mutation和磁盘的存在的key组合在一块儿,hbase须要基于rowkey执行merge。
      Rowkey能够是任意长度的字符串,所以对比rowkey是很是耗性能的。另外,在一个查询中,即便key列没有被使用(例如聚合计算),它们也要被读取出来,
      这致使了额外的IO。复合主键在hbase应用中很常见,主键的大小可能比你关注的列大一个数量级,特别是查询的列被压缩的状况下; 
    6.kudu中,读取一条数据或者执行非排序查询,不须要merge操做。例如,聚合必定范围内的key能够独立的查询每一个RowSet(甚至能够并行的),
      而后执行求和,由于key的顺序是不重要的,显然查询的效率更高,kudu中,mutation是与rowid绑定的。因此merge会更加高效,经过维护计数器的方式,
      给定下一个须要保存的mutation,咱们能够简单的相减,就能够获得从base data到当前版本有多少个mutation。
      或者,直接寻址能够用来高效的获取最新版本的数据。获取block也很是的高效,由于mutation直接指向了block的索引地址;
    7.hbase的系统中,每一个cell的timstamp都是暴露给用户的,本质上组成了这个cell的一个符合主键。意味着,这种方式能够高效的直接访问指定版本的cell,
      且它存储了一个cell的整个时间序列的全部版本; 而Kudu却不高效(须要执行多个mutation),它的timestamp是从MVCC实现而来的,它不是主键的另一个描述;
    8.hbase采用的LSM(LogStructured Merge,很难对数据进行特殊编码,因此处理效率不高),hbase会将多条更新记录前后Flush到不一样的Storefile中,
      因此读取时须要扫描多个文件,比较rowkey,比较版本等,而后进行更新操做,特别是major compaction操做的时候,会占用大量的性能; 
    9.Kudu对同一行的数据更新记录的合并工做,不是在查询的时候发生的,而是在更新的时候进行,在Kudu中一行数据只会存在于一个DiskRowSet中,
      避免读操做时的比较合并工做。对于列式存储的数据文件,要原地变动一行数据是很困难的,因此在Kudu中,对于Flush到磁盘上的DiskRowSet(DRS)数据,
      其实是分两种形式存在的,一种是Base的数据,按列式存储格式存在,一旦生成,就再也不修改,另外一种是Delta文件,存储Base数据中有变动的数据,
      一个Base文件能够对应多个Delta文件(Kudu用MVCC(多版本并发控制)来实现数据的删改功能。更新、删除操做须要记录到特殊的数据结构里,
      保存在内存中的DeltaMemStore或磁盘上的DeltaFIle里面。DeltaMemStore是B-Tree实现的,所以速度快,并且可修改。
      磁盘上的DeltaFIle是二进制的列式的块,和base数据同样都是不可修改的。所以当数据频繁删改的时候,磁盘上会有大量的DeltaFiles文件,
      Kudu借鉴了Hbase的方式,会按期对这些文件进行合并),这种方式意味着,插入数据时相比HBase,须要额外走一次检索流程来断定对应主键的数据是否已经存在。
      所以,Kudu是牺牲了写性能来换取读取性能的提高。另外,若是在查询中没有指定key,那执行计划就不会查阅key,除了须要肯定key边界状况;
    10.hbase中insert和mutation是相同的操做,直接存储到storefile中。 
       kudu中insert和mutation是不一样的操做:insert写入数据至MemRowSet,而mutation(delete、update)写入存在这条数据的RowSet的DeltaMemStore里,
       写入时必须肯定这是一条新数据。这会产生一个bloom filter查询全部RowSet。若是布隆过滤器获得一个可能的match(即计算出可能在一个RowSet里),
       接着为了肯定是不是insert仍是update,一个寻址就必须被执行。 假设,只要RowSet足够小,bloom filter的结果就会足够精确,
       那么大部分插入将不须要物理磁盘寻址。另外,若是插入的key是有序的,例如timeseries+“_”+xxx,因为频繁使用,key所在的block可能会被保存在数据块缓存中。
       Update时,须要肯定key在哪一个RowSet。与上雷同,须要执行bloom filter。 这有点相似于关系型数据库RDBMS,当插入一条主键存在的数据时会报错,
       且不会更新这条数据。相似的,更新一条数据时,若是这条数据不存在也会报错。hbase的语法却不是这样,它不存在主键的概念;
 
6.写入和读取过程
    1.写过程
        HBase写的时候,无论是新插入一条数据仍是更新数据,都看成插入一条新数据来进行;而Kudu将插入新数据与更新操做分别看待;
        Kudu表结构中必须设置一个惟一键,插入数据的时候必须判断一些该数据的主键是否惟一,因此插入的时候其实有一个读的过程;
        而HBase没有太多限制,待插入数据将直接写进memstore;
        HBase实现数据可靠性是经过将落盘的数据写入HDFS来实现,而Kudu是经过将数据写入和更新操做同步在其余副本上实现数据可靠性;
        结合以上几点,能够看出Kudu在写的性能上相对HBase有必定的劣势;
    2.读过程
        在HBase中,读取的数据可能有多个版本,因此须要结合多个storefile进行查询;Kudu数据只可能存在于一个DiskRowset或者MemRowset中,
        可是由于可能存在还未合并进原数据的更新,因此Kudu也须要结合多个DeltaFile进行查询;
        HBase写入或者更新时能够指定timestamp,致使storefile之间timestamp范围的规律性下降,增长了实际查询storefile的数量;
        Kudu不容许人为指定写入或者更新时的timestamp值,DeltaFile之间timestamp连续,能够更快的找到须要的DeltaFile;
        HBase经过timestamp值能够直接取出数据;而Kudu实现多版本是经过保留UNDO records(已经合并过的操做)和REDO records(未合并过的操做)完成的,
        在一些状况下Kudu须要将base data结合UNDO records进行回滚或者结合REDO records进行合并而后才能获得真正所须要的数据;
        结合以上三点能够得出,无论是HBase仍是Kudu,在读取一条数据时都须要从多个文件中搜寻相关信息。相对于HBase,Kudu选择将插入数据和更新操做分开,
        一条数据只可能存在于一个DiskRowset或者memRowset中,只须要搜寻到一个rowset中存在指定数据就不用继续往下找了,用户不能设置更新和插入时的timestamp值,
        减小了在rowset中DeltaFile的读取数量。这样在scan的状况下能够结合列式存储的优势实现较高的读性能,特别是在更新数量较少的状况下可以有效提升scan性能;
        另外,本文在描述HBase读写过程当中没有考虑读写中使用的优化技术如Bloomfilter、timestamp range等。其实Kudu中也有使用相似的优化技术来提升读写性能,
        本文只是简单的分析,所以就再也不详细讨论读写过程;
 
7.其余差别
    HBase:使用的java,内存的释放经过GC来完成,在内存比较紧张时可能引起full GC进而致使服务不稳定;
    Kudu:核心模块用的C++来实现,没有full gc的风险;
 
8.总结
    Kudu经过要求完整的表结构设置,主键的设定,以列式存储做为数据在磁盘上的组织方式,更新和数据分开等技巧,
    使得Kudu可以实现像HBase同样实现数据的随机读写以外,在HBase不太擅长的批量数据扫描(scan)具备较好的性能。
    而批量读数据正是olap型应用所关注的重点,正如Kudu官网主页上描述的,Kudu实现的是既能够实现数据的快速插入与实时更新,
    也能够实现数据的快速分析。Kudu的定位不是取代HBase,而是以下降写的性能为代价,提升了批量读的性能,使其可以实现快速在线分析。
 

6.kudu性能调优和报错方案解决

 

报错一:tablet初始化时长好久
解决方案:
    升级版本到kudu1.6.0以上版本 .参考:https://kudu.apache.org/2017/12/08/apache-kudu-1-6-0-released.html
    查看io使用状况 iostat -d -x -k 1 200.(多是IO瓶颈)
    Recommended maximum number of tablet servers is 100.
    Recommended maximum number of tablets per tablet server is 2000.

 

报错二:rpc链接超时(IO问题)
    RPC can not complete before timeout: KuduRpc(method=CreateTable, tablet=null, attempt=26, DeadlineTracker(timeout=30000, elapsed=29427)
解决方案:session.setTimeoutMillis(60000)
 
报错三:移动tablet,权限不能访问
解决方案:--superuser_acl=*
 

报错四:新增master找不到元数据
解决方案:
    由于master的存储所有在本地磁盘文件,若是额外的添加了一个master,会报错,找不到consensus-meta,也就是master的容错机制,须要对master的元数据数据格式化,
    初始化的时候直接设计好。
    Recommended maximum number of masters is 3.
--------------------- 

报错五:minidumps文件(存储crash信息)出错
    [New I/O worker #1] WARN org.apache.kudu.client.GetMasterRegistrationReceived - None of the provided masters (hadoop6:7051) is a leader, will retry.
解决方案:
    rm -rf /home/var/lib/kudu/master/log_dir/minidumps
补充:
    minidump文件包含有关崩溃的进程的重要调试信息,包括加载的共享库及其版本,崩溃时运行的线程列表,处理器寄存器的状态和每一个线程的堆栈内存副本,
    以及CPU和操做系统版本信息。Minitump能够经过电子邮件发送给Kudu开发人员或附加到JIRA,以帮助Kudu开发人员调试崩溃。
 
报错六:impala操做kudu超时
解决方案:kudu_operation_timeout_ms = 1800000
 

    报错七:CDH安装kudu设置master
    解决方案:
        --master_addresses=hadoop4:7051,hadoop5:7051,hadoop6:7051
     

1.Kudu Tablet Server Maintenance Threads
    解释:Kudu后台对数据进行维护操做,如flush、compaction、inserts、updates、and deletes,通常设置为4,官网建议的是数据目录的3倍
    参数:maintenance_manager_num_threads
 
2.Kudu Tablet Server Block Cache Capacity Tablet
    解释:分配给Kudu Tablet Server块缓存的最大内存量,建议是2-4G
    参数:block_cache_capacity_mb
3.数据插入都kudu中,使用manual_flush策略
4.设置ntp服务器的时间偏差不超过20s(默认是10s)
    参数:max_clock_sync_error_usec=20000000
 
5.Kudu Tablet Server Hard Memory Limit Kudu
    解释:写性能,Tablet Server能使用的最大内存量,建议是机器总内存的百分之80,master的内存量建议是2G,Tablet Server在批量写入数据时并不是实时写入磁盘,
          而是先Cache在内存中,在flush到磁盘。这个值设置太小时,会形成Kudu数据写入性能显著降低。对于写入性能要求比较高的集群,建议设置更大的值
    参数:memory_limit_hard_bytes
6.建议每一个表50columns左右,不能超过300个
7.kudu的wal只支持单目录,若是快达到极限了,就会初始化tablte失败。因此说在部署集群的时候要单独给wal设置一个单独的目录。
8.impala中建立表,底层使用kudu存储(Impala::TableName),经过kudu的client端读取数据,读取不出来。
9.kudu表若是不新建的状况下,在表中增长字段,对数据是没有影响的,kudu中增长一个字段user_id,以前impala已经和kudu进行关联操做了,
  impala读取kudu的数据按照以前的所定义的字段读取的。
10.设置client长链接过时时间,默认是7天(实际生产环境中设置的是180天)
    --authn_token_validity_seconds=604800
      注意:设置到tserver的配置文件中
11.tserver宕掉后,5分钟后没有恢复的状况下,该机器上的tablet会移动到其余机器,由于咱们一般设置的是3个副本,其中一个副本宕掉,也就是一台机器的tserver出现故障,
   实际状况下,还存在一个leader和follower,读写仍是可以正常进行的,因此说这个参数很重要,保证数据不会转移。
    --follower_unavailable_considered_failed_sec=300
12.超过参数时间的历史数据会被清理,若是是base数据不会被清理。而真实运行时数据大小持续累加,没有被清理,默认是900s。
    --tablet_history_max_age_sec=900

7.kudu性能测试报告

 

建立 hash分区 + range分区 二者同时使用 的表、删除表

 

package src.main.sample;
 
import com.google.common.collect.ImmutableList;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
 
import java.util.ArrayList;
import java.util.List;
 
public class CreateTable {
 
    public static void main(String[] args) {
        String tableName = "bigData";
        KuduClient client = new KuduClient.KuduClientBuilder("192.168.241.128,192.168.241.129,192.168.241.130").defaultAdminOperationTimeoutMs(60000).build();
        KuduSession session = client.newSession();
        // 此处所定义的是rpc链接超时
        session.setTimeoutMillis(60000);
 
        try {
            // 测试,若是table存在的状况下,就删除该表
            if(client.tableExists(tableName)) {
                client.deleteTable(tableName);
                System.out.println("delete the table!");
            }
 
            List<ColumnSchema> columns = new ArrayList();
 
            // 建立列
            columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT64).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("user_id", Type.INT64).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("start_time", Type.INT64).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).nullable(true).build());
 
            // 建立schema
            Schema schema = new Schema(columns);
 
       /*
       建立 hash分区 + range分区 二者同时使用 的表
        addHashPartitions(ImmutableList.of("字段名1","字段名2",...), hash分区数量)  默认使用主键,也可另外指定联合主键
        setRangePartitionColumns(ImmutableList.of("字段名"))
        */
            // id,user_id至关于联合主键,三个条件都知足的状况下,才能够更新数据,不然就是插入数据
            ImmutableList<String> hashKeys = ImmutableList.of("id","user_id");
            CreateTableOptions tableOptions = new CreateTableOptions();
       /*
        建立 hash分区 + range分区 二者同时使用 的表
        addHashPartitions(ImmutableList.of("字段名1","字段名2",...), hash分区数量)  默认使用主键,也可另外指定联合主键
        setRangePartitionColumns(ImmutableList.of("字段名"))
       */
            // 设置hash分区,包括分区数量、副本数目
            tableOptions.addHashPartitions(hashKeys,3); //hash分区数量
            tableOptions.setNumReplicas(3); //副本数目
 
            // 设置range分区
            tableOptions.setRangePartitionColumns(ImmutableList.of("start_time"));
 
       // 设置range分区数量    
            // 规则:range范围为时间戳是1-10,10-20,20-30,30-40,40-50
            int count = 0;
            for(long i = 1 ; i <6 ; i++) {
                PartialRow lower = schema.newPartialRow();
                lower.addLong("start_time",count);
                PartialRow upper = schema.newPartialRow();
                count += 10;
                upper.addLong("start_time", count);
                tableOptions.addRangePartition(lower, upper);
            }
 
            System.out.println("create table is success!");
            // 建立table,并设置partition
            client.createTable(tableName, schema, tableOptions);
 
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
//          client.deleteTable(tableName);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    client.shutdown();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}
--------------------- 

 

修改表:增长字段、删除字段

 

package src.main.sample;
 
import org.apache.kudu.Type;
import org.apache.kudu.client.AlterTableOptions;
import org.apache.kudu.client.KuduClient;
 
public class AlterTable {
    public static void main(String[] args) {
        String tableName = "bigData";
        KuduClient client = new KuduClient.KuduClientBuilder("192.168.161.128,192.168.161.129,192.168.161.130").defaultAdminOperationTimeoutMs(60000).build();
        try {
            Object o = 0L;
            // 建立非空的列
            client.alterTable(tableName, new AlterTableOptions().addColumn("device_id", Type.INT64, o));
 
            // 建立列为空
            client.alterTable(tableName, new AlterTableOptions().addNullableColumn("site_id", Type.INT64));
       // 删除字段
//            client.alterTable(tableName, new AlterTableOptions().dropColumn("site_id"));
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

 

插入 表数据

 

package src.main.sample;
 
import org.apache.kudu.client.*;
 
public class InsertData {
    public static void main(String[] args) {
        try {
            String tableName = "bigData";
 
            KuduClient client = new KuduClient.KuduClientBuilder("192.168.241.128,192.168.241.129,192.168.241.130").defaultAdminOperationTimeoutMs(60000).build();
            // 获取table
            KuduTable table = client.openTable(tableName);
 
            // 获取一个会话
            KuduSession session = client.newSession();
            session.setTimeoutMillis(60000);
            /**
             * mode形式:
             * SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND 后台自动一次性批处理刷新提交N条数据
             * SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC  每次自动同步刷新提交每条数据
             * SessionConfiguration.FlushMode.MANUAL_FLUSH     手动刷新一次性提交N条数据
             */
            session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); //mode形式
            session.setMutationBufferSpace(10000);// 缓冲大小,也就是数据的条数
 
            // 插入时,初始时间
            long startTime = System.currentTimeMillis();
 
            int val = 0;
            // 插入数据
            for (int i = 0; i < 60; i++) {
                Insert insert = table.newInsert();
                PartialRow row = insert.getRow();
                // row.addString("字段名", 字段值)、row.addLong(第几列, 字段值) 
                row.addLong(0, i); //指第一个字段 "id"(hash分区的联合主键之一)
                row.addLong(1, i*100);//指第二个字段 "user_id"(hash分区的联合主键之一)
                row.addLong(2, i);//指第三个字段 "start_time"(range分区字段)
                row.addString(3, "bigData");//指第四个字段 "name"
                session.apply(insert);
                if (val % 10 == 0) {
                    session.flush(); //手动刷新提交
                    val = 0;
                }
                val++;
            }
            session.flush(); //手动刷新提交
            // 插入时结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("the timePeriod executed is : " + (endTime - startTime) + "ms");
            session.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
 

 

3种刷新提交数据的模式

 

package src.main.sample;
 
import com.google.common.collect.ImmutableList;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
 
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
 
/**
 * 数据刷新策略对比
 */
public class InsertFlushData {
    // 缓冲大小,也就是数据的条数
    private final static int OPERATION_BATCH = 2000;
 
    /**
     * mode形式:
     * SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND 后台自动一次性批处理刷新提交N条数据
     * SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC  每次自动同步刷新提交每条数据
     * SessionConfiguration.FlushMode.MANUAL_FLUSH     手动刷新一次性提交N条数据
     */
 
    // 支持三个模式的测试用例
    public static void insertTestGeneric(KuduSession session, KuduTable table, SessionConfiguration.FlushMode mode, int recordCount) throws Exception {
        //设置 刷新提交模式
        session.setFlushMode(mode);
 
        //当刷新提交模式 不为 AUTO_FLUSH_SYNC(自动同步刷新)时,才设置缓冲大小(数据条数)
        if (SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC != mode) {
            // 缓冲大小,也就是数据的条数
            session.setMutationBufferSpace(OPERATION_BATCH);
        }
 
        int commit = 0;
 
        for (int i = 0; i < recordCount; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            UUID uuid = UUID.randomUUID();
            // row.addString("字段名", 字段值)、row.addLong(第几列, 字段值) 
            row.addString("id", uuid.toString());
            row.addInt("value1", 16);
            row.addLong("value2", 16);
 
            Long gtmMillis;
            /**
             * System.currentTimeMillis() 是从1970-01-01开始算的毫秒数(GMT), kudu API是采用纳秒数, 因此须要时间*1000
             * 另外, 考虑到咱们是东8区时间, 因此转成Long型须要再加8个小时, 不然存到Kudu的时间是GTM, 比东8区晚8个小时
             */
            // 第一步: 获取当前时间对应的GTM时区unix毫秒数
            // 第二步: 将timestamp转成对应的GTM时区unix毫秒数
            Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
            gtmMillis = localTimestamp.getTime();
            // 将GTM的毫秒数转成东8区的毫秒数量
            Long shanghaiTimezoneMillis = gtmMillis + 8 * 3600 * 1000;
            row.addLong("timestamp", shanghaiTimezoneMillis);
 
            session.apply(insert);
 
            // 对于在MANUAL_FLUSH(手动刷新)模式时,进行 手动刷新提交
            if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode) {
                commit = commit + 1;
                // 对于手工提交, 须要buffer在未满的时候flush,这里采用了buffer一半时即提交
                //若是要提交的数据条数 已经大于 缓冲大小(数据条数)除以2的值的话,则进行一次手动刷新提交
                if (commit > OPERATION_BATCH / 2) {
                    session.flush();//手动刷新提交
                    commit = 0;
                }
            }
        }
 
        // 对于在MANUAL_FLUSH(手动刷新)模式时,进行 手动刷新提交
        // 对于手工提交, 保证完成最后的提交
        if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode && commit > 0) {
            session.flush();//手动刷新提交
        }
 
        // 对于后台自动提交, 必须保证完成最后的提交, 并保证有错误时能抛出异常
        if (SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND == mode) {
            session.flush();//手动刷新提交
            RowErrorsAndOverflowStatus error = session.getPendingErrors();
            // 检查错误收集器是否有溢出和是否有行错误
            if (error.isOverflowed() || error.getRowErrors().length > 0) {
                if (error.isOverflowed()) {
                    throw new Exception("kudu overflow exception occurred.");
                }
                StringBuilder errorMessage = new StringBuilder();
                if (error.getRowErrors().length > 0) {
                    for (RowError errorObj : error.getRowErrors()) {
                        errorMessage.append(errorObj.toString());
                        errorMessage.append(";");
                    }
                }
                throw new Exception(errorMessage.toString());
            }
        }
    }
 
    // 支持手动flush的测试用例
    public static void insertTestManualFlush(KuduSession session, KuduTable table, int recordCount) throws Exception {
        SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
        session.setFlushMode(mode);
        session.setMutationBufferSpace(OPERATION_BATCH);
 
        int commit = 0;
        for (int i = 0; i < recordCount; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            UUID uuid = UUID.randomUUID();
            row.addString("id", uuid.toString());
            row.addInt("value1", 17);
            row.addLong("value2", 17);
            Long gtmMillis;
            /**
             * System.currentTimeMillis() 是从1970-01-01开始算的毫秒数(GMT), kudu API是采用纳秒数, 因此须要时间*1000
             * 另外, 考虑到咱们是东8区时间, 因此转成Long型须要再加8个小时, 不然存到Kudu的时间是GTM, 比东8区晚8个小时
             */
            // 第一步: 获取当前时间对应的GTM时区unix毫秒数
            // 第二步: 将timestamp转成对应的GTM时区unix毫秒数
            Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
            gtmMillis = localTimestamp.getTime();
            // 将GTM的毫秒数转成东8区的毫秒数量
            Long shanghaiTimezoneMillis = gtmMillis + 8 * 3600 * 1000;
            row.addLong("timestamp", shanghaiTimezoneMillis);
            session.apply(insert);
            // 对于手工提交, 须要buffer在未满的时候flush,这里采用了buffer一半时即提交
            commit = commit + 1;
            //若是要提交的数据条数 已经大于 缓冲大小(数据条数)除以2的值的话,则进行一次手动刷新提交
            if (commit > OPERATION_BATCH / 2) {
                session.flush();//手动刷新提交
                commit = 0;
            }
        }
 
        // 对于手工提交, 保证完成最后的提交
        if (commit > 0) {
            session.flush();//手动刷新提交
        }
    }
 
    // 自动flush的测试案例
    public static void insertTestAutoFlushSync(KuduSession session, KuduTable table, int recordCount) throws Exception {
        SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
        session.setFlushMode(mode);
        for (int i = 0; i < recordCount; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            UUID uuid = UUID.randomUUID();
            row.addString("id", uuid.toString());
            row.addInt("value1", 18);
            row.addLong("value2", 18);
 
            Long gtmMillis;
 
            /**
             * System.currentTimeMillis() 是从1970-01-01开始算的毫秒数(GMT), kudu API是采用纳秒数, 因此须要时间*1000
             * 另外, 考虑到咱们是东8区时间, 因此转成Long型须要再加8个小时, 不然存到Kudu的时间是GTM, 比东8区晚8个小时
             */
            // 第一步: 获取当前时间对应的GTM时区unix毫秒数
            gtmMillis = System.currentTimeMillis();
 
            // 第二步: 将timestamp转成对应的GTM时区unix毫秒数
            Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
            gtmMillis = localTimestamp.getTime();
 
            // 将GTM的毫秒数转成东8区的毫秒数量
            Long shanghaiTimezoneMillis = gtmMillis + 8 * 3600 * 1000;
            row.addLong("timestamp", shanghaiTimezoneMillis);
 
            // 对于AUTO_FLUSH_SYNC模式, apply()将当即完成数据写入,可是并非批处理
            session.apply(insert);
        }
    }
 
    /**
     * 测试案例
     */
    public static void testStrategy() throws KuduException {
        KuduClient client = new KuduClient.KuduClientBuilder("192.168.161.128,192.168.161.129,192.168.161.130").build();
        KuduSession session = client.newSession();
        KuduTable table = client.openTable("bigData2");
 
        SessionConfiguration.FlushMode mode;
        long d1;
        long d2;
        long timeMillis;
        long seconds;
        int recordCount = 200000;
 
        try {
            // 自动刷新策略(默认的刷新策略,同步刷新)
            mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
            System.out.println(mode + " is start!");
            d1 = System.currentTimeMillis();
            insertTestAutoFlushSync(session, table, recordCount);
            d2 = System.currentTimeMillis();
            timeMillis = d2 - d1;
            System.out.println(mode.name() + "花费毫秒数: " + timeMillis);
 
            // 后台刷新策略(后台批处理刷新)
            mode = SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND;
            System.out.println(mode + " is start!");
            d1 = System.currentTimeMillis();
            insertTestGeneric(session, table, mode, recordCount);
            d2 = System.currentTimeMillis();
            timeMillis = d2 - d1;
            System.out.println(mode.name() + "花费毫秒数: " + timeMillis);
 
            // 手动刷新
            mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
            System.out.println(mode + " is start!");
            d1 = System.currentTimeMillis();
            insertTestManualFlush(session, table, recordCount);
            d2 = System.currentTimeMillis();
            timeMillis = d2 - d1;
            System.out.println(mode.name() + "花费毫秒数: " + timeMillis);
 
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (!session.isClosed()) {
                session.close();
            }
        }
    }
 
    public static void createTable() {
        String tableName = "bigData2";
        KuduClient client = new KuduClient.KuduClientBuilder("192.168.161.128,192.168.161.129,192.168.161.130").defaultAdminOperationTimeoutMs(60000).build();
        KuduSession session = client.newSession();
        session.setTimeoutMillis(60000);
 
        try {
            // 测试,若是table存在的状况下,就删除该表
            if (client.tableExists(tableName)) {
                client.deleteTable(tableName);
                System.out.println("delete the table is success!");
            }
 
            List<ColumnSchema> columns = new ArrayList();
 
            // 建立列
            columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.STRING).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("value1", Type.INT32).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("value2", Type.INT64).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.INT64).key(true).build());
 
            // 建立schema
            Schema schema = new Schema(columns);
            /*
            建立 hash分区 + range分区 二者同时使用 的表
                addHashPartitions(ImmutableList.of("字段名1","字段名2",...), hash分区数量)  默认使用主键,也可另外指定联合主键
                setRangePartitionColumns(ImmutableList.of("字段名"))
            */
            // id和timestamp 组成 联合主键
            ImmutableList<String> hashKeys = ImmutableList.of("id", "timestamp");
            CreateTableOptions tableOptions = new CreateTableOptions();
 
            // 设置hash分区,包括分区数量、副本数目
            tableOptions.addHashPartitions(hashKeys, 20); //hash分区数量
            tableOptions.setNumReplicas(1);//副本数目
 
            System.out.println("create the table is success! ");
            // 建立table,并设置partition
            client.createTable(tableName, schema, tableOptions);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) {
        try {
            createTable();
            testStrategy();
            /**
             *AUTO_FLUSH_SYNC is start!
              AUTO_FLUSH_SYNC花费毫秒数: 588863
              AUTO_FLUSH_BACKGROUND is start!
              AUTO_FLUSH_BACKGROUND花费毫秒数: 12284
              MANUAL_FLUSH is start!
              MANUAL_FLUSH花费毫秒数: 17231
             */
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
}
--------------------- 

 

查询 表数据

 

package src.main.sample;
 
import org.apache.kudu.Schema;
import org.apache.kudu.client.*;
 
import java.util.ArrayList;
import java.util.List;
 
public class SelectData {
    public static void main(String[] args) {
        try {
            String tableName = "bigData";
 
            KuduClient client = new KuduClient.KuduClientBuilder("192.168.161.128,192.168.161.129,192.168.161.130").defaultAdminOperationTimeoutMs(60000).build();
 
            // 获取须要查询数据的列
            List<String> projectColumns = new ArrayList<String>();
            projectColumns.add("id");
            projectColumns.add("user_id");
            projectColumns.add("start_time");
            projectColumns.add("name");
 
            KuduTable table = client.openTable(tableName);
 
            // 简单的读取
            KuduScanner scanner = client.newScannerBuilder(table).setProjectedColumnNames(projectColumns).build();
 
            // 根据主键设置读取的上限和下限
//            Schema schema = table.getSchema();
//            PartialRow lower = schema.newPartialRow();
//            lower.addLong("id", 10);
//            lower.addLong("user_id", 10);
//            lower.addLong("start_time", 50);
//            PartialRow upper = schema.newPartialRow();
//            upper.addLong("id", 50);
//            upper.addLong("user_id", 50);
//            upper.addLong("start_time", 50);
 
//            KuduScanner scanner = client.newScannerBuilder(table)
//                    .setProjectedColumnNames(projectColumns)
//                    .lowerBound(lower)
//                    .exclusiveUpperBound(upper)
//                    .build();
 
            while (scanner.hasMoreRows()) {
                RowResultIterator results = scanner.nextRows();
                // 15个tablet,每次从tablet中获取的数据的行数
                int numRows = results.getNumRows();
                System.out.println("numRows count is : " + numRows);
                while (results.hasNext()) {
                    RowResult result = results.next();
                    long id = result.getLong(0);
                    long user_id = result.getLong(1);
                    long start_time = result.getLong(2);
                    String name = result.getString(3);
                    System.out.println("id is : " + id + "  ===  user_id is : " + user_id + "  ===  start_time : " + start_time + "  ===  name is : " + name);
                }
                System.out.println("--------------------------------------");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
 

 

修改 表数据

package src.main.sample;
 
import org.apache.kudu.client.*;
 
public class UpsertData {
    public static void main(String[] args) {
        try {
            String tableName = "bigData";
 
            KuduClient client = new KuduClient.KuduClientBuilder("192.168.161.128,192.168.161.129,192.168.161.130").defaultAdminOperationTimeoutMs(60000).build();
            // 获取table
            KuduTable table = client.openTable(tableName);
            /**
             * mode形式:
             * SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND 后台自动一次性批处理刷新提交N条数据
             * SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC  每次自动同步刷新提交每条数据
             * SessionConfiguration.FlushMode.MANUAL_FLUSH     手动刷新一次性提交N条数据
             */
            // 获取一个会话
            KuduSession session = client.newSession();
            session.setTimeoutMillis(60000);
            session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH); //手动刷新一次性提交N条数据
            session.setMutationBufferSpace(10000); // 缓冲大小,也就是数据的条数
 
            // 插入时,初始时间
            long startTime = System.currentTimeMillis();
 
            int val = 0;
            // 在使用 upsert 语句时,当前须要 三个条件(key)都知足的状况下,才能够更新数据,不然就是插入数据
            // 三个条件(key) 分别指的是 hash分区的联合主键id、user_id,还有range分区字段 start_time
            for (int i = 0; i < 60; i++) {
                //upsert into 表名 values (‘xx’,123) 若是指定的values中的主键值 在表中已经存在,则执行update语义,反之,执行insert语义。
                Upsert upsert = table.newUpsert();
                PartialRow row = upsert.getRow();
                row.addLong(0, i); //指第一个字段 "id"(hash分区的联合主键之一)
                row.addLong(1, i*100); //指第二个字段 "user_id"(hash分区的联合主键之一)
                row.addLong(2, i); //指第三个字段 "start_time"(range分区字段)
                row.addString(3, "bigData"+i); //指第四个字段 "name"
                session.apply(upsert);
                if (val % 10 == 0) {
                    session.flush(); //手动刷新提交
                    val = 0;
                }
                val++;
            }
            session.flush(); //手动刷新提交
            // 插入时结束时间
            long endTime = System.currentTimeMillis();
            System.out.println("the timePeriod executed is : " + (endTime - startTime) + "ms");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
 
impala命令刷新元数据
    1.impala-shell 命令进入交互界面 执行 invalidate metadata; 命令刷新元数据 
    2.Hue的wen页面中,在impala执行sql的窗口 执行 invalidate metadata; 命令刷新元数据 
    
--------------------------------------------------------------------------
从Impala建立一个新的Kudu表
    从Impala在Kudu中建立新表相似于将现有Kudu表映射到Impala表,除了您须要本身指定模式和分区信息。
    使用如下示例做为指导。Impala首先建立表,而后建立映射。
    Impala 中建立一个新的 Kudu 表
    
        CREATE TABLE my_first_table
        (
          id BIGINT,
          name STRING,
          PRIMARY KEY(id)
        )
        PARTITION BY HASH PARTITIONS 16
        STORED AS KUDU;
 
    在CREATE TABLE语句中,必须首先列出组成主键的列。此外,隐式标记主键列NOT NULL。
    建立新的Kudu表时,您须要指定分发方案。
    请参阅分区表:https://kudu.apache.org/docs/kudu_impala_integration.html#partitioning_tables
    为了为简单起见,上面的表建立示例经过散列 id 列分红 16 个分区。
    有关分区的指导,请参阅 分区规则:https://kudu.apache.org/docs/kudu_impala_integration.html#partitioning_rules_of_thumb
 
    
CREATE TABLE AS SELECT
    您可使用 CREATE TABLE ... AS SELECT 语句查询 Impala 中的任何其余表或表来建立表。
    如下示例将现有表 old_table 中的全部行导入到 Kudu 表 new_table 中。 
    new_table 中的列的名称和类型 将根据 SELECT 语句的结果集中的列肯定。
    请注意,您必须另外指定主键和分区。
 
        CREATE TABLE new_table
        PRIMARY KEY (ts, name)
        PARTITION BY HASH(name) PARTITIONS 8
        STORED AS KUDU
        AS SELECT ts, name, value FROM old_table;
        
--------------------------------------------------------------------------
 
在Impala中查询现有的Kudu表:Impala中建立映射Kudu表的外部映射表
    经过Kudu API或其余集成(如Apache Spark)建立的表在Impala中不会自动显示。
    要查询它们,必须首先在Impala中建立外部表,以将Kudu表映射到Impala数据库:
    
        CREATE EXTERNAL TABLE `bigData` STORED AS KUDU
        TBLPROPERTIES(
            'kudu.table_name' = 'bigData',
            'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051')
            
            
查询 Impala 中现有的 Kudu 表(Impala中建立映射表(外部表)映射Kudu中的表)
    经过 Kudu API 或其余集成(如 Apache Spark )建立的表不会在 Impala 中自动显示。
    要查询它们,您必须先在 Impala 中建立外部表以将 Kudu 表映射到 Impala 数据库中:
 
        CREATE EXTERNAL TABLE my_mapping_table
        STORED AS KUDU
        TBLPROPERTIES (
          'kudu.table_name' = 'my_kudu_table'
        );
 
--------------------------------------------------------------------------
 
内部和外部 Impala 表
    使用 Impala 建立新的 Kudu 表时,能够将表建立为内部表或外部表。
 
Internal ( 内部表 )
    内部表由 Impala 管理,当您从 Impala 中删除时,数据和表确实被删除。当您使用 Impala 建立新表时,一般是内部表。
 
External ( 外部表 )
    外部表(由 CREATE EXTERNAL TABLE 建立)不受 Impala 管理,而且删除此表不会将表从其源位置(此处为 Kudu)丢弃。
    相反,它只会去除 Impala 和 Kudu 之间的映射。这是 Kudu 提供的用于将现有表映射到 Impala 的语法。
            
--------------------------------------------------------------------------
 
Kudu中的分区方法主要有两种:partition by hash 和 partition by range 
    kudu表基于其partition方法被拆分红多个分区,每一个分区就是一个tablet,一张kudu表所属的全部tablets均匀分布并存储在tablet servers的磁盘上。
    所以在建立kudu表的时候须要声明该表的partition方法,同时要指定primary key做为partition的依据。
    
基于hash的分区方法的基本原理是:
    基于primary key的hash值将每一个row(行)划分到相应的tablet当中,分区的个数即tablet的个数必须在建立表语句中指定,建表语句示例以下:
    注:若是未指定基于某个字段的hash值进行分区,默认以主键的hash值进行分区。
        create table test
        (
            name string,
            age int,
            primary key (name)
        )
        partition by hash (name) partitions 8
        stored as kudu;
 
基于range的分区方法的基本原理是:
    基于指定主键的取值范围将每一个row(行)划分到相应的tablet当中,用于range分区的主键以及各个取值范围都必须在建表语句中声明,建表语句示例以下:
    例子:有班级、姓名、年龄三个字段,表中的每一个row将会根据其所在的班级划分红四个分区,每一个分区就表明一个班级。
        create table test
        (
            classes int,
            name string,
            age int,
            primary key (classes,name)
        )
        partition by range (classes)
        (
            partition value = 1,
            partition value = 2,
            partition value = 3,
            partition value = 4
        )
        stored as kudu;
 
        
kudu表还能够采用基于hash和基于range相结合的分区方式 
    /*
    建立 hash分区 + range分区 二者同时使用 的表
        addHashPartitions(ImmutableList.of("字段名1","字段名2",...), hash分区数量)  默认使用主键,也可另外指定联合主键
        setRangePartitionColumns(ImmutableList.of("字段名"))
    */
    // 设置hash分区,包括分区数量、副本数目
    tableOptions.addHashPartitions(hashKeys,3); //hash分区数量
    tableOptions.setNumReplicas(3); //副本数目
 
    // 设置range分区
    tableOptions.setRangePartitionColumns(ImmutableList.of("start_time"));
            
--------------------------------------------------------------------------
        
kudu表支持3种insert语句:
    1.insert into test values(‘a’,12);
    2.insert into test values(‘a’,12),(‘b’,13),(‘c’,14);
    3.insert into test select * from other_table;
 
update语句
    kudu表的update操做不能更改主键的值,其余与标准sql语法相同。
 
upsert 语句
    对于 upsert into test values (‘a’,12) 
    若是指定的values中的主键值 在表中已经存在,则执行update语义,反之,执行insert语义。
    注意:若是同时存在 主键/联合主键、hash分区字段、range分区字段时,那么便要求三个条件都符合的状况下,才能够更新数据,不然就是插入数据。
    
delete语句
    与标准sql语法相同。
 
--------------------------------------------------------------------------
 
Impala 中建立一个新的 Kudu 表
        create table test
        (
        classes int,
        name string,
        age int,
        primary key (classes,name)
        )
        partition by range (classes)
        (
        partition value = 1,
        partition value = 2,
        partition value = 3,
        partition value = 4
        )
        stored as kudu;
 
        insert into test values(1,"nagisa",16);
        select * from test;
        
kudu webUI页面显示
    impala::default.test
    
Impala中建立映射Kudu表的外部映射表
    CREATE EXTERNAL TABLE `EXTERNAL_test` STORED AS KUDU
    TBLPROPERTIES(
        'kudu.table_name' = 'impala::default.test',
        'kudu.master_addresses' = 'node1:7051,node2:7051,node3:7051');
        
    insert into test values(2,"ushio",5);
    select * from EXTERNAL_test;
    
--------------------------------------------------------------------------
 
指定 Tablet Partitioning ( Tablet 分区 )
    表分为每一个由一个或多个 tablet servers 提供的 tablets 。理想状况下,tablets 应该相对平等地拆分表的数据。 
    Kudu 目前没有自动(或手动)拆分预先存在的 tablets  的机制。在实现此功能以前,您必须在建立表时指定分区。
    在设计表格架构时,请考虑使用主键,您能够将表拆分红以相似速度增加的分区。
    使用 Impala 建立表时,可使用 PARTITION BY 子句指定分区:
    注意:Impala 关键字(如 group)在关键字意义上不被使用时,由背面的字符包围。
    
        CREATE TABLE cust_behavior 
        (
              _id BIGINT PRIMARY KEY,
              salary STRING,
              edu_level INT,
              usergender STRING,
              `group` STRING,
              city STRING,
              postcode STRING,
              last_purchase_price FLOAT,
              last_purchase_date BIGINT,
              category STRING,
              sku STRING,
              rating INT,
              fulfilled_date BIGINT
        )
        PARTITION BY RANGE (_id)
        (
            PARTITION VALUES < 1439560049342,
            PARTITION 1439560049342 <= VALUES < 1439566253755,
            PARTITION 1439566253755 <= VALUES < 1439572458168,
            PARTITION 1439572458168 <= VALUES < 1439578662581,
            PARTITION 1439578662581 <= VALUES < 1439584866994,
            PARTITION 1439584866994 <= VALUES < 1439591071407,
            PARTITION 1439591071407 <= VALUES
        )
        STORED AS KUDU;
    
    
若是您有多个主键列,则可使用元组语法指定分区边界:('va'1),('ab'2)。该表达式必须是有效的 JSON
 
Impala 数据库和 Kudu
    每一个 Impala 表都包含在称为数据库的命名空间中。默认数据库称为默认数据库,用户可根据须要建立和删除其余数据库
 
当从 Impala 中建立一个受管 Kudu 表时,相应的 Kudu 表将被命名为 my_database :: table_name
 
不支持 Kudu 表的 Impala 关键字
    建立 Kudu 表时不支持如下 Impala 关键字: - PARTITIONED - LOCATION - ROWFORMAT
    
    
--------------------------------------------------------------------------
 
优化评估 SQL 谓词的性能
    若是您的查询的 WHERE 子句包含与 operators = , <= , '\ , '\' , > = , BETWEEN 或 IN 的比较,则 Kudu 直接评估该条件,只返回相关结果。
    这提供了最佳性能,由于 Kudu 只将相关结果返回给 Impala 。
    对于谓词 != , LIKE 或 Impala 支持的任何其余谓词类型, Kudu 不会直接评估谓词,而是将全部结果返回给 Impala ,并依赖于 Impala 来评估剩余的谓词并相应地过滤结果。
    这可能会致使性能差别,这取决于评估 WHERE 子句以前和以后的结果集的增量。
分区表
    根据主键列上的分区模式将表格划分为 tablets 。每一个 tablet 由至少一台 tablet server 提供。
    理想状况下,一张表应该分红多个 tablets 中分布的 tablet servers ,以最大化并行操做。您使用的分区模式的详细信息将彻底取决于您存储的数据类型和访问方式。关于 Kudu 模式设计的全面讨论,请参阅 Schema Design。
Kudu 目前没有在建立表以后拆分或合并 tablets 的机制。建立表时,必须为表提供分区模式。在设计表格时,请考虑使用主键,这样您就能够将表格分为以相同速率增加的 tablets 。
您可使用 Impala 的 PARTITION BY 关键字对表进行分区,该关键字支持 RANGE 或 HASH 分发。分区方案能够包含零个或多个 HASH 定义,后面是可选的 RANGE 定义。 RANGE 定义能够引用一个或多个主键列。基本 和 高级分区 的示例以下所示。

--------------------- 
相关文章
相关标签/搜索