spark 读取hbase

HBase api方式读取hbasehtml

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.List;

/**
 * 经过HbaseApi获取数据
 */
public class DataAchieveFromHbaseApi {
    public static void main(String[] args) throws IOException {
        //Hbase配置
        Configuration conf=HBaseConfiguration.create();
        conf.set("hbase.zookeeper.property.clientPort", "2181");//端口
        conf.set("hbase.zookeeper.quorum","hdh1,hdh2,hdh3");//hbase zookeeper地址
        //扫描配置
        Scan scan=new Scan();
        scan.addFamily(Bytes.toBytes("cf"));//列族,可添加多个
        //hbase表
        HTable hTable=new HTable(conf, Bytes.toBytes("test"));//代表
        //获取扫描数据
        ResultScanner rs= hTable.getScanner(scan);
        //hbase表的列族信息
        HColumnDescriptor[] hColDes=hTable.getTableDescriptor().getColumnFamilies();
        for (HColumnDescriptor hColDe : hColDes) {
            System.out.println(Bytes.toString(hColDe.getName()));
        }
        //展现每一行的每一列(这个只有一列)信息
        for (Result r : rs) {
            byte [] bytes= r.getValue(Bytes.toBytes("cf"),Bytes.toBytes("SSID"));//列族和列名
            String str=new String(bytes,"UTF-8");
            if(null!=str&&str.trim().length()>0) {
                System.out.println(str.trim());
            }
        }
        System.out.println("end<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<");
    }
}

spark提供的接口读取hbasejava

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.io.IOException;

/**
 * 经过hfile形式获取数据
 */
public class DataAchieveFromHfile {
    private static JavaPairRDD<ImmutableBytesWritable, Result> rdd;

    public static void main(String[] args) throws IOException {
        Configuration conf= HBaseConfiguration.create();
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("hbase.zookeeper.quorum","hdh1,hdh2,hdh3");
        conf.set(TableInputFormat.INPUT_TABLE, "test");
        SparkConf conf1=new SparkConf().setAppName("test").setMaster("local");//设置spark app名称和运行模式(此为local模式)
        JavaSparkContext sc=new JavaSparkContext(conf1);
        //加载数据
        rdd=sc.newAPIHadoopRDD(conf,TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
        System.out.println("读取数据条数:"+rdd.count());
        rdd.foreach(new VoidFunction<Tuple2<ImmutableBytesWritable, Result>>() {
            @Override
            public void call(Tuple2<ImmutableBytesWritable, Result> result) throws Exception {
                byte [] bytes= result._2().getValue(Bytes.toBytes("cf"), Bytes.toBytes("SSID"));//列族和列名
                String str= new String(bytes,"UTF-8");
                if(null!=str&&str.trim().length()>0) {
                    System.out.println(str.trim());
                }
            }
        });
    }
}

第二种方式,若是是直接读取hbase表那么就是扫描全表,若是读取的是快照,那就不走regionserverapache

spark读取快照:https://www.cnblogs.com/kwzblog/p/9007713.htmlapi

spark生成hfile写入hbase:https://www.cnblogs.com/luckuan/p/5142203.htmlapp

https://blog.csdn.net/wl044090432/article/details/50821313ide

相关文章
相关标签/搜索